From 18cb731f27746cc65742d5965da24fb8231e413a Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 3 Nov 2022 13:02:06 -0700 Subject: [PATCH] Fix a bug in range scan with merge and deletion with timestamp (#10915) Summary: When performing Merge during range scan, iterator should understand value types of kDeletionWithTimestamp. Also add an additional check in debug mode to MergeHelper, and account for the presence of compaction filter. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10915 Test Plan: make check Reviewed By: ltamasi Differential Revision: D40960039 Pulled By: riversand963 fbshipit-source-id: dd79d86d7c79d05755bb939a3d94e0c53ddd7f59 --- db/db_iter.cc | 9 ++-- db/db_with_timestamp_basic_test.cc | 71 ++++++++++++++++++++++++++++++ db/merge_helper.cc | 16 +++++-- 3 files changed, 89 insertions(+), 7 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index 342323331..d0a6698a5 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -528,7 +528,8 @@ bool DBIter::MergeValuesNewToOld() { // hit the next user key, stop right here break; } - if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) { + if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type || + kTypeDeletionWithTimestamp == ikey.type) { // hit a delete with the same user key, stop right here // iter_ is positioned after delete iter_.Next(); @@ -957,7 +958,8 @@ bool DBIter::FindValueForCurrentKey() { case kTypeMerge: current_entry_is_merged_ = true; if (last_not_merge_type == kTypeDeletion || - last_not_merge_type == kTypeSingleDeletion) { + last_not_merge_type == kTypeSingleDeletion || + last_not_merge_type == kTypeDeletionWithTimestamp) { s = Merge(nullptr, saved_key_.GetUserKey()); if (!s.ok()) { return false; @@ -1164,7 +1166,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { saved_key_.GetUserKey())) { break; } - if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { + if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || + ikey.type == kTypeDeletionWithTimestamp) { break; } if (!iter_.PrepareValue()) { diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 83fa3f0de..6ea1aaf46 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -3799,6 +3799,77 @@ TEST_F(DBBasicTestWithTimestamp, MergeBasic) { Close(); } + +TEST_F(DBBasicTestWithTimestamp, MergeAfterDeletion) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.merge_operator = std::make_shared('.'); + DestroyAndReopen(options); + + ColumnFamilyHandle* const column_family = db_->DefaultColumnFamily(); + + const size_t num_keys_per_file = 10; + const size_t num_merges_per_key = 2; + for (size_t i = 0; i < num_keys_per_file; ++i) { + std::string ts = Timestamp(i + 10000, 0); + Status s = db_->Delete(WriteOptions(), Key1(i), ts); + ASSERT_OK(s); + for (size_t j = 1; j <= num_merges_per_key; ++j) { + ts = Timestamp(i + 10000 + j, 0); + s = db_->Merge(WriteOptions(), column_family, Key1(i), ts, + std::to_string(j)); + ASSERT_OK(s); + } + } + + const auto verify_db = [&]() { + ReadOptions read_opts; + std::string read_ts_str = Timestamp(20000, 0); + Slice ts = read_ts_str; + read_opts.timestamp = &ts; + std::unique_ptr it(db_->NewIterator(read_opts)); + size_t count = 0; + for (it->SeekToFirst(); it->Valid(); it->Next(), ++count) { + std::string key = Key1(count); + ASSERT_EQ(key, it->key()); + std::string value; + for (size_t j = 1; j <= num_merges_per_key; ++j) { + value.append(std::to_string(j)); + if (j < num_merges_per_key) { + value.push_back('.'); + } + } + ASSERT_EQ(value, it->value()); + std::string ts1 = Timestamp(count + 10000 + num_merges_per_key, 0); + ASSERT_EQ(ts1, it->timestamp()); + } + ASSERT_OK(it->status()); + ASSERT_EQ(num_keys_per_file, count); + for (it->SeekToLast(); it->Valid(); it->Prev(), --count) { + std::string key = Key1(count - 1); + ASSERT_EQ(key, it->key()); + std::string value; + for (size_t j = 1; j <= num_merges_per_key; ++j) { + value.append(std::to_string(j)); + if (j < num_merges_per_key) { + value.push_back('.'); + } + } + ASSERT_EQ(value, it->value()); + std::string ts1 = Timestamp(count - 1 + 10000 + num_merges_per_key, 0); + ASSERT_EQ(ts1, it->timestamp()); + } + ASSERT_OK(it->status()); + ASSERT_EQ(0, count); + }; + + verify_db(); + + Close(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index ab758876f..5ece49616 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -13,6 +13,7 @@ #include "db/compaction/compaction_iteration_stats.h" #include "db/dbformat.h" #include "db/wide/wide_column_serialization.h" +#include "logging/logging.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "port/likely.h" @@ -255,6 +256,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, assert(s.ok()); if (!s.ok()) return s; + assert(kTypeMerge == orig_ikey.type); + bool hit_the_next_user_key = false; int cmp_with_full_history_ts_low = 0; for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { @@ -460,10 +463,15 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, } if (cmp_with_full_history_ts_low >= 0) { - // If we reach here, and ts_sz == 0, it means compaction cannot perform - // merge with an earlier internal key, thus merge_context_.GetNumOperands() - // is 1. - assert(ts_sz == 0 || merge_context_.GetNumOperands() == 1); + size_t num_merge_operands = merge_context_.GetNumOperands(); + if (ts_sz && num_merge_operands > 1) { + // We do not merge merge operands with different timestamps if they are + // not eligible for GC. + ROCKS_LOG_ERROR(logger_, "ts_sz=%d, %d merge oprands", + static_cast(ts_sz), + static_cast(num_merge_operands)); + assert(false); + } } if (merge_context_.GetNumOperands() == 0) {