diff --git a/HISTORY.md b/HISTORY.md index 639a9074c..3ede0b544 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log ## Unreleased +### New Features +* `DeleteRange()` now supports user-defined timestamp. + ### Bug Fixes * Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed. * Fixed a regression in iterator performance when the entire DB is a single memtable introduced in #10449. The fix is in #10705 and #10716. diff --git a/db/builder.cc b/db/builder.cc index 03760ec91..d4bb395b1 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -90,7 +90,7 @@ Status BuildTable( iter->SeekToFirst(); std::unique_ptr range_del_agg( new CompactionRangeDelAggregator(&tboptions.internal_comparator, - snapshots)); + snapshots, full_history_ts_low)); uint64_t num_unfragmented_tombstones = 0; uint64_t total_tombstone_payload_bytes = 0; for (auto& range_del_iter : range_del_iters) { diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 0ca9e75f6..a5a0d99df 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -914,8 +914,17 @@ void CompactionIterator::NextFromInput() { } else { // 1. new user key -OR- // 2. different snapshot stripe - bool should_delete = range_del_agg_->ShouldDelete( - key_, RangeDelPositioningMode::kForwardTraversal); + // If user-defined timestamp is enabled, we consider keys for GC if they + // are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete() + // only considers range deletions that are at or below history_ts_low_ and + // trim_ts_. We drop keys here that are below history_ts_low_ and are + // covered by a range tombstone that is at or below history_ts_low_ and + // trim_ts. + bool should_delete = false; + if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) { + should_delete = range_del_agg_->ShouldDelete( + key_, RangeDelPositioningMode::kForwardTraversal); + } if (should_delete) { ++iter_stats_.num_record_drop_hidden; ++iter_stats_.num_record_drop_range_del; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 4e62c2f04..9ec21f03c 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1035,7 +1035,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { NotifyOnSubcompactionBegin(sub_compact); auto range_del_agg = std::make_unique( - &cfd->internal_comparator(), existing_snapshots_); + &cfd->internal_comparator(), existing_snapshots_, &full_history_ts_low_, + &trim_ts_); // TODO: since we already use C++17, should use // std::optional instead. @@ -1455,7 +1456,7 @@ Status CompactionJob::FinishCompactionOutputFile( : nullptr, sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr, range_del_out_stats, bottommost_level_, cfd->internal_comparator(), - earliest_snapshot, next_table_min_key); + earliest_snapshot, next_table_min_key, full_history_ts_low_); } RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1"); diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index dfdaa20bf..bb077015c 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -346,10 +346,10 @@ Status CompactionOutputs::AddToOutput( } Status CompactionOutputs::AddRangeDels( - const Slice* comp_start, const Slice* comp_end, + const Slice* comp_start_user_key, const Slice* comp_end_user_key, CompactionIterationStats& range_del_out_stats, bool bottommost_level, const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, - const Slice& next_table_min_key) { + const Slice& next_table_min_key, const std::string& full_history_ts_low) { assert(HasRangeDel()); FileMetaData& meta = current_output().meta; const Comparator* ucmp = icmp.user_comparator(); @@ -363,7 +363,7 @@ Status CompactionOutputs::AddRangeDels( if (output_size == 1) { // For the first output table, include range tombstones before the min // key but after the subcompaction boundary. - lower_bound = comp_start; + lower_bound = comp_start_user_key; lower_bound_from_sub_compact = true; } else if (meta.smallest.size() > 0) { // For subsequent output tables, only include range tombstones from min @@ -383,21 +383,22 @@ Status CompactionOutputs::AddRangeDels( // use the smaller key as the upper bound of the output file, to ensure // that there is no overlapping between different output files. upper_bound_guard = ExtractUserKey(next_table_min_key); - if (comp_end != nullptr && - ucmp->Compare(upper_bound_guard, *comp_end) >= 0) { - upper_bound = comp_end; + if (comp_end_user_key != nullptr && + ucmp->CompareWithoutTimestamp(upper_bound_guard, *comp_end_user_key) >= + 0) { + upper_bound = comp_end_user_key; } else { upper_bound = &upper_bound_guard; } } else { // This is the last file in the subcompaction, so extend until the // subcompaction ends. - upper_bound = comp_end; + upper_bound = comp_end_user_key; } bool has_overlapping_endpoints; if (upper_bound != nullptr && meta.largest.size() > 0) { - has_overlapping_endpoints = - ucmp->Compare(meta.largest.user_key(), *upper_bound) == 0; + has_overlapping_endpoints = ucmp->CompareWithoutTimestamp( + meta.largest.user_key(), *upper_bound) == 0; } else { has_overlapping_endpoints = false; } @@ -406,8 +407,8 @@ Status CompactionOutputs::AddRangeDels( // bound. If the end of subcompaction is null or the upper bound is null, // it means that this file is the last file in the compaction. So there // will be no overlapping between this file and others. - assert(comp_end == nullptr || upper_bound == nullptr || - ucmp->Compare(*upper_bound, *comp_end) <= 0); + assert(comp_end_user_key == nullptr || upper_bound == nullptr || + ucmp->CompareWithoutTimestamp(*upper_bound, *comp_end_user_key) <= 0); auto it = range_del_agg_->NewIterator(lower_bound, upper_bound, has_overlapping_endpoints); // Position the range tombstone output iterator. There may be tombstone @@ -421,7 +422,8 @@ Status CompactionOutputs::AddRangeDels( for (; it->Valid(); it->Next()) { auto tombstone = it->Tombstone(); if (upper_bound != nullptr) { - int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_); + int cmp = + ucmp->CompareWithoutTimestamp(*upper_bound, tombstone.start_key_); if ((has_overlapping_endpoints && cmp < 0) || (!has_overlapping_endpoints && cmp <= 0)) { // Tombstones starting after upper_bound only need to be included in @@ -434,7 +436,17 @@ Status CompactionOutputs::AddRangeDels( } } - if (bottommost_level && tombstone.seq_ <= earliest_snapshot) { + const size_t ts_sz = ucmp->timestamp_size(); + // Garbage collection for range tombstones. + // If user-defined timestamp is enabled, range tombstones are dropped if + // they are at bottommost_level, below full_history_ts_low and not visible + // in any snapshot. trim_ts_ is passed to the constructor for + // range_del_agg_, and range_del_agg_ internally drops tombstones above + // trim_ts_. + if (bottommost_level && tombstone.seq_ <= earliest_snapshot && + (ts_sz == 0 || + (!full_history_ts_low.empty() && + ucmp->CompareTimestamp(tombstone.ts_, full_history_ts_low) < 0))) { // TODO(andrewkr): tombstones that span multiple output files are // counted for each compaction output file, so lots of double // counting. @@ -445,12 +457,13 @@ Status CompactionOutputs::AddRangeDels( auto kv = tombstone.Serialize(); assert(lower_bound == nullptr || - ucmp->Compare(*lower_bound, kv.second) < 0); + ucmp->CompareWithoutTimestamp(*lower_bound, kv.second) < 0); // Range tombstone is not supported by output validator yet. builder_->Add(kv.first.Encode(), kv.second); InternalKey smallest_candidate = std::move(kv.first); if (lower_bound != nullptr && - ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) { + ucmp->CompareWithoutTimestamp(smallest_candidate.user_key(), + *lower_bound) <= 0) { // Pretend the smallest key has the same user key as lower_bound // (the max key in the previous table or subcompaction) in order for // files to appear key-space partitioned. @@ -470,13 +483,23 @@ Status CompactionOutputs::AddRangeDels( // choose lowest seqnum so this file's smallest internal key comes // after the previous file's largest. The fake seqnum is OK because // the read path's file-picking code only considers user key. - smallest_candidate = InternalKey( - *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0, - kTypeRangeDeletion); + if (lower_bound_from_sub_compact) { + if (ts_sz) { + assert(tombstone.ts_.size() == ts_sz); + smallest_candidate = InternalKey(*lower_bound, tombstone.seq_, + kTypeRangeDeletion, tombstone.ts_); + } else { + smallest_candidate = + InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion); + } + } else { + smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion); + } } InternalKey largest_candidate = tombstone.SerializeEndKey(); if (upper_bound != nullptr && - ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) { + ucmp->CompareWithoutTimestamp(*upper_bound, + largest_candidate.user_key()) <= 0) { // Pretend the largest key has the same user key as upper_bound (the // min key in the following table or subcompaction) in order for files // to appear key-space partitioned. @@ -490,9 +513,22 @@ Status CompactionOutputs::AddRangeDels( // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of // kTypeRangeDeletion (0xF), so the range tombstone comes before the // Seek() key in InternalKey's ordering. So Seek() will look in the - // next file for the user key. - largest_candidate = - InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion); + // next file for the user key + if (ts_sz) { + static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; + if (ts_sz <= strlen(kTsMax)) { + largest_candidate = + InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion, + Slice(kTsMax, ts_sz)); + } else { + largest_candidate = + InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion, + std::string(ts_sz, '\xff')); + } + } else { + largest_candidate = + InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion); + } } #ifndef NDEBUG SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber; diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index dbf5f348a..081a87d48 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -168,11 +168,16 @@ class CompactionOutputs { } // Add range-dels from the aggregator to the current output file - Status AddRangeDels(const Slice* comp_start, const Slice* comp_end, + // @param comp_start_user_key and comp_end_user_key include timestamp if + // user-defined timestamp is enabled. + // @param full_history_ts_low used for range tombstone garbage collection. + Status AddRangeDels(const Slice* comp_start_user_key, + const Slice* comp_end_user_key, CompactionIterationStats& range_del_out_stats, bool bottommost_level, const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, - const Slice& next_table_min_key); + const Slice& next_table_min_key, + const std::string& full_history_ts_low); // if the outputs have range delete, range delete is also data bool HasRangeDel() const { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 0a2cd12e9..7047bb58a 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -222,6 +222,9 @@ class DBImpl : public DB { Status DeleteRange(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key) override; + Status DeleteRange(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& begin_key, + const Slice& end_key, const Slice& ts) override; using DB::Write; virtual Status Write(const WriteOptions& options, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 7f400f5e3..e69a83b49 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -111,6 +111,17 @@ Status DBImpl::DeleteRange(const WriteOptions& write_options, return DB::DeleteRange(write_options, column_family, begin_key, end_key); } +Status DBImpl::DeleteRange(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts) { + const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false); + if (!s.ok()) { + return s; + } + return DB::DeleteRange(write_options, column_family, begin_key, end_key, ts); +} + void DBImpl::SetRecoverableStatePreReleaseCallback( PreReleaseCallback* callback) { recoverable_state_pre_release_callback_.reset(callback); @@ -2361,6 +2372,24 @@ Status DB::DeleteRange(const WriteOptions& opt, return Write(opt, &batch); } +Status DB::DeleteRange(const WriteOptions& opt, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + Status s = batch.DeleteRange(column_family, begin_key, end_key, ts); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index 7b4cc8dfd..3f982003c 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -13,6 +13,7 @@ namespace ROCKSDB_NAMESPACE { +// TODO(cbi): parameterize the test to cover user-defined timestamp cases class DBRangeDelTest : public DBTestBase { public: DBRangeDelTest() : DBTestBase("db_range_del_test", /*env_do_fsync=*/false) {} diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 593dda903..347f22951 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -56,7 +56,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) { db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument()); ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "begin_key", "end_key", dummy_ts) - .IsNotSupported()); + .IsInvalidArgument()); // Perform non-timestamp operations on "data" cf. ASSERT_TRUE( @@ -85,6 +85,11 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) { ASSERT_OK(wb.SingleDelete(handle, "key")); ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); } + { + WriteBatch wb; + ASSERT_OK(wb.DeleteRange(handle, "begin_key", "end_key")); + ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument()); + } // Perform timestamp operations with timestamps of incorrect size. const std::string wrong_ts(sizeof(uint32_t), '\0'); @@ -98,7 +103,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) { .IsInvalidArgument()); ASSERT_TRUE( db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key", wrong_ts) - .IsNotSupported()); + .IsInvalidArgument()); delete handle; } @@ -215,6 +220,10 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) { ts_str = Timestamp(4, 0); ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v5")); + ts_str = Timestamp(5, 0); + ASSERT_OK( + db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k0", "k9", ts_str)); + ts_str = Timestamp(3, 0); Slice ts = ts_str; CompactRangeOptions cro; @@ -234,6 +243,13 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) { ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound()); ASSERT_EQ(Timestamp(2, 0), key_ts); + ts_str = Timestamp(5, 0); + ts = ts_str; + ropts.timestamp = &ts; + ASSERT_TRUE(db_->Get(ropts, "k2", &value, &key_ts).IsNotFound()); + ASSERT_EQ(Timestamp(5, 0), key_ts); + ASSERT_TRUE(db_->Get(ropts, "k2", &value).IsNotFound()); + Close(); } @@ -590,6 +606,19 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) { check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2", Timestamp(4, 0)); Close(); + + Reopen(options); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "k1", + "k3", Timestamp(7, 0))); + check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::NotFound(), "", + Timestamp(7, 0)); + Close(); + // Trim data whose timestamp > Timestamp(6, 0), read(k1, ts(8)) <- v2 + ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families, + &handles_, &db_, Timestamp(6, 0))); + check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::OK(), "v2", + Timestamp(4, 0)); + Close(); } TEST_F(DBBasicTestWithTimestamp, OpenAndTrimHistoryInvalidOptionTest) { @@ -2014,7 +2043,7 @@ constexpr int DataVisibilityTest::kTestDataSize; // seq'=11 // write finishes // GetImpl(ts,seq) -// It is OK to return if ts>=t1 AND seq>=s1. If ts>=1t1 but seq if ts>=t1 AND seq>=s1. If ts>=t1 but seqPut(wopts, "k1", ts_str, "v1")); + ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2")); + ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3")); + ts_str = Timestamp(2, 0); + ASSERT_OK( + db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", ts_str)); + + ts_str = Timestamp(3, 0); + Slice ts = ts_str; + ReadOptions ropts; + ropts.timestamp = &ts; + CompactRangeOptions cro; + cro.full_history_ts_low = nullptr; + std::string value, key_ts; + Status s; + auto verify = [&] { + s = db_->Get(ropts, "k1", &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db_->Get(ropts, "k2", &value, &key_ts); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(key_ts, Timestamp(2, 0)); + + ASSERT_OK(db_->Get(ropts, "k3", &value, &key_ts)); + ASSERT_EQ(value, "v3"); + ASSERT_EQ(Timestamp(1, 0), key_ts); + + size_t batch_size = 3; + std::vector key_strs = {"k1", "k2", "k3"}; + std::vector keys{key_strs.begin(), key_strs.end()}; + std::vector values(batch_size); + std::vector statuses(batch_size); + db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(), + values.data(), statuses.data(), true /* sorted_input */); + ASSERT_TRUE(statuses[0].IsNotFound()); + ASSERT_TRUE(statuses[1].IsNotFound()); + ASSERT_OK(statuses[2]); + ; + ASSERT_EQ(values[2], "v3"); + }; + verify(); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + verify(); + std::string lb = Timestamp(0, 0); + Slice lb_slice = lb; + cro.full_history_ts_low = &lb_slice; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + verify(); + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, + GCRangeTombstonesAndCoveredKeysRespectingTslow) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + bbto.cache_index_and_filter_blocks = true; + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.num_levels = 2; + DestroyAndReopen(options); + + WriteOptions wopts; + ASSERT_OK(db_->Put(wopts, "k1", Timestamp(1, 0), "v1")); + ASSERT_OK(db_->Delete(wopts, "k2", Timestamp(2, 0))); + ASSERT_OK(db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", + Timestamp(3, 0))); + ASSERT_OK(db_->Put(wopts, "k3", Timestamp(4, 0), "v3")); + + ReadOptions ropts; + std::string read_ts = Timestamp(5, 0); + Slice read_ts_slice = read_ts; + ropts.timestamp = &read_ts_slice; + size_t batch_size = 3; + std::vector key_strs = {"k1", "k2", "k3"}; + std::vector keys = {key_strs.begin(), key_strs.end()}; + std::vector values(batch_size); + std::vector statuses(batch_size); + std::vector timestamps(batch_size); + db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(), + values.data(), timestamps.data(), statuses.data(), + true /* sorted_input */); + ASSERT_TRUE(statuses[0].IsNotFound()); + ASSERT_EQ(timestamps[0], Timestamp(3, 0)); + ASSERT_TRUE(statuses[1].IsNotFound()); + // DeleteRange has a higher timestamp than Delete for "k2" + ASSERT_EQ(timestamps[1], Timestamp(3, 0)); + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], "v3"); + ASSERT_EQ(timestamps[2], Timestamp(4, 0)); + + CompactRangeOptions cro; + // Range tombstone has timestamp >= full_history_ts_low, covered keys + // are not dropped. + std::string compaction_ts_str = Timestamp(2, 0); + Slice compaction_ts = compaction_ts_str; + cro.full_history_ts_low = &compaction_ts; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ropts.timestamp = &compaction_ts; + std::string value, ts; + ASSERT_OK(db_->Get(ropts, "k1", &value, &ts)); + ASSERT_EQ(value, "v1"); + // timestamp is below full_history_ts_low, zeroed out as the key goes into + // bottommost level + ASSERT_EQ(ts, Timestamp(0, 0)); + ASSERT_TRUE(db_->Get(ropts, "k2", &value, &ts).IsNotFound()); + ASSERT_EQ(ts, Timestamp(2, 0)); + + compaction_ts_str = Timestamp(4, 0); + compaction_ts = compaction_ts_str; + cro.full_history_ts_low = &compaction_ts; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ropts.timestamp = &read_ts_slice; + // k1, k2 and the range tombstone should be dropped + // k3 should still exist + db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(), + values.data(), timestamps.data(), statuses.data(), + true /* sorted_input */); + ASSERT_TRUE(statuses[0].IsNotFound()); + ASSERT_TRUE(timestamps[0].empty()); + ASSERT_TRUE(statuses[1].IsNotFound()); + ASSERT_TRUE(timestamps[1].empty()); + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], "v3"); + ASSERT_EQ(timestamps[2], Timestamp(4, 0)); + + Close(); +} + +TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) { + const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25; + Options options = CurrentOptions(); + options.prefix_extractor.reset(NewFixedPrefixTransform(3)); + options.compression = kNoCompression; + BlockBasedTableOptions bbto; + bbto.index_type = GetParam(); + bbto.block_size = 100; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile)); + DestroyAndReopen(options); + + // Write half of the keys before the tombstone and half after the tombstone. + // Only covered keys (i.e., within the range and older than the tombstone) + // should be deleted. + for (int i = 0; i < kNum; ++i) { + if (i == kNum / 2) { + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + Key1(kRangeBegin), Key1(kRangeEnd), + Timestamp(i, 0))); + } + ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0), + "val" + std::to_string(i))); + if (i == kNum - kNumPerFile) { + ASSERT_OK(Flush()); + } + } + + ReadOptions read_opts; + read_opts.total_order_seek = true; + std::string read_ts = Timestamp(kNum, 0); + Slice read_ts_slice = read_ts; + read_opts.timestamp = &read_ts_slice; + { + std::unique_ptr iter(db_->NewIterator(read_opts)); + ASSERT_OK(iter->status()); + + int expected = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(Key1(expected), iter->key()); + if (expected == kRangeBegin - 1) { + expected = kNum / 2; + } else { + ++expected; + } + } + ASSERT_EQ(kNum, expected); + + expected = kNum / 2; + for (iter->Seek(Key1(kNum / 2)); iter->Valid(); iter->Next()) { + ASSERT_EQ(Key1(expected), iter->key()); + ++expected; + } + ASSERT_EQ(kNum, expected); + + expected = kRangeBegin - 1; + for (iter->SeekForPrev(Key1(kNum / 2 - 1)); iter->Valid(); iter->Prev()) { + ASSERT_EQ(Key1(expected), iter->key()); + --expected; + } + ASSERT_EQ(-1, expected); + + read_ts = Timestamp(0, 0); + read_ts_slice = read_ts; + read_opts.timestamp = &read_ts_slice; + iter.reset(db_->NewIterator(read_opts)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), Key1(0)); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + + read_ts = Timestamp(kNum, 0); + read_ts_slice = read_ts; + read_opts.timestamp = &read_ts_slice; + std::string value, timestamp; + Status s; + for (int i = 0; i < kNum; ++i) { + s = db_->Get(read_opts, Key1(i), &value, ×tamp); + if (i >= kRangeBegin && i < kNum / 2) { + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(timestamp, Timestamp(kNum / 2, 0)); + } else { + ASSERT_OK(s); + ASSERT_EQ(value, "val" + std::to_string(i)); + ASSERT_EQ(timestamp, Timestamp(i, 0)); + } + } + + size_t batch_size = kNum; + std::vector key_strs(batch_size); + std::vector keys(batch_size); + std::vector values(batch_size); + std::vector statuses(batch_size); + std::vector timestamps(batch_size); + for (int i = 0; i < kNum; ++i) { + key_strs[i] = Key1(i); + keys[i] = key_strs[i]; + } + db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size, keys.data(), + values.data(), timestamps.data(), statuses.data(), + true /* sorted_input */); + for (int i = 0; i < kNum; ++i) { + if (i >= kRangeBegin && i < kNum / 2) { + ASSERT_TRUE(statuses[i].IsNotFound()); + ASSERT_EQ(timestamps[i], Timestamp(kNum / 2, 0)); + } else { + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], "val" + std::to_string(i)); + ASSERT_EQ(timestamps[i], Timestamp(i, 0)); + } + } + Close(); +} + +TEST_F(DBBasicTestWithTimestamp, DeleteRangeGetIteratorWithSnapshot) { + // 4 keys 0, 1, 2, 3 at timestamps 0, 1, 2, 3 respectively. + // A range tombstone [1, 3) at timestamp 1 and has a sequence number between + // key 1 and 2. + Options options = CurrentOptions(); + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + DestroyAndReopen(options); + WriteOptions write_opts; + std::string put_ts = Timestamp(0, 0); + const int kNum = 4, kNumPerFile = 1, kRangeBegin = 1, kRangeEnd = 3; + options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile)); + const Snapshot* before_tombstone = nullptr; + const Snapshot* after_tombstone = nullptr; + for (int i = 0; i < kNum; ++i) { + ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0), + "val" + std::to_string(i))); + if (i == kRangeBegin) { + before_tombstone = db_->GetSnapshot(); + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + Key1(kRangeBegin), Key1(kRangeEnd), + Timestamp(kRangeBegin, 0))); + } + if (i == kNum / 2) { + ASSERT_OK(Flush()); + } + } + assert(before_tombstone); + after_tombstone = db_->GetSnapshot(); + // snapshot and ts before tombstone + std::string read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0) + Slice read_ts = read_ts_str; + ReadOptions read_opts; + read_opts.timestamp = &read_ts; + read_opts.snapshot = before_tombstone; + std::vector expected_status = { + Status::OK(), Status::NotFound(), Status::NotFound(), Status::NotFound()}; + std::vector expected_values(kNum); + expected_values[0] = "val" + std::to_string(0); + std::vector expected_timestamps(kNum); + expected_timestamps[0] = Timestamp(0, 0); + + size_t batch_size = kNum; + std::vector key_strs(batch_size); + std::vector keys(batch_size); + std::vector values(batch_size); + std::vector statuses(batch_size); + std::vector timestamps(batch_size); + for (int i = 0; i < kNum; ++i) { + key_strs[i] = Key1(i); + keys[i] = key_strs[i]; + } + + auto verify = [&] { + db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size, + keys.data(), values.data(), timestamps.data(), + statuses.data(), true /* sorted_input */); + std::string value, timestamp; + Status s; + for (int i = 0; i < kNum; ++i) { + s = db_->Get(read_opts, Key1(i), &value, ×tamp); + ASSERT_EQ(s, expected_status[i]); + ASSERT_EQ(statuses[i], expected_status[i]); + if (s.ok()) { + ASSERT_EQ(value, expected_values[i]); + ASSERT_EQ(values[i], expected_values[i]); + } + if (!timestamp.empty()) { + ASSERT_EQ(timestamp, expected_timestamps[i]); + ASSERT_EQ(timestamps[i], expected_timestamps[i]); + } else { + ASSERT_TRUE(timestamps[i].empty()); + } + } + std::unique_ptr iter(db_->NewIterator(read_opts)); + std::unique_ptr iter_for_seek(db_->NewIterator(read_opts)); + iter->SeekToFirst(); + for (int i = 0; i < kNum; ++i) { + if (expected_status[i].ok()) { + auto verify_iter = [&](Iterator* iter_ptr) { + ASSERT_TRUE(iter_ptr->Valid()); + ASSERT_EQ(iter_ptr->key(), keys[i]); + ASSERT_EQ(iter_ptr->value(), expected_values[i]); + ASSERT_EQ(iter_ptr->timestamp(), expected_timestamps[i]); + }; + verify_iter(iter.get()); + iter->Next(); + + iter_for_seek->Seek(keys[i]); + verify_iter(iter_for_seek.get()); + + iter_for_seek->SeekForPrev(keys[i]); + verify_iter(iter_for_seek.get()); + } + } + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + }; + + verify(); + + // snapshot before tombstone and ts after tombstone + read_ts_str = Timestamp(kNum, 0); // (4, 0) + read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + read_opts.snapshot = before_tombstone; + expected_status[1] = Status::OK(); + expected_timestamps[1] = Timestamp(1, 0); + expected_values[1] = "val" + std::to_string(1); + verify(); + + // snapshot after tombstone and ts before tombstone + read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0) + read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + read_opts.snapshot = after_tombstone; + expected_status[1] = Status::NotFound(); + expected_timestamps[1].clear(); + expected_values[1].clear(); + verify(); + + // snapshot and ts after tombstone + read_ts_str = Timestamp(kNum, 0); // (4, 0) + read_ts = read_ts_str; + read_opts.timestamp = &read_ts; + read_opts.snapshot = after_tombstone; + for (int i = 0; i < kNum; ++i) { + if (i == kRangeBegin) { + expected_status[i] = Status::NotFound(); + expected_values[i].clear(); + } else { + expected_status[i] = Status::OK(); + expected_values[i] = "val" + std::to_string(i); + } + expected_timestamps[i] = Timestamp(i, 0); + } + verify(); + + db_->ReleaseSnapshot(before_tombstone); + db_->ReleaseSnapshot(after_tombstone); + Close(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/dbformat.cc b/db/dbformat.cc index 8aa91e94d..b0ac6c339 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -88,6 +88,19 @@ void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key, result->append(kTsMax.data(), ts_sz); } +void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key, + size_t ts_sz) { + assert(ts_sz > 0); + result->append(key.data(), key.size() - ts_sz); + + static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; + if (ts_sz < strlen(kTsMax)) { + result->append(kTsMax, ts_sz); + } else { + result->append(std::string(ts_sz, '\xff')); + } +} + std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const { std::string result = "'"; if (log_err_key) { diff --git a/db/dbformat.h b/db/dbformat.h index aa3d07438..b3981fc74 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -182,6 +182,11 @@ extern void AppendKeyWithMinTimestamp(std::string* result, const Slice& key, extern void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key, size_t ts_sz); +// `key` is a user key with timestamp. Append the user key without timestamp +// and the maximal timestamp to *result. +extern void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key, + size_t ts_sz); + // Attempt to parse an internal key from "internal_key". On success, // stores the parsed data in "*result", and returns true. // @@ -290,6 +295,10 @@ class InternalKey { InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) { AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t)); } + InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t, Slice ts) { + AppendInternalKeyWithDifferentTimestamp( + &rep_, ParsedInternalKey(_user_key, s, t), ts); + } // sets the internal key to be bigger or equal to all internal keys with this // user key @@ -324,11 +333,24 @@ class InternalKey { SetFrom(ParsedInternalKey(_user_key, s, t)); } + void Set(const Slice& _user_key_with_ts, SequenceNumber s, ValueType t, + const Slice& ts) { + ParsedInternalKey pik = ParsedInternalKey(_user_key_with_ts, s, t); + // Should not call pik.SetTimestamp() directly as it overwrites the buffer + // containing _user_key. + SetFrom(pik, ts); + } + void SetFrom(const ParsedInternalKey& p) { rep_.clear(); AppendInternalKey(&rep_, p); } + void SetFrom(const ParsedInternalKey& p, const Slice& ts) { + rep_.clear(); + AppendInternalKeyWithDifferentTimestamp(&rep_, p, ts); + } + void Clear() { rep_.clear(); } // The underlying representation. @@ -518,7 +540,9 @@ class IterKey { bool IsKeyPinned() const { return (key_ != buf_); } - // user_key does not have timestamp. + // If `ts` is provided, user_key should not contain timestamp, + // and `ts` is appended after user_key. + // TODO: more efficient storage for timestamp. void SetInternalKey(const Slice& key_prefix, const Slice& user_key, SequenceNumber s, ValueType value_type = kValueTypeForSeek, @@ -671,16 +695,38 @@ extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, // When user call DeleteRange() to delete a range of keys, // we will store a serialized RangeTombstone in MemTable and SST. -// the struct here is a easy-understood form +// the struct here is an easy-understood form // start/end_key_ is the start/end user key of the range to be deleted struct RangeTombstone { Slice start_key_; Slice end_key_; SequenceNumber seq_; + // TODO: we should optimize the storage here when user-defined timestamp + // is NOT enabled: they currently take up (16 + 32 + 32) bytes per tombstone. + Slice ts_; + std::string pinned_start_key_; + std::string pinned_end_key_; + RangeTombstone() = default; RangeTombstone(Slice sk, Slice ek, SequenceNumber sn) : start_key_(sk), end_key_(ek), seq_(sn) {} + // User-defined timestamp is enabled, `sk` and `ek` should be user key + // with timestamp, `ts` will replace the timestamps in `sk` and + // `ek`. + RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts) + : seq_(sn), ts_(ts) { + assert(!ts.empty()); + pinned_start_key_.reserve(sk.size()); + pinned_start_key_.append(sk.data(), sk.size() - ts.size()); + pinned_start_key_.append(ts.data(), ts.size()); + pinned_end_key_.reserve(ek.size()); + pinned_end_key_.append(ek.data(), ek.size() - ts.size()); + pinned_end_key_.append(ts.data(), ts.size()); + start_key_ = pinned_start_key_; + end_key_ = pinned_end_key_; + } + RangeTombstone(ParsedInternalKey parsed_key, Slice value) { start_key_ = parsed_key.user_key; seq_ = parsed_key.sequence; @@ -690,8 +736,7 @@ struct RangeTombstone { // be careful to use Serialize(), allocates new memory std::pair Serialize() const { auto key = InternalKey(start_key_, seq_, kTypeRangeDeletion); - Slice value = end_key_; - return std::make_pair(std::move(key), std::move(value)); + return std::make_pair(std::move(key), end_key_); } // be careful to use SerializeKey(), allocates new memory @@ -707,6 +752,16 @@ struct RangeTombstone { // // be careful to use SerializeEndKey(), allocates new memory InternalKey SerializeEndKey() const { + if (!ts_.empty()) { + static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; + if (ts_.size() <= strlen(kTsMax)) { + return InternalKey(end_key_, kMaxSequenceNumber, kTypeRangeDeletion, + Slice(kTsMax, ts_.size())); + } else { + return InternalKey(end_key_, kMaxSequenceNumber, kTypeRangeDeletion, + std::string(ts_.size(), '\xff')); + } + } return InternalKey(end_key_, kMaxSequenceNumber, kTypeRangeDeletion); } }; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 0ca173ca6..6648a76d9 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -339,9 +339,30 @@ Status ExternalSstFileIngestionJob::Prepare( Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed, SuperVersion* super_version) { autovector ranges; - for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { - ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(), - file_to_ingest.largest_internal_key.user_key()); + autovector keys; + size_t ts_sz = cfd_->user_comparator()->timestamp_size(); + if (ts_sz) { + // Check all ranges [begin, end] inclusively. Add maximum + // timestamp to include all `begin` keys, and add minimal timestamp to + // include all `end` keys. + for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { + std::string begin_str; + std::string end_str; + AppendUserKeyWithMaxTimestamp( + &begin_str, file_to_ingest.smallest_internal_key.user_key(), ts_sz); + AppendKeyWithMinTimestamp( + &end_str, file_to_ingest.largest_internal_key.user_key(), ts_sz); + keys.emplace_back(std::move(begin_str)); + keys.emplace_back(std::move(end_str)); + } + for (size_t i = 0; i < files_to_ingest_.size(); ++i) { + ranges.emplace_back(keys[2 * i], keys[2 * i + 1]); + } + } else { + for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { + ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(), + file_to_ingest.largest_internal_key.user_key()); + } } Status status = cfd_->RangesOverlapWithMemtables( ranges, super_version, db_options_.allow_data_in_errors, flush_needed); diff --git a/db/flush_job.cc b/db/flush_job.cc index d2099c337..1b28be08e 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -420,9 +420,11 @@ Status FlushJob::MemPurge() { // Place iterator at the First (meaning most recent) key node. iter->SeekToFirst(); + const std::string* const full_history_ts_low = &(cfd_->GetFullHistoryTsLow()); std::unique_ptr range_del_agg( new CompactionRangeDelAggregator(&(cfd_->internal_comparator()), - existing_snapshots_)); + existing_snapshots_, + full_history_ts_low)); for (auto& rd_iter : range_del_iters) { range_del_agg->AddTombstones(std::move(rd_iter)); } @@ -479,8 +481,7 @@ Status FlushJob::MemPurge() { ioptions->enforce_single_del_contracts, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, /*compaction=*/nullptr, compaction_filter.get(), - /*shutting_down=*/nullptr, ioptions->info_log, - &(cfd_->GetFullHistoryTsLow())); + /*shutting_down=*/nullptr, ioptions->info_log, full_history_ts_low); // Set earliest sequence number in the new memtable // to be equal to the earliest sequence number of the diff --git a/db/memtable.cc b/db/memtable.cc index 029490d2b..a4bcaa322 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -573,7 +573,7 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal( assert(IsFragmentedRangeTombstonesConstructed()); return new FragmentedRangeTombstoneIterator( fragmented_range_tombstone_list_.get(), comparator_.comparator, - read_seq); + read_seq, read_options.timestamp); } // takes current cache @@ -596,8 +596,9 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal( cache->reader_mutex.unlock(); } - return new FragmentedRangeTombstoneIterator(cache, comparator_.comparator, - read_seq); + auto* fragmented_iter = new FragmentedRangeTombstoneIterator( + cache, comparator_.comparator, read_seq, read_options.timestamp); + return fragmented_iter; } void MemTable::ConstructFragmentedRangeTombstones() { @@ -946,6 +947,10 @@ static bool SaveValue(void* arg, const char* entry) { const Comparator* user_comparator = s->mem->GetInternalKeyComparator().user_comparator(); size_t ts_sz = user_comparator->timestamp_size(); + if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) { + // timestamp should already be set to range tombstone timestamp + assert(s->timestamp->size() == ts_sz); + } if (user_comparator->EqualWithoutTimestamp(user_key_slice, s->key->user_key())) { // Correct user key @@ -960,10 +965,20 @@ static bool SaveValue(void* arg, const char* entry) { if (s->seq == kMaxSequenceNumber) { s->seq = seq; + if (s->seq > max_covering_tombstone_seq) { + if (ts_sz && s->timestamp != nullptr) { + // `timestamp` was set to range tombstone's timestamp before + // `SaveValue` is ever called. This key has a higher sequence number + // than range tombstone, and is the key with the highest seqno across + // all keys with this user_key, so we update timestamp here. + Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); + s->timestamp->assign(ts.data(), ts_sz); + } + } else { + s->seq = max_covering_tombstone_seq; + } } - s->seq = std::max(s->seq, max_covering_tombstone_seq); - if (ts_sz > 0 && s->timestamp != nullptr) { if (!s->timestamp->empty()) { assert(ts_sz == s->timestamp->size()); @@ -978,7 +993,8 @@ static bool SaveValue(void* arg, const char* entry) { } if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || - type == kTypeWideColumnEntity) && + type == kTypeWideColumnEntity || type == kTypeDeletion || + type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp) && max_covering_tombstone_seq > seq) { type = kTypeRangeDeletion; } @@ -1139,9 +1155,17 @@ bool MemTable::Get(const LookupKey& key, std::string* value, GetInternalKeySeqno(key.internal_key()), immutable_memtable)); if (range_del_iter != nullptr) { - *max_covering_tombstone_seq = - std::max(*max_covering_tombstone_seq, - range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key())); + SequenceNumber covering_seq = + range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()); + if (covering_seq > *max_covering_tombstone_seq) { + *max_covering_tombstone_seq = covering_seq; + if (timestamp) { + // Will be overwritten in SaveValue() if there is a point key with + // a higher seqno. + timestamp->assign(range_del_iter->timestamp().data(), + range_del_iter->timestamp().size()); + } + } } bool found_final_value = false; @@ -1272,9 +1296,17 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, NewRangeTombstoneIteratorInternal( read_options, GetInternalKeySeqno(iter->lkey->internal_key()), immutable_memtable)); - iter->max_covering_tombstone_seq = std::max( - iter->max_covering_tombstone_seq, - range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); + SequenceNumber covering_seq = + range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()); + if (covering_seq > iter->max_covering_tombstone_seq) { + iter->max_covering_tombstone_seq = covering_seq; + if (iter->timestamp) { + // Will be overwritten in SaveValue() if there is a point key with + // a higher seqno. + iter->timestamp->assign(range_del_iter->timestamp().data(), + range_del_iter->timestamp().size()); + } + } } SequenceNumber dummy_seq; GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, diff --git a/db/memtable.h b/db/memtable.h index 141100228..546797e50 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -639,6 +639,8 @@ class MemTable { // Always returns non-null and assumes certain pre-checks (e.g., // is_range_del_table_empty_) are done. This is only valid during the lifetime // of the underlying memtable. + // read_seq and read_options.timestamp will be used as the upper bound + // for range tombstones. FragmentedRangeTombstoneIterator* NewRangeTombstoneIteratorInternal( const ReadOptions& read_options, SequenceNumber read_seq, bool immutable_memtable); diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index 5a2fff0c2..b45d5b4d4 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -85,7 +85,7 @@ bool TruncatedRangeDelIterator::Valid() const { icmp_->Compare(iter_->parsed_start_key(), *largest_) < 0); } -// NOTE: target is a user key +// NOTE: target is a user key, with timestamp if enabled. void TruncatedRangeDelIterator::Seek(const Slice& target) { if (largest_ != nullptr && icmp_->Compare(*largest_, ParsedInternalKey(target, kMaxSequenceNumber, @@ -101,7 +101,7 @@ void TruncatedRangeDelIterator::Seek(const Slice& target) { iter_->Seek(target); } -// NOTE: target is a user key +// NOTE: target is a user key, with timestamp if enabled. void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) { if (smallest_ != nullptr && icmp_->Compare(ParsedInternalKey(target, 0, kTypeRangeDeletion), @@ -339,11 +339,22 @@ void CompactionRangeDelAggregator::AddTombstones( if (input_iter == nullptr || input_iter->empty()) { return; } + // This bounds output of CompactionRangeDelAggregator::NewIterator. + if (!trim_ts_.empty()) { + assert(icmp_->user_comparator()->timestamp_size() > 0); + input_iter->SetTimestampUpperBound(&trim_ts_); + } + assert(input_iter->lower_bound() == 0); assert(input_iter->upper_bound() == kMaxSequenceNumber); parent_iters_.emplace_back(new TruncatedRangeDelIterator( std::move(input_iter), icmp_, smallest, largest)); + Slice* ts_upper_bound = nullptr; + if (!ts_upper_bound_.empty()) { + assert(icmp_->user_comparator()->timestamp_size() > 0); + ts_upper_bound = &ts_upper_bound_; + } auto split_iters = parent_iters_.back()->SplitBySnapshot(*snapshots_); for (auto& split_iter : split_iters) { auto it = reps_.find(split_iter.first); @@ -356,6 +367,16 @@ void CompactionRangeDelAggregator::AddTombstones( assert(inserted); } assert(it != reps_.end()); + // ts_upper_bound is used to bound ShouldDelete() to only consider + // range tombstones under full_history_ts_low_ and trim_ts_. Keys covered by + // range tombstones that are above full_history_ts_low_ should not be + // dropped prematurely: user may read with a timestamp between the range + // tombstone and the covered key. Note that we cannot set timestamp + // upperbound on the original `input_iter` since `input_iter`s are later + // used in CompactionRangeDelAggregator::NewIterator to output range + // tombstones for persistence. We do not want to only persist range + // tombstones with timestamp lower than ts_upper_bound. + split_iter.second->SetTimestampUpperBound(ts_upper_bound); it->second.AddTombstones(std::move(split_iter.second)); } } @@ -371,6 +392,12 @@ bool CompactionRangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed, namespace { +// Produce a sorted (by start internal key) stream of range tombstones from +// `children`. lower_bound and upper_bound on user key can be +// optionally specified. Range tombstones that ends before lower_bound or starts +// after upper_bound are excluded. +// If user-defined timestamp is enabled, lower_bound and upper_bound should +// contain timestamp, but comparison is done ignoring timestamps. class TruncatedRangeDelMergingIter : public InternalIterator { public: TruncatedRangeDelMergingIter( @@ -381,7 +408,8 @@ class TruncatedRangeDelMergingIter : public InternalIterator { lower_bound_(lower_bound), upper_bound_(upper_bound), upper_bound_inclusive_(upper_bound_inclusive), - heap_(StartKeyMinComparator(icmp)) { + heap_(StartKeyMinComparator(icmp)), + ts_sz_(icmp_->user_comparator()->timestamp_size()) { for (auto& child : children) { if (child != nullptr) { assert(child->lower_bound() == 0); @@ -422,15 +450,28 @@ class TruncatedRangeDelMergingIter : public InternalIterator { Slice key() const override { auto* top = heap_.top(); - cur_start_key_.Set(top->start_key().user_key, top->seq(), - kTypeRangeDeletion); + if (ts_sz_) { + cur_start_key_.Set(top->start_key().user_key, top->seq(), + kTypeRangeDeletion, top->timestamp()); + } else { + cur_start_key_.Set(top->start_key().user_key, top->seq(), + kTypeRangeDeletion); + } + assert(top->start_key().user_key.size() >= ts_sz_); return cur_start_key_.Encode(); } Slice value() const override { auto* top = heap_.top(); - assert(top->end_key().sequence == kMaxSequenceNumber); - return top->end_key().user_key; + if (!ts_sz_) { + return top->end_key().user_key; + } + assert(top->timestamp().size() == ts_sz_); + cur_end_key_.clear(); + cur_end_key_.append(top->end_key().user_key.data(), + top->end_key().user_key.size() - ts_sz_); + cur_end_key_.append(top->timestamp().data(), ts_sz_); + return cur_end_key_; } // Unused InternalIterator methods @@ -444,8 +485,8 @@ class TruncatedRangeDelMergingIter : public InternalIterator { if (upper_bound_ == nullptr) { return true; } - int cmp = icmp_->user_comparator()->Compare(iter->start_key().user_key, - *upper_bound_); + int cmp = icmp_->user_comparator()->CompareWithoutTimestamp( + iter->start_key().user_key, *upper_bound_); return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0; } @@ -457,6 +498,8 @@ class TruncatedRangeDelMergingIter : public InternalIterator { std::vector children_; mutable InternalKey cur_start_key_; + mutable std::string cur_end_key_; + size_t ts_sz_; }; } // namespace diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 43e34565e..9bd40967d 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -72,6 +72,13 @@ class TruncatedRangeDelIterator { } SequenceNumber seq() const { return iter_->seq(); } + Slice timestamp() const { + assert(icmp_->user_comparator()->timestamp_size()); + return iter_->timestamp(); + } + void SetTimestampUpperBound(const Slice* ts_upper_bound) { + iter_->SetTimestampUpperBound(ts_upper_bound); + } std::map> SplitBySnapshot(const std::vector& snapshots); @@ -332,6 +339,8 @@ class RangeDelAggregator { } } + // If user-defined timestamp is enabled, `start` and `end` are user keys + // with timestamp. bool IsRangeOverlapped(const Slice& start, const Slice& end); private: @@ -395,8 +404,25 @@ class ReadRangeDelAggregator final : public RangeDelAggregator { class CompactionRangeDelAggregator : public RangeDelAggregator { public: CompactionRangeDelAggregator(const InternalKeyComparator* icmp, - const std::vector& snapshots) - : RangeDelAggregator(icmp), snapshots_(&snapshots) {} + const std::vector& snapshots, + const std::string* full_history_ts_low = nullptr, + const std::string* trim_ts = nullptr) + : RangeDelAggregator(icmp), snapshots_(&snapshots) { + if (full_history_ts_low) { + ts_upper_bound_ = *full_history_ts_low; + } + if (trim_ts) { + trim_ts_ = *trim_ts; + // Range tombstone newer than `trim_ts` or `full_history_ts_low` should + // not be considered in ShouldDelete(). + if (ts_upper_bound_.empty()) { + ts_upper_bound_ = trim_ts_; + } else if (!trim_ts_.empty() && icmp->user_comparator()->CompareTimestamp( + trim_ts_, ts_upper_bound_) < 0) { + ts_upper_bound_ = trim_ts_; + } + } + } ~CompactionRangeDelAggregator() override {} void AddTombstones( @@ -442,6 +468,9 @@ class CompactionRangeDelAggregator : public RangeDelAggregator { std::map reps_; const std::vector* snapshots_; + // min over full_history_ts_low and trim_ts_ + Slice ts_upper_bound_{}; + Slice trim_ts_{}; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/range_tombstone_fragmenter.cc b/db/range_tombstone_fragmenter.cc index 356eee181..925b4ed33 100644 --- a/db/range_tombstone_fragmenter.cc +++ b/db/range_tombstone_fragmenter.cc @@ -84,6 +84,7 @@ void FragmentedRangeTombstoneList::FragmentTombstones( // for use in flush_current_tombstones. std::set cur_end_keys(cmp); + size_t ts_sz = icmp.user_comparator()->timestamp_size(); // Given the next start key in unfragmented_tombstones, // flush_current_tombstones writes every tombstone fragment that starts // and ends with a key before next_start_key, and starts with a key greater @@ -93,12 +94,14 @@ void FragmentedRangeTombstoneList::FragmentTombstones( bool reached_next_start_key = false; for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) { Slice cur_end_key = it->user_key; - if (icmp.user_comparator()->Compare(cur_start_key, cur_end_key) == 0) { + if (icmp.user_comparator()->CompareWithoutTimestamp(cur_start_key, + cur_end_key) == 0) { // Empty tombstone. continue; } - if (icmp.user_comparator()->Compare(next_start_key, cur_end_key) <= 0) { - // All of the end keys in [it, cur_end_keys.end()) are after + if (icmp.user_comparator()->CompareWithoutTimestamp(next_start_key, + cur_end_key) <= 0) { + // All the end keys in [it, cur_end_keys.end()) are after // next_start_key, so the tombstones they represent can be used in // fragments that start with keys greater than or equal to // next_start_key. However, the end keys we already passed will not be @@ -115,22 +118,38 @@ void FragmentedRangeTombstoneList::FragmentTombstones( // Flush a range tombstone fragment [cur_start_key, cur_end_key), which // should not overlap with the last-flushed tombstone fragment. assert(tombstones_.empty() || - icmp.user_comparator()->Compare(tombstones_.back().end_key, - cur_start_key) <= 0); + icmp.user_comparator()->CompareWithoutTimestamp( + tombstones_.back().end_key, cur_start_key) <= 0); // Sort the sequence numbers of the tombstones being fragmented in // descending order, and then flush them in that order. autovector seqnums_to_flush; + autovector timestamps_to_flush; for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) { seqnums_to_flush.push_back(flush_it->sequence); + if (ts_sz) { + timestamps_to_flush.push_back( + ExtractTimestampFromUserKey(flush_it->user_key, ts_sz)); + } } + // TODO: bind the two sorting together to be more efficient std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(), std::greater()); + if (ts_sz) { + std::sort(timestamps_to_flush.begin(), timestamps_to_flush.end(), + [icmp](const Slice& ts1, const Slice& ts2) { + return icmp.user_comparator()->CompareTimestamp(ts1, ts2) > + 0; + }); + } size_t start_idx = tombstone_seqs_.size(); size_t end_idx = start_idx + seqnums_to_flush.size(); - if (for_compaction) { + // If user-defined timestamp is enabled, we should not drop tombstones + // from any snapshot stripe. Garbage collection of range tombstones + // happens in CompactionOutputs::AddRangeDels(). + if (for_compaction && ts_sz == 0) { // Drop all tombstone seqnums that are not preserved by a snapshot. SequenceNumber next_snapshot = kMaxSequenceNumber; for (auto seq : seqnums_to_flush) { @@ -155,10 +174,33 @@ void FragmentedRangeTombstoneList::FragmentTombstones( tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(), seqnums_to_flush.end()); seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end()); + if (ts_sz) { + tombstone_timestamps_.insert(tombstone_timestamps_.end(), + timestamps_to_flush.begin(), + timestamps_to_flush.end()); + } } assert(start_idx < end_idx); - tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx); + if (ts_sz) { + std::string start_key_with_max_ts; + AppendUserKeyWithMaxTimestamp(&start_key_with_max_ts, cur_start_key, + ts_sz); + pinned_slices_.emplace_back(std::move(start_key_with_max_ts)); + Slice start_key = pinned_slices_.back(); + + std::string end_key_with_max_ts; + AppendUserKeyWithMaxTimestamp(&end_key_with_max_ts, cur_end_key, ts_sz); + pinned_slices_.emplace_back(std::move(end_key_with_max_ts)); + Slice end_key = pinned_slices_.back(); + + // RangeTombstoneStack expects start_key and end_key to have max + // timestamp. + tombstones_.emplace_back(start_key, end_key, start_idx, end_idx); + } else { + tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, + end_idx); + } cur_start_key = cur_end_key; } @@ -193,8 +235,9 @@ void FragmentedRangeTombstoneList::FragmentTombstones( tombstone_end_key.size()); tombstone_end_key = pinned_slices_.back(); } - if (!cur_end_keys.empty() && icmp.user_comparator()->Compare( - cur_start_key, tombstone_start_key) != 0) { + if (!cur_end_keys.empty() && + icmp.user_comparator()->CompareWithoutTimestamp( + cur_start_key, tombstone_start_key) != 0) { // The start key has changed. Flush all tombstones that start before // this new start key. flush_current_tombstones(tombstone_start_key); @@ -223,14 +266,15 @@ bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower, FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( const FragmentedRangeTombstoneList* tombstones, const InternalKeyComparator& icmp, SequenceNumber _upper_bound, - SequenceNumber _lower_bound) + const Slice* ts_upper_bound, SequenceNumber _lower_bound) : tombstone_start_cmp_(icmp.user_comparator()), tombstone_end_cmp_(icmp.user_comparator()), icmp_(&icmp), ucmp_(icmp.user_comparator()), tombstones_(tombstones), upper_bound_(_upper_bound), - lower_bound_(_lower_bound) { + lower_bound_(_lower_bound), + ts_upper_bound_(ts_upper_bound) { assert(tombstones_ != nullptr); Invalidate(); } @@ -238,7 +282,7 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( const std::shared_ptr& tombstones, const InternalKeyComparator& icmp, SequenceNumber _upper_bound, - SequenceNumber _lower_bound) + const Slice* ts_upper_bound, SequenceNumber _lower_bound) : tombstone_start_cmp_(icmp.user_comparator()), tombstone_end_cmp_(icmp.user_comparator()), icmp_(&icmp), @@ -246,7 +290,8 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( tombstones_ref_(tombstones), tombstones_(tombstones_ref_.get()), upper_bound_(_upper_bound), - lower_bound_(_lower_bound) { + lower_bound_(_lower_bound), + ts_upper_bound_(ts_upper_bound) { assert(tombstones_ != nullptr); Invalidate(); } @@ -254,7 +299,7 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( const std::shared_ptr& tombstones_cache, const InternalKeyComparator& icmp, SequenceNumber _upper_bound, - SequenceNumber _lower_bound) + const Slice* ts_upper_bound, SequenceNumber _lower_bound) : tombstone_start_cmp_(icmp.user_comparator()), tombstone_end_cmp_(icmp.user_comparator()), icmp_(&icmp), @@ -264,6 +309,11 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator( upper_bound_(_upper_bound), lower_bound_(_lower_bound) { assert(tombstones_ != nullptr); + if (!ts_upper_bound || ts_upper_bound->empty()) { + ts_upper_bound_ = nullptr; + } else { + ts_upper_bound_ = ts_upper_bound; + } Invalidate(); } @@ -278,9 +328,7 @@ void FragmentedRangeTombstoneIterator::SeekToTopFirst() { return; } pos_ = tombstones_->begin(); - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); ScanForwardToVisibleTombstone(); } @@ -295,12 +343,12 @@ void FragmentedRangeTombstoneIterator::SeekToTopLast() { return; } pos_ = std::prev(tombstones_->end()); - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); ScanBackwardToVisibleTombstone(); } +// @param `target` is a user key, with timestamp if user-defined timestamp is +// enabled. void FragmentedRangeTombstoneIterator::Seek(const Slice& target) { if (tombstones_->empty()) { Invalidate(); @@ -328,9 +376,7 @@ void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone( seq_pos_ = tombstones_->seq_end(); return; } - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); } void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone( @@ -347,9 +393,7 @@ void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone( return; } --pos_; - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); } void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() { @@ -361,9 +405,7 @@ void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() { Invalidate(); return; } - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); } } @@ -376,9 +418,7 @@ void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() { return; } --pos_; - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); } } @@ -394,9 +434,7 @@ void FragmentedRangeTombstoneIterator::TopNext() { if (pos_ == tombstones_->end()) { return; } - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); ScanForwardToVisibleTombstone(); } @@ -418,9 +456,7 @@ void FragmentedRangeTombstoneIterator::TopPrev() { return; } --pos_; - seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), - tombstones_->seq_iter(pos_->seq_end_idx), - upper_bound_, std::greater()); + SetMaxVisibleSeqAndTimestamp(); ScanBackwardToVisibleTombstone(); } @@ -431,8 +467,10 @@ bool FragmentedRangeTombstoneIterator::Valid() const { SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum( const Slice& target_user_key) { SeekToCoveringTombstone(target_user_key); - return ValidPos() && ucmp_->Compare(start_key(), target_user_key) <= 0 ? seq() - : 0; + return ValidPos() && ucmp_->CompareWithoutTimestamp(start_key(), + target_user_key) <= 0 + ? seq() + : 0; } std::map> @@ -449,8 +487,9 @@ FragmentedRangeTombstoneIterator::SplitBySnapshot( upper = snapshots[i]; } if (tombstones_->ContainsRange(lower, upper)) { - splits.emplace(upper, std::make_unique( - tombstones_, *icmp_, upper, lower)); + splits.emplace(upper, + std::make_unique( + tombstones_, *icmp_, upper, ts_upper_bound_, lower)); } lower = upper + 1; } diff --git a/db/range_tombstone_fragmenter.h b/db/range_tombstone_fragmenter.h index 0c8cbf181..f4b0eab42 100644 --- a/db/range_tombstone_fragmenter.h +++ b/db/range_tombstone_fragmenter.h @@ -33,6 +33,10 @@ struct FragmentedRangeTombstoneList { // start and end at the same user keys but have different sequence numbers. // The members seq_start_idx and seq_end_idx are intended to be parameters to // seq_iter(). + // If user-defined timestamp is enabled, `start` and `end` should be user keys + // with timestamp, and the timestamps are set to max timestamp to be returned + // by parsed_start_key()/parsed_end_key(). seq_start_idx and seq_end_idx will + // also be used as parameters to ts_iter(). struct RangeTombstoneStack { RangeTombstoneStack(const Slice& start, const Slice& end, size_t start_idx, size_t end_idx) @@ -40,12 +44,13 @@ struct FragmentedRangeTombstoneList { end_key(end), seq_start_idx(start_idx), seq_end_idx(end_idx) {} - Slice start_key; Slice end_key; size_t seq_start_idx; size_t seq_end_idx; }; + // Assumes unfragmented_tombstones->key() and unfragmented_tombstones->value() + // both contain timestamp if enabled. FragmentedRangeTombstoneList( std::unique_ptr unfragmented_tombstones, const InternalKeyComparator& icmp, bool for_compaction = false, @@ -63,6 +68,10 @@ struct FragmentedRangeTombstoneList { return std::next(tombstone_seqs_.begin(), idx); } + std::vector::const_iterator ts_iter(size_t idx) const { + return std::next(tombstone_timestamps_.begin(), idx); + } + std::vector::const_iterator seq_begin() const { return tombstone_seqs_.begin(); } @@ -87,8 +96,15 @@ struct FragmentedRangeTombstoneList { private: // Given an ordered range tombstone iterator unfragmented_tombstones, - // "fragment" the tombstones into non-overlapping pieces, and store them in - // tombstones_ and tombstone_seqs_. + // "fragment" the tombstones into non-overlapping pieces. Each + // "non-overlapping piece" is a RangeTombstoneStack in tombstones_, which + // contains start_key, end_key, and indices that points to sequence numbers + // (in tombstone_seqs_) and timestamps (in tombstone_timestamps_). If + // for_compaction is true, then `snapshots` should be provided. Range + // tombstone fragments are dropped if they are not visible in any snapshot and + // user-defined timestamp is not enabled. That is, for each snapshot stripe + // [lower, upper], the range tombstone fragment with largest seqno in [lower, + // upper] is preserved, and all the other range tombstones are dropped. void FragmentTombstones( std::unique_ptr unfragmented_tombstones, const InternalKeyComparator& icmp, bool for_compaction, @@ -96,6 +112,7 @@ struct FragmentedRangeTombstoneList { std::vector tombstones_; std::vector tombstone_seqs_; + std::vector tombstone_timestamps_; std::set seq_set_; std::list pinned_slices_; PinnedIteratorsManager pinned_iters_mgr_; @@ -117,15 +134,15 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { FragmentedRangeTombstoneIterator( const FragmentedRangeTombstoneList* tombstones, const InternalKeyComparator& icmp, SequenceNumber upper_bound, - SequenceNumber lower_bound = 0); + const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0); FragmentedRangeTombstoneIterator( const std::shared_ptr& tombstones, const InternalKeyComparator& icmp, SequenceNumber upper_bound, - SequenceNumber lower_bound = 0); + const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0); FragmentedRangeTombstoneIterator( const std::shared_ptr& tombstones, const InternalKeyComparator& icmp, SequenceNumber upper_bound, - SequenceNumber lower_bound = 0); + const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0); void SeekToFirst() override; void SeekToLast() override; @@ -154,6 +171,8 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { void TopPrev(); bool Valid() const override; + // Note that key() and value() do not return correct timestamp. + // Caller should call timestamp() to get the current timestamp. Slice key() const override { MaybePinKey(); return current_start_key_.Encode(); @@ -172,11 +191,28 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { } RangeTombstone Tombstone() const { + assert(Valid()); + if (icmp_->user_comparator()->timestamp_size()) { + return RangeTombstone(start_key(), end_key(), seq(), timestamp()); + } return RangeTombstone(start_key(), end_key(), seq()); } + // Note that start_key() and end_key() are not guaranteed to have the + // correct timestamp. User can call timestamp() to get the correct + // timestamp(). Slice start_key() const { return pos_->start_key; } Slice end_key() const { return pos_->end_key; } SequenceNumber seq() const { return *seq_pos_; } + Slice timestamp() const { + // seqno and timestamp are stored in the same order. + return *tombstones_->ts_iter(seq_pos_ - tombstones_->seq_begin()); + } + // Current use case is by CompactionRangeDelAggregator to set + // full_history_ts_low_. + void SetTimestampUpperBound(const Slice* ts_upper_bound) { + ts_upper_bound_ = ts_upper_bound; + } + ParsedInternalKey parsed_start_key() const { return ParsedInternalKey(pos_->start_key, kMaxSequenceNumber, kTypeRangeDeletion); @@ -186,6 +222,9 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { kTypeRangeDeletion); } + // Return the max sequence number of a range tombstone that covers + // the given user key. + // If there is no covering tombstone, then 0 is returned. SequenceNumber MaxCoveringTombstoneSeqnum(const Slice& user_key); // Splits the iterator into n+1 iterators (where n is the number of @@ -218,15 +257,15 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { bool operator()(const RangeTombstoneStack& a, const RangeTombstoneStack& b) const { - return cmp->Compare(a.start_key, b.start_key) < 0; + return cmp->CompareWithoutTimestamp(a.start_key, b.start_key) < 0; } bool operator()(const RangeTombstoneStack& a, const Slice& b) const { - return cmp->Compare(a.start_key, b) < 0; + return cmp->CompareWithoutTimestamp(a.start_key, b) < 0; } bool operator()(const Slice& a, const RangeTombstoneStack& b) const { - return cmp->Compare(a, b.start_key) < 0; + return cmp->CompareWithoutTimestamp(a, b.start_key) < 0; } const Comparator* cmp; @@ -237,15 +276,15 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { bool operator()(const RangeTombstoneStack& a, const RangeTombstoneStack& b) const { - return cmp->Compare(a.end_key, b.end_key) < 0; + return cmp->CompareWithoutTimestamp(a.end_key, b.end_key) < 0; } bool operator()(const RangeTombstoneStack& a, const Slice& b) const { - return cmp->Compare(a.end_key, b) < 0; + return cmp->CompareWithoutTimestamp(a.end_key, b) < 0; } bool operator()(const Slice& a, const RangeTombstoneStack& b) const { - return cmp->Compare(a, b.end_key) < 0; + return cmp->CompareWithoutTimestamp(a, b.end_key) < 0; } const Comparator* cmp; @@ -277,11 +316,38 @@ class FragmentedRangeTombstoneIterator : public InternalIterator { const FragmentedRangeTombstoneList* tombstones_; SequenceNumber upper_bound_; SequenceNumber lower_bound_; + // Only consider timestamps <= ts_upper_bound_. + const Slice* ts_upper_bound_; std::vector::const_iterator pos_; std::vector::const_iterator seq_pos_; mutable std::vector::const_iterator pinned_pos_; mutable std::vector::const_iterator pinned_seq_pos_; mutable InternalKey current_start_key_; + + // Check the current RangeTombstoneStack `pos_` against timestamp + // upper bound `ts_upper_bound_` and sequence number upper bound + // `upper_bound_`. Update the sequence number (and timestamp) pointer + // `seq_pos_` to the first valid position satisfying both bounds. + void SetMaxVisibleSeqAndTimestamp() { + seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx), + tombstones_->seq_iter(pos_->seq_end_idx), + upper_bound_, std::greater()); + if (ts_upper_bound_ && !ts_upper_bound_->empty()) { + auto ts_pos = std::lower_bound( + tombstones_->ts_iter(pos_->seq_start_idx), + tombstones_->ts_iter(pos_->seq_end_idx), *ts_upper_bound_, + [this](const Slice& s1, const Slice& s2) { + return ucmp_->CompareTimestamp(s1, s2) > 0; + }); + auto ts_idx = ts_pos - tombstones_->ts_iter(pos_->seq_start_idx); + auto seq_idx = seq_pos_ - tombstones_->seq_iter(pos_->seq_start_idx); + if (seq_idx < ts_idx) { + // seq and ts are ordered in non-increasing order. Only updates seq_pos_ + // to a larger index for smaller sequence number and timestamp. + seq_pos_ = tombstones_->seq_iter(pos_->seq_start_idx + ts_idx); + } + } + } }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/table_cache.cc b/db/table_cache.cc index 42c72e925..c44578f8b 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -490,9 +490,15 @@ Status TableCache::Get( std::unique_ptr range_del_iter( t->NewRangeTombstoneIterator(options)); if (range_del_iter != nullptr) { - *max_covering_tombstone_seq = std::max( - *max_covering_tombstone_seq, - range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k))); + SequenceNumber seq = + range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)); + if (seq > *max_covering_tombstone_seq) { + *max_covering_tombstone_seq = seq; + if (get_context->NeedTimestamp()) { + get_context->SetTimestampFromRangeTombstone( + range_del_iter->timestamp()); + } + } } } if (s.ok()) { @@ -535,9 +541,15 @@ void TableCache::UpdateRangeTombstoneSeqnums( for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) { SequenceNumber* max_covering_tombstone_seq = iter->get_context->max_covering_tombstone_seq(); - *max_covering_tombstone_seq = std::max( - *max_covering_tombstone_seq, - range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts)); + SequenceNumber seq = + range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts); + if (seq > *max_covering_tombstone_seq) { + *max_covering_tombstone_seq = seq; + if (iter->get_context->NeedTimestamp()) { + iter->get_context->SetTimestampFromRangeTombstone( + range_del_iter->timestamp()); + } + } } } } diff --git a/db/version_set.cc b/db/version_set.cc index 29ecec309..a3e8ac944 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1632,6 +1632,7 @@ Status Version::TablesRangeTombstoneSummary(int max_entries_to_print, if (tombstone_iter) { tombstone_iter->SeekToFirst(); + // TODO: print timestamp while (tombstone_iter->Valid() && num_entries_left > 0) { ss << "start: " << tombstone_iter->start_key().ToString(true) << " end: " << tombstone_iter->end_key().ToString(true) diff --git a/db/write_batch.cc b/db/write_batch.cc index 4301800d0..ae015693f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1353,8 +1353,31 @@ Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key); } - return Status::InvalidArgument( - "Cannot call this method on column family enabling timestamp"); + needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; + std::string dummy_ts(ts_sz, '\0'); + std::array begin_key_with_ts{{begin_key, dummy_ts}}; + std::array end_key_with_ts{{end_key, dummy_ts}}; + return WriteBatchInternal::DeleteRange( + this, cf_id, SliceParts(begin_key_with_ts.data(), 2), + SliceParts(end_key_with_ts.data(), 2)); +} + +Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts) { + const Status s = CheckColumnFamilyTimestampSize(column_family, ts); + if (!s.ok()) { + return s; + } + assert(column_family); + has_key_with_ts_ = true; + uint32_t cf_id = column_family->GetID(); + std::array key_with_ts{{begin_key, ts}}; + std::array end_key_with_ts{{end_key, ts}}; + return WriteBatchInternal::DeleteRange(this, cf_id, + SliceParts(key_with_ts.data(), 2), + SliceParts(end_key_with_ts.data(), 2)); } Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, @@ -1928,10 +1951,9 @@ class MemTableInserter : public WriteBatch::Handler { // always 0 in // non-recovery, regular write code-path) // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that - // column - // family already contains updates from this log. We can't apply updates - // twice because of update-in-place or merge workloads -- ignore the - // update + // column family already contains updates from this log. We can't apply + // updates twice because of update-in-place or merge workloads -- ignore + // the update *s = Status::OK(); return false; } @@ -2331,7 +2353,8 @@ class MemTableInserter : public WriteBatch::Handler { cfd->ioptions()->table_factory->Name() + " in CF " + cfd->GetName()); } - int cmp = cfd->user_comparator()->Compare(begin_key, end_key); + int cmp = + cfd->user_comparator()->CompareWithoutTimestamp(begin_key, end_key); if (cmp > 0) { // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`. ret_status.PermitUncheckedError(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index ee8690c28..f4ea75893 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -314,8 +314,12 @@ class TimestampUpdater : public WriteBatch::Handler { } Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, - const Slice&) override { - return UpdateTimestamp(cf, begin_key); + const Slice& end_key) override { + Status s = UpdateTimestamp(cf, begin_key, true /* is_key */); + if (s.ok()) { + s = UpdateTimestamp(cf, end_key, false /* is_key */); + } + return s; } Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { @@ -341,13 +345,15 @@ class TimestampUpdater : public WriteBatch::Handler { Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } private: - Status UpdateTimestamp(uint32_t cf, const Slice& key) { - Status s = UpdateTimestampImpl(cf, key, idx_); + // @param is_key specifies whether the update is for key or value. + Status UpdateTimestamp(uint32_t cf, const Slice& buf, bool is_key = true) { + Status s = UpdateTimestampImpl(cf, buf, idx_, is_key); ++idx_; return s; } - Status UpdateTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) { + Status UpdateTimestampImpl(uint32_t cf, const Slice& buf, size_t /*idx*/, + bool is_key) { if (timestamp_.empty()) { return Status::InvalidArgument("Timestamp is empty"); } @@ -361,22 +367,27 @@ class TimestampUpdater : public WriteBatch::Handler { } else if (cf_ts_sz != timestamp_.size()) { return Status::InvalidArgument("timestamp size mismatch"); } - UpdateProtectionInformationIfNeeded(key, timestamp_); + UpdateProtectionInformationIfNeeded(buf, timestamp_, is_key); - char* ptr = const_cast(key.data() + key.size() - cf_ts_sz); + char* ptr = const_cast(buf.data() + buf.size() - cf_ts_sz); assert(ptr); memcpy(ptr, timestamp_.data(), timestamp_.size()); return Status::OK(); } - void UpdateProtectionInformationIfNeeded(const Slice& key, const Slice& ts) { + void UpdateProtectionInformationIfNeeded(const Slice& buf, const Slice& ts, + bool is_key) { if (prot_info_ != nullptr) { const size_t ts_sz = ts.size(); - SliceParts old_key(&key, 1); - Slice key_no_ts(key.data(), key.size() - ts_sz); - std::array new_key_cmpts{{key_no_ts, ts}}; - SliceParts new_key(new_key_cmpts.data(), 2); - prot_info_->entries_[idx_].UpdateK(old_key, new_key); + SliceParts old(&buf, 1); + Slice old_no_ts(buf.data(), buf.size() - ts_sz); + std::array new_key_cmpts{{old_no_ts, ts}}; + SliceParts new_parts(new_key_cmpts.data(), 2); + if (is_key) { + prot_info_->entries_[idx_].UpdateK(old, new_parts); + } else { + prot_info_->entries_[idx_].UpdateV(old, new_parts); + } } } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 40f23f54b..d3d9ba860 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -962,15 +962,15 @@ TEST_F(WriteBatchTest, SanityChecks) { ASSERT_TRUE(wb.Delete(nullptr, "key", "ts").IsInvalidArgument()); ASSERT_TRUE(wb.SingleDelete(nullptr, "key", "ts").IsInvalidArgument()); ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsNotSupported()); - ASSERT_TRUE( - wb.DeleteRange(nullptr, "begin_key", "end_key", "ts").IsNotSupported()); + ASSERT_TRUE(wb.DeleteRange(nullptr, "begin_key", "end_key", "ts") + .IsInvalidArgument()); ASSERT_TRUE(wb.Put(&cf4, "key", "ts", "value").IsInvalidArgument()); ASSERT_TRUE(wb.Delete(&cf4, "key", "ts").IsInvalidArgument()); ASSERT_TRUE(wb.SingleDelete(&cf4, "key", "ts").IsInvalidArgument()); ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsNotSupported()); ASSERT_TRUE( - wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsNotSupported()); + wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsInvalidArgument()); constexpr size_t wrong_ts_sz = 1 + sizeof(uint64_t); std::string ts(wrong_ts_sz, '\0'); @@ -980,7 +980,7 @@ TEST_F(WriteBatchTest, SanityChecks) { ASSERT_TRUE(wb.SingleDelete(&cf0, "key", ts).IsInvalidArgument()); ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsNotSupported()); ASSERT_TRUE( - wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsNotSupported()); + wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsInvalidArgument()); // Sanity checks for the new WriteBatch APIs without extra 'ts' arg. WriteBatch wb1(0, 0, 0, wrong_ts_sz); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 896653b70..020380dc2 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -935,15 +935,11 @@ void StressTest::OperateDb(ThreadState* thread) { // Assign timestamps if necessary. std::string read_ts_str; - std::string write_ts_str; Slice read_ts; - Slice write_ts; if (FLAGS_user_timestamp_size > 0) { read_ts_str = GetNowNanos(); read_ts = read_ts_str; read_opts.timestamp = &read_ts; - write_ts_str = GetNowNanos(); - write_ts = write_ts_str; } int prob_op = thread->rand.Uniform(100); @@ -2831,17 +2827,17 @@ void StressTest::Reopen(ThreadState* thread) { } } -void StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread, +bool StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread, std::string& ts_str, Slice& ts_slice, ReadOptions& read_opts) { if (FLAGS_user_timestamp_size == 0) { - return; + return false; } assert(thread); if (!thread->rand.OneInOpt(3)) { - return; + return false; } const SharedState* const shared = thread->shared; @@ -2857,6 +2853,7 @@ void StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread, PutFixed64(&ts_str, ts); ts_slice = ts_str; read_opts.timestamp = &ts_slice; + return true; } void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState* thread, @@ -2914,10 +2911,6 @@ void CheckAndSetOptionsForUserTimestamp(Options& options) { fprintf(stderr, "Merge does not support timestamp yet.\n"); exit(1); } - if (FLAGS_delrangepercent > 0) { - fprintf(stderr, "DeleteRange does not support timestamp yet.\n"); - exit(1); - } if (FLAGS_use_txn) { fprintf(stderr, "TransactionDB does not support timestamp yet.\n"); exit(1); diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 374cb9349..e05bfe4f4 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -244,7 +244,8 @@ class StressTest { TransactionDBOptions& /*txn_db_opts*/) {} #endif - void MaybeUseOlderTimestampForPointLookup(ThreadState* thread, + // Returns whether the timestamp of read_opts is updated. + bool MaybeUseOlderTimestampForPointLookup(ThreadState* thread, std::string& ts_str, Slice& ts_slice, ReadOptions& read_opts); diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 004387944..b809902dc 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -390,8 +390,8 @@ class NonBatchedOpsStressTest : public StressTest { ReadOptions read_opts_copy = read_opts; std::string read_ts_str; Slice read_ts_slice; - MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice, - read_opts_copy); + bool read_older_ts = MaybeUseOlderTimestampForPointLookup( + thread, read_ts_str, read_ts_slice, read_opts_copy); Status s = db_->Get(read_opts_copy, cfh, key, &from_db); if (fault_fs_guard) { @@ -424,7 +424,7 @@ class NonBatchedOpsStressTest : public StressTest { } else if (s.IsNotFound()) { // not found case thread->stats.AddGets(1, 0); - if (!FLAGS_skip_verifydb && !read_opts_copy.timestamp) { + if (!FLAGS_skip_verifydb && !read_older_ts) { auto expected = thread->shared->Get(rand_column_families[0], rand_keys[0]); if (expected != SharedState::DELETION_SENTINEL && @@ -959,7 +959,16 @@ class NonBatchedOpsStressTest : public StressTest { auto cfh = column_families_[rand_column_family]; std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); Slice end_key = end_keystr; - Status s = db_->DeleteRange(write_opts, cfh, key, end_key); + std::string write_ts_str; + Slice write_ts; + Status s; + if (FLAGS_user_timestamp_size) { + write_ts_str = GetNowNanos(); + write_ts = write_ts_str; + s = db_->DeleteRange(write_opts, cfh, key, end_key, write_ts); + } else { + s = db_->DeleteRange(write_opts, cfh, key, end_key); + } if (!s.ok()) { if (FLAGS_injest_error_severity >= 2) { if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 9359a45ba..6d8048d9d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -481,13 +481,10 @@ class DB { virtual Status DeleteRange(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key); - virtual Status DeleteRange(const WriteOptions& /*options*/, - ColumnFamilyHandle* /*column_family*/, - const Slice& /*begin_key*/, - const Slice& /*end_key*/, const Slice& /*ts*/) { - return Status::NotSupported( - "DeleteRange does not support user-defined timestamp yet"); - } + virtual Status DeleteRange(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts); // Merge the database entry for "key" with "value". Returns OK on success, // and a non-OK status on error. The semantics of this operation is diff --git a/include/rocksdb/sst_file_reader.h b/include/rocksdb/sst_file_reader.h index 4b8642480..2748e675b 100644 --- a/include/rocksdb/sst_file_reader.h +++ b/include/rocksdb/sst_file_reader.h @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE +#include "db/range_tombstone_fragmenter.h" #include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" @@ -30,6 +31,9 @@ class SstFileReader { // If "snapshot" is nullptr, the iterator returns only the latest keys. Iterator* NewIterator(const ReadOptions& options); + FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + const ReadOptions& options); + std::shared_ptr GetTableProperties() const; // Verifies whether there is corruption in this table. diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h index a6430eaa9..e693bef9e 100644 --- a/include/rocksdb/sst_file_writer.h +++ b/include/rocksdb/sst_file_writer.h @@ -148,6 +148,13 @@ class SstFileWriter { // REQUIRES: comparator is *not* timestamp-aware. Status DeleteRange(const Slice& begin_key, const Slice& end_key); + // Add a range deletion tombstone to currently opened file. + // REQUIRES: begin_key and end_key are user keys without timestamp. + // REQUIRES: the timestamp's size is equal to what is expected by + // the comparator. + Status DeleteRange(const Slice& begin_key, const Slice& end_key, + const Slice& timestamp); + // Finalize writing to sst file and close file. // // An optional ExternalSstFileInfo pointer can be passed to the function diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index dd516eabd..dba80a76e 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -150,12 +150,9 @@ class WriteBatch : public WriteBatchBase { Status DeleteRange(const Slice& begin_key, const Slice& end_key) override { return DeleteRange(nullptr, begin_key, end_key); } - Status DeleteRange(ColumnFamilyHandle* /*column_family*/, - const Slice& /*begin_key*/, const Slice& /*end_key*/, - const Slice& /*ts*/) override { - return Status::NotSupported( - "DeleteRange does not support user-defined timestamp"); - } + // begin_key and end_key should be user keys without timestamp. + Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, + const Slice& end_key, const Slice& ts) override; // variant that takes SliceParts Status DeleteRange(ColumnFamilyHandle* column_family, diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index e2fc18150..40dcd6e1f 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2026,8 +2026,9 @@ FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator( if (read_options.snapshot != nullptr) { snapshot = read_options.snapshot->GetSequenceNumber(); } - return new FragmentedRangeTombstoneIterator( - rep_->fragmented_range_dels, rep_->internal_comparator, snapshot); + return new FragmentedRangeTombstoneIterator(rep_->fragmented_range_dels, + rep_->internal_comparator, + snapshot, read_options.timestamp); } bool BlockBasedTable::FullFilterKeyMayMatch( diff --git a/table/get_context.cc b/table/get_context.cc index c86edbd7e..fca809cc3 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -250,6 +250,20 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, if (ts_sz > 0 && timestamp_ != nullptr) { if (!timestamp_->empty()) { assert(ts_sz == timestamp_->size()); + // `timestamp` can be set before `SaveValue` is ever called + // when max_covering_tombstone_seq_ was set. + // If this key has a higher sequence number than range tombstone, + // then timestamp should be updated. `ts_from_rangetombstone_` is + // set to false afterwards so that only the key with highest seqno + // updates the timestamp. + if (ts_from_rangetombstone_) { + assert(max_covering_tombstone_seq_); + if (parsed_key.sequence > *max_covering_tombstone_seq_) { + Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); + timestamp_->assign(ts.data(), ts.size()); + ts_from_rangetombstone_ = false; + } + } } // TODO optimize for small size ts const std::string kMaxTs(ts_sz, '\xff'); @@ -263,9 +277,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, auto type = parsed_key.type; // Key matches. Process it if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || - type == kTypeWideColumnEntity) && + type == kTypeWideColumnEntity || type == kTypeDeletion || + type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) && max_covering_tombstone_seq_ != nullptr && *max_covering_tombstone_seq_ > parsed_key.sequence) { + // Note that deletion types are also considered, this is for the case + // when we need to return timestamp to user. If a range tombstone has a + // higher seqno than point tombstone, its timestamp should be returned. type = kTypeRangeDeletion; } switch (type) { diff --git a/table/get_context.h b/table/get_context.h index 5638f7e10..57f8b7eea 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -148,6 +148,14 @@ class GetContext { return max_covering_tombstone_seq_; } + bool NeedTimestamp() { return timestamp_ != nullptr; } + + void SetTimestampFromRangeTombstone(const Slice& timestamp) { + assert(timestamp_); + timestamp_->assign(timestamp.data(), timestamp.size()); + ts_from_rangetombstone_ = true; + } + PinnedIteratorsManager* pinned_iters_mgr() { return pinned_iters_mgr_; } // If a non-null string is passed, all the SaveValue calls will be @@ -190,6 +198,7 @@ class GetContext { PinnableSlice* pinnable_val_; PinnableWideColumns* columns_; std::string* timestamp_; + bool ts_from_rangetombstone_{false}; bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; SequenceNumber* max_covering_tombstone_seq_; diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index eaadf9889..b712e935e 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -697,6 +697,11 @@ void MergingIterator::SeekImpl(const Slice& target, size_t starting_level, // is not the same as the original target, it should not affect // correctness. Besides, in most cases, range tombstone start and // end key should have the same prefix? + // If range_tombstone_iter->end_key() is truncated to its largest_ + // boundary, the timestamp in user_key will not be max timestamp, + // but the timestamp of `range_tombstone_iter.largest_`. This should + // be fine here as current_search_key is used to Seek into lower + // levels. current_search_key.SetInternalKey( range_tombstone_iter->end_key().user_key, kMaxSequenceNumber); } @@ -919,7 +924,6 @@ void MergingIterator::SeekForPrevImpl(const Slice& target, current_search_key.GetUserKey(), range_tombstone_iter->end_key().user_key) < 0) { range_tombstone_reseek = true; - // covered by this range tombstone current_search_key.SetInternalKey( range_tombstone_iter->start_key().user_key, kMaxSequenceNumber, kValueTypeForSeekForPrev); @@ -988,10 +992,6 @@ bool MergingIterator::SkipPrevDeleted() { return true /* current key deleted */; } if (current->iter.IsDeleteRangeSentinelKey()) { - // Different from SkipNextDeleted(): range tombstone start key is before - // file boundary due to op_type set in SetTombstoneKey(). - assert(ExtractValueType(current->iter.key()) != kTypeRangeDeletion || - active_.count(current->level)); // LevelIterator enters a new SST file current->iter.Prev(); if (current->iter.Valid()) { @@ -1025,12 +1025,11 @@ bool MergingIterator::SkipPrevDeleted() { std::string target; AppendInternalKey(&target, range_tombstone_iters_[i]->start_key()); // This is different from SkipNextDeleted() which does reseek at sorted - // runs - // >= level (instead of i+1 here). With min heap, if level L is at top of - // the heap, then levels level L's current - // internal key, which means levels = level (instead of i+1 here). With min heap, if level L is at + // top of the heap, then levels level L's + // current internal key, which means levels table_reader->NewRangeTombstoneIterator(options); +} + std::shared_ptr SstFileReader::GetTableProperties() const { return rep_->table_reader->GetTableProperties(); diff --git a/table/sst_file_reader_test.cc b/table/sst_file_reader_test.cc index 4837d223b..9292732a3 100644 --- a/table/sst_file_reader_test.cc +++ b/table/sst_file_reader_test.cc @@ -385,6 +385,39 @@ TEST_F(SstFileReaderTimestampTest, Basic) { } } +TEST_F(SstFileReaderTimestampTest, BasicDeleteRange) { + SstFileWriter writer(soptions_, options_); + ASSERT_OK(writer.Open(sst_name_)); + ASSERT_OK(writer.DeleteRange("key1", "key2", EncodeAsUint64(1))); + ASSERT_OK(writer.Finish()); + + SstFileReader reader(options_); + ASSERT_OK(reader.Open(sst_name_)); + ASSERT_OK(reader.VerifyChecksum()); + + ReadOptions read_options; + std::string ts = EncodeAsUint64(2); + Slice ts_slice = ts; + read_options.timestamp = &ts_slice; + FragmentedRangeTombstoneIterator* iter = + reader.NewRangeTombstoneIterator(read_options); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ( + StripTimestampFromUserKey(iter->start_key(), EncodeAsUint64(1).size()), + "key1"); + ASSERT_EQ( + StripTimestampFromUserKey(iter->end_key(), EncodeAsUint64(1).size()), + "key2"); + ASSERT_EQ(iter->timestamp(), EncodeAsUint64(1)); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + delete iter; +} + TEST_F(SstFileReaderTimestampTest, TimestampsOutOfOrder) { SstFileWriter writer(soptions_, options_); diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 67cfd4d30..a6d5a6cd4 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -131,15 +131,10 @@ struct SstFileWriter::Rep { return AddImpl(user_key_with_ts, value, value_type); } - Status DeleteRange(const Slice& begin_key, const Slice& end_key) { - if (internal_comparator.user_comparator()->timestamp_size() != 0) { - return Status::InvalidArgument("Timestamp size mismatch"); - } - + Status DeleteRangeImpl(const Slice& begin_key, const Slice& end_key) { if (!builder) { return Status::InvalidArgument("File is not opened"); } - RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */); if (file_info.num_range_del_entries == 0) { file_info.smallest_range_del_key.assign(tombstone.start_key_.data(), @@ -170,6 +165,45 @@ struct SstFileWriter::Rep { return Status::OK(); } + Status DeleteRange(const Slice& begin_key, const Slice& end_key) { + if (internal_comparator.user_comparator()->timestamp_size() != 0) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return DeleteRangeImpl(begin_key, end_key); + } + + // begin_key and end_key should be users keys without timestamp. + Status DeleteRange(const Slice& begin_key, const Slice& end_key, + const Slice& timestamp) { + const size_t timestamp_size = timestamp.size(); + + if (internal_comparator.user_comparator()->timestamp_size() != + timestamp_size) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + + const size_t begin_key_size = begin_key.size(); + const size_t end_key_size = end_key.size(); + if (begin_key.data() + begin_key_size == timestamp.data() || + end_key.data() + begin_key_size == timestamp.data()) { + assert(memcmp(begin_key.data() + begin_key_size, + end_key.data() + end_key_size, timestamp_size) == 0); + Slice begin_key_with_ts(begin_key.data(), + begin_key_size + timestamp_size); + Slice end_key_with_ts(end_key.data(), end_key.size() + timestamp_size); + return DeleteRangeImpl(begin_key_with_ts, end_key_with_ts); + } + std::string begin_key_with_ts; + begin_key_with_ts.reserve(begin_key_size + timestamp_size); + begin_key_with_ts.append(begin_key.data(), begin_key_size); + begin_key_with_ts.append(timestamp.data(), timestamp_size); + std::string end_key_with_ts; + end_key_with_ts.reserve(end_key_size + timestamp_size); + end_key_with_ts.append(end_key.data(), end_key_size); + end_key_with_ts.append(timestamp.data(), timestamp_size); + return DeleteRangeImpl(begin_key_with_ts, end_key_with_ts); + } + Status InvalidatePageCache(bool closing) { Status s = Status::OK(); if (invalidate_page_cache == false) { @@ -346,6 +380,11 @@ Status SstFileWriter::DeleteRange(const Slice& begin_key, return rep_->DeleteRange(begin_key, end_key); } +Status SstFileWriter::DeleteRange(const Slice& begin_key, const Slice& end_key, + const Slice& timestamp) { + return rep_->DeleteRange(begin_key, end_key, timestamp); +} + Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { Rep* r = rep_.get(); if (!r->builder) { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 07f754f3c..a2245ade1 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -389,8 +389,6 @@ ts_params = { "test_cf_consistency": 0, "test_batches_snapshots": 0, "user_timestamp_size": 8, - "delrangepercent": 0, - "delpercent": 5, "use_merge": 0, "use_full_merge_v1": 0, "use_txn": 0, @@ -515,14 +513,14 @@ def finalize_and_sanitize(src_params): # Multi-key operations are not currently compatible with transactions or # timestamp. - if ( - dest_params.get("test_batches_snapshots") == 1 - or dest_params.get("use_txn") == 1 - or dest_params.get("user_timestamp_size") > 0 - ): + if (dest_params.get("test_batches_snapshots") == 1 or + dest_params.get("use_txn") == 1 or + dest_params.get("user_timestamp_size") > 0): + dest_params["ingest_external_file_one_in"] = 0 + if (dest_params.get("test_batches_snapshots") == 1 or + dest_params.get("use_txn") == 1): dest_params["delpercent"] += dest_params["delrangepercent"] dest_params["delrangepercent"] = 0 - dest_params["ingest_external_file_one_in"] = 0 if ( dest_params.get("disable_wal") == 1 or dest_params.get("sync_fault_injection") == 1