Basic Support for Merge with user-defined timestamp (#10819)

Summary:
This PR implements the originally disabled `Merge()` APIs when user-defined timestamp is enabled.

Simplest usage:
```cpp
// assume string append merge op is used with '.' as delimiter.
// ts1 < ts2
db->Put(WriteOptions(), "key", ts1, "v0");
db->Merge(WriteOptions(), "key", ts2, "1");
ReadOptions ro;
ro.timestamp = &ts2;
db->Get(ro, "key", &value);
ASSERT_EQ("v0.1", value);
```

Some code comments are added for clarity.

Note: support for timestamp in `DB::GetMergeOperands()` will be done in a follow-up PR.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10819

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D40603195

Pulled By: riversand963

fbshipit-source-id: f96d6f183258f3392d80377025529f7660503013
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent 9f3475eccf
commit 7d26e4c5a3
  1. 3
      HISTORY.md
  2. 11
      db/compaction/compaction_iterator.cc
  3. 114
      db/compaction/compaction_iterator_test.cc
  4. 3
      db/db_impl/db_impl.h
  5. 26
      db/db_impl/db_impl_write.cc
  6. 6
      db/db_iter.cc
  7. 1
      db/db_iter.h
  8. 5
      db/db_test.cc
  9. 113
      db/db_with_timestamp_basic_test.cc
  10. 15
      db/dbformat.h
  11. 4
      db/lookup_key.h
  12. 2
      db/memtable.cc
  13. 53
      db/merge_helper.cc
  14. 4
      db/merge_helper.h
  15. 3
      db/merge_helper_test.cc
  16. 24
      db/write_batch.cc
  17. 6
      db/write_batch_test.cc
  18. 1
      include/rocksdb/compaction_filter.h
  19. 5
      include/rocksdb/db.h
  20. 1
      include/rocksdb/merge_operator.h
  21. 4
      include/rocksdb/utilities/stackable_db.h
  22. 18
      include/rocksdb/write_batch.h
  23. 1
      table/get_context.cc
  24. 12
      utilities/transactions/write_committed_transaction_ts_test.cc

@ -6,6 +6,9 @@
### Bug Fixes
* Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected.
### New Features
* Add basic support for user-defined timestamp to Merge (#10819).
## 7.8.0 (10/22/2022)
### New Features
* `DeleteRange()` now supports user-defined timestamp.

@ -168,7 +168,12 @@ void CompactionIterator::Next() {
}
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
if (0 == timestamp_size_) {
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
} else {
Slice ts = ikey_.GetTimestamp(timestamp_size_);
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts);
}
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
validity_info_.SetValid(ValidContext::kMerge1);
@ -877,8 +882,8 @@ void CompactionIterator::NextFromInput() {
// object to minimize change to the existing flow.
Status s = merge_helper_->MergeUntil(
&input_, range_del_agg_, prev_snapshot, bottommost_level_,
allow_data_in_errors_, blob_fetcher_.get(), prefetch_buffers_.get(),
&iter_stats_);
allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
prefetch_buffers_.get(), &iter_stats_);
merge_out_iter_.SeekToFirst();
if (!s.ok() && !s.IsMergeInProgress()) {

@ -812,6 +812,8 @@ TEST_P(PerKeyPlacementCompIteratorTest, SplitLastLevelData) {
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(PerKeyPlacementCompIteratorTest, SnapshotData) {
@ -877,6 +879,8 @@ TEST_P(PerKeyPlacementCompIteratorTest, ConflictWithSnapshot) {
// output_to_penultimate_level.
c_iter_->Next();
ASSERT_TRUE(c_iter_->status().IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
INSTANTIATE_TEST_CASE_P(PerKeyPlacementCompIteratorTest,
@ -1251,6 +1255,31 @@ TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
}
}
TEST_P(CompactionIteratorTsGcTest, NoMergeEligibleForGc) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(10002, user_key, 102, kTypeMerge),
test::KeyStr(10001, user_key, 101, kTypeMerge),
test::KeyStr(10000, user_key, 100, kTypeValue)};
const std::vector<std::string> input_values = {"2", "1", "a0"};
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendTESTOperator();
const std::vector<std::string>& expected_keys = input_keys;
const std::vector<std::string>& expected_values = input_values;
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const auto& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(),
/*compaction_filter=*/nullptr, bottommost_level,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
key_not_exists_beyond_output_level,
/*full_history_ts_low=*/nullptr);
}
}
TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
@ -1304,6 +1333,91 @@ TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
}
}
TEST_P(CompactionIteratorTsGcTest, SomeMergesOlderThanThreshold) {
constexpr char user_key[][2] = {"a", "f"};
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeMerge),
test::KeyStr(/*ts=*/19000, user_key[0], /*seq=*/2300, kTypeMerge),
test::KeyStr(/*ts=*/18000, user_key[0], /*seq=*/1800, kTypeMerge),
test::KeyStr(/*ts=*/16000, user_key[0], /*seq=*/1600, kTypeValue),
test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeMerge),
test::KeyStr(/*ts=*/17000, user_key[1], /*seq=*/1700, kTypeMerge),
test::KeyStr(/*ts=*/15000, user_key[1], /*seq=*/1600,
kTypeDeletionWithTimestamp)};
const std::vector<std::string> input_values = {"25", "19", "18", "16",
"19", "17", ""};
std::shared_ptr<MergeOperator> merge_op =
MergeOperators::CreateStringAppendTESTOperator();
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 20000);
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
{
AddSnapshot(1600);
AddSnapshot(1900);
const std::vector<std::string> expected_keys = {
test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeMerge),
test::KeyStr(/*ts=*/19000, user_key[0], /*seq=*/2300, kTypeMerge),
test::KeyStr(/*ts=*/18000, user_key[0], /*seq=*/1800, kTypeMerge),
test::KeyStr(/*ts=*/16000, user_key[0], /*seq=*/1600, kTypeValue),
test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeMerge),
test::KeyStr(/*ts=*/17000, user_key[1], /*seq=*/1700, kTypeMerge),
test::KeyStr(/*ts=*/15000, user_key[1], /*seq=*/1600,
kTypeDeletionWithTimestamp)};
const std::vector<std::string> expected_values = {"25", "19", "18", "16",
"19", "17", ""};
for (const auto& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
auto expected_keys_copy = expected_keys;
auto expected_values_copy = expected_values;
if (bottommost_level || key_not_exists_beyond_output_level) {
// the kTypeDeletionWithTimestamp will be dropped
expected_keys_copy.pop_back();
expected_values_copy.pop_back();
if (bottommost_level) {
// seq zero
expected_keys_copy[3] =
test::KeyStr(/*ts=*/0, user_key[0], /*seq=*/0, kTypeValue);
}
}
RunTest(input_keys, input_values, expected_keys_copy,
expected_values_copy,
/*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(),
/*compaction_filter=*/nullptr, bottommost_level,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
ClearSnapshots();
}
// No snapshots
{
const std::vector<std::string> expected_keys = {
test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeValue),
test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeValue)};
const std::vector<std::string> expected_values = {"16,18,19,25", "17,19"};
for (const auto& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
auto expected_keys_copy = expected_keys;
auto expected_values_copy = expected_values;
if (bottommost_level) {
expected_keys_copy[1] =
test::KeyStr(/*ts=*/0, user_key[1], /*seq=*/0, kTypeValue);
}
RunTest(input_keys, input_values, expected_keys_copy,
expected_values_copy,
/*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(),
/*compaction_filter=*/nullptr, bottommost_level,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
}
TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {

@ -204,6 +204,9 @@ class DBImpl : public DB {
using DB::Merge;
Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) override;
using DB::Delete;
Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key) override;

@ -62,6 +62,15 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
}
}
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& val) {
const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
if (!s.ok()) {
return s;
}
return DB::Merge(o, column_family, key, ts, val);
}
Status DBImpl::Delete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family, const Slice& key) {
const Status s = FailIfCfHasTs(column_family);
@ -2406,4 +2415,21 @@ Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
}
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.Merge(column_family, key, ts, value);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
} // namespace ROCKSDB_NAMESPACE

@ -523,7 +523,8 @@ bool DBIter::MergeValuesNewToOld() {
return false;
}
if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey())) {
// hit the next user key, stop right here
break;
}
@ -1159,7 +1160,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!ParseKey(&ikey)) {
return false;
}
if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey())) {
break;
}
if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {

@ -317,6 +317,7 @@ class DBIter final : public Iterator {
wide_columns_.clear();
}
// If user-defined timestamp is enabled, `user_key` includes timestamp.
Status Merge(const Slice* val, const Slice& user_key);
const SliceTransform* prefix_extractor_;

@ -3075,6 +3075,11 @@ class ModelDB : public DB {
}
return Write(o, &batch);
}
Status Merge(const WriteOptions& /*o*/, ColumnFamilyHandle* /*cf*/,
const Slice& /*k*/, const Slice& /*ts*/,
const Slice& /*value*/) override {
return Status::NotSupported();
}
using DB::Get;
Status Get(const ReadOptions& /*options*/, ColumnFamilyHandle* /*cf*/,
const Slice& key, PinnableSlice* /*value*/) override {

@ -18,6 +18,7 @@
#endif
#include "test_util/testutil.h"
#include "utilities/fault_injection_env.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
namespace ROCKSDB_NAMESPACE {
class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
@ -50,7 +51,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) {
db_->Put(WriteOptions(), "key", dummy_ts, "value").IsInvalidArgument());
ASSERT_TRUE(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), "key",
dummy_ts, "value")
.IsNotSupported());
.IsInvalidArgument());
ASSERT_TRUE(db_->Delete(WriteOptions(), "key", dummy_ts).IsInvalidArgument());
ASSERT_TRUE(
db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument());
@ -96,7 +97,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) {
ASSERT_TRUE(db_->Put(WriteOptions(), handle, "key", wrong_ts, "value")
.IsInvalidArgument());
ASSERT_TRUE(db_->Merge(WriteOptions(), handle, "key", wrong_ts, "value")
.IsNotSupported());
.IsInvalidArgument());
ASSERT_TRUE(
db_->Delete(WriteOptions(), handle, "key", wrong_ts).IsInvalidArgument());
ASSERT_TRUE(db_->SingleDelete(WriteOptions(), handle, "key", wrong_ts)
@ -3690,6 +3691,114 @@ TEST_F(DBBasicTestWithTimestamp, DeleteRangeGetIteratorWithSnapshot) {
db_->ReleaseSnapshot(after_tombstone);
Close();
}
TEST_F(DBBasicTestWithTimestamp, MergeBasic) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.merge_operator = std::make_shared<StringAppendTESTOperator>('.');
DestroyAndReopen(options);
const std::array<std::string, 3> write_ts_strs = {
Timestamp(100, 0), Timestamp(200, 0), Timestamp(300, 0)};
constexpr size_t kNumOfUniqKeys = 100;
ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
for (size_t i = 0; i < write_ts_strs.size(); ++i) {
for (size_t j = 0; j < kNumOfUniqKeys; ++j) {
Status s;
if (i == 0) {
const std::string val = "v" + std::to_string(j) + "_0";
s = db_->Put(WriteOptions(), Key1(j), write_ts_strs[i], val);
} else {
const std::string merge_op = std::to_string(i);
s = db_->Merge(WriteOptions(), default_cf, Key1(j), write_ts_strs[i],
merge_op);
}
ASSERT_OK(s);
}
}
std::array<std::string, 3> read_ts_strs = {
Timestamp(150, 0), Timestamp(250, 0), Timestamp(350, 0)};
const auto verify_db_with_get = [&]() {
for (size_t i = 0; i < kNumOfUniqKeys; ++i) {
const std::string base_val = "v" + std::to_string(i) + "_0";
const std::array<std::string, 3> expected_values = {
base_val, base_val + ".1", base_val + ".1.2"};
const std::array<std::string, 3>& expected_ts = write_ts_strs;
ReadOptions read_opts;
for (size_t j = 0; j < read_ts_strs.size(); ++j) {
Slice read_ts = read_ts_strs[j];
read_opts.timestamp = &read_ts;
std::string value;
std::string ts;
const Status s = db_->Get(read_opts, Key1(i), &value, &ts);
ASSERT_OK(s);
ASSERT_EQ(expected_values[j], value);
ASSERT_EQ(expected_ts[j], ts);
// Do Seek/SeekForPrev
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
it->Seek(Key1(i));
ASSERT_TRUE(it->Valid());
ASSERT_EQ(expected_values[j], it->value());
ASSERT_EQ(expected_ts[j], it->timestamp());
it->SeekForPrev(Key1(i));
ASSERT_TRUE(it->Valid());
ASSERT_EQ(expected_values[j], it->value());
ASSERT_EQ(expected_ts[j], it->timestamp());
}
}
};
const auto verify_db_with_iterator = [&]() {
std::string value_suffix;
for (size_t i = 0; i < read_ts_strs.size(); ++i) {
ReadOptions read_opts;
Slice read_ts = read_ts_strs[i];
read_opts.timestamp = &read_ts;
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
size_t key_int_val = 0;
for (it->SeekToFirst(); it->Valid(); it->Next(), ++key_int_val) {
const std::string key = Key1(key_int_val);
const std::string value =
"v" + std::to_string(key_int_val) + "_0" + value_suffix;
ASSERT_EQ(key, it->key());
ASSERT_EQ(value, it->value());
ASSERT_EQ(write_ts_strs[i], it->timestamp());
}
ASSERT_EQ(kNumOfUniqKeys, key_int_val);
key_int_val = kNumOfUniqKeys - 1;
for (it->SeekToLast(); it->Valid(); it->Prev(), --key_int_val) {
const std::string key = Key1(key_int_val);
const std::string value =
"v" + std::to_string(key_int_val) + "_0" + value_suffix;
ASSERT_EQ(key, it->key());
ASSERT_EQ(value, it->value());
ASSERT_EQ(write_ts_strs[i], it->timestamp());
}
ASSERT_EQ(std::numeric_limits<size_t>::max(), key_int_val);
value_suffix = value_suffix + "." + std::to_string(i + 1);
}
};
verify_db_with_get();
verify_db_with_iterator();
ASSERT_OK(db_->Flush(FlushOptions()));
verify_db_with_get();
verify_db_with_iterator();
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -129,6 +129,12 @@ struct ParsedInternalKey {
const char* addr = user_key.data() + user_key.size() - ts.size();
memcpy(const_cast<char*>(addr), ts.data(), ts.size());
}
Slice GetTimestamp(size_t ts_sz) {
assert(ts_sz <= user_key.size());
const char* addr = user_key.data() + user_key.size() - ts_sz;
return Slice(const_cast<char*>(addr), ts_sz);
}
};
// Return the length of the encoding of "key".
@ -439,6 +445,8 @@ class IterKey {
void SetIsUserKey(bool is_user_key) { is_user_key_ = is_user_key; }
// Returns the key in whichever format that was provided to KeyIter
// If user-defined timestamp is enabled, then timestamp is included in the
// return result.
Slice GetKey() const { return Slice(key_, key_size_); }
Slice GetInternalKey() const {
@ -446,6 +454,8 @@ class IterKey {
return Slice(key_, key_size_);
}
// If user-defined timestamp is enabled, then timestamp is included in the
// return result of GetUserKey();
Slice GetUserKey() const {
if (IsUserKey()) {
return Slice(key_, key_size_);
@ -495,6 +505,9 @@ class IterKey {
return SetKeyImpl(key, copy);
}
// If user-defined timestamp is enabled, then `key` includes timestamp.
// TODO(yanqin) this is also used to set prefix, which do not include
// timestamp. Should be handled.
Slice SetUserKey(const Slice& key, bool copy = true) {
is_user_key_ = true;
return SetKeyImpl(key, copy);
@ -689,6 +702,8 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key,
// slice they point to.
// Tag is defined as ValueType.
// input will be advanced to after the record.
// If user-defined timestamp is enabled for a column family, then the `key`
// resulting from this call will include timestamp.
extern Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid);

@ -35,7 +35,9 @@ class LookupKey {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_));
}
// Return the user key
// Return the user key.
// If user-defined timestamp is enabled, then timestamp is included in the
// result.
Slice user_key() const {
return Slice(kstart_, static_cast<size_t>(end_ - kstart_ - 8));
}

@ -1058,6 +1058,8 @@ static bool SaveValue(void* arg, const char* entry) {
if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
// TODO(yanqin) update MergeContext so that timestamps information
// can also be retained.
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);

@ -63,7 +63,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
bool update_num_ops_stats) {
assert(merge_operator != nullptr);
if (operands.size() == 0) {
if (operands.empty()) {
assert(value != nullptr && result != nullptr);
result->assign(value->data(), value->size());
return Status::OK();
@ -74,7 +74,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
static_cast<uint64_t>(operands.size()));
}
bool success;
bool success = false;
Slice tmp_result_operand(nullptr, 0);
const MergeOperator::MergeOperationInput merge_in(key, value, operands,
logger);
@ -155,6 +155,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
const bool at_bottom,
const bool allow_data_in_errors,
const BlobFetcher* blob_fetcher,
const std::string* const full_history_ts_low,
PrefetchBufferCollection* prefetch_buffers,
CompactionIterationStats* c_iter_stats) {
// Get a copy of the internal key, before it's invalidated by iter->Next()
@ -164,6 +165,12 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
merge_context_.Clear();
has_compaction_filter_skip_until_ = false;
assert(user_merge_operator_);
assert(user_comparator_);
const size_t ts_sz = user_comparator_->timestamp_size();
if (full_history_ts_low) {
assert(ts_sz > 0);
assert(ts_sz == full_history_ts_low->size());
}
bool first_key = true;
// We need to parse the internal key again as the parsed key is
@ -184,6 +191,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
if (!s.ok()) return s;
bool hit_the_next_user_key = false;
int cmp_with_full_history_ts_low = 0;
for (; iter->Valid(); iter->Next(), original_key_is_iter = false) {
if (IsShuttingDown()) {
s = Status::ShutdownInProgress();
@ -195,6 +203,14 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
Status pik_status =
ParseInternalKey(iter->key(), &ikey, allow_data_in_errors);
Slice ts;
if (pik_status.ok()) {
ts = ExtractTimestampFromUserKey(ikey.user_key, ts_sz);
if (full_history_ts_low) {
cmp_with_full_history_ts_low =
user_comparator_->CompareTimestamp(ts, *full_history_ts_low);
}
}
if (!pik_status.ok()) {
// stop at corrupted key
if (assert_valid_internal_key_) {
@ -202,10 +218,18 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
}
break;
} else if (first_key) {
// If user-defined timestamp is enabled, we expect both user key and
// timestamps are equal, as a sanity check.
assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key));
first_key = false;
} else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) {
// hit a different user key, stop right here
} else if (!user_comparator_->EqualWithoutTimestamp(ikey.user_key,
orig_ikey.user_key) ||
(ts_sz > 0 &&
!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key) &&
cmp_with_full_history_ts_low >= 0)) {
// 1) hit a different user key, or
// 2) user-defined timestamp is enabled, and hit a version of user key NOT
// eligible for GC, then stop right here.
hit_the_next_user_key = true;
break;
} else if (stop_before > 0 && ikey.sequence <= stop_before &&
@ -338,9 +362,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
filter == CompactionFilter::Decision::kChangeValue) {
if (original_key_is_iter) {
// this is just an optimization that saves us one memcpy
keys_.push_front(std::move(original_key));
keys_.emplace_front(original_key);
} else {
keys_.push_front(iter->key().ToString());
keys_.emplace_front(iter->key().ToString());
}
if (keys_.size() == 1) {
// we need to re-anchor the orig_ikey because it was anchored by
@ -353,7 +377,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
if (filter == CompactionFilter::Decision::kKeep) {
merge_context_.PushOperand(
value_slice, iter->IsValuePinned() /* operand_pinned */);
} else { // kChangeValue
} else {
assert(filter == CompactionFilter::Decision::kChangeValue);
// Compaction filter asked us to change the operand from value_slice
// to compaction_filter_value_.
merge_context_.PushOperand(compaction_filter_value_, false);
@ -369,6 +394,13 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
}
}
if (cmp_with_full_history_ts_low >= 0) {
// If we reach here, and ts_sz == 0, it means compaction cannot perform
// merge with an earlier internal key, thus merge_context_.GetNumOperands()
// is 1.
assert(ts_sz == 0 || merge_context_.GetNumOperands() == 1);
}
if (merge_context_.GetNumOperands() == 0) {
// we filtered out all the merge operands
return s;
@ -382,6 +414,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// AND
// we have either encountered another key or end of key history on this
// layer.
// Note that if user-defined timestamp is enabled, we need some extra caution
// here: if full_history_ts_low is nullptr, or it's not null but the key's
// timestamp is greater than or equal to full_history_ts_low, it means this
// key cannot be dropped. We may not have seen the beginning of the key.
//
// When these conditions are true we are able to merge all the keys
// using full merge.
@ -391,7 +427,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
// sure that all merge-operands on the same level get compacted together,
// this will simply lead to these merge operands moving to the next level.
bool surely_seen_the_beginning =
(hit_the_next_user_key || !iter->Valid()) && at_bottom;
(hit_the_next_user_key || !iter->Valid()) && at_bottom &&
(ts_sz == 0 || cmp_with_full_history_ts_low < 0);
if (surely_seen_the_beginning) {
// do a final merge with nullptr as the existing value and say
// bye to the merge type (it's now converted to a Put)

@ -65,7 +65,7 @@ class MergeHelper {
Slice* result_operand = nullptr,
bool update_num_ops_stats = false);
// Merge entries until we hit
// During compaction, merge entries until we hit
// - a corrupted key
// - a Put/Delete,
// - a different user key,
@ -101,6 +101,7 @@ class MergeHelper {
const SequenceNumber stop_before, const bool at_bottom,
const bool allow_data_in_errors,
const BlobFetcher* blob_fetcher,
const std::string* const full_history_ts_low,
PrefetchBufferCollection* prefetch_buffers,
CompactionIterationStats* c_iter_stats);
@ -108,6 +109,7 @@ class MergeHelper {
// in the constructor. Returns the decision that the filter made.
// Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
// optional outputs of compaction filter.
// user_key includes timestamp if user-defined timestamp is enabled.
CompactionFilter::Decision FilterMerge(const Slice& user_key,
const Slice& value_slice);

@ -35,7 +35,8 @@ class MergeHelperTest : public testing::Test {
return merge_helper_->MergeUntil(
iter_.get(), nullptr /* range_del_agg */, stop_before, at_bottom,
false /* allow_data_in_errors */, nullptr /* blob_fetcher */,
nullptr /* prefetch_buffers */, nullptr /* c_iter_stats */);
nullptr /* full_history_ts_low */, nullptr /* prefetch_buffers */,
nullptr /* c_iter_stats */);
}
void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,

@ -1481,8 +1481,27 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
return WriteBatchInternal::Merge(this, cf_id, key, value);
}
return Status::InvalidArgument(
"Cannot call this method on column family enabling timestamp");
needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> key_with_ts{{key, dummy_ts}};
return WriteBatchInternal::Merge(
this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
}
Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts, const Slice& value) {
const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
if (!s.ok()) {
return s;
}
has_key_with_ts_ = true;
assert(column_family);
uint32_t cf_id = column_family->GetID();
std::array<Slice, 2> key_with_ts{{key, ts}};
return WriteBatchInternal::Merge(
this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1));
}
Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
@ -2036,6 +2055,7 @@ class MemTableInserter : public WriteBatch::Handler {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
// TODO (yanqin): fix when user-defined timestamp is enabled.
get_status = db_->Get(ropts, cf_handle, key, &prev_value);
}
// Intentionally overwrites the `NotFound` in `ret_status`.

@ -961,14 +961,14 @@ TEST_F(WriteBatchTest, SanityChecks) {
ASSERT_TRUE(wb.Put(nullptr, "key", "ts", "value").IsInvalidArgument());
ASSERT_TRUE(wb.Delete(nullptr, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.SingleDelete(nullptr, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsNotSupported());
ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsInvalidArgument());
ASSERT_TRUE(wb.DeleteRange(nullptr, "begin_key", "end_key", "ts")
.IsInvalidArgument());
ASSERT_TRUE(wb.Put(&cf4, "key", "ts", "value").IsInvalidArgument());
ASSERT_TRUE(wb.Delete(&cf4, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.SingleDelete(&cf4, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsNotSupported());
ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsInvalidArgument());
ASSERT_TRUE(
wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsInvalidArgument());
@ -978,7 +978,7 @@ TEST_F(WriteBatchTest, SanityChecks) {
ASSERT_TRUE(wb.Put(&cf0, "key", ts, "value").IsInvalidArgument());
ASSERT_TRUE(wb.Delete(&cf0, "key", ts).IsInvalidArgument());
ASSERT_TRUE(wb.SingleDelete(&cf0, "key", ts).IsInvalidArgument());
ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsNotSupported());
ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsInvalidArgument());
ASSERT_TRUE(
wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsInvalidArgument());

@ -163,6 +163,7 @@ class CompactionFilter : public Customizable {
// is a write conflict and may allow a Transaction to Commit that should have
// failed. Instead, it is better to implement any Merge filtering inside the
// MergeOperator.
// key includes timestamp if user-defined timestamp is enabled.
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* /*skip_until*/) const {

@ -500,10 +500,7 @@ class DB {
virtual Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/, const Slice& /*ts*/,
const Slice& /*value*/) {
return Status::NotSupported(
"Merge does not support user-defined timestamp yet");
}
const Slice& /*value*/);
// Apply the specified updates to the database.
// If `updates` contains no update, WAL will still be synced if

@ -82,6 +82,7 @@ class MergeOperator : public Customizable {
}
struct MergeOperationInput {
// If user-defined timestamp is enabled, `_key` includes timestamp.
explicit MergeOperationInput(const Slice& _key,
const Slice* _existing_value,
const std::vector<Slice>& _operand_list,

@ -215,6 +215,10 @@ class StackableDB : public DB {
const Slice& value) override {
return db_->Merge(options, column_family, key, value);
}
Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) override {
return db_->Merge(options, column_family, key, ts, value);
}
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override {
return db_->Write(opts, updates);

@ -172,10 +172,7 @@ class WriteBatch : public WriteBatchBase {
return Merge(nullptr, key, value);
}
Status Merge(ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
const Slice& /*ts*/, const Slice& /*value*/) override {
return Status::NotSupported(
"Merge does not support user-defined timestamp");
}
const Slice& /*ts*/, const Slice& /*value*/) override;
// variant that takes SliceParts
Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key,
@ -219,6 +216,7 @@ class WriteBatch : public WriteBatchBase {
Status PopSavePoint() override;
// Support for iterating over the contents of a batch.
// Objects of subclasses of Handler will be used by WriteBatch::Iterate().
class Handler {
public:
virtual ~Handler();
@ -229,6 +227,7 @@ class WriteBatch : public WriteBatchBase {
// default implementation will just call Put without column family for
// backwards compatibility. If the column family is not default,
// the function is noop
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
@ -241,14 +240,17 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument(
"non-default column family and PutCF not implemented");
}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual Status PutEntityCF(uint32_t /* column_family_id */,
const Slice& /* key */,
const Slice& /* entity */) {
return Status::NotSupported("PutEntityCF not implemented");
}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
Delete(key);
@ -257,8 +259,10 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument(
"non-default column family and DeleteCF not implemented");
}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual void Delete(const Slice& /*key*/) {}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
SingleDelete(key);
@ -267,14 +271,18 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument(
"non-default column family and SingleDeleteCF not implemented");
}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual void SingleDelete(const Slice& /*key*/) {}
// If user-defined timestamp is enabled, then `begin_key` and `end_key`
// both include timestamp.
virtual Status DeleteRangeCF(uint32_t /*column_family_id*/,
const Slice& /*begin_key*/,
const Slice& /*end_key*/) {
return Status::InvalidArgument("DeleteRangeCF not implemented");
}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
@ -284,8 +292,10 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument(
"non-default column family and MergeCF not implemented");
}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {}
// If user-defined timestamp is enabled, then `key` includes timestamp.
virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/,
const Slice& /*key*/,
const Slice& /*value*/) {

@ -478,6 +478,7 @@ bool GetContext::GetBlobValue(const Slice& blob_index,
}
void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
// TODO(yanqin) preserve timestamps information in merge_context
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
value_pinner != nullptr) {
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());

@ -320,6 +320,7 @@ TEST_P(WriteCommittedTxnWithTsTest, Merge) {
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
@ -338,8 +339,17 @@ TEST_P(WriteCommittedTxnWithTsTest, Merge) {
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn);
ASSERT_OK(txn->Put(handles_[1], "foo", "bar"));
ASSERT_TRUE(txn->Merge(handles_[1], "foo", "1").IsInvalidArgument());
ASSERT_OK(txn->Merge(handles_[1], "foo", "1"));
ASSERT_OK(txn->SetCommitTimestamp(24));
ASSERT_OK(txn->Commit());
txn.reset();
{
std::string value;
const Status s =
GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/24, &value);
ASSERT_OK(s);
ASSERT_EQ("bar,1", value);
}
}
TEST_P(WriteCommittedTxnWithTsTest, GetForUpdate) {

Loading…
Cancel
Save