From 961c7590d6d2ce83ea0f19cabc32e0f1b0486ca1 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 28 May 2020 10:37:57 -0700 Subject: [PATCH] Add timestamp to delete (#6253) Summary: Preliminary user-timestamp support for delete. If ["a", ts=100] exists, you can delete it by calling `DB::Delete(write_options, key)` in which `write_options.timestamp` points to a `ts` higher than 100. Implementation A new ValueType, i.e. `kTypeDeletionWithTimestamp` is added for deletion marker with timestamp. The reason for a separate `kTypeDeletionWithTimestamp`: RocksDB may drop tombstones (keys with kTypeDeletion) when compacting them to the bottom level. This is OK and useful if timestamp is disabled. When timestamp is enabled, should we still reuse `kTypeDeletion`, we may drop the tombstone with a more recent timestamp, causing deleted keys to re-appear. Test plan (dev server) ``` make check ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6253 Reviewed By: ltamasi Differential Revision: D20995328 Pulled By: riversand963 fbshipit-source-id: a9e5c22968ad76f98e3dc6ee0151265a3f0df619 --- db/db_impl/db_impl_write.cc | 26 ++- db/db_iter.cc | 3 +- db/db_with_timestamp_basic_test.cc | 250 ++++++++++++++++++++++++++++- db/dbformat.cc | 2 +- db/dbformat.h | 6 +- db/memtable.cc | 1 + db/write_batch.cc | 24 ++- table/get_context.cc | 1 + 8 files changed, 297 insertions(+), 16 deletions(-) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 745d100b9..b0e21b6d7 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1837,8 +1837,30 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { - WriteBatch batch; - batch.Delete(column_family, key); + if (nullptr == opt.timestamp) { + WriteBatch batch; + Status s = batch.Delete(column_family, key); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); + } + const Slice* ts = opt.timestamp; + assert(ts != nullptr); + const size_t ts_sz = ts->size(); + constexpr size_t kKeyAndValueLenSize = 11; + constexpr size_t kWriteBatchOverhead = + WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize; + WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0, + ts_sz); + Status s = batch.Delete(column_family, key); + if (!s.ok()) { + return s; + } + s = batch.AssignTimestamp(*ts); + if (!s.ok()) { + return s; + } return Write(opt, &batch); } diff --git a/db/db_iter.cc b/db/db_iter.cc index ba179ade8..fd3ff78f7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -258,7 +258,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // prone to bugs causing the same user key with the same sequence number. // Note that with current timestamp implementation, the same user key can // have different timestamps and zero sequence number on the bottommost - // level. This will change in the future. + // level. This may change in the future. if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) && skipping_saved_key && CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) { @@ -276,6 +276,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, reseek_done = false; switch (ikey_.type) { case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 6e9b00565..a2a7313af 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -449,6 +449,65 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) { Close(); } +// Create two L0, and compact them to a new L1. In this test, L1 is L_bottom. +// Two L0s: +// f1 f2 +// ... +// Since f2.smallest < f1.largest < f2.largest +// f1 and f2 will be the inputs of a real compaction instead of trivial move. +TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.num_levels = 2; + options.level0_file_num_compaction_trigger = 2; + DestroyAndReopen(options); + WriteOptions write_opts; + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, "a", "value0")); + ASSERT_OK(Flush()); + + ts_str = Timestamp(2, 0); + ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, "b", "value0")); + ts_str = Timestamp(3, 0); + ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Delete(write_opts, "a")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ReadOptions read_opts; + ts_str = Timestamp(1, 0); + ts = ts_str; + read_opts.timestamp = &ts; + std::string value; + Status s = db_->Get(read_opts, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("value0", value); + + ts_str = Timestamp(3, 0); + ts = ts_str; + read_opts.timestamp = &ts; + s = db_->Get(read_opts, "a", &value); + ASSERT_TRUE(s.IsNotFound()); + + // Time-travel to the past before deletion + ts_str = Timestamp(2, 0); + ts = ts_str; + read_opts.timestamp = &ts; + s = db_->Get(read_opts, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("value0", value); + Close(); +} + class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< @@ -536,6 +595,104 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) { Close(); } +TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + const int kNumKeysPerFile = 1024; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + BlockBasedTableOptions bbto; + bbto.filter_policy = std::get<0>(GetParam()); + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + + const CompressionType comp_type = std::get<1>(GetParam()); +#if LZ4_VERSION_NUMBER < 10400 // r124+ + if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { + return; + } +#endif // LZ4_VERSION_NUMBER >= 10400 + if (!ZSTD_Supported() && comp_type == kZSTD) { + return; + } + if (!Zlib_Supported() && comp_type == kZlibCompression) { + return; + } + + options.compression = comp_type; + options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); + if (comp_type == kZSTD) { + options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); + } + options.compression_opts.parallel_threads = std::get<3>(GetParam()); + options.target_file_size_base = 1 << 26; // 64MB + + DestroyAndReopen(options); + + const size_t kNumL0Files = + static_cast(Options().level0_file_num_compaction_trigger); + { + // Generate enough L0 files with ts=1 to trigger compaction to L1 + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + WriteOptions wopts; + wopts.timestamp = &ts; + for (size_t i = 0; i != kNumL0Files; ++i) { + for (int j = 0; j != kNumKeysPerFile; ++j) { + ASSERT_OK(db_->Put(wopts, Key1(j), "value" + std::to_string(i))); + } + ASSERT_OK(db_->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // Generate another L0 at ts=3 + ts_str = Timestamp(3, 0); + ts = ts_str; + wopts.timestamp = &ts; + for (int i = 0; i != kNumKeysPerFile; ++i) { + std::string key_str = Key1(i); + Slice key(key_str); + if ((i % 3) == 0) { + ASSERT_OK(db_->Delete(wopts, key)); + } else { + ASSERT_OK(db_->Put(wopts, key, "new_value")); + } + } + ASSERT_OK(db_->Flush(FlushOptions())); + // Populate memtable at ts=5 + ts_str = Timestamp(5, 0); + ts = ts_str; + wopts.timestamp = &ts; + for (int i = 0; i != kNumKeysPerFile; ++i) { + std::string key_str = Key1(i); + Slice key(key_str); + if ((i % 3) == 1) { + ASSERT_OK(db_->Delete(wopts, key)); + } else if ((i % 3) == 2) { + ASSERT_OK(db_->Put(wopts, key, "new_value_2")); + } + } + } + { + std::string ts_str = Timestamp(6, 0); + Slice ts = ts_str; + ReadOptions ropts; + ropts.timestamp = &ts; + for (uint64_t i = 0; i != static_cast(kNumKeysPerFile); ++i) { + std::string value; + Status s = db_->Get(ropts, Key1(i), &value); + if ((i % 3) == 2) { + ASSERT_OK(s); + ASSERT_EQ("new_value_2", value); + } else { + ASSERT_TRUE(s.IsNotFound()); + } + } + } +} + #ifndef ROCKSDB_LITE // A class which remembers the name of each flushed file. class FlushedFileCollector : public EventListener { @@ -870,15 +1027,10 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { for (size_t i = 0; i != write_ts_list.size(); ++i) { Slice write_ts = write_ts_list[i]; write_opts.timestamp = &write_ts; - uint64_t key = kMinKey; - do { + for (uint64_t key = kMaxKey; key >= kMinKey; --key) { Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); ASSERT_OK(s); - if (key == kMaxKey) { - break; - } - ++key; - } while (true); + } } } const std::vector read_ts_list = {Timestamp(5, 0xffffffff), @@ -962,6 +1114,90 @@ INSTANTIATE_TEST_CASE_P( false))), ::testing::Bool())); +class DBBasicTestWithTsIterTombstones + : public DBBasicTestWithTimestampBase, + public testing::WithParamInterface< + std::tuple, + std::shared_ptr, int>> { + public: + DBBasicTestWithTsIterTombstones() + : DBBasicTestWithTimestampBase("/db_basic_ts_iter_tombstones") {} +}; + +TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) { + constexpr size_t kNumKeysPerFile = 128; + Options options = CurrentOptions(); + options.env = env_; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.prefix_extractor = std::get<0>(GetParam()); + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + BlockBasedTableOptions bbto; + bbto.filter_policy = std::get<1>(GetParam()); + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.num_levels = std::get<2>(GetParam()); + DestroyAndReopen(options); + std::vector write_ts_strs = {Timestamp(2, 0), Timestamp(4, 0)}; + constexpr uint64_t kMaxKey = 0xffffffffffffffff; + constexpr uint64_t kMinKey = 0xfffffffffffff000; + // Insert kMinKey...kMaxKey + uint64_t key = kMinKey; + WriteOptions write_opts; + Slice ts = write_ts_strs[0]; + write_opts.timestamp = &ts; + do { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key)); + ASSERT_OK(s); + if (kMaxKey == key) { + break; + } + ++key; + } while (true); + // Delete them all + ts = write_ts_strs[1]; + write_opts.timestamp = &ts; + for (key = kMaxKey; key >= kMinKey; --key) { + Status s; + if (0 != (key % 2)) { + s = db_->Put(write_opts, Key1(key), "value1" + std::to_string(key)); + } else { + s = db_->Delete(write_opts, Key1(key)); + } + ASSERT_OK(s); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + { + std::string read_ts = Timestamp(4, 0); + ts = read_ts; + ReadOptions read_opts; + read_opts.total_order_seek = true; + read_opts.timestamp = &ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + size_t count = 0; + key = kMinKey + 1; + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++count, key += 2) { + ASSERT_EQ(Key1(key), iter->key()); + ASSERT_EQ("value1" + std::to_string(key), iter->value()); + } + ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count); + } + Close(); +} + +INSTANTIATE_TEST_CASE_P( + Timestamp, DBBasicTestWithTsIterTombstones, + ::testing::Combine( + ::testing::Values( + std::shared_ptr(NewFixedPrefixTransform(7)), + std::shared_ptr(NewFixedPrefixTransform(8))), + ::testing::Values(std::shared_ptr(nullptr), + std::shared_ptr( + NewBloomFilterPolicy(10, false)), + std::shared_ptr( + NewBloomFilterPolicy(20, false))), + ::testing::Values(2, 6))); + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/dbformat.cc b/db/dbformat.cc index 46137feb7..c72388c31 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -23,7 +23,7 @@ namespace ROCKSDB_NAMESPACE { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -const ValueType kValueTypeForSeek = kTypeBlobIndex; +const ValueType kValueTypeForSeek = kTypeDeletionWithTimestamp; const ValueType kValueTypeForSeekForPrev = kTypeDeletion; uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { diff --git a/db/dbformat.h b/db/dbformat.h index e664fa9b2..dac6f3d46 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -69,7 +69,8 @@ enum ValueType : unsigned char { // generated by WriteUnprepared write policy is not mistakenly read by // another. kTypeBeginUnprepareXID = 0x13, // WAL only. - kMaxValue = 0x7F // Not used for storing records. + kTypeDeletionWithTimestamp = 0x14, + kMaxValue = 0x7F // Not used for storing records. }; // Defined in dbformat.cc @@ -79,7 +80,8 @@ extern const ValueType kValueTypeForSeekForPrev; // Checks whether a type is an inline value type // (i.e. a type used in memtable skiplist and sst file datablock). inline bool IsValueType(ValueType t) { - return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex; + return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex || + kTypeDeletionWithTimestamp == t; } // Checks whether a type is from user operation diff --git a/db/memtable.cc b/db/memtable.cc index 82b577ed8..32f8977df 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -724,6 +724,7 @@ static bool SaveValue(void* arg, const char* entry) { return false; } case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: case kTypeRangeDeletion: { if (*(s->merge_in_progress)) { diff --git a/db/write_batch.cc b/db/write_batch.cc index 77715a208..645bbd70a 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -908,7 +908,14 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, b->rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSlice(&b->rep_, key); + if (0 == b->timestamp_size_) { + PutLengthPrefixedSlice(&b->rep_, key); + } else { + PutVarint32(&b->rep_, + static_cast(key.size() + b->timestamp_size_)); + b->rep_.append(key.data(), key.size()); + b->rep_.append(b->timestamp_size_, '\0'); + } b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE, std::memory_order_relaxed); @@ -930,7 +937,11 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, b->rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSliceParts(&b->rep_, key); + if (0 == b->timestamp_size_) { + PutLengthPrefixedSliceParts(&b->rep_, key); + } else { + PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_); + } b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE, std::memory_order_relaxed); @@ -1546,7 +1557,14 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } - auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion); + ColumnFamilyData* cfd = cf_mems_->current(); + assert(!cfd || cfd->user_comparator()); + const size_t ts_sz = (cfd && cfd->user_comparator()) + ? cfd->user_comparator()->timestamp_size() + : 0; + const ValueType delete_type = + (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp; + auto ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type); // optimize for non-recovery mode if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { assert(!write_after_commit_); diff --git a/table/get_context.cc b/table/get_context.cc index 686a54d1f..ecd59220a 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -303,6 +303,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, return false; case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: case kTypeRangeDeletion: // TODO(noetzli): Verify correctness once merge of single-deletes