User-defined timestamp support for `DeleteRange()` (#10661)

Summary:
Add user-defined timestamp support for range deletion. The new API is `DeleteRange(opt, cf, begin_key, end_key, ts)`. Most of the change is to update the comparator to compare without timestamp. Other than that, major changes are
- internal range tombstone data structures (`FragmentedRangeTombstoneList`, `RangeTombstone`, etc.) to store timestamps.
- Garbage collection of range tombstones and range tombstone covered keys during compaction.
- Get()/MultiGet() to return the timestamp of a range tombstone when needed.
- Get/Iterator with range tombstones bounded by readoptions.timestamp.
- timestamp crash test now issues DeleteRange by default.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10661

Test Plan:
- Added unit test: `make check`
- Stress test: `python3 tools/db_crashtest.py --enable_ts whitebox --readpercent=57 --prefixpercent=4 --writepercent=25 -delpercent=5 --iterpercent=5 --delrangepercent=4`
- Ran `db_bench` to measure regression when timestamp is not enabled. The tests are for write (with some range deletion) and iterate with DB fitting in memory: `./db_bench--benchmarks=fillrandom,seekrandom --writes_per_range_tombstone=200 --max_write_buffer_number=100 --min_write_buffer_number_to_merge=100 --writes=500000 --reads=500000 --seek_nexts=10 --disable_auto_compactions -disable_wal=true --max_num_range_tombstones=1000`.  Did not see consistent regression in no timestamp case.

| micros/op | fillrandom | seekrandom |
| --- | --- | --- |
|main| 2.58 |10.96|
|PR 10661| 2.68 |10.63|

Reviewed By: riversand963

Differential Revision: D39441192

Pulled By: cbi42

fbshipit-source-id: f05aca3c41605caf110daf0ff405919f300ddec2
main
Changyu Bi 2 years ago committed by Facebook GitHub Bot
parent 3b8164912e
commit 9f2363f4c4
  1. 3
      HISTORY.md
  2. 2
      db/builder.cc
  3. 13
      db/compaction/compaction_iterator.cc
  4. 5
      db/compaction/compaction_job.cc
  5. 80
      db/compaction/compaction_outputs.cc
  6. 9
      db/compaction/compaction_outputs.h
  7. 3
      db/db_impl/db_impl.h
  8. 29
      db/db_impl/db_impl_write.cc
  9. 1
      db/db_range_del_test.cc
  10. 447
      db/db_with_timestamp_basic_test.cc
  11. 13
      db/dbformat.cc
  12. 63
      db/dbformat.h
  13. 27
      db/external_sst_file_ingestion_job.cc
  14. 7
      db/flush_job.cc
  15. 56
      db/memtable.cc
  16. 2
      db/memtable.h
  17. 61
      db/range_del_aggregator.cc
  18. 33
      db/range_del_aggregator.h
  19. 123
      db/range_tombstone_fragmenter.cc
  20. 90
      db/range_tombstone_fragmenter.h
  21. 24
      db/table_cache.cc
  22. 1
      db/version_set.cc
  23. 37
      db/write_batch.cc
  24. 37
      db/write_batch_internal.h
  25. 8
      db/write_batch_test.cc
  26. 15
      db_stress_tool/db_stress_test_base.cc
  27. 3
      db_stress_tool/db_stress_test_base.h
  28. 17
      db_stress_tool/no_batched_ops_stress.cc
  29. 11
      include/rocksdb/db.h
  30. 4
      include/rocksdb/sst_file_reader.h
  31. 7
      include/rocksdb/sst_file_writer.h
  32. 9
      include/rocksdb/write_batch.h
  33. 5
      table/block_based/block_based_table_reader.cc
  34. 20
      table/get_context.cc
  35. 9
      table/get_context.h
  36. 21
      table/merging_iterator.cc
  37. 6
      table/sst_file_reader.cc
  38. 33
      table/sst_file_reader_test.cc
  39. 51
      table/sst_file_writer.cc
  40. 14
      tools/db_crashtest.py

@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### New Features
* `DeleteRange()` now supports user-defined timestamp.
### Bug Fixes
* Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed.
* Fixed a regression in iterator performance when the entire DB is a single memtable introduced in #10449. The fix is in #10705 and #10716.

@ -90,7 +90,7 @@ Status BuildTable(
iter->SeekToFirst();
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
new CompactionRangeDelAggregator(&tboptions.internal_comparator,
snapshots));
snapshots, full_history_ts_low));
uint64_t num_unfragmented_tombstones = 0;
uint64_t total_tombstone_payload_bytes = 0;
for (auto& range_del_iter : range_del_iters) {

@ -914,8 +914,17 @@ void CompactionIterator::NextFromInput() {
} else {
// 1. new user key -OR-
// 2. different snapshot stripe
bool should_delete = range_del_agg_->ShouldDelete(
key_, RangeDelPositioningMode::kForwardTraversal);
// If user-defined timestamp is enabled, we consider keys for GC if they
// are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete()
// only considers range deletions that are at or below history_ts_low_ and
// trim_ts_. We drop keys here that are below history_ts_low_ and are
// covered by a range tombstone that is at or below history_ts_low_ and
// trim_ts.
bool should_delete = false;
if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) {
should_delete = range_del_agg_->ShouldDelete(
key_, RangeDelPositioningMode::kForwardTraversal);
}
if (should_delete) {
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_range_del;

@ -1035,7 +1035,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
NotifyOnSubcompactionBegin(sub_compact);
auto range_del_agg = std::make_unique<CompactionRangeDelAggregator>(
&cfd->internal_comparator(), existing_snapshots_);
&cfd->internal_comparator(), existing_snapshots_, &full_history_ts_low_,
&trim_ts_);
// TODO: since we already use C++17, should use
// std::optional<const Slice> instead.
@ -1455,7 +1456,7 @@ Status CompactionJob::FinishCompactionOutputFile(
: nullptr,
sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr,
range_del_out_stats, bottommost_level_, cfd->internal_comparator(),
earliest_snapshot, next_table_min_key);
earliest_snapshot, next_table_min_key, full_history_ts_low_);
}
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");

@ -346,10 +346,10 @@ Status CompactionOutputs::AddToOutput(
}
Status CompactionOutputs::AddRangeDels(
const Slice* comp_start, const Slice* comp_end,
const Slice* comp_start_user_key, const Slice* comp_end_user_key,
CompactionIterationStats& range_del_out_stats, bool bottommost_level,
const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
const Slice& next_table_min_key) {
const Slice& next_table_min_key, const std::string& full_history_ts_low) {
assert(HasRangeDel());
FileMetaData& meta = current_output().meta;
const Comparator* ucmp = icmp.user_comparator();
@ -363,7 +363,7 @@ Status CompactionOutputs::AddRangeDels(
if (output_size == 1) {
// For the first output table, include range tombstones before the min
// key but after the subcompaction boundary.
lower_bound = comp_start;
lower_bound = comp_start_user_key;
lower_bound_from_sub_compact = true;
} else if (meta.smallest.size() > 0) {
// For subsequent output tables, only include range tombstones from min
@ -383,21 +383,22 @@ Status CompactionOutputs::AddRangeDels(
// use the smaller key as the upper bound of the output file, to ensure
// that there is no overlapping between different output files.
upper_bound_guard = ExtractUserKey(next_table_min_key);
if (comp_end != nullptr &&
ucmp->Compare(upper_bound_guard, *comp_end) >= 0) {
upper_bound = comp_end;
if (comp_end_user_key != nullptr &&
ucmp->CompareWithoutTimestamp(upper_bound_guard, *comp_end_user_key) >=
0) {
upper_bound = comp_end_user_key;
} else {
upper_bound = &upper_bound_guard;
}
} else {
// This is the last file in the subcompaction, so extend until the
// subcompaction ends.
upper_bound = comp_end;
upper_bound = comp_end_user_key;
}
bool has_overlapping_endpoints;
if (upper_bound != nullptr && meta.largest.size() > 0) {
has_overlapping_endpoints =
ucmp->Compare(meta.largest.user_key(), *upper_bound) == 0;
has_overlapping_endpoints = ucmp->CompareWithoutTimestamp(
meta.largest.user_key(), *upper_bound) == 0;
} else {
has_overlapping_endpoints = false;
}
@ -406,8 +407,8 @@ Status CompactionOutputs::AddRangeDels(
// bound. If the end of subcompaction is null or the upper bound is null,
// it means that this file is the last file in the compaction. So there
// will be no overlapping between this file and others.
assert(comp_end == nullptr || upper_bound == nullptr ||
ucmp->Compare(*upper_bound, *comp_end) <= 0);
assert(comp_end_user_key == nullptr || upper_bound == nullptr ||
ucmp->CompareWithoutTimestamp(*upper_bound, *comp_end_user_key) <= 0);
auto it = range_del_agg_->NewIterator(lower_bound, upper_bound,
has_overlapping_endpoints);
// Position the range tombstone output iterator. There may be tombstone
@ -421,7 +422,8 @@ Status CompactionOutputs::AddRangeDels(
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
if (upper_bound != nullptr) {
int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
int cmp =
ucmp->CompareWithoutTimestamp(*upper_bound, tombstone.start_key_);
if ((has_overlapping_endpoints && cmp < 0) ||
(!has_overlapping_endpoints && cmp <= 0)) {
// Tombstones starting after upper_bound only need to be included in
@ -434,7 +436,17 @@ Status CompactionOutputs::AddRangeDels(
}
}
if (bottommost_level && tombstone.seq_ <= earliest_snapshot) {
const size_t ts_sz = ucmp->timestamp_size();
// Garbage collection for range tombstones.
// If user-defined timestamp is enabled, range tombstones are dropped if
// they are at bottommost_level, below full_history_ts_low and not visible
// in any snapshot. trim_ts_ is passed to the constructor for
// range_del_agg_, and range_del_agg_ internally drops tombstones above
// trim_ts_.
if (bottommost_level && tombstone.seq_ <= earliest_snapshot &&
(ts_sz == 0 ||
(!full_history_ts_low.empty() &&
ucmp->CompareTimestamp(tombstone.ts_, full_history_ts_low) < 0))) {
// TODO(andrewkr): tombstones that span multiple output files are
// counted for each compaction output file, so lots of double
// counting.
@ -445,12 +457,13 @@ Status CompactionOutputs::AddRangeDels(
auto kv = tombstone.Serialize();
assert(lower_bound == nullptr ||
ucmp->Compare(*lower_bound, kv.second) < 0);
ucmp->CompareWithoutTimestamp(*lower_bound, kv.second) < 0);
// Range tombstone is not supported by output validator yet.
builder_->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
ucmp->CompareWithoutTimestamp(smallest_candidate.user_key(),
*lower_bound) <= 0) {
// Pretend the smallest key has the same user key as lower_bound
// (the max key in the previous table or subcompaction) in order for
// files to appear key-space partitioned.
@ -470,13 +483,23 @@ Status CompactionOutputs::AddRangeDels(
// choose lowest seqnum so this file's smallest internal key comes
// after the previous file's largest. The fake seqnum is OK because
// the read path's file-picking code only considers user key.
smallest_candidate = InternalKey(
*lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
kTypeRangeDeletion);
if (lower_bound_from_sub_compact) {
if (ts_sz) {
assert(tombstone.ts_.size() == ts_sz);
smallest_candidate = InternalKey(*lower_bound, tombstone.seq_,
kTypeRangeDeletion, tombstone.ts_);
} else {
smallest_candidate =
InternalKey(*lower_bound, tombstone.seq_, kTypeRangeDeletion);
}
} else {
smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
}
}
InternalKey largest_candidate = tombstone.SerializeEndKey();
if (upper_bound != nullptr &&
ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
ucmp->CompareWithoutTimestamp(*upper_bound,
largest_candidate.user_key()) <= 0) {
// Pretend the largest key has the same user key as upper_bound (the
// min key in the following table or subcompaction) in order for files
// to appear key-space partitioned.
@ -490,9 +513,22 @@ Status CompactionOutputs::AddRangeDels(
// kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
// kTypeRangeDeletion (0xF), so the range tombstone comes before the
// Seek() key in InternalKey's ordering. So Seek() will look in the
// next file for the user key.
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
// next file for the user key
if (ts_sz) {
static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
if (ts_sz <= strlen(kTsMax)) {
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion,
Slice(kTsMax, ts_sz));
} else {
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion,
std::string(ts_sz, '\xff'));
}
} else {
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
}
}
#ifndef NDEBUG
SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;

@ -168,11 +168,16 @@ class CompactionOutputs {
}
// Add range-dels from the aggregator to the current output file
Status AddRangeDels(const Slice* comp_start, const Slice* comp_end,
// @param comp_start_user_key and comp_end_user_key include timestamp if
// user-defined timestamp is enabled.
// @param full_history_ts_low used for range tombstone garbage collection.
Status AddRangeDels(const Slice* comp_start_user_key,
const Slice* comp_end_user_key,
CompactionIterationStats& range_del_out_stats,
bool bottommost_level, const InternalKeyComparator& icmp,
SequenceNumber earliest_snapshot,
const Slice& next_table_min_key);
const Slice& next_table_min_key,
const std::string& full_history_ts_low);
// if the outputs have range delete, range delete is also data
bool HasRangeDel() const {

@ -222,6 +222,9 @@ class DBImpl : public DB {
Status DeleteRange(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key) override;
Status DeleteRange(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key, const Slice& ts) override;
using DB::Write;
virtual Status Write(const WriteOptions& options,

@ -111,6 +111,17 @@ Status DBImpl::DeleteRange(const WriteOptions& write_options,
return DB::DeleteRange(write_options, column_family, begin_key, end_key);
}
Status DBImpl::DeleteRange(const WriteOptions& write_options,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key,
const Slice& ts) {
const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
if (!s.ok()) {
return s;
}
return DB::DeleteRange(write_options, column_family, begin_key, end_key, ts);
}
void DBImpl::SetRecoverableStatePreReleaseCallback(
PreReleaseCallback* callback) {
recoverable_state_pre_release_callback_.reset(callback);
@ -2361,6 +2372,24 @@ Status DB::DeleteRange(const WriteOptions& opt,
return Write(opt, &batch);
}
Status DB::DeleteRange(const WriteOptions& opt,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key,
const Slice& ts) {
ColumnFamilyHandle* default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.DeleteRange(column_family, begin_key, end_key, ts);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,

@ -13,6 +13,7 @@
namespace ROCKSDB_NAMESPACE {
// TODO(cbi): parameterize the test to cover user-defined timestamp cases
class DBRangeDelTest : public DBTestBase {
public:
DBRangeDelTest() : DBTestBase("db_range_del_test", /*env_do_fsync=*/false) {}

@ -56,7 +56,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) {
db_->SingleDelete(WriteOptions(), "key", dummy_ts).IsInvalidArgument());
ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"begin_key", "end_key", dummy_ts)
.IsNotSupported());
.IsInvalidArgument());
// Perform non-timestamp operations on "data" cf.
ASSERT_TRUE(
@ -85,6 +85,11 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) {
ASSERT_OK(wb.SingleDelete(handle, "key"));
ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
}
{
WriteBatch wb;
ASSERT_OK(wb.DeleteRange(handle, "begin_key", "end_key"));
ASSERT_TRUE(db_->Write(WriteOptions(), &wb).IsInvalidArgument());
}
// Perform timestamp operations with timestamps of incorrect size.
const std::string wrong_ts(sizeof(uint32_t), '\0');
@ -98,7 +103,7 @@ TEST_F(DBBasicTestWithTimestamp, SanityChecks) {
.IsInvalidArgument());
ASSERT_TRUE(
db_->DeleteRange(WriteOptions(), handle, "begin_key", "end_key", wrong_ts)
.IsNotSupported());
.IsInvalidArgument());
delete handle;
}
@ -215,6 +220,10 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) {
ts_str = Timestamp(4, 0);
ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v5"));
ts_str = Timestamp(5, 0);
ASSERT_OK(
db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k0", "k9", ts_str));
ts_str = Timestamp(3, 0);
Slice ts = ts_str;
CompactRangeOptions cro;
@ -234,6 +243,13 @@ TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) {
ASSERT_TRUE(db_->Get(ropts, "k3", &value, &key_ts).IsNotFound());
ASSERT_EQ(Timestamp(2, 0), key_ts);
ts_str = Timestamp(5, 0);
ts = ts_str;
ropts.timestamp = &ts;
ASSERT_TRUE(db_->Get(ropts, "k2", &value, &key_ts).IsNotFound());
ASSERT_EQ(Timestamp(5, 0), key_ts);
ASSERT_TRUE(db_->Get(ropts, "k2", &value).IsNotFound());
Close();
}
@ -590,6 +606,19 @@ TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
check_value_by_ts(db_, "k1", Timestamp(7, 0), Status::OK(), "v2",
Timestamp(4, 0));
Close();
Reopen(options);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "k1",
"k3", Timestamp(7, 0)));
check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::NotFound(), "",
Timestamp(7, 0));
Close();
// Trim data whose timestamp > Timestamp(6, 0), read(k1, ts(8)) <- v2
ASSERT_OK(DB::OpenAndTrimHistory(db_options, dbname_, column_families,
&handles_, &db_, Timestamp(6, 0)));
check_value_by_ts(db_, "k1", Timestamp(8, 0), Status::OK(), "v2",
Timestamp(4, 0));
Close();
}
TEST_F(DBBasicTestWithTimestamp, OpenAndTrimHistoryInvalidOptionTest) {
@ -2014,7 +2043,7 @@ constexpr int DataVisibilityTest::kTestDataSize;
// seq'=11
// write finishes
// GetImpl(ts,seq)
// It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=1t1 but seq<s1,
// It is OK to return <k, t1, s1> if ts>=t1 AND seq>=s1. If ts>=t1 but seq<s1,
// the key should not be returned.
TEST_F(DataVisibilityTest, PointLookupWithoutSnapshot1) {
Options options = CurrentOptions();
@ -3249,6 +3278,418 @@ TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) {
Close();
}
TEST_F(DBBasicTestWithTimestamp,
GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
std::string ts_str = Timestamp(1, 0);
WriteOptions wopts;
ASSERT_OK(db_->Put(wopts, "k1", ts_str, "v1"));
ASSERT_OK(db_->Put(wopts, "k2", ts_str, "v2"));
ASSERT_OK(db_->Put(wopts, "k3", ts_str, "v3"));
ts_str = Timestamp(2, 0);
ASSERT_OK(
db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3", ts_str));
ts_str = Timestamp(3, 0);
Slice ts = ts_str;
ReadOptions ropts;
ropts.timestamp = &ts;
CompactRangeOptions cro;
cro.full_history_ts_low = nullptr;
std::string value, key_ts;
Status s;
auto verify = [&] {
s = db_->Get(ropts, "k1", &value);
ASSERT_TRUE(s.IsNotFound());
s = db_->Get(ropts, "k2", &value, &key_ts);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(key_ts, Timestamp(2, 0));
ASSERT_OK(db_->Get(ropts, "k3", &value, &key_ts));
ASSERT_EQ(value, "v3");
ASSERT_EQ(Timestamp(1, 0), key_ts);
size_t batch_size = 3;
std::vector<std::string> key_strs = {"k1", "k2", "k3"};
std::vector<Slice> keys{key_strs.begin(), key_strs.end()};
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
values.data(), statuses.data(), true /* sorted_input */);
ASSERT_TRUE(statuses[0].IsNotFound());
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_OK(statuses[2]);
;
ASSERT_EQ(values[2], "v3");
};
verify();
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
verify();
std::string lb = Timestamp(0, 0);
Slice lb_slice = lb;
cro.full_history_ts_low = &lb_slice;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
verify();
Close();
}
TEST_F(DBBasicTestWithTimestamp,
GCRangeTombstonesAndCoveredKeysRespectingTslow) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
bbto.cache_index_and_filter_blocks = true;
bbto.whole_key_filtering = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.num_levels = 2;
DestroyAndReopen(options);
WriteOptions wopts;
ASSERT_OK(db_->Put(wopts, "k1", Timestamp(1, 0), "v1"));
ASSERT_OK(db_->Delete(wopts, "k2", Timestamp(2, 0)));
ASSERT_OK(db_->DeleteRange(wopts, db_->DefaultColumnFamily(), "k1", "k3",
Timestamp(3, 0)));
ASSERT_OK(db_->Put(wopts, "k3", Timestamp(4, 0), "v3"));
ReadOptions ropts;
std::string read_ts = Timestamp(5, 0);
Slice read_ts_slice = read_ts;
ropts.timestamp = &read_ts_slice;
size_t batch_size = 3;
std::vector<std::string> key_strs = {"k1", "k2", "k3"};
std::vector<Slice> keys = {key_strs.begin(), key_strs.end()};
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
std::vector<std::string> timestamps(batch_size);
db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
values.data(), timestamps.data(), statuses.data(),
true /* sorted_input */);
ASSERT_TRUE(statuses[0].IsNotFound());
ASSERT_EQ(timestamps[0], Timestamp(3, 0));
ASSERT_TRUE(statuses[1].IsNotFound());
// DeleteRange has a higher timestamp than Delete for "k2"
ASSERT_EQ(timestamps[1], Timestamp(3, 0));
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], "v3");
ASSERT_EQ(timestamps[2], Timestamp(4, 0));
CompactRangeOptions cro;
// Range tombstone has timestamp >= full_history_ts_low, covered keys
// are not dropped.
std::string compaction_ts_str = Timestamp(2, 0);
Slice compaction_ts = compaction_ts_str;
cro.full_history_ts_low = &compaction_ts;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ropts.timestamp = &compaction_ts;
std::string value, ts;
ASSERT_OK(db_->Get(ropts, "k1", &value, &ts));
ASSERT_EQ(value, "v1");
// timestamp is below full_history_ts_low, zeroed out as the key goes into
// bottommost level
ASSERT_EQ(ts, Timestamp(0, 0));
ASSERT_TRUE(db_->Get(ropts, "k2", &value, &ts).IsNotFound());
ASSERT_EQ(ts, Timestamp(2, 0));
compaction_ts_str = Timestamp(4, 0);
compaction_ts = compaction_ts_str;
cro.full_history_ts_low = &compaction_ts;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ropts.timestamp = &read_ts_slice;
// k1, k2 and the range tombstone should be dropped
// k3 should still exist
db_->MultiGet(ropts, db_->DefaultColumnFamily(), batch_size, keys.data(),
values.data(), timestamps.data(), statuses.data(),
true /* sorted_input */);
ASSERT_TRUE(statuses[0].IsNotFound());
ASSERT_TRUE(timestamps[0].empty());
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_TRUE(timestamps[1].empty());
ASSERT_OK(statuses[2]);
ASSERT_EQ(values[2], "v3");
ASSERT_EQ(timestamps[2], Timestamp(4, 0));
Close();
}
TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25;
Options options = CurrentOptions();
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
options.compression = kNoCompression;
BlockBasedTableOptions bbto;
bbto.index_type = GetParam();
bbto.block_size = 100;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
DestroyAndReopen(options);
// Write half of the keys before the tombstone and half after the tombstone.
// Only covered keys (i.e., within the range and older than the tombstone)
// should be deleted.
for (int i = 0; i < kNum; ++i) {
if (i == kNum / 2) {
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key1(kRangeBegin), Key1(kRangeEnd),
Timestamp(i, 0)));
}
ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
"val" + std::to_string(i)));
if (i == kNum - kNumPerFile) {
ASSERT_OK(Flush());
}
}
ReadOptions read_opts;
read_opts.total_order_seek = true;
std::string read_ts = Timestamp(kNum, 0);
Slice read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
ASSERT_OK(iter->status());
int expected = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(Key1(expected), iter->key());
if (expected == kRangeBegin - 1) {
expected = kNum / 2;
} else {
++expected;
}
}
ASSERT_EQ(kNum, expected);
expected = kNum / 2;
for (iter->Seek(Key1(kNum / 2)); iter->Valid(); iter->Next()) {
ASSERT_EQ(Key1(expected), iter->key());
++expected;
}
ASSERT_EQ(kNum, expected);
expected = kRangeBegin - 1;
for (iter->SeekForPrev(Key1(kNum / 2 - 1)); iter->Valid(); iter->Prev()) {
ASSERT_EQ(Key1(expected), iter->key());
--expected;
}
ASSERT_EQ(-1, expected);
read_ts = Timestamp(0, 0);
read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
iter.reset(db_->NewIterator(read_opts));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key1(0));
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
read_ts = Timestamp(kNum, 0);
read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
std::string value, timestamp;
Status s;
for (int i = 0; i < kNum; ++i) {
s = db_->Get(read_opts, Key1(i), &value, &timestamp);
if (i >= kRangeBegin && i < kNum / 2) {
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(timestamp, Timestamp(kNum / 2, 0));
} else {
ASSERT_OK(s);
ASSERT_EQ(value, "val" + std::to_string(i));
ASSERT_EQ(timestamp, Timestamp(i, 0));
}
}
size_t batch_size = kNum;
std::vector<std::string> key_strs(batch_size);
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
std::vector<std::string> timestamps(batch_size);
for (int i = 0; i < kNum; ++i) {
key_strs[i] = Key1(i);
keys[i] = key_strs[i];
}
db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size, keys.data(),
values.data(), timestamps.data(), statuses.data(),
true /* sorted_input */);
for (int i = 0; i < kNum; ++i) {
if (i >= kRangeBegin && i < kNum / 2) {
ASSERT_TRUE(statuses[i].IsNotFound());
ASSERT_EQ(timestamps[i], Timestamp(kNum / 2, 0));
} else {
ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], "val" + std::to_string(i));
ASSERT_EQ(timestamps[i], Timestamp(i, 0));
}
}
Close();
}
TEST_F(DBBasicTestWithTimestamp, DeleteRangeGetIteratorWithSnapshot) {
// 4 keys 0, 1, 2, 3 at timestamps 0, 1, 2, 3 respectively.
// A range tombstone [1, 3) at timestamp 1 and has a sequence number between
// key 1 and 2.
Options options = CurrentOptions();
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);
WriteOptions write_opts;
std::string put_ts = Timestamp(0, 0);
const int kNum = 4, kNumPerFile = 1, kRangeBegin = 1, kRangeEnd = 3;
options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
const Snapshot* before_tombstone = nullptr;
const Snapshot* after_tombstone = nullptr;
for (int i = 0; i < kNum; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
"val" + std::to_string(i)));
if (i == kRangeBegin) {
before_tombstone = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key1(kRangeBegin), Key1(kRangeEnd),
Timestamp(kRangeBegin, 0)));
}
if (i == kNum / 2) {
ASSERT_OK(Flush());
}
}
assert(before_tombstone);
after_tombstone = db_->GetSnapshot();
// snapshot and ts before tombstone
std::string read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0)
Slice read_ts = read_ts_str;
ReadOptions read_opts;
read_opts.timestamp = &read_ts;
read_opts.snapshot = before_tombstone;
std::vector<Status> expected_status = {
Status::OK(), Status::NotFound(), Status::NotFound(), Status::NotFound()};
std::vector<std::string> expected_values(kNum);
expected_values[0] = "val" + std::to_string(0);
std::vector<std::string> expected_timestamps(kNum);
expected_timestamps[0] = Timestamp(0, 0);
size_t batch_size = kNum;
std::vector<std::string> key_strs(batch_size);
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
std::vector<std::string> timestamps(batch_size);
for (int i = 0; i < kNum; ++i) {
key_strs[i] = Key1(i);
keys[i] = key_strs[i];
}
auto verify = [&] {
db_->MultiGet(read_opts, db_->DefaultColumnFamily(), batch_size,
keys.data(), values.data(), timestamps.data(),
statuses.data(), true /* sorted_input */);
std::string value, timestamp;
Status s;
for (int i = 0; i < kNum; ++i) {
s = db_->Get(read_opts, Key1(i), &value, &timestamp);
ASSERT_EQ(s, expected_status[i]);
ASSERT_EQ(statuses[i], expected_status[i]);
if (s.ok()) {
ASSERT_EQ(value, expected_values[i]);
ASSERT_EQ(values[i], expected_values[i]);
}
if (!timestamp.empty()) {
ASSERT_EQ(timestamp, expected_timestamps[i]);
ASSERT_EQ(timestamps[i], expected_timestamps[i]);
} else {
ASSERT_TRUE(timestamps[i].empty());
}
}
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
std::unique_ptr<Iterator> iter_for_seek(db_->NewIterator(read_opts));
iter->SeekToFirst();
for (int i = 0; i < kNum; ++i) {
if (expected_status[i].ok()) {
auto verify_iter = [&](Iterator* iter_ptr) {
ASSERT_TRUE(iter_ptr->Valid());
ASSERT_EQ(iter_ptr->key(), keys[i]);
ASSERT_EQ(iter_ptr->value(), expected_values[i]);
ASSERT_EQ(iter_ptr->timestamp(), expected_timestamps[i]);
};
verify_iter(iter.get());
iter->Next();
iter_for_seek->Seek(keys[i]);
verify_iter(iter_for_seek.get());
iter_for_seek->SeekForPrev(keys[i]);
verify_iter(iter_for_seek.get());
}
}
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
};
verify();
// snapshot before tombstone and ts after tombstone
read_ts_str = Timestamp(kNum, 0); // (4, 0)
read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
read_opts.snapshot = before_tombstone;
expected_status[1] = Status::OK();
expected_timestamps[1] = Timestamp(1, 0);
expected_values[1] = "val" + std::to_string(1);
verify();
// snapshot after tombstone and ts before tombstone
read_ts_str = Timestamp(kRangeBegin - 1, 0); // (0, 0)
read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
read_opts.snapshot = after_tombstone;
expected_status[1] = Status::NotFound();
expected_timestamps[1].clear();
expected_values[1].clear();
verify();
// snapshot and ts after tombstone
read_ts_str = Timestamp(kNum, 0); // (4, 0)
read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
read_opts.snapshot = after_tombstone;
for (int i = 0; i < kNum; ++i) {
if (i == kRangeBegin) {
expected_status[i] = Status::NotFound();
expected_values[i].clear();
} else {
expected_status[i] = Status::OK();
expected_values[i] = "val" + std::to_string(i);
}
expected_timestamps[i] = Timestamp(i, 0);
}
verify();
db_->ReleaseSnapshot(before_tombstone);
db_->ReleaseSnapshot(after_tombstone);
Close();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -88,6 +88,19 @@ void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key,
result->append(kTsMax.data(), ts_sz);
}
void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz) {
assert(ts_sz > 0);
result->append(key.data(), key.size() - ts_sz);
static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
if (ts_sz < strlen(kTsMax)) {
result->append(kTsMax, ts_sz);
} else {
result->append(std::string(ts_sz, '\xff'));
}
}
std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const {
std::string result = "'";
if (log_err_key) {

@ -182,6 +182,11 @@ extern void AppendKeyWithMinTimestamp(std::string* result, const Slice& key,
extern void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz);
// `key` is a user key with timestamp. Append the user key without timestamp
// and the maximal timestamp to *result.
extern void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz);
// Attempt to parse an internal key from "internal_key". On success,
// stores the parsed data in "*result", and returns true.
//
@ -290,6 +295,10 @@ class InternalKey {
InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) {
AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t));
}
InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t, Slice ts) {
AppendInternalKeyWithDifferentTimestamp(
&rep_, ParsedInternalKey(_user_key, s, t), ts);
}
// sets the internal key to be bigger or equal to all internal keys with this
// user key
@ -324,11 +333,24 @@ class InternalKey {
SetFrom(ParsedInternalKey(_user_key, s, t));
}
void Set(const Slice& _user_key_with_ts, SequenceNumber s, ValueType t,
const Slice& ts) {
ParsedInternalKey pik = ParsedInternalKey(_user_key_with_ts, s, t);
// Should not call pik.SetTimestamp() directly as it overwrites the buffer
// containing _user_key.
SetFrom(pik, ts);
}
void SetFrom(const ParsedInternalKey& p) {
rep_.clear();
AppendInternalKey(&rep_, p);
}
void SetFrom(const ParsedInternalKey& p, const Slice& ts) {
rep_.clear();
AppendInternalKeyWithDifferentTimestamp(&rep_, p, ts);
}
void Clear() { rep_.clear(); }
// The underlying representation.
@ -518,7 +540,9 @@ class IterKey {
bool IsKeyPinned() const { return (key_ != buf_); }
// user_key does not have timestamp.
// If `ts` is provided, user_key should not contain timestamp,
// and `ts` is appended after user_key.
// TODO: more efficient storage for timestamp.
void SetInternalKey(const Slice& key_prefix, const Slice& user_key,
SequenceNumber s,
ValueType value_type = kValueTypeForSeek,
@ -671,16 +695,38 @@ extern Status ReadRecordFromWriteBatch(Slice* input, char* tag,
// When user call DeleteRange() to delete a range of keys,
// we will store a serialized RangeTombstone in MemTable and SST.
// the struct here is a easy-understood form
// the struct here is an easy-understood form
// start/end_key_ is the start/end user key of the range to be deleted
struct RangeTombstone {
Slice start_key_;
Slice end_key_;
SequenceNumber seq_;
// TODO: we should optimize the storage here when user-defined timestamp
// is NOT enabled: they currently take up (16 + 32 + 32) bytes per tombstone.
Slice ts_;
std::string pinned_start_key_;
std::string pinned_end_key_;
RangeTombstone() = default;
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn)
: start_key_(sk), end_key_(ek), seq_(sn) {}
// User-defined timestamp is enabled, `sk` and `ek` should be user key
// with timestamp, `ts` will replace the timestamps in `sk` and
// `ek`.
RangeTombstone(Slice sk, Slice ek, SequenceNumber sn, Slice ts)
: seq_(sn), ts_(ts) {
assert(!ts.empty());
pinned_start_key_.reserve(sk.size());
pinned_start_key_.append(sk.data(), sk.size() - ts.size());
pinned_start_key_.append(ts.data(), ts.size());
pinned_end_key_.reserve(ek.size());
pinned_end_key_.append(ek.data(), ek.size() - ts.size());
pinned_end_key_.append(ts.data(), ts.size());
start_key_ = pinned_start_key_;
end_key_ = pinned_end_key_;
}
RangeTombstone(ParsedInternalKey parsed_key, Slice value) {
start_key_ = parsed_key.user_key;
seq_ = parsed_key.sequence;
@ -690,8 +736,7 @@ struct RangeTombstone {
// be careful to use Serialize(), allocates new memory
std::pair<InternalKey, Slice> Serialize() const {
auto key = InternalKey(start_key_, seq_, kTypeRangeDeletion);
Slice value = end_key_;
return std::make_pair(std::move(key), std::move(value));
return std::make_pair(std::move(key), end_key_);
}
// be careful to use SerializeKey(), allocates new memory
@ -707,6 +752,16 @@ struct RangeTombstone {
//
// be careful to use SerializeEndKey(), allocates new memory
InternalKey SerializeEndKey() const {
if (!ts_.empty()) {
static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
if (ts_.size() <= strlen(kTsMax)) {
return InternalKey(end_key_, kMaxSequenceNumber, kTypeRangeDeletion,
Slice(kTsMax, ts_.size()));
} else {
return InternalKey(end_key_, kMaxSequenceNumber, kTypeRangeDeletion,
std::string(ts_.size(), '\xff'));
}
}
return InternalKey(end_key_, kMaxSequenceNumber, kTypeRangeDeletion);
}
};

@ -339,9 +339,30 @@ Status ExternalSstFileIngestionJob::Prepare(
Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
SuperVersion* super_version) {
autovector<Range> ranges;
for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
file_to_ingest.largest_internal_key.user_key());
autovector<std::string> keys;
size_t ts_sz = cfd_->user_comparator()->timestamp_size();
if (ts_sz) {
// Check all ranges [begin, end] inclusively. Add maximum
// timestamp to include all `begin` keys, and add minimal timestamp to
// include all `end` keys.
for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
std::string begin_str;
std::string end_str;
AppendUserKeyWithMaxTimestamp(
&begin_str, file_to_ingest.smallest_internal_key.user_key(), ts_sz);
AppendKeyWithMinTimestamp(
&end_str, file_to_ingest.largest_internal_key.user_key(), ts_sz);
keys.emplace_back(std::move(begin_str));
keys.emplace_back(std::move(end_str));
}
for (size_t i = 0; i < files_to_ingest_.size(); ++i) {
ranges.emplace_back(keys[2 * i], keys[2 * i + 1]);
}
} else {
for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
file_to_ingest.largest_internal_key.user_key());
}
}
Status status = cfd_->RangesOverlapWithMemtables(
ranges, super_version, db_options_.allow_data_in_errors, flush_needed);

@ -420,9 +420,11 @@ Status FlushJob::MemPurge() {
// Place iterator at the First (meaning most recent) key node.
iter->SeekToFirst();
const std::string* const full_history_ts_low = &(cfd_->GetFullHistoryTsLow());
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
existing_snapshots_));
existing_snapshots_,
full_history_ts_low));
for (auto& rd_iter : range_del_iters) {
range_del_agg->AddTombstones(std::move(rd_iter));
}
@ -479,8 +481,7 @@ Status FlushJob::MemPurge() {
ioptions->enforce_single_del_contracts,
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr, ioptions->info_log,
&(cfd_->GetFullHistoryTsLow()));
/*shutting_down=*/nullptr, ioptions->info_log, full_history_ts_low);
// Set earliest sequence number in the new memtable
// to be equal to the earliest sequence number of the

@ -573,7 +573,7 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
assert(IsFragmentedRangeTombstonesConstructed());
return new FragmentedRangeTombstoneIterator(
fragmented_range_tombstone_list_.get(), comparator_.comparator,
read_seq);
read_seq, read_options.timestamp);
}
// takes current cache
@ -596,8 +596,9 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
cache->reader_mutex.unlock();
}
return new FragmentedRangeTombstoneIterator(cache, comparator_.comparator,
read_seq);
auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
cache, comparator_.comparator, read_seq, read_options.timestamp);
return fragmented_iter;
}
void MemTable::ConstructFragmentedRangeTombstones() {
@ -946,6 +947,10 @@ static bool SaveValue(void* arg, const char* entry) {
const Comparator* user_comparator =
s->mem->GetInternalKeyComparator().user_comparator();
size_t ts_sz = user_comparator->timestamp_size();
if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) {
// timestamp should already be set to range tombstone timestamp
assert(s->timestamp->size() == ts_sz);
}
if (user_comparator->EqualWithoutTimestamp(user_key_slice,
s->key->user_key())) {
// Correct user key
@ -960,10 +965,20 @@ static bool SaveValue(void* arg, const char* entry) {
if (s->seq == kMaxSequenceNumber) {
s->seq = seq;
if (s->seq > max_covering_tombstone_seq) {
if (ts_sz && s->timestamp != nullptr) {
// `timestamp` was set to range tombstone's timestamp before
// `SaveValue` is ever called. This key has a higher sequence number
// than range tombstone, and is the key with the highest seqno across
// all keys with this user_key, so we update timestamp here.
Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
s->timestamp->assign(ts.data(), ts_sz);
}
} else {
s->seq = max_covering_tombstone_seq;
}
}
s->seq = std::max(s->seq, max_covering_tombstone_seq);
if (ts_sz > 0 && s->timestamp != nullptr) {
if (!s->timestamp->empty()) {
assert(ts_sz == s->timestamp->size());
@ -978,7 +993,8 @@ static bool SaveValue(void* arg, const char* entry) {
}
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
type == kTypeWideColumnEntity) &&
type == kTypeWideColumnEntity || type == kTypeDeletion ||
type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp) &&
max_covering_tombstone_seq > seq) {
type = kTypeRangeDeletion;
}
@ -1139,9 +1155,17 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
GetInternalKeySeqno(key.internal_key()),
immutable_memtable));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq =
std::max(*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
SequenceNumber covering_seq =
range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key());
if (covering_seq > *max_covering_tombstone_seq) {
*max_covering_tombstone_seq = covering_seq;
if (timestamp) {
// Will be overwritten in SaveValue() if there is a point key with
// a higher seqno.
timestamp->assign(range_del_iter->timestamp().data(),
range_del_iter->timestamp().size());
}
}
}
bool found_final_value = false;
@ -1272,9 +1296,17 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
NewRangeTombstoneIteratorInternal(
read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
immutable_memtable));
iter->max_covering_tombstone_seq = std::max(
iter->max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
SequenceNumber covering_seq =
range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key());
if (covering_seq > iter->max_covering_tombstone_seq) {
iter->max_covering_tombstone_seq = covering_seq;
if (iter->timestamp) {
// Will be overwritten in SaveValue() if there is a point key with
// a higher seqno.
iter->timestamp->assign(range_del_iter->timestamp().data(),
range_del_iter->timestamp().size());
}
}
}
SequenceNumber dummy_seq;
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,

@ -639,6 +639,8 @@ class MemTable {
// Always returns non-null and assumes certain pre-checks (e.g.,
// is_range_del_table_empty_) are done. This is only valid during the lifetime
// of the underlying memtable.
// read_seq and read_options.timestamp will be used as the upper bound
// for range tombstones.
FragmentedRangeTombstoneIterator* NewRangeTombstoneIteratorInternal(
const ReadOptions& read_options, SequenceNumber read_seq,
bool immutable_memtable);

@ -85,7 +85,7 @@ bool TruncatedRangeDelIterator::Valid() const {
icmp_->Compare(iter_->parsed_start_key(), *largest_) < 0);
}
// NOTE: target is a user key
// NOTE: target is a user key, with timestamp if enabled.
void TruncatedRangeDelIterator::Seek(const Slice& target) {
if (largest_ != nullptr &&
icmp_->Compare(*largest_, ParsedInternalKey(target, kMaxSequenceNumber,
@ -101,7 +101,7 @@ void TruncatedRangeDelIterator::Seek(const Slice& target) {
iter_->Seek(target);
}
// NOTE: target is a user key
// NOTE: target is a user key, with timestamp if enabled.
void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) {
if (smallest_ != nullptr &&
icmp_->Compare(ParsedInternalKey(target, 0, kTypeRangeDeletion),
@ -339,11 +339,22 @@ void CompactionRangeDelAggregator::AddTombstones(
if (input_iter == nullptr || input_iter->empty()) {
return;
}
// This bounds output of CompactionRangeDelAggregator::NewIterator.
if (!trim_ts_.empty()) {
assert(icmp_->user_comparator()->timestamp_size() > 0);
input_iter->SetTimestampUpperBound(&trim_ts_);
}
assert(input_iter->lower_bound() == 0);
assert(input_iter->upper_bound() == kMaxSequenceNumber);
parent_iters_.emplace_back(new TruncatedRangeDelIterator(
std::move(input_iter), icmp_, smallest, largest));
Slice* ts_upper_bound = nullptr;
if (!ts_upper_bound_.empty()) {
assert(icmp_->user_comparator()->timestamp_size() > 0);
ts_upper_bound = &ts_upper_bound_;
}
auto split_iters = parent_iters_.back()->SplitBySnapshot(*snapshots_);
for (auto& split_iter : split_iters) {
auto it = reps_.find(split_iter.first);
@ -356,6 +367,16 @@ void CompactionRangeDelAggregator::AddTombstones(
assert(inserted);
}
assert(it != reps_.end());
// ts_upper_bound is used to bound ShouldDelete() to only consider
// range tombstones under full_history_ts_low_ and trim_ts_. Keys covered by
// range tombstones that are above full_history_ts_low_ should not be
// dropped prematurely: user may read with a timestamp between the range
// tombstone and the covered key. Note that we cannot set timestamp
// upperbound on the original `input_iter` since `input_iter`s are later
// used in CompactionRangeDelAggregator::NewIterator to output range
// tombstones for persistence. We do not want to only persist range
// tombstones with timestamp lower than ts_upper_bound.
split_iter.second->SetTimestampUpperBound(ts_upper_bound);
it->second.AddTombstones(std::move(split_iter.second));
}
}
@ -371,6 +392,12 @@ bool CompactionRangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed,
namespace {
// Produce a sorted (by start internal key) stream of range tombstones from
// `children`. lower_bound and upper_bound on user key can be
// optionally specified. Range tombstones that ends before lower_bound or starts
// after upper_bound are excluded.
// If user-defined timestamp is enabled, lower_bound and upper_bound should
// contain timestamp, but comparison is done ignoring timestamps.
class TruncatedRangeDelMergingIter : public InternalIterator {
public:
TruncatedRangeDelMergingIter(
@ -381,7 +408,8 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
lower_bound_(lower_bound),
upper_bound_(upper_bound),
upper_bound_inclusive_(upper_bound_inclusive),
heap_(StartKeyMinComparator(icmp)) {
heap_(StartKeyMinComparator(icmp)),
ts_sz_(icmp_->user_comparator()->timestamp_size()) {
for (auto& child : children) {
if (child != nullptr) {
assert(child->lower_bound() == 0);
@ -422,15 +450,28 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
Slice key() const override {
auto* top = heap_.top();
cur_start_key_.Set(top->start_key().user_key, top->seq(),
kTypeRangeDeletion);
if (ts_sz_) {
cur_start_key_.Set(top->start_key().user_key, top->seq(),
kTypeRangeDeletion, top->timestamp());
} else {
cur_start_key_.Set(top->start_key().user_key, top->seq(),
kTypeRangeDeletion);
}
assert(top->start_key().user_key.size() >= ts_sz_);
return cur_start_key_.Encode();
}
Slice value() const override {
auto* top = heap_.top();
assert(top->end_key().sequence == kMaxSequenceNumber);
return top->end_key().user_key;
if (!ts_sz_) {
return top->end_key().user_key;
}
assert(top->timestamp().size() == ts_sz_);
cur_end_key_.clear();
cur_end_key_.append(top->end_key().user_key.data(),
top->end_key().user_key.size() - ts_sz_);
cur_end_key_.append(top->timestamp().data(), ts_sz_);
return cur_end_key_;
}
// Unused InternalIterator methods
@ -444,8 +485,8 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
if (upper_bound_ == nullptr) {
return true;
}
int cmp = icmp_->user_comparator()->Compare(iter->start_key().user_key,
*upper_bound_);
int cmp = icmp_->user_comparator()->CompareWithoutTimestamp(
iter->start_key().user_key, *upper_bound_);
return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0;
}
@ -457,6 +498,8 @@ class TruncatedRangeDelMergingIter : public InternalIterator {
std::vector<TruncatedRangeDelIterator*> children_;
mutable InternalKey cur_start_key_;
mutable std::string cur_end_key_;
size_t ts_sz_;
};
} // namespace

@ -72,6 +72,13 @@ class TruncatedRangeDelIterator {
}
SequenceNumber seq() const { return iter_->seq(); }
Slice timestamp() const {
assert(icmp_->user_comparator()->timestamp_size());
return iter_->timestamp();
}
void SetTimestampUpperBound(const Slice* ts_upper_bound) {
iter_->SetTimestampUpperBound(ts_upper_bound);
}
std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
SplitBySnapshot(const std::vector<SequenceNumber>& snapshots);
@ -332,6 +339,8 @@ class RangeDelAggregator {
}
}
// If user-defined timestamp is enabled, `start` and `end` are user keys
// with timestamp.
bool IsRangeOverlapped(const Slice& start, const Slice& end);
private:
@ -395,8 +404,25 @@ class ReadRangeDelAggregator final : public RangeDelAggregator {
class CompactionRangeDelAggregator : public RangeDelAggregator {
public:
CompactionRangeDelAggregator(const InternalKeyComparator* icmp,
const std::vector<SequenceNumber>& snapshots)
: RangeDelAggregator(icmp), snapshots_(&snapshots) {}
const std::vector<SequenceNumber>& snapshots,
const std::string* full_history_ts_low = nullptr,
const std::string* trim_ts = nullptr)
: RangeDelAggregator(icmp), snapshots_(&snapshots) {
if (full_history_ts_low) {
ts_upper_bound_ = *full_history_ts_low;
}
if (trim_ts) {
trim_ts_ = *trim_ts;
// Range tombstone newer than `trim_ts` or `full_history_ts_low` should
// not be considered in ShouldDelete().
if (ts_upper_bound_.empty()) {
ts_upper_bound_ = trim_ts_;
} else if (!trim_ts_.empty() && icmp->user_comparator()->CompareTimestamp(
trim_ts_, ts_upper_bound_) < 0) {
ts_upper_bound_ = trim_ts_;
}
}
}
~CompactionRangeDelAggregator() override {}
void AddTombstones(
@ -442,6 +468,9 @@ class CompactionRangeDelAggregator : public RangeDelAggregator {
std::map<SequenceNumber, StripeRep> reps_;
const std::vector<SequenceNumber>* snapshots_;
// min over full_history_ts_low and trim_ts_
Slice ts_upper_bound_{};
Slice trim_ts_{};
};
} // namespace ROCKSDB_NAMESPACE

@ -84,6 +84,7 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
// for use in flush_current_tombstones.
std::set<ParsedInternalKey, ParsedInternalKeyComparator> cur_end_keys(cmp);
size_t ts_sz = icmp.user_comparator()->timestamp_size();
// Given the next start key in unfragmented_tombstones,
// flush_current_tombstones writes every tombstone fragment that starts
// and ends with a key before next_start_key, and starts with a key greater
@ -93,12 +94,14 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
bool reached_next_start_key = false;
for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) {
Slice cur_end_key = it->user_key;
if (icmp.user_comparator()->Compare(cur_start_key, cur_end_key) == 0) {
if (icmp.user_comparator()->CompareWithoutTimestamp(cur_start_key,
cur_end_key) == 0) {
// Empty tombstone.
continue;
}
if (icmp.user_comparator()->Compare(next_start_key, cur_end_key) <= 0) {
// All of the end keys in [it, cur_end_keys.end()) are after
if (icmp.user_comparator()->CompareWithoutTimestamp(next_start_key,
cur_end_key) <= 0) {
// All the end keys in [it, cur_end_keys.end()) are after
// next_start_key, so the tombstones they represent can be used in
// fragments that start with keys greater than or equal to
// next_start_key. However, the end keys we already passed will not be
@ -115,22 +118,38 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
// Flush a range tombstone fragment [cur_start_key, cur_end_key), which
// should not overlap with the last-flushed tombstone fragment.
assert(tombstones_.empty() ||
icmp.user_comparator()->Compare(tombstones_.back().end_key,
cur_start_key) <= 0);
icmp.user_comparator()->CompareWithoutTimestamp(
tombstones_.back().end_key, cur_start_key) <= 0);
// Sort the sequence numbers of the tombstones being fragmented in
// descending order, and then flush them in that order.
autovector<SequenceNumber> seqnums_to_flush;
autovector<Slice> timestamps_to_flush;
for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
seqnums_to_flush.push_back(flush_it->sequence);
if (ts_sz) {
timestamps_to_flush.push_back(
ExtractTimestampFromUserKey(flush_it->user_key, ts_sz));
}
}
// TODO: bind the two sorting together to be more efficient
std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
std::greater<SequenceNumber>());
if (ts_sz) {
std::sort(timestamps_to_flush.begin(), timestamps_to_flush.end(),
[icmp](const Slice& ts1, const Slice& ts2) {
return icmp.user_comparator()->CompareTimestamp(ts1, ts2) >
0;
});
}
size_t start_idx = tombstone_seqs_.size();
size_t end_idx = start_idx + seqnums_to_flush.size();
if (for_compaction) {
// If user-defined timestamp is enabled, we should not drop tombstones
// from any snapshot stripe. Garbage collection of range tombstones
// happens in CompactionOutputs::AddRangeDels().
if (for_compaction && ts_sz == 0) {
// Drop all tombstone seqnums that are not preserved by a snapshot.
SequenceNumber next_snapshot = kMaxSequenceNumber;
for (auto seq : seqnums_to_flush) {
@ -155,10 +174,33 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
seqnums_to_flush.end());
seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end());
if (ts_sz) {
tombstone_timestamps_.insert(tombstone_timestamps_.end(),
timestamps_to_flush.begin(),
timestamps_to_flush.end());
}
}
assert(start_idx < end_idx);
tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx);
if (ts_sz) {
std::string start_key_with_max_ts;
AppendUserKeyWithMaxTimestamp(&start_key_with_max_ts, cur_start_key,
ts_sz);
pinned_slices_.emplace_back(std::move(start_key_with_max_ts));
Slice start_key = pinned_slices_.back();
std::string end_key_with_max_ts;
AppendUserKeyWithMaxTimestamp(&end_key_with_max_ts, cur_end_key, ts_sz);
pinned_slices_.emplace_back(std::move(end_key_with_max_ts));
Slice end_key = pinned_slices_.back();
// RangeTombstoneStack expects start_key and end_key to have max
// timestamp.
tombstones_.emplace_back(start_key, end_key, start_idx, end_idx);
} else {
tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx,
end_idx);
}
cur_start_key = cur_end_key;
}
@ -193,8 +235,9 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
tombstone_end_key.size());
tombstone_end_key = pinned_slices_.back();
}
if (!cur_end_keys.empty() && icmp.user_comparator()->Compare(
cur_start_key, tombstone_start_key) != 0) {
if (!cur_end_keys.empty() &&
icmp.user_comparator()->CompareWithoutTimestamp(
cur_start_key, tombstone_start_key) != 0) {
// The start key has changed. Flush all tombstones that start before
// this new start key.
flush_current_tombstones(tombstone_start_key);
@ -223,14 +266,15 @@ bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
const FragmentedRangeTombstoneList* tombstones,
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
SequenceNumber _lower_bound)
const Slice* ts_upper_bound, SequenceNumber _lower_bound)
: tombstone_start_cmp_(icmp.user_comparator()),
tombstone_end_cmp_(icmp.user_comparator()),
icmp_(&icmp),
ucmp_(icmp.user_comparator()),
tombstones_(tombstones),
upper_bound_(_upper_bound),
lower_bound_(_lower_bound) {
lower_bound_(_lower_bound),
ts_upper_bound_(ts_upper_bound) {
assert(tombstones_ != nullptr);
Invalidate();
}
@ -238,7 +282,7 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
SequenceNumber _lower_bound)
const Slice* ts_upper_bound, SequenceNumber _lower_bound)
: tombstone_start_cmp_(icmp.user_comparator()),
tombstone_end_cmp_(icmp.user_comparator()),
icmp_(&icmp),
@ -246,7 +290,8 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
tombstones_ref_(tombstones),
tombstones_(tombstones_ref_.get()),
upper_bound_(_upper_bound),
lower_bound_(_lower_bound) {
lower_bound_(_lower_bound),
ts_upper_bound_(ts_upper_bound) {
assert(tombstones_ != nullptr);
Invalidate();
}
@ -254,7 +299,7 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
const std::shared_ptr<FragmentedRangeTombstoneListCache>& tombstones_cache,
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
SequenceNumber _lower_bound)
const Slice* ts_upper_bound, SequenceNumber _lower_bound)
: tombstone_start_cmp_(icmp.user_comparator()),
tombstone_end_cmp_(icmp.user_comparator()),
icmp_(&icmp),
@ -264,6 +309,11 @@ FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
upper_bound_(_upper_bound),
lower_bound_(_lower_bound) {
assert(tombstones_ != nullptr);
if (!ts_upper_bound || ts_upper_bound->empty()) {
ts_upper_bound_ = nullptr;
} else {
ts_upper_bound_ = ts_upper_bound;
}
Invalidate();
}
@ -278,9 +328,7 @@ void FragmentedRangeTombstoneIterator::SeekToTopFirst() {
return;
}
pos_ = tombstones_->begin();
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
ScanForwardToVisibleTombstone();
}
@ -295,12 +343,12 @@ void FragmentedRangeTombstoneIterator::SeekToTopLast() {
return;
}
pos_ = std::prev(tombstones_->end());
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
ScanBackwardToVisibleTombstone();
}
// @param `target` is a user key, with timestamp if user-defined timestamp is
// enabled.
void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
if (tombstones_->empty()) {
Invalidate();
@ -328,9 +376,7 @@ void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone(
seq_pos_ = tombstones_->seq_end();
return;
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
}
void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
@ -347,9 +393,7 @@ void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
return;
}
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
}
void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
@ -361,9 +405,7 @@ void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
Invalidate();
return;
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
}
}
@ -376,9 +418,7 @@ void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
return;
}
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
}
}
@ -394,9 +434,7 @@ void FragmentedRangeTombstoneIterator::TopNext() {
if (pos_ == tombstones_->end()) {
return;
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
ScanForwardToVisibleTombstone();
}
@ -418,9 +456,7 @@ void FragmentedRangeTombstoneIterator::TopPrev() {
return;
}
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
SetMaxVisibleSeqAndTimestamp();
ScanBackwardToVisibleTombstone();
}
@ -431,8 +467,10 @@ bool FragmentedRangeTombstoneIterator::Valid() const {
SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum(
const Slice& target_user_key) {
SeekToCoveringTombstone(target_user_key);
return ValidPos() && ucmp_->Compare(start_key(), target_user_key) <= 0 ? seq()
: 0;
return ValidPos() && ucmp_->CompareWithoutTimestamp(start_key(),
target_user_key) <= 0
? seq()
: 0;
}
std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
@ -449,8 +487,9 @@ FragmentedRangeTombstoneIterator::SplitBySnapshot(
upper = snapshots[i];
}
if (tombstones_->ContainsRange(lower, upper)) {
splits.emplace(upper, std::make_unique<FragmentedRangeTombstoneIterator>(
tombstones_, *icmp_, upper, lower));
splits.emplace(upper,
std::make_unique<FragmentedRangeTombstoneIterator>(
tombstones_, *icmp_, upper, ts_upper_bound_, lower));
}
lower = upper + 1;
}

@ -33,6 +33,10 @@ struct FragmentedRangeTombstoneList {
// start and end at the same user keys but have different sequence numbers.
// The members seq_start_idx and seq_end_idx are intended to be parameters to
// seq_iter().
// If user-defined timestamp is enabled, `start` and `end` should be user keys
// with timestamp, and the timestamps are set to max timestamp to be returned
// by parsed_start_key()/parsed_end_key(). seq_start_idx and seq_end_idx will
// also be used as parameters to ts_iter().
struct RangeTombstoneStack {
RangeTombstoneStack(const Slice& start, const Slice& end, size_t start_idx,
size_t end_idx)
@ -40,12 +44,13 @@ struct FragmentedRangeTombstoneList {
end_key(end),
seq_start_idx(start_idx),
seq_end_idx(end_idx) {}
Slice start_key;
Slice end_key;
size_t seq_start_idx;
size_t seq_end_idx;
};
// Assumes unfragmented_tombstones->key() and unfragmented_tombstones->value()
// both contain timestamp if enabled.
FragmentedRangeTombstoneList(
std::unique_ptr<InternalIterator> unfragmented_tombstones,
const InternalKeyComparator& icmp, bool for_compaction = false,
@ -63,6 +68,10 @@ struct FragmentedRangeTombstoneList {
return std::next(tombstone_seqs_.begin(), idx);
}
std::vector<Slice>::const_iterator ts_iter(size_t idx) const {
return std::next(tombstone_timestamps_.begin(), idx);
}
std::vector<SequenceNumber>::const_iterator seq_begin() const {
return tombstone_seqs_.begin();
}
@ -87,8 +96,15 @@ struct FragmentedRangeTombstoneList {
private:
// Given an ordered range tombstone iterator unfragmented_tombstones,
// "fragment" the tombstones into non-overlapping pieces, and store them in
// tombstones_ and tombstone_seqs_.
// "fragment" the tombstones into non-overlapping pieces. Each
// "non-overlapping piece" is a RangeTombstoneStack in tombstones_, which
// contains start_key, end_key, and indices that points to sequence numbers
// (in tombstone_seqs_) and timestamps (in tombstone_timestamps_). If
// for_compaction is true, then `snapshots` should be provided. Range
// tombstone fragments are dropped if they are not visible in any snapshot and
// user-defined timestamp is not enabled. That is, for each snapshot stripe
// [lower, upper], the range tombstone fragment with largest seqno in [lower,
// upper] is preserved, and all the other range tombstones are dropped.
void FragmentTombstones(
std::unique_ptr<InternalIterator> unfragmented_tombstones,
const InternalKeyComparator& icmp, bool for_compaction,
@ -96,6 +112,7 @@ struct FragmentedRangeTombstoneList {
std::vector<RangeTombstoneStack> tombstones_;
std::vector<SequenceNumber> tombstone_seqs_;
std::vector<Slice> tombstone_timestamps_;
std::set<SequenceNumber> seq_set_;
std::list<std::string> pinned_slices_;
PinnedIteratorsManager pinned_iters_mgr_;
@ -117,15 +134,15 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
FragmentedRangeTombstoneIterator(
const FragmentedRangeTombstoneList* tombstones,
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
SequenceNumber lower_bound = 0);
const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0);
FragmentedRangeTombstoneIterator(
const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
SequenceNumber lower_bound = 0);
const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0);
FragmentedRangeTombstoneIterator(
const std::shared_ptr<FragmentedRangeTombstoneListCache>& tombstones,
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
SequenceNumber lower_bound = 0);
const Slice* ts_upper_bound = nullptr, SequenceNumber lower_bound = 0);
void SeekToFirst() override;
void SeekToLast() override;
@ -154,6 +171,8 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
void TopPrev();
bool Valid() const override;
// Note that key() and value() do not return correct timestamp.
// Caller should call timestamp() to get the current timestamp.
Slice key() const override {
MaybePinKey();
return current_start_key_.Encode();
@ -172,11 +191,28 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
}
RangeTombstone Tombstone() const {
assert(Valid());
if (icmp_->user_comparator()->timestamp_size()) {
return RangeTombstone(start_key(), end_key(), seq(), timestamp());
}
return RangeTombstone(start_key(), end_key(), seq());
}
// Note that start_key() and end_key() are not guaranteed to have the
// correct timestamp. User can call timestamp() to get the correct
// timestamp().
Slice start_key() const { return pos_->start_key; }
Slice end_key() const { return pos_->end_key; }
SequenceNumber seq() const { return *seq_pos_; }
Slice timestamp() const {
// seqno and timestamp are stored in the same order.
return *tombstones_->ts_iter(seq_pos_ - tombstones_->seq_begin());
}
// Current use case is by CompactionRangeDelAggregator to set
// full_history_ts_low_.
void SetTimestampUpperBound(const Slice* ts_upper_bound) {
ts_upper_bound_ = ts_upper_bound;
}
ParsedInternalKey parsed_start_key() const {
return ParsedInternalKey(pos_->start_key, kMaxSequenceNumber,
kTypeRangeDeletion);
@ -186,6 +222,9 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
kTypeRangeDeletion);
}
// Return the max sequence number of a range tombstone that covers
// the given user key.
// If there is no covering tombstone, then 0 is returned.
SequenceNumber MaxCoveringTombstoneSeqnum(const Slice& user_key);
// Splits the iterator into n+1 iterators (where n is the number of
@ -218,15 +257,15 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
bool operator()(const RangeTombstoneStack& a,
const RangeTombstoneStack& b) const {
return cmp->Compare(a.start_key, b.start_key) < 0;
return cmp->CompareWithoutTimestamp(a.start_key, b.start_key) < 0;
}
bool operator()(const RangeTombstoneStack& a, const Slice& b) const {
return cmp->Compare(a.start_key, b) < 0;
return cmp->CompareWithoutTimestamp(a.start_key, b) < 0;
}
bool operator()(const Slice& a, const RangeTombstoneStack& b) const {
return cmp->Compare(a, b.start_key) < 0;
return cmp->CompareWithoutTimestamp(a, b.start_key) < 0;
}
const Comparator* cmp;
@ -237,15 +276,15 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
bool operator()(const RangeTombstoneStack& a,
const RangeTombstoneStack& b) const {
return cmp->Compare(a.end_key, b.end_key) < 0;
return cmp->CompareWithoutTimestamp(a.end_key, b.end_key) < 0;
}
bool operator()(const RangeTombstoneStack& a, const Slice& b) const {
return cmp->Compare(a.end_key, b) < 0;
return cmp->CompareWithoutTimestamp(a.end_key, b) < 0;
}
bool operator()(const Slice& a, const RangeTombstoneStack& b) const {
return cmp->Compare(a, b.end_key) < 0;
return cmp->CompareWithoutTimestamp(a, b.end_key) < 0;
}
const Comparator* cmp;
@ -277,11 +316,38 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
const FragmentedRangeTombstoneList* tombstones_;
SequenceNumber upper_bound_;
SequenceNumber lower_bound_;
// Only consider timestamps <= ts_upper_bound_.
const Slice* ts_upper_bound_;
std::vector<RangeTombstoneStack>::const_iterator pos_;
std::vector<SequenceNumber>::const_iterator seq_pos_;
mutable std::vector<RangeTombstoneStack>::const_iterator pinned_pos_;
mutable std::vector<SequenceNumber>::const_iterator pinned_seq_pos_;
mutable InternalKey current_start_key_;
// Check the current RangeTombstoneStack `pos_` against timestamp
// upper bound `ts_upper_bound_` and sequence number upper bound
// `upper_bound_`. Update the sequence number (and timestamp) pointer
// `seq_pos_` to the first valid position satisfying both bounds.
void SetMaxVisibleSeqAndTimestamp() {
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
upper_bound_, std::greater<SequenceNumber>());
if (ts_upper_bound_ && !ts_upper_bound_->empty()) {
auto ts_pos = std::lower_bound(
tombstones_->ts_iter(pos_->seq_start_idx),
tombstones_->ts_iter(pos_->seq_end_idx), *ts_upper_bound_,
[this](const Slice& s1, const Slice& s2) {
return ucmp_->CompareTimestamp(s1, s2) > 0;
});
auto ts_idx = ts_pos - tombstones_->ts_iter(pos_->seq_start_idx);
auto seq_idx = seq_pos_ - tombstones_->seq_iter(pos_->seq_start_idx);
if (seq_idx < ts_idx) {
// seq and ts are ordered in non-increasing order. Only updates seq_pos_
// to a larger index for smaller sequence number and timestamp.
seq_pos_ = tombstones_->seq_iter(pos_->seq_start_idx + ts_idx);
}
}
}
};
} // namespace ROCKSDB_NAMESPACE

@ -490,9 +490,15 @@ Status TableCache::Get(
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
SequenceNumber seq =
range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k));
if (seq > *max_covering_tombstone_seq) {
*max_covering_tombstone_seq = seq;
if (get_context->NeedTimestamp()) {
get_context->SetTimestampFromRangeTombstone(
range_del_iter->timestamp());
}
}
}
}
if (s.ok()) {
@ -535,9 +541,15 @@ void TableCache::UpdateRangeTombstoneSeqnums(
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
SequenceNumber* max_covering_tombstone_seq =
iter->get_context->max_covering_tombstone_seq();
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts));
SequenceNumber seq =
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts);
if (seq > *max_covering_tombstone_seq) {
*max_covering_tombstone_seq = seq;
if (iter->get_context->NeedTimestamp()) {
iter->get_context->SetTimestampFromRangeTombstone(
range_del_iter->timestamp());
}
}
}
}
}

@ -1632,6 +1632,7 @@ Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
if (tombstone_iter) {
tombstone_iter->SeekToFirst();
// TODO: print timestamp
while (tombstone_iter->Valid() && num_entries_left > 0) {
ss << "start: " << tombstone_iter->start_key().ToString(true)
<< " end: " << tombstone_iter->end_key().ToString(true)

@ -1353,8 +1353,31 @@ Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
return WriteBatchInternal::DeleteRange(this, cf_id, begin_key, end_key);
}
return Status::InvalidArgument(
"Cannot call this method on column family enabling timestamp");
needs_in_place_update_ts_ = true;
has_key_with_ts_ = true;
std::string dummy_ts(ts_sz, '\0');
std::array<Slice, 2> begin_key_with_ts{{begin_key, dummy_ts}};
std::array<Slice, 2> end_key_with_ts{{end_key, dummy_ts}};
return WriteBatchInternal::DeleteRange(
this, cf_id, SliceParts(begin_key_with_ts.data(), 2),
SliceParts(end_key_with_ts.data(), 2));
}
Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key,
const Slice& ts) {
const Status s = CheckColumnFamilyTimestampSize(column_family, ts);
if (!s.ok()) {
return s;
}
assert(column_family);
has_key_with_ts_ = true;
uint32_t cf_id = column_family->GetID();
std::array<Slice, 2> key_with_ts{{begin_key, ts}};
std::array<Slice, 2> end_key_with_ts{{end_key, ts}};
return WriteBatchInternal::DeleteRange(this, cf_id,
SliceParts(key_with_ts.data(), 2),
SliceParts(end_key_with_ts.data(), 2));
}
Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
@ -1928,10 +1951,9 @@ class MemTableInserter : public WriteBatch::Handler {
// always 0 in
// non-recovery, regular write code-path)
// * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
// column
// family already contains updates from this log. We can't apply updates
// twice because of update-in-place or merge workloads -- ignore the
// update
// column family already contains updates from this log. We can't apply
// updates twice because of update-in-place or merge workloads -- ignore
// the update
*s = Status::OK();
return false;
}
@ -2331,7 +2353,8 @@ class MemTableInserter : public WriteBatch::Handler {
cfd->ioptions()->table_factory->Name() + " in CF " +
cfd->GetName());
}
int cmp = cfd->user_comparator()->Compare(begin_key, end_key);
int cmp =
cfd->user_comparator()->CompareWithoutTimestamp(begin_key, end_key);
if (cmp > 0) {
// TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`.
ret_status.PermitUncheckedError();

@ -314,8 +314,12 @@ class TimestampUpdater : public WriteBatch::Handler {
}
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice&) override {
return UpdateTimestamp(cf, begin_key);
const Slice& end_key) override {
Status s = UpdateTimestamp(cf, begin_key, true /* is_key */);
if (s.ok()) {
s = UpdateTimestamp(cf, end_key, false /* is_key */);
}
return s;
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
@ -341,13 +345,15 @@ class TimestampUpdater : public WriteBatch::Handler {
Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
private:
Status UpdateTimestamp(uint32_t cf, const Slice& key) {
Status s = UpdateTimestampImpl(cf, key, idx_);
// @param is_key specifies whether the update is for key or value.
Status UpdateTimestamp(uint32_t cf, const Slice& buf, bool is_key = true) {
Status s = UpdateTimestampImpl(cf, buf, idx_, is_key);
++idx_;
return s;
}
Status UpdateTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) {
Status UpdateTimestampImpl(uint32_t cf, const Slice& buf, size_t /*idx*/,
bool is_key) {
if (timestamp_.empty()) {
return Status::InvalidArgument("Timestamp is empty");
}
@ -361,22 +367,27 @@ class TimestampUpdater : public WriteBatch::Handler {
} else if (cf_ts_sz != timestamp_.size()) {
return Status::InvalidArgument("timestamp size mismatch");
}
UpdateProtectionInformationIfNeeded(key, timestamp_);
UpdateProtectionInformationIfNeeded(buf, timestamp_, is_key);
char* ptr = const_cast<char*>(key.data() + key.size() - cf_ts_sz);
char* ptr = const_cast<char*>(buf.data() + buf.size() - cf_ts_sz);
assert(ptr);
memcpy(ptr, timestamp_.data(), timestamp_.size());
return Status::OK();
}
void UpdateProtectionInformationIfNeeded(const Slice& key, const Slice& ts) {
void UpdateProtectionInformationIfNeeded(const Slice& buf, const Slice& ts,
bool is_key) {
if (prot_info_ != nullptr) {
const size_t ts_sz = ts.size();
SliceParts old_key(&key, 1);
Slice key_no_ts(key.data(), key.size() - ts_sz);
std::array<Slice, 2> new_key_cmpts{{key_no_ts, ts}};
SliceParts new_key(new_key_cmpts.data(), 2);
prot_info_->entries_[idx_].UpdateK(old_key, new_key);
SliceParts old(&buf, 1);
Slice old_no_ts(buf.data(), buf.size() - ts_sz);
std::array<Slice, 2> new_key_cmpts{{old_no_ts, ts}};
SliceParts new_parts(new_key_cmpts.data(), 2);
if (is_key) {
prot_info_->entries_[idx_].UpdateK(old, new_parts);
} else {
prot_info_->entries_[idx_].UpdateV(old, new_parts);
}
}
}

@ -962,15 +962,15 @@ TEST_F(WriteBatchTest, SanityChecks) {
ASSERT_TRUE(wb.Delete(nullptr, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.SingleDelete(nullptr, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.Merge(nullptr, "key", "ts", "value").IsNotSupported());
ASSERT_TRUE(
wb.DeleteRange(nullptr, "begin_key", "end_key", "ts").IsNotSupported());
ASSERT_TRUE(wb.DeleteRange(nullptr, "begin_key", "end_key", "ts")
.IsInvalidArgument());
ASSERT_TRUE(wb.Put(&cf4, "key", "ts", "value").IsInvalidArgument());
ASSERT_TRUE(wb.Delete(&cf4, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.SingleDelete(&cf4, "key", "ts").IsInvalidArgument());
ASSERT_TRUE(wb.Merge(&cf4, "key", "ts", "value").IsNotSupported());
ASSERT_TRUE(
wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsNotSupported());
wb.DeleteRange(&cf4, "begin_key", "end_key", "ts").IsInvalidArgument());
constexpr size_t wrong_ts_sz = 1 + sizeof(uint64_t);
std::string ts(wrong_ts_sz, '\0');
@ -980,7 +980,7 @@ TEST_F(WriteBatchTest, SanityChecks) {
ASSERT_TRUE(wb.SingleDelete(&cf0, "key", ts).IsInvalidArgument());
ASSERT_TRUE(wb.Merge(&cf0, "key", ts, "value").IsNotSupported());
ASSERT_TRUE(
wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsNotSupported());
wb.DeleteRange(&cf0, "begin_key", "end_key", ts).IsInvalidArgument());
// Sanity checks for the new WriteBatch APIs without extra 'ts' arg.
WriteBatch wb1(0, 0, 0, wrong_ts_sz);

@ -935,15 +935,11 @@ void StressTest::OperateDb(ThreadState* thread) {
// Assign timestamps if necessary.
std::string read_ts_str;
std::string write_ts_str;
Slice read_ts;
Slice write_ts;
if (FLAGS_user_timestamp_size > 0) {
read_ts_str = GetNowNanos();
read_ts = read_ts_str;
read_opts.timestamp = &read_ts;
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
}
int prob_op = thread->rand.Uniform(100);
@ -2831,17 +2827,17 @@ void StressTest::Reopen(ThreadState* thread) {
}
}
void StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
bool StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
std::string& ts_str,
Slice& ts_slice,
ReadOptions& read_opts) {
if (FLAGS_user_timestamp_size == 0) {
return;
return false;
}
assert(thread);
if (!thread->rand.OneInOpt(3)) {
return;
return false;
}
const SharedState* const shared = thread->shared;
@ -2857,6 +2853,7 @@ void StressTest::MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
PutFixed64(&ts_str, ts);
ts_slice = ts_str;
read_opts.timestamp = &ts_slice;
return true;
}
void StressTest::MaybeUseOlderTimestampForRangeScan(ThreadState* thread,
@ -2914,10 +2911,6 @@ void CheckAndSetOptionsForUserTimestamp(Options& options) {
fprintf(stderr, "Merge does not support timestamp yet.\n");
exit(1);
}
if (FLAGS_delrangepercent > 0) {
fprintf(stderr, "DeleteRange does not support timestamp yet.\n");
exit(1);
}
if (FLAGS_use_txn) {
fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
exit(1);

@ -244,7 +244,8 @@ class StressTest {
TransactionDBOptions& /*txn_db_opts*/) {}
#endif
void MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
// Returns whether the timestamp of read_opts is updated.
bool MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
std::string& ts_str,
Slice& ts_slice,
ReadOptions& read_opts);

@ -390,8 +390,8 @@ class NonBatchedOpsStressTest : public StressTest {
ReadOptions read_opts_copy = read_opts;
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice,
read_opts_copy);
bool read_older_ts = MaybeUseOlderTimestampForPointLookup(
thread, read_ts_str, read_ts_slice, read_opts_copy);
Status s = db_->Get(read_opts_copy, cfh, key, &from_db);
if (fault_fs_guard) {
@ -424,7 +424,7 @@ class NonBatchedOpsStressTest : public StressTest {
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
if (!FLAGS_skip_verifydb && !read_opts_copy.timestamp) {
if (!FLAGS_skip_verifydb && !read_older_ts) {
auto expected =
thread->shared->Get(rand_column_families[0], rand_keys[0]);
if (expected != SharedState::DELETION_SENTINEL &&
@ -959,7 +959,16 @@ class NonBatchedOpsStressTest : public StressTest {
auto cfh = column_families_[rand_column_family];
std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
Slice end_key = end_keystr;
Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
std::string write_ts_str;
Slice write_ts;
Status s;
if (FLAGS_user_timestamp_size) {
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
s = db_->DeleteRange(write_opts, cfh, key, end_key, write_ts);
} else {
s = db_->DeleteRange(write_opts, cfh, key, end_key);
}
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {

@ -481,13 +481,10 @@ class DB {
virtual Status DeleteRange(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key);
virtual Status DeleteRange(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*begin_key*/,
const Slice& /*end_key*/, const Slice& /*ts*/) {
return Status::NotSupported(
"DeleteRange does not support user-defined timestamp yet");
}
virtual Status DeleteRange(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key,
const Slice& ts);
// Merge the database entry for "key" with "value". Returns OK on success,
// and a non-OK status on error. The semantics of this operation is

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE
#include "db/range_tombstone_fragmenter.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
@ -30,6 +31,9 @@ class SstFileReader {
// If "snapshot" is nullptr, the iterator returns only the latest keys.
Iterator* NewIterator(const ReadOptions& options);
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
const ReadOptions& options);
std::shared_ptr<const TableProperties> GetTableProperties() const;
// Verifies whether there is corruption in this table.

@ -148,6 +148,13 @@ class SstFileWriter {
// REQUIRES: comparator is *not* timestamp-aware.
Status DeleteRange(const Slice& begin_key, const Slice& end_key);
// Add a range deletion tombstone to currently opened file.
// REQUIRES: begin_key and end_key are user keys without timestamp.
// REQUIRES: the timestamp's size is equal to what is expected by
// the comparator.
Status DeleteRange(const Slice& begin_key, const Slice& end_key,
const Slice& timestamp);
// Finalize writing to sst file and close file.
//
// An optional ExternalSstFileInfo pointer can be passed to the function

@ -150,12 +150,9 @@ class WriteBatch : public WriteBatchBase {
Status DeleteRange(const Slice& begin_key, const Slice& end_key) override {
return DeleteRange(nullptr, begin_key, end_key);
}
Status DeleteRange(ColumnFamilyHandle* /*column_family*/,
const Slice& /*begin_key*/, const Slice& /*end_key*/,
const Slice& /*ts*/) override {
return Status::NotSupported(
"DeleteRange does not support user-defined timestamp");
}
// begin_key and end_key should be user keys without timestamp.
Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key, const Slice& ts) override;
// variant that takes SliceParts
Status DeleteRange(ColumnFamilyHandle* column_family,

@ -2026,8 +2026,9 @@ FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator(
if (read_options.snapshot != nullptr) {
snapshot = read_options.snapshot->GetSequenceNumber();
}
return new FragmentedRangeTombstoneIterator(
rep_->fragmented_range_dels, rep_->internal_comparator, snapshot);
return new FragmentedRangeTombstoneIterator(rep_->fragmented_range_dels,
rep_->internal_comparator,
snapshot, read_options.timestamp);
}
bool BlockBasedTable::FullFilterKeyMayMatch(

@ -250,6 +250,20 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
if (ts_sz > 0 && timestamp_ != nullptr) {
if (!timestamp_->empty()) {
assert(ts_sz == timestamp_->size());
// `timestamp` can be set before `SaveValue` is ever called
// when max_covering_tombstone_seq_ was set.
// If this key has a higher sequence number than range tombstone,
// then timestamp should be updated. `ts_from_rangetombstone_` is
// set to false afterwards so that only the key with highest seqno
// updates the timestamp.
if (ts_from_rangetombstone_) {
assert(max_covering_tombstone_seq_);
if (parsed_key.sequence > *max_covering_tombstone_seq_) {
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
timestamp_->assign(ts.data(), ts.size());
ts_from_rangetombstone_ = false;
}
}
}
// TODO optimize for small size ts
const std::string kMaxTs(ts_sz, '\xff');
@ -263,9 +277,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
auto type = parsed_key.type;
// Key matches. Process it
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
type == kTypeWideColumnEntity) &&
type == kTypeWideColumnEntity || type == kTypeDeletion ||
type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) &&
max_covering_tombstone_seq_ != nullptr &&
*max_covering_tombstone_seq_ > parsed_key.sequence) {
// Note that deletion types are also considered, this is for the case
// when we need to return timestamp to user. If a range tombstone has a
// higher seqno than point tombstone, its timestamp should be returned.
type = kTypeRangeDeletion;
}
switch (type) {

@ -148,6 +148,14 @@ class GetContext {
return max_covering_tombstone_seq_;
}
bool NeedTimestamp() { return timestamp_ != nullptr; }
void SetTimestampFromRangeTombstone(const Slice& timestamp) {
assert(timestamp_);
timestamp_->assign(timestamp.data(), timestamp.size());
ts_from_rangetombstone_ = true;
}
PinnedIteratorsManager* pinned_iters_mgr() { return pinned_iters_mgr_; }
// If a non-null string is passed, all the SaveValue calls will be
@ -190,6 +198,7 @@ class GetContext {
PinnableSlice* pinnable_val_;
PinnableWideColumns* columns_;
std::string* timestamp_;
bool ts_from_rangetombstone_{false};
bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_;
SequenceNumber* max_covering_tombstone_seq_;

@ -697,6 +697,11 @@ void MergingIterator::SeekImpl(const Slice& target, size_t starting_level,
// is not the same as the original target, it should not affect
// correctness. Besides, in most cases, range tombstone start and
// end key should have the same prefix?
// If range_tombstone_iter->end_key() is truncated to its largest_
// boundary, the timestamp in user_key will not be max timestamp,
// but the timestamp of `range_tombstone_iter.largest_`. This should
// be fine here as current_search_key is used to Seek into lower
// levels.
current_search_key.SetInternalKey(
range_tombstone_iter->end_key().user_key, kMaxSequenceNumber);
}
@ -919,7 +924,6 @@ void MergingIterator::SeekForPrevImpl(const Slice& target,
current_search_key.GetUserKey(),
range_tombstone_iter->end_key().user_key) < 0) {
range_tombstone_reseek = true;
// covered by this range tombstone
current_search_key.SetInternalKey(
range_tombstone_iter->start_key().user_key, kMaxSequenceNumber,
kValueTypeForSeekForPrev);
@ -988,10 +992,6 @@ bool MergingIterator::SkipPrevDeleted() {
return true /* current key deleted */;
}
if (current->iter.IsDeleteRangeSentinelKey()) {
// Different from SkipNextDeleted(): range tombstone start key is before
// file boundary due to op_type set in SetTombstoneKey().
assert(ExtractValueType(current->iter.key()) != kTypeRangeDeletion ||
active_.count(current->level));
// LevelIterator enters a new SST file
current->iter.Prev();
if (current->iter.Valid()) {
@ -1025,12 +1025,11 @@ bool MergingIterator::SkipPrevDeleted() {
std::string target;
AppendInternalKey(&target, range_tombstone_iters_[i]->start_key());
// This is different from SkipNextDeleted() which does reseek at sorted
// runs
// >= level (instead of i+1 here). With min heap, if level L is at top of
// the heap, then levels <L all have internal keys > level L's current
// internal key, which means levels <L are already at a different user
// key. With max heap, if level L is at top of the heap, then levels <L
// all have internal keys smaller than level L's current internal key,
// runs >= level (instead of i+1 here). With min heap, if level L is at
// top of the heap, then levels <L all have internal keys > level L's
// current internal key, which means levels <L are already at a different
// user key. With max heap, if level L is at top of the heap, then levels
// <L all have internal keys smaller than level L's current internal key,
// which might still be the same user key.
SeekForPrevImpl(target, i + 1, true);
return true /* current key deleted */;

@ -86,6 +86,12 @@ Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) {
return res;
}
FragmentedRangeTombstoneIterator* SstFileReader::NewRangeTombstoneIterator(
const ReadOptions& options) {
auto r = rep_.get();
return r->table_reader->NewRangeTombstoneIterator(options);
}
std::shared_ptr<const TableProperties> SstFileReader::GetTableProperties()
const {
return rep_->table_reader->GetTableProperties();

@ -385,6 +385,39 @@ TEST_F(SstFileReaderTimestampTest, Basic) {
}
}
TEST_F(SstFileReaderTimestampTest, BasicDeleteRange) {
SstFileWriter writer(soptions_, options_);
ASSERT_OK(writer.Open(sst_name_));
ASSERT_OK(writer.DeleteRange("key1", "key2", EncodeAsUint64(1)));
ASSERT_OK(writer.Finish());
SstFileReader reader(options_);
ASSERT_OK(reader.Open(sst_name_));
ASSERT_OK(reader.VerifyChecksum());
ReadOptions read_options;
std::string ts = EncodeAsUint64(2);
Slice ts_slice = ts;
read_options.timestamp = &ts_slice;
FragmentedRangeTombstoneIterator* iter =
reader.NewRangeTombstoneIterator(read_options);
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(
StripTimestampFromUserKey(iter->start_key(), EncodeAsUint64(1).size()),
"key1");
ASSERT_EQ(
StripTimestampFromUserKey(iter->end_key(), EncodeAsUint64(1).size()),
"key2");
ASSERT_EQ(iter->timestamp(), EncodeAsUint64(1));
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
TEST_F(SstFileReaderTimestampTest, TimestampsOutOfOrder) {
SstFileWriter writer(soptions_, options_);

@ -131,15 +131,10 @@ struct SstFileWriter::Rep {
return AddImpl(user_key_with_ts, value, value_type);
}
Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
if (internal_comparator.user_comparator()->timestamp_size() != 0) {
return Status::InvalidArgument("Timestamp size mismatch");
}
Status DeleteRangeImpl(const Slice& begin_key, const Slice& end_key) {
if (!builder) {
return Status::InvalidArgument("File is not opened");
}
RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
if (file_info.num_range_del_entries == 0) {
file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
@ -170,6 +165,45 @@ struct SstFileWriter::Rep {
return Status::OK();
}
Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
if (internal_comparator.user_comparator()->timestamp_size() != 0) {
return Status::InvalidArgument("Timestamp size mismatch");
}
return DeleteRangeImpl(begin_key, end_key);
}
// begin_key and end_key should be users keys without timestamp.
Status DeleteRange(const Slice& begin_key, const Slice& end_key,
const Slice& timestamp) {
const size_t timestamp_size = timestamp.size();
if (internal_comparator.user_comparator()->timestamp_size() !=
timestamp_size) {
return Status::InvalidArgument("Timestamp size mismatch");
}
const size_t begin_key_size = begin_key.size();
const size_t end_key_size = end_key.size();
if (begin_key.data() + begin_key_size == timestamp.data() ||
end_key.data() + begin_key_size == timestamp.data()) {
assert(memcmp(begin_key.data() + begin_key_size,
end_key.data() + end_key_size, timestamp_size) == 0);
Slice begin_key_with_ts(begin_key.data(),
begin_key_size + timestamp_size);
Slice end_key_with_ts(end_key.data(), end_key.size() + timestamp_size);
return DeleteRangeImpl(begin_key_with_ts, end_key_with_ts);
}
std::string begin_key_with_ts;
begin_key_with_ts.reserve(begin_key_size + timestamp_size);
begin_key_with_ts.append(begin_key.data(), begin_key_size);
begin_key_with_ts.append(timestamp.data(), timestamp_size);
std::string end_key_with_ts;
end_key_with_ts.reserve(end_key_size + timestamp_size);
end_key_with_ts.append(end_key.data(), end_key_size);
end_key_with_ts.append(timestamp.data(), timestamp_size);
return DeleteRangeImpl(begin_key_with_ts, end_key_with_ts);
}
Status InvalidatePageCache(bool closing) {
Status s = Status::OK();
if (invalidate_page_cache == false) {
@ -346,6 +380,11 @@ Status SstFileWriter::DeleteRange(const Slice& begin_key,
return rep_->DeleteRange(begin_key, end_key);
}
Status SstFileWriter::DeleteRange(const Slice& begin_key, const Slice& end_key,
const Slice& timestamp) {
return rep_->DeleteRange(begin_key, end_key, timestamp);
}
Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
Rep* r = rep_.get();
if (!r->builder) {

@ -389,8 +389,6 @@ ts_params = {
"test_cf_consistency": 0,
"test_batches_snapshots": 0,
"user_timestamp_size": 8,
"delrangepercent": 0,
"delpercent": 5,
"use_merge": 0,
"use_full_merge_v1": 0,
"use_txn": 0,
@ -515,14 +513,14 @@ def finalize_and_sanitize(src_params):
# Multi-key operations are not currently compatible with transactions or
# timestamp.
if (
dest_params.get("test_batches_snapshots") == 1
or dest_params.get("use_txn") == 1
or dest_params.get("user_timestamp_size") > 0
):
if (dest_params.get("test_batches_snapshots") == 1 or
dest_params.get("use_txn") == 1 or
dest_params.get("user_timestamp_size") > 0):
dest_params["ingest_external_file_one_in"] = 0
if (dest_params.get("test_batches_snapshots") == 1 or
dest_params.get("use_txn") == 1):
dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0
dest_params["ingest_external_file_one_in"] = 0
if (
dest_params.get("disable_wal") == 1
or dest_params.get("sync_fault_injection") == 1

Loading…
Cancel
Save