diff --git a/db/db_iter.cc b/db/db_iter.cc index 342323331..d0a6698a5 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -528,7 +528,8 @@ bool DBIter::MergeValuesNewToOld() { // hit the next user key, stop right here break; } - if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) { + if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type || + kTypeDeletionWithTimestamp == ikey.type) { // hit a delete with the same user key, stop right here // iter_ is positioned after delete iter_.Next(); @@ -957,7 +958,8 @@ bool DBIter::FindValueForCurrentKey() { case kTypeMerge: current_entry_is_merged_ = true; if (last_not_merge_type == kTypeDeletion || - last_not_merge_type == kTypeSingleDeletion) { + last_not_merge_type == kTypeSingleDeletion || + last_not_merge_type == kTypeDeletionWithTimestamp) { s = Merge(nullptr, saved_key_.GetUserKey()); if (!s.ok()) { return false; @@ -1164,7 +1166,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { saved_key_.GetUserKey())) { break; } - if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) { + if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || + ikey.type == kTypeDeletionWithTimestamp) { break; } if (!iter_.PrepareValue()) { diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 83fa3f0de..6ea1aaf46 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -3799,6 +3799,77 @@ TEST_F(DBBasicTestWithTimestamp, MergeBasic) { Close(); } + +TEST_F(DBBasicTestWithTimestamp, MergeAfterDeletion) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.merge_operator = std::make_shared('.'); + DestroyAndReopen(options); + + ColumnFamilyHandle* const column_family = db_->DefaultColumnFamily(); + + const size_t num_keys_per_file = 10; + const size_t num_merges_per_key = 2; + for (size_t i = 0; i < num_keys_per_file; ++i) { + std::string ts = Timestamp(i + 10000, 0); + Status s = db_->Delete(WriteOptions(), Key1(i), ts); + ASSERT_OK(s); + for (size_t j = 1; j <= num_merges_per_key; ++j) { + ts = Timestamp(i + 10000 + j, 0); + s = db_->Merge(WriteOptions(), column_family, Key1(i), ts, + std::to_string(j)); + ASSERT_OK(s); + } + } + + const auto verify_db = [&]() { + ReadOptions read_opts; + std::string read_ts_str = Timestamp(20000, 0); + Slice ts = read_ts_str; + read_opts.timestamp = &ts; + std::unique_ptr it(db_->NewIterator(read_opts)); + size_t count = 0; + for (it->SeekToFirst(); it->Valid(); it->Next(), ++count) { + std::string key = Key1(count); + ASSERT_EQ(key, it->key()); + std::string value; + for (size_t j = 1; j <= num_merges_per_key; ++j) { + value.append(std::to_string(j)); + if (j < num_merges_per_key) { + value.push_back('.'); + } + } + ASSERT_EQ(value, it->value()); + std::string ts1 = Timestamp(count + 10000 + num_merges_per_key, 0); + ASSERT_EQ(ts1, it->timestamp()); + } + ASSERT_OK(it->status()); + ASSERT_EQ(num_keys_per_file, count); + for (it->SeekToLast(); it->Valid(); it->Prev(), --count) { + std::string key = Key1(count - 1); + ASSERT_EQ(key, it->key()); + std::string value; + for (size_t j = 1; j <= num_merges_per_key; ++j) { + value.append(std::to_string(j)); + if (j < num_merges_per_key) { + value.push_back('.'); + } + } + ASSERT_EQ(value, it->value()); + std::string ts1 = Timestamp(count - 1 + 10000 + num_merges_per_key, 0); + ASSERT_EQ(ts1, it->timestamp()); + } + ASSERT_OK(it->status()); + ASSERT_EQ(0, count); + }; + + verify_db(); + + Close(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index ab758876f..5ece49616 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -13,6 +13,7 @@ #include "db/compaction/compaction_iteration_stats.h" #include "db/dbformat.h" #include "db/wide/wide_column_serialization.h" +#include "logging/logging.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "port/likely.h" @@ -255,6 +256,8 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, assert(s.ok()); if (!s.ok()) return s; + assert(kTypeMerge == orig_ikey.type); + bool hit_the_next_user_key = false; int cmp_with_full_history_ts_low = 0; for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { @@ -460,10 +463,15 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, } if (cmp_with_full_history_ts_low >= 0) { - // If we reach here, and ts_sz == 0, it means compaction cannot perform - // merge with an earlier internal key, thus merge_context_.GetNumOperands() - // is 1. - assert(ts_sz == 0 || merge_context_.GetNumOperands() == 1); + size_t num_merge_operands = merge_context_.GetNumOperands(); + if (ts_sz && num_merge_operands > 1) { + // We do not merge merge operands with different timestamps if they are + // not eligible for GC. + ROCKS_LOG_ERROR(logger_, "ts_sz=%d, %d merge oprands", + static_cast(ts_sz), + static_cast(num_merge_operands)); + assert(false); + } } if (merge_context_.GetNumOperands() == 0) {