From 7d26e4c5a39edb7e805621ccd0b0fec46b9c8f9d Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 31 Oct 2022 22:28:58 -0700 Subject: [PATCH] Basic Support for Merge with user-defined timestamp (#10819) Summary: This PR implements the originally disabled `Merge()` APIs when user-defined timestamp is enabled. Simplest usage: ```cpp // assume string append merge op is used with '.' as delimiter. // ts1 < ts2 db->Put(WriteOptions(), "key", ts1, "v0"); db->Merge(WriteOptions(), "key", ts2, "1"); ReadOptions ro; ro.timestamp = &ts2; db->Get(ro, "key", &value); ASSERT_EQ("v0.1", value); ``` Some code comments are added for clarity. Note: support for timestamp in `DB::GetMergeOperands()` will be done in a follow-up PR. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10819 Test Plan: make check Reviewed By: ltamasi Differential Revision: D40603195 Pulled By: riversand963 fbshipit-source-id: f96d6f183258f3392d80377025529f7660503013 --- HISTORY.md | 3 + db/compaction/compaction_iterator.cc | 11 +- db/compaction/compaction_iterator_test.cc | 114 ++++++++++++++++++ db/db_impl/db_impl.h | 3 + db/db_impl/db_impl_write.cc | 26 ++++ db/db_iter.cc | 6 +- db/db_iter.h | 1 + db/db_test.cc | 5 + db/db_with_timestamp_basic_test.cc | 113 ++++++++++++++++- db/dbformat.h | 15 +++ db/lookup_key.h | 4 +- db/memtable.cc | 2 + db/merge_helper.cc | 53 ++++++-- db/merge_helper.h | 4 +- db/merge_helper_test.cc | 3 +- db/write_batch.cc | 24 +++- db/write_batch_test.cc | 6 +- include/rocksdb/compaction_filter.h | 1 + include/rocksdb/db.h | 5 +- include/rocksdb/merge_operator.h | 1 + include/rocksdb/utilities/stackable_db.h | 4 + include/rocksdb/write_batch.h | 18 ++- table/get_context.cc | 1 + .../write_committed_transaction_ts_test.cc | 12 +- 24 files changed, 403 insertions(+), 32 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 553eeb463..2dfce83a1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,9 @@ ### Bug Fixes * Fix FIFO compaction causing corruption of overlapping seqnos in L0 files due to ingesting files of overlapping seqnos with memtable's under `CompactionOptionsFIFO::allow_compaction=true` or `CompactionOptionsFIFO::age_for_warm>0` or `CompactRange()/CompactFiles()` is used. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. +### New Features +* Add basic support for user-defined timestamp to Merge (#10819). + ## 7.8.0 (10/22/2022) ### New Features * `DeleteRange()` now supports user-defined timestamp. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index ad4cac844..788c8876c 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -168,7 +168,12 @@ void CompactionIterator::Next() { } // Keep current_key_ in sync. - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + if (0 == timestamp_size_) { + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + } else { + Slice ts = ikey_.GetTimestamp(timestamp_size_); + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts); + } key_ = current_key_.GetInternalKey(); ikey_.user_key = current_key_.GetUserKey(); validity_info_.SetValid(ValidContext::kMerge1); @@ -877,8 +882,8 @@ void CompactionIterator::NextFromInput() { // object to minimize change to the existing flow. Status s = merge_helper_->MergeUntil( &input_, range_del_agg_, prev_snapshot, bottommost_level_, - allow_data_in_errors_, blob_fetcher_.get(), prefetch_buffers_.get(), - &iter_stats_); + allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_, + prefetch_buffers_.get(), &iter_stats_); merge_out_iter_.SeekToFirst(); if (!s.ok() && !s.IsMergeInProgress()) { diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 5e0768d15..81362d792 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -812,6 +812,8 @@ TEST_P(PerKeyPlacementCompIteratorTest, SplitLastLevelData) { c_iter_->Next(); ASSERT_OK(c_iter_->status()); ASSERT_FALSE(c_iter_->Valid()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); } TEST_P(PerKeyPlacementCompIteratorTest, SnapshotData) { @@ -877,6 +879,8 @@ TEST_P(PerKeyPlacementCompIteratorTest, ConflictWithSnapshot) { // output_to_penultimate_level. c_iter_->Next(); ASSERT_TRUE(c_iter_->status().IsCorruption()); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); } INSTANTIATE_TEST_CASE_P(PerKeyPlacementCompIteratorTest, @@ -1251,6 +1255,31 @@ TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) { } } +TEST_P(CompactionIteratorTsGcTest, NoMergeEligibleForGc) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(10002, user_key, 102, kTypeMerge), + test::KeyStr(10001, user_key, 101, kTypeMerge), + test::KeyStr(10000, user_key, 100, kTypeValue)}; + const std::vector input_values = {"2", "1", "a0"}; + std::shared_ptr merge_op = + MergeOperators::CreateStringAppendTESTOperator(); + const std::vector& expected_keys = input_keys; + const std::vector& expected_values = input_values; + const std::vector> params = { + {false, false}, {false, true}, {true, true}}; + for (const auto& param : params) { + const bool bottommost_level = param.first; + const bool key_not_exists_beyond_output_level = param.second; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(), + /*compaction_filter=*/nullptr, bottommost_level, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + key_not_exists_beyond_output_level, + /*full_history_ts_low=*/nullptr); + } +} + TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) { constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}}; const std::vector input_keys = { @@ -1304,6 +1333,91 @@ TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) { } } +TEST_P(CompactionIteratorTsGcTest, SomeMergesOlderThanThreshold) { + constexpr char user_key[][2] = {"a", "f"}; + const std::vector input_keys = { + test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeMerge), + test::KeyStr(/*ts=*/19000, user_key[0], /*seq=*/2300, kTypeMerge), + test::KeyStr(/*ts=*/18000, user_key[0], /*seq=*/1800, kTypeMerge), + test::KeyStr(/*ts=*/16000, user_key[0], /*seq=*/1600, kTypeValue), + test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeMerge), + test::KeyStr(/*ts=*/17000, user_key[1], /*seq=*/1700, kTypeMerge), + test::KeyStr(/*ts=*/15000, user_key[1], /*seq=*/1600, + kTypeDeletionWithTimestamp)}; + const std::vector input_values = {"25", "19", "18", "16", + "19", "17", ""}; + std::shared_ptr merge_op = + MergeOperators::CreateStringAppendTESTOperator(); + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 20000); + + const std::vector> params = { + {false, false}, {false, true}, {true, true}}; + + { + AddSnapshot(1600); + AddSnapshot(1900); + const std::vector expected_keys = { + test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeMerge), + test::KeyStr(/*ts=*/19000, user_key[0], /*seq=*/2300, kTypeMerge), + test::KeyStr(/*ts=*/18000, user_key[0], /*seq=*/1800, kTypeMerge), + test::KeyStr(/*ts=*/16000, user_key[0], /*seq=*/1600, kTypeValue), + test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeMerge), + test::KeyStr(/*ts=*/17000, user_key[1], /*seq=*/1700, kTypeMerge), + test::KeyStr(/*ts=*/15000, user_key[1], /*seq=*/1600, + kTypeDeletionWithTimestamp)}; + const std::vector expected_values = {"25", "19", "18", "16", + "19", "17", ""}; + for (const auto& param : params) { + const bool bottommost_level = param.first; + const bool key_not_exists_beyond_output_level = param.second; + auto expected_keys_copy = expected_keys; + auto expected_values_copy = expected_values; + if (bottommost_level || key_not_exists_beyond_output_level) { + // the kTypeDeletionWithTimestamp will be dropped + expected_keys_copy.pop_back(); + expected_values_copy.pop_back(); + if (bottommost_level) { + // seq zero + expected_keys_copy[3] = + test::KeyStr(/*ts=*/0, user_key[0], /*seq=*/0, kTypeValue); + } + } + RunTest(input_keys, input_values, expected_keys_copy, + expected_values_copy, + /*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(), + /*compaction_filter=*/nullptr, bottommost_level, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + key_not_exists_beyond_output_level, &full_history_ts_low); + } + ClearSnapshots(); + } + + // No snapshots + { + const std::vector expected_keys = { + test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeValue), + test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeValue)}; + const std::vector expected_values = {"16,18,19,25", "17,19"}; + for (const auto& param : params) { + const bool bottommost_level = param.first; + const bool key_not_exists_beyond_output_level = param.second; + auto expected_keys_copy = expected_keys; + auto expected_values_copy = expected_values; + if (bottommost_level) { + expected_keys_copy[1] = + test::KeyStr(/*ts=*/0, user_key[1], /*seq=*/0, kTypeValue); + } + RunTest(input_keys, input_values, expected_keys_copy, + expected_values_copy, + /*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(), + /*compaction_filter=*/nullptr, bottommost_level, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + key_not_exists_beyond_output_level, &full_history_ts_low); + } + } +} + TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) { constexpr char user_key[] = "a"; const std::vector input_keys = { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index f9aef72f6..725e77c18 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -204,6 +204,9 @@ class DBImpl : public DB { using DB::Merge; Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) override; + Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& value) override; + using DB::Delete; Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key) override; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 635f16f01..a597c168d 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -62,6 +62,15 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, } } +Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& val) { + const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false); + if (!s.ok()) { + return s; + } + return DB::Merge(o, column_family, key, ts, val); +} + Status DBImpl::Delete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key) { const Status s = FailIfCfHasTs(column_family); @@ -2406,4 +2415,21 @@ Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, } return Write(opt, &batch); } + +Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& value) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + Status s = batch.Merge(column_family, key, ts, value); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_iter.cc b/db/db_iter.cc index 5bc9bb8d0..342323331 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -523,7 +523,8 @@ bool DBIter::MergeValuesNewToOld() { return false; } - if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { + if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key, + saved_key_.GetUserKey())) { // hit the next user key, stop right here break; } @@ -1159,7 +1160,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!ParseKey(&ikey)) { return false; } - if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) { + if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key, + saved_key_.GetUserKey())) { break; } if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { diff --git a/db/db_iter.h b/db/db_iter.h index d81d1f0ef..c1c7dd404 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -317,6 +317,7 @@ class DBIter final : public Iterator { wide_columns_.clear(); } + // If user-defined timestamp is enabled, `user_key` includes timestamp. Status Merge(const Slice* val, const Slice& user_key); const SliceTransform* prefix_extractor_; diff --git a/db/db_test.cc b/db/db_test.cc index 5a574c900..f8bd52768 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3075,6 +3075,11 @@ class ModelDB : public DB { } return Write(o, &batch); } + Status Merge(const WriteOptions& /*o*/, ColumnFamilyHandle* /*cf*/, + const Slice& /*k*/, const Slice& /*ts*/, + const Slice& /*value*/) override { + return Status::NotSupported(); + } using DB::Get; Status Get(const ReadOptions& /*options*/, ColumnFamilyHandle* /*cf*/, const Slice& key, PinnableSlice* /*value*/) override { diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 347f22951..83fa3f0de 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -18,6 +18,7 @@ #endif #include "test_util/testutil.h" #include "utilities/fault_injection_env.h" +#include "utilities/merge_operators/string_append/stringappend2.h" namespace ROCKSDB_NAMESPACE { class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase { @@ -50,7 +51,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) { db_->Put(WriteOptions(), "key", dummy_ts, "value").IsInvalidArgument()); ASSERT_TRUE(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), "key", dummy_ts, "value") - .IsNotSupported()); + .IsInvalidArgument()); ASSERT_TRUE(db_->Delete(WriteOptions(), "key", dummy_ts).IsInvalidArgument()); ASSERT_TRUE( db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument()); @@ -96,7 +97,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) { ASSERT_TRUE(db_->Put(WriteOptions(), handle, "key", wrong_ts, "value") .IsInvalidArgument()); ASSERT_TRUE(db_->Merge(WriteOptions(), handle, "key", wrong_ts, "value") - .IsNotSupported()); + .IsInvalidArgument()); ASSERT_TRUE( db_->Delete(WriteOptions(), handle, "key", wrong_ts).IsInvalidArgument()); ASSERT_TRUE(db_->SingleDelete(WriteOptions(), handle, "key", wrong_ts) @@ -3690,6 +3691,114 @@ TEST_F(DBBasicTestWithTimestamp, DeleteRangeGetIteratorWithSnapshot) { db_->ReleaseSnapshot(after_tombstone); Close(); } + +TEST_F(DBBasicTestWithTimestamp, MergeBasic) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.merge_operator = std::make_shared('.'); + DestroyAndReopen(options); + + const std::array write_ts_strs = { + Timestamp(100, 0), Timestamp(200, 0), Timestamp(300, 0)}; + constexpr size_t kNumOfUniqKeys = 100; + ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily(); + + for (size_t i = 0; i < write_ts_strs.size(); ++i) { + for (size_t j = 0; j < kNumOfUniqKeys; ++j) { + Status s; + if (i == 0) { + const std::string val = "v" + std::to_string(j) + "_0"; + s = db_->Put(WriteOptions(), Key1(j), write_ts_strs[i], val); + } else { + const std::string merge_op = std::to_string(i); + s = db_->Merge(WriteOptions(), default_cf, Key1(j), write_ts_strs[i], + merge_op); + } + ASSERT_OK(s); + } + } + + std::array read_ts_strs = { + Timestamp(150, 0), Timestamp(250, 0), Timestamp(350, 0)}; + + const auto verify_db_with_get = [&]() { + for (size_t i = 0; i < kNumOfUniqKeys; ++i) { + const std::string base_val = "v" + std::to_string(i) + "_0"; + const std::array expected_values = { + base_val, base_val + ".1", base_val + ".1.2"}; + const std::array& expected_ts = write_ts_strs; + ReadOptions read_opts; + for (size_t j = 0; j < read_ts_strs.size(); ++j) { + Slice read_ts = read_ts_strs[j]; + read_opts.timestamp = &read_ts; + std::string value; + std::string ts; + const Status s = db_->Get(read_opts, Key1(i), &value, &ts); + ASSERT_OK(s); + ASSERT_EQ(expected_values[j], value); + ASSERT_EQ(expected_ts[j], ts); + + // Do Seek/SeekForPrev + std::unique_ptr it(db_->NewIterator(read_opts)); + it->Seek(Key1(i)); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(expected_values[j], it->value()); + ASSERT_EQ(expected_ts[j], it->timestamp()); + + it->SeekForPrev(Key1(i)); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ(expected_values[j], it->value()); + ASSERT_EQ(expected_ts[j], it->timestamp()); + } + } + }; + + const auto verify_db_with_iterator = [&]() { + std::string value_suffix; + for (size_t i = 0; i < read_ts_strs.size(); ++i) { + ReadOptions read_opts; + Slice read_ts = read_ts_strs[i]; + read_opts.timestamp = &read_ts; + std::unique_ptr it(db_->NewIterator(read_opts)); + size_t key_int_val = 0; + for (it->SeekToFirst(); it->Valid(); it->Next(), ++key_int_val) { + const std::string key = Key1(key_int_val); + const std::string value = + "v" + std::to_string(key_int_val) + "_0" + value_suffix; + ASSERT_EQ(key, it->key()); + ASSERT_EQ(value, it->value()); + ASSERT_EQ(write_ts_strs[i], it->timestamp()); + } + ASSERT_EQ(kNumOfUniqKeys, key_int_val); + + key_int_val = kNumOfUniqKeys - 1; + for (it->SeekToLast(); it->Valid(); it->Prev(), --key_int_val) { + const std::string key = Key1(key_int_val); + const std::string value = + "v" + std::to_string(key_int_val) + "_0" + value_suffix; + ASSERT_EQ(key, it->key()); + ASSERT_EQ(value, it->value()); + ASSERT_EQ(write_ts_strs[i], it->timestamp()); + } + ASSERT_EQ(std::numeric_limits::max(), key_int_val); + + value_suffix = value_suffix + "." + std::to_string(i + 1); + } + }; + + verify_db_with_get(); + verify_db_with_iterator(); + + ASSERT_OK(db_->Flush(FlushOptions())); + + verify_db_with_get(); + verify_db_with_iterator(); + + Close(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/dbformat.h b/db/dbformat.h index b3981fc74..8c1fc7055 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -129,6 +129,12 @@ struct ParsedInternalKey { const char* addr = user_key.data() + user_key.size() - ts.size(); memcpy(const_cast(addr), ts.data(), ts.size()); } + + Slice GetTimestamp(size_t ts_sz) { + assert(ts_sz <= user_key.size()); + const char* addr = user_key.data() + user_key.size() - ts_sz; + return Slice(const_cast(addr), ts_sz); + } }; // Return the length of the encoding of "key". @@ -439,6 +445,8 @@ class IterKey { void SetIsUserKey(bool is_user_key) { is_user_key_ = is_user_key; } // Returns the key in whichever format that was provided to KeyIter + // If user-defined timestamp is enabled, then timestamp is included in the + // return result. Slice GetKey() const { return Slice(key_, key_size_); } Slice GetInternalKey() const { @@ -446,6 +454,8 @@ class IterKey { return Slice(key_, key_size_); } + // If user-defined timestamp is enabled, then timestamp is included in the + // return result of GetUserKey(); Slice GetUserKey() const { if (IsUserKey()) { return Slice(key_, key_size_); @@ -495,6 +505,9 @@ class IterKey { return SetKeyImpl(key, copy); } + // If user-defined timestamp is enabled, then `key` includes timestamp. + // TODO(yanqin) this is also used to set prefix, which do not include + // timestamp. Should be handled. Slice SetUserKey(const Slice& key, bool copy = true) { is_user_key_ = true; return SetKeyImpl(key, copy); @@ -689,6 +702,8 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, // slice they point to. // Tag is defined as ValueType. // input will be advanced to after the record. +// If user-defined timestamp is enabled for a column family, then the `key` +// resulting from this call will include timestamp. extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, Slice* value, Slice* blob, Slice* xid); diff --git a/db/lookup_key.h b/db/lookup_key.h index 609d08daf..75686cc52 100644 --- a/db/lookup_key.h +++ b/db/lookup_key.h @@ -35,7 +35,9 @@ class LookupKey { return Slice(kstart_, static_cast(end_ - kstart_)); } - // Return the user key + // Return the user key. + // If user-defined timestamp is enabled, then timestamp is included in the + // result. Slice user_key() const { return Slice(kstart_, static_cast(end_ - kstart_ - 8)); } diff --git a/db/memtable.cc b/db/memtable.cc index 369627a33..300f38eff 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1058,6 +1058,8 @@ static bool SaveValue(void* arg, const char* entry) { if (!s->do_merge) { // Preserve the value with the goal of returning it as part of // raw merge operands to the user + // TODO(yanqin) update MergeContext so that timestamps information + // can also be retained. merge_context->PushOperand( v, s->inplace_update_support == false /* operand_pinned */); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 680365b98..86a4d11f6 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -63,7 +63,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, bool update_num_ops_stats) { assert(merge_operator != nullptr); - if (operands.size() == 0) { + if (operands.empty()) { assert(value != nullptr && result != nullptr); result->assign(value->data(), value->size()); return Status::OK(); @@ -74,7 +74,7 @@ Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, static_cast(operands.size())); } - bool success; + bool success = false; Slice tmp_result_operand(nullptr, 0); const MergeOperator::MergeOperationInput merge_in(key, value, operands, logger); @@ -155,6 +155,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, const bool at_bottom, const bool allow_data_in_errors, const BlobFetcher* blob_fetcher, + const std::string* const full_history_ts_low, PrefetchBufferCollection* prefetch_buffers, CompactionIterationStats* c_iter_stats) { // Get a copy of the internal key, before it's invalidated by iter->Next() @@ -164,6 +165,12 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, merge_context_.Clear(); has_compaction_filter_skip_until_ = false; assert(user_merge_operator_); + assert(user_comparator_); + const size_t ts_sz = user_comparator_->timestamp_size(); + if (full_history_ts_low) { + assert(ts_sz > 0); + assert(ts_sz == full_history_ts_low->size()); + } bool first_key = true; // We need to parse the internal key again as the parsed key is @@ -184,6 +191,7 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, if (!s.ok()) return s; bool hit_the_next_user_key = false; + int cmp_with_full_history_ts_low = 0; for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { if (IsShuttingDown()) { s = Status::ShutdownInProgress(); @@ -195,6 +203,14 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, Status pik_status = ParseInternalKey(iter->key(), &ikey, allow_data_in_errors); + Slice ts; + if (pik_status.ok()) { + ts = ExtractTimestampFromUserKey(ikey.user_key, ts_sz); + if (full_history_ts_low) { + cmp_with_full_history_ts_low = + user_comparator_->CompareTimestamp(ts, *full_history_ts_low); + } + } if (!pik_status.ok()) { // stop at corrupted key if (assert_valid_internal_key_) { @@ -202,10 +218,18 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, } break; } else if (first_key) { + // If user-defined timestamp is enabled, we expect both user key and + // timestamps are equal, as a sanity check. assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)); first_key = false; - } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) { - // hit a different user key, stop right here + } else if (!user_comparator_->EqualWithoutTimestamp(ikey.user_key, + orig_ikey.user_key) || + (ts_sz > 0 && + !user_comparator_->Equal(ikey.user_key, orig_ikey.user_key) && + cmp_with_full_history_ts_low >= 0)) { + // 1) hit a different user key, or + // 2) user-defined timestamp is enabled, and hit a version of user key NOT + // eligible for GC, then stop right here. hit_the_next_user_key = true; break; } else if (stop_before > 0 && ikey.sequence <= stop_before && @@ -338,9 +362,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, filter == CompactionFilter::Decision::kChangeValue) { if (original_key_is_iter) { // this is just an optimization that saves us one memcpy - keys_.push_front(std::move(original_key)); + keys_.emplace_front(original_key); } else { - keys_.push_front(iter->key().ToString()); + keys_.emplace_front(iter->key().ToString()); } if (keys_.size() == 1) { // we need to re-anchor the orig_ikey because it was anchored by @@ -353,7 +377,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, if (filter == CompactionFilter::Decision::kKeep) { merge_context_.PushOperand( value_slice, iter->IsValuePinned() /* operand_pinned */); - } else { // kChangeValue + } else { + assert(filter == CompactionFilter::Decision::kChangeValue); // Compaction filter asked us to change the operand from value_slice // to compaction_filter_value_. merge_context_.PushOperand(compaction_filter_value_, false); @@ -369,6 +394,13 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, } } + if (cmp_with_full_history_ts_low >= 0) { + // If we reach here, and ts_sz == 0, it means compaction cannot perform + // merge with an earlier internal key, thus merge_context_.GetNumOperands() + // is 1. + assert(ts_sz == 0 || merge_context_.GetNumOperands() == 1); + } + if (merge_context_.GetNumOperands() == 0) { // we filtered out all the merge operands return s; @@ -382,6 +414,10 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // AND // we have either encountered another key or end of key history on this // layer. + // Note that if user-defined timestamp is enabled, we need some extra caution + // here: if full_history_ts_low is nullptr, or it's not null but the key's + // timestamp is greater than or equal to full_history_ts_low, it means this + // key cannot be dropped. We may not have seen the beginning of the key. // // When these conditions are true we are able to merge all the keys // using full merge. @@ -391,7 +427,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, // sure that all merge-operands on the same level get compacted together, // this will simply lead to these merge operands moving to the next level. bool surely_seen_the_beginning = - (hit_the_next_user_key || !iter->Valid()) && at_bottom; + (hit_the_next_user_key || !iter->Valid()) && at_bottom && + (ts_sz == 0 || cmp_with_full_history_ts_low < 0); if (surely_seen_the_beginning) { // do a final merge with nullptr as the existing value and say // bye to the merge type (it's now converted to a Put) diff --git a/db/merge_helper.h b/db/merge_helper.h index a1af1c002..1def78e6d 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -65,7 +65,7 @@ class MergeHelper { Slice* result_operand = nullptr, bool update_num_ops_stats = false); - // Merge entries until we hit + // During compaction, merge entries until we hit // - a corrupted key // - a Put/Delete, // - a different user key, @@ -101,6 +101,7 @@ class MergeHelper { const SequenceNumber stop_before, const bool at_bottom, const bool allow_data_in_errors, const BlobFetcher* blob_fetcher, + const std::string* const full_history_ts_low, PrefetchBufferCollection* prefetch_buffers, CompactionIterationStats* c_iter_stats); @@ -108,6 +109,7 @@ class MergeHelper { // in the constructor. Returns the decision that the filter made. // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the // optional outputs of compaction filter. + // user_key includes timestamp if user-defined timestamp is enabled. CompactionFilter::Decision FilterMerge(const Slice& user_key, const Slice& value_slice); diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index b3fd9a074..05408d5b9 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -35,7 +35,8 @@ class MergeHelperTest : public testing::Test { return merge_helper_->MergeUntil( iter_.get(), nullptr /* range_del_agg */, stop_before, at_bottom, false /* allow_data_in_errors */, nullptr /* blob_fetcher */, - nullptr /* prefetch_buffers */, nullptr /* c_iter_stats */); + nullptr /* full_history_ts_low */, nullptr /* prefetch_buffers */, + nullptr /* c_iter_stats */); } void AddKeyVal(const std::string& user_key, const SequenceNumber& seq, diff --git a/db/write_batch.cc b/db/write_batch.cc index ae015693f..7b63e59b2 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1481,8 +1481,27 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, return WriteBatchInternal::Merge(this, cf_id, key, value); } - return Status::InvalidArgument( - "Cannot call this method on column family enabling timestamp"); + needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array key_with_ts{{key, dummy_ts}}; + + return WriteBatchInternal::Merge( + this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1)); +} + +Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts, const Slice& value) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + has_key_with_ts_ = true; + assert(column_family); + uint32_t cf_id = column_family->GetID(); + std::array key_with_ts{{key, ts}}; + return WriteBatchInternal::Merge( + this, cf_id, SliceParts(key_with_ts.data(), 2), SliceParts(&value, 1)); } Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, @@ -2036,6 +2055,7 @@ class MemTableInserter : public WriteBatch::Handler { if (cf_handle == nullptr) { cf_handle = db_->DefaultColumnFamily(); } + // TODO (yanqin): fix when user-defined timestamp is enabled. get_status = db_->Get(ropts, cf_handle, key, &prev_value); } // Intentionally overwrites the `NotFound` in `ret_status`. diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 9e436cf50..2714d7a01 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -961,14 +961,14 @@ TEST_F(WriteBatchTest, SanityChecks) { ASSERT_TRUE(wb.Put(nullptr, "key", "ts", "value").IsInvalidArgument()); ASSERT_TRUE(wb.Delete(nullptr, "key", "ts").IsInvalidArgument()); ASSERT_TRUE(wb.SingleDelete(nullptr, "key", "ts").IsInvalidArgument()); - ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsNotSupported()); + ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsInvalidArgument()); ASSERT_TRUE(wb.DeleteRange(nullptr, "begin_key", "end_key", "ts") .IsInvalidArgument()); ASSERT_TRUE(wb.Put(&cf4, "key", "ts", "value").IsInvalidArgument()); ASSERT_TRUE(wb.Delete(&cf4, "key", "ts").IsInvalidArgument()); ASSERT_TRUE(wb.SingleDelete(&cf4, "key", "ts").IsInvalidArgument()); - ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsNotSupported()); + ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsInvalidArgument()); ASSERT_TRUE( wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsInvalidArgument()); @@ -978,7 +978,7 @@ TEST_F(WriteBatchTest, SanityChecks) { ASSERT_TRUE(wb.Put(&cf0, "key", ts, "value").IsInvalidArgument()); ASSERT_TRUE(wb.Delete(&cf0, "key", ts).IsInvalidArgument()); ASSERT_TRUE(wb.SingleDelete(&cf0, "key", ts).IsInvalidArgument()); - ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsNotSupported()); + ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsInvalidArgument()); ASSERT_TRUE( wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsInvalidArgument()); diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 57668a24e..9c6a9c30d 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -163,6 +163,7 @@ class CompactionFilter : public Customizable { // is a write conflict and may allow a Transaction to Commit that should have // failed. Instead, it is better to implement any Merge filtering inside the // MergeOperator. + // key includes timestamp if user-defined timestamp is enabled. virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* /*skip_until*/) const { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 18e43fa65..26c07c19f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -500,10 +500,7 @@ class DB { virtual Status Merge(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, const Slice& /*ts*/, - const Slice& /*value*/) { - return Status::NotSupported( - "Merge does not support user-defined timestamp yet"); - } + const Slice& /*value*/); // Apply the specified updates to the database. // If `updates` contains no update, WAL will still be synced if diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index e1e88bbdf..ae795220b 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -82,6 +82,7 @@ class MergeOperator : public Customizable { } struct MergeOperationInput { + // If user-defined timestamp is enabled, `_key` includes timestamp. explicit MergeOperationInput(const Slice& _key, const Slice* _existing_value, const std::vector& _operand_list, diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 74b0fe000..9b13c3bdf 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -215,6 +215,10 @@ class StackableDB : public DB { const Slice& value) override { return db_->Merge(options, column_family, key, value); } + Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& value) override { + return db_->Merge(options, column_family, key, ts, value); + } virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override { return db_->Write(opts, updates); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index dba80a76e..61ba5a739 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -172,10 +172,7 @@ class WriteBatch : public WriteBatchBase { return Merge(nullptr, key, value); } Status Merge(ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, - const Slice& /*ts*/, const Slice& /*value*/) override { - return Status::NotSupported( - "Merge does not support user-defined timestamp"); - } + const Slice& /*ts*/, const Slice& /*value*/) override; // variant that takes SliceParts Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key, @@ -219,6 +216,7 @@ class WriteBatch : public WriteBatchBase { Status PopSavePoint() override; // Support for iterating over the contents of a batch. + // Objects of subclasses of Handler will be used by WriteBatch::Iterate(). class Handler { public: virtual ~Handler(); @@ -229,6 +227,7 @@ class WriteBatch : public WriteBatchBase { // default implementation will just call Put without column family for // backwards compatibility. If the column family is not default, // the function is noop + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { if (column_family_id == 0) { @@ -241,14 +240,17 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument( "non-default column family and PutCF not implemented"); } + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {} + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */, const Slice& /* entity */) { return Status::NotSupported("PutEntityCF not implemented"); } + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { if (column_family_id == 0) { Delete(key); @@ -257,8 +259,10 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument( "non-default column family and DeleteCF not implemented"); } + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual void Delete(const Slice& /*key*/) {} + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) { if (column_family_id == 0) { SingleDelete(key); @@ -267,14 +271,18 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument( "non-default column family and SingleDeleteCF not implemented"); } + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual void SingleDelete(const Slice& /*key*/) {} + // If user-defined timestamp is enabled, then `begin_key` and `end_key` + // both include timestamp. virtual Status DeleteRangeCF(uint32_t /*column_family_id*/, const Slice& /*begin_key*/, const Slice& /*end_key*/) { return Status::InvalidArgument("DeleteRangeCF not implemented"); } + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { if (column_family_id == 0) { @@ -284,8 +292,10 @@ class WriteBatch : public WriteBatchBase { return Status::InvalidArgument( "non-default column family and MergeCF not implemented"); } + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {} + // If user-defined timestamp is enabled, then `key` includes timestamp. virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/, const Slice& /*key*/, const Slice& /*value*/) { diff --git a/table/get_context.cc b/table/get_context.cc index 435e3e14f..dc72a79fb 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -478,6 +478,7 @@ bool GetContext::GetBlobValue(const Slice& blob_index, } void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) { + // TODO(yanqin) preserve timestamps information in merge_context if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && value_pinner != nullptr) { value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); diff --git a/utilities/transactions/write_committed_transaction_ts_test.cc b/utilities/transactions/write_committed_transaction_ts_test.cc index 2bae5db12..94b8201f7 100644 --- a/utilities/transactions/write_committed_transaction_ts_test.cc +++ b/utilities/transactions/write_committed_transaction_ts_test.cc @@ -320,6 +320,7 @@ TEST_P(WriteCommittedTxnWithTsTest, Merge) { ColumnFamilyOptions cf_options; cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); const std::string test_cf_name = "test_cf"; ColumnFamilyHandle* cfh = nullptr; assert(db); @@ -338,8 +339,17 @@ TEST_P(WriteCommittedTxnWithTsTest, Merge) { NewTxn(WriteOptions(), TransactionOptions())); assert(txn); ASSERT_OK(txn->Put(handles_[1], "foo", "bar")); - ASSERT_TRUE(txn->Merge(handles_[1], "foo", "1").IsInvalidArgument()); + ASSERT_OK(txn->Merge(handles_[1], "foo", "1")); + ASSERT_OK(txn->SetCommitTimestamp(24)); + ASSERT_OK(txn->Commit()); txn.reset(); + { + std::string value; + const Status s = + GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/24, &value); + ASSERT_OK(s); + ASSERT_EQ("bar,1", value); + } } TEST_P(WriteCommittedTxnWithTsTest, GetForUpdate) {