diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 745d100b9..b0e21b6d7 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1837,8 +1837,30 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { - WriteBatch batch; - batch.Delete(column_family, key); + if (nullptr == opt.timestamp) { + WriteBatch batch; + Status s = batch.Delete(column_family, key); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); + } + const Slice* ts = opt.timestamp; + assert(ts != nullptr); + const size_t ts_sz = ts->size(); + constexpr size_t kKeyAndValueLenSize = 11; + constexpr size_t kWriteBatchOverhead = + WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize; + WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0, + ts_sz); + Status s = batch.Delete(column_family, key); + if (!s.ok()) { + return s; + } + s = batch.AssignTimestamp(*ts); + if (!s.ok()) { + return s; + } return Write(opt, &batch); } diff --git a/db/db_iter.cc b/db/db_iter.cc index ba179ade8..fd3ff78f7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -258,7 +258,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, // prone to bugs causing the same user key with the same sequence number. // Note that with current timestamp implementation, the same user key can // have different timestamps and zero sequence number on the bottommost - // level. This will change in the future. + // level. This may change in the future. if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) && skipping_saved_key && CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) { @@ -276,6 +276,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, reseek_done = false; switch (ikey_.type) { case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 6e9b00565..a2a7313af 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -449,6 +449,65 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) { Close(); } +// Create two L0, and compact them to a new L1. In this test, L1 is L_bottom. +// Two L0s: +// f1 f2 +// ... +// Since f2.smallest < f1.largest < f2.largest +// f1 and f2 will be the inputs of a real compaction instead of trivial move. +TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.num_levels = 2; + options.level0_file_num_compaction_trigger = 2; + DestroyAndReopen(options); + WriteOptions write_opts; + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, "a", "value0")); + ASSERT_OK(Flush()); + + ts_str = Timestamp(2, 0); + ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Put(write_opts, "b", "value0")); + ts_str = Timestamp(3, 0); + ts = ts_str; + write_opts.timestamp = &ts; + ASSERT_OK(db_->Delete(write_opts, "a")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ReadOptions read_opts; + ts_str = Timestamp(1, 0); + ts = ts_str; + read_opts.timestamp = &ts; + std::string value; + Status s = db_->Get(read_opts, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("value0", value); + + ts_str = Timestamp(3, 0); + ts = ts_str; + read_opts.timestamp = &ts; + s = db_->Get(read_opts, "a", &value); + ASSERT_TRUE(s.IsNotFound()); + + // Time-travel to the past before deletion + ts_str = Timestamp(2, 0); + ts = ts_str; + read_opts.timestamp = &ts; + s = db_->Get(read_opts, "a", &value); + ASSERT_OK(s); + ASSERT_EQ("value0", value); + Close(); +} + class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< @@ -536,6 +595,104 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) { Close(); } +TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + const int kNumKeysPerFile = 1024; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + BlockBasedTableOptions bbto; + bbto.filter_policy = std::get<0>(GetParam()); + bbto.whole_key_filtering = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + + const CompressionType comp_type = std::get<1>(GetParam()); +#if LZ4_VERSION_NUMBER < 10400 // r124+ + if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { + return; + } +#endif // LZ4_VERSION_NUMBER >= 10400 + if (!ZSTD_Supported() && comp_type == kZSTD) { + return; + } + if (!Zlib_Supported() && comp_type == kZlibCompression) { + return; + } + + options.compression = comp_type; + options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); + if (comp_type == kZSTD) { + options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); + } + options.compression_opts.parallel_threads = std::get<3>(GetParam()); + options.target_file_size_base = 1 << 26; // 64MB + + DestroyAndReopen(options); + + const size_t kNumL0Files = + static_cast(Options().level0_file_num_compaction_trigger); + { + // Generate enough L0 files with ts=1 to trigger compaction to L1 + std::string ts_str = Timestamp(1, 0); + Slice ts = ts_str; + WriteOptions wopts; + wopts.timestamp = &ts; + for (size_t i = 0; i != kNumL0Files; ++i) { + for (int j = 0; j != kNumKeysPerFile; ++j) { + ASSERT_OK(db_->Put(wopts, Key1(j), "value" + std::to_string(i))); + } + ASSERT_OK(db_->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // Generate another L0 at ts=3 + ts_str = Timestamp(3, 0); + ts = ts_str; + wopts.timestamp = &ts; + for (int i = 0; i != kNumKeysPerFile; ++i) { + std::string key_str = Key1(i); + Slice key(key_str); + if ((i % 3) == 0) { + ASSERT_OK(db_->Delete(wopts, key)); + } else { + ASSERT_OK(db_->Put(wopts, key, "new_value")); + } + } + ASSERT_OK(db_->Flush(FlushOptions())); + // Populate memtable at ts=5 + ts_str = Timestamp(5, 0); + ts = ts_str; + wopts.timestamp = &ts; + for (int i = 0; i != kNumKeysPerFile; ++i) { + std::string key_str = Key1(i); + Slice key(key_str); + if ((i % 3) == 1) { + ASSERT_OK(db_->Delete(wopts, key)); + } else if ((i % 3) == 2) { + ASSERT_OK(db_->Put(wopts, key, "new_value_2")); + } + } + } + { + std::string ts_str = Timestamp(6, 0); + Slice ts = ts_str; + ReadOptions ropts; + ropts.timestamp = &ts; + for (uint64_t i = 0; i != static_cast(kNumKeysPerFile); ++i) { + std::string value; + Status s = db_->Get(ropts, Key1(i), &value); + if ((i % 3) == 2) { + ASSERT_OK(s); + ASSERT_EQ("new_value_2", value); + } else { + ASSERT_TRUE(s.IsNotFound()); + } + } + } +} + #ifndef ROCKSDB_LITE // A class which remembers the name of each flushed file. class FlushedFileCollector : public EventListener { @@ -870,15 +1027,10 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { for (size_t i = 0; i != write_ts_list.size(); ++i) { Slice write_ts = write_ts_list[i]; write_opts.timestamp = &write_ts; - uint64_t key = kMinKey; - do { + for (uint64_t key = kMaxKey; key >= kMinKey; --key) { Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); ASSERT_OK(s); - if (key == kMaxKey) { - break; - } - ++key; - } while (true); + } } } const std::vector read_ts_list = {Timestamp(5, 0xffffffff), @@ -962,6 +1114,90 @@ INSTANTIATE_TEST_CASE_P( false))), ::testing::Bool())); +class DBBasicTestWithTsIterTombstones + : public DBBasicTestWithTimestampBase, + public testing::WithParamInterface< + std::tuple, + std::shared_ptr, int>> { + public: + DBBasicTestWithTsIterTombstones() + : DBBasicTestWithTimestampBase("/db_basic_ts_iter_tombstones") {} +}; + +TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) { + constexpr size_t kNumKeysPerFile = 128; + Options options = CurrentOptions(); + options.env = env_; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.prefix_extractor = std::get<0>(GetParam()); + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + BlockBasedTableOptions bbto; + bbto.filter_policy = std::get<1>(GetParam()); + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.num_levels = std::get<2>(GetParam()); + DestroyAndReopen(options); + std::vector write_ts_strs = {Timestamp(2, 0), Timestamp(4, 0)}; + constexpr uint64_t kMaxKey = 0xffffffffffffffff; + constexpr uint64_t kMinKey = 0xfffffffffffff000; + // Insert kMinKey...kMaxKey + uint64_t key = kMinKey; + WriteOptions write_opts; + Slice ts = write_ts_strs[0]; + write_opts.timestamp = &ts; + do { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key)); + ASSERT_OK(s); + if (kMaxKey == key) { + break; + } + ++key; + } while (true); + // Delete them all + ts = write_ts_strs[1]; + write_opts.timestamp = &ts; + for (key = kMaxKey; key >= kMinKey; --key) { + Status s; + if (0 != (key % 2)) { + s = db_->Put(write_opts, Key1(key), "value1" + std::to_string(key)); + } else { + s = db_->Delete(write_opts, Key1(key)); + } + ASSERT_OK(s); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + { + std::string read_ts = Timestamp(4, 0); + ts = read_ts; + ReadOptions read_opts; + read_opts.total_order_seek = true; + read_opts.timestamp = &ts; + std::unique_ptr iter(db_->NewIterator(read_opts)); + size_t count = 0; + key = kMinKey + 1; + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++count, key += 2) { + ASSERT_EQ(Key1(key), iter->key()); + ASSERT_EQ("value1" + std::to_string(key), iter->value()); + } + ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count); + } + Close(); +} + +INSTANTIATE_TEST_CASE_P( + Timestamp, DBBasicTestWithTsIterTombstones, + ::testing::Combine( + ::testing::Values( + std::shared_ptr(NewFixedPrefixTransform(7)), + std::shared_ptr(NewFixedPrefixTransform(8))), + ::testing::Values(std::shared_ptr(nullptr), + std::shared_ptr( + NewBloomFilterPolicy(10, false)), + std::shared_ptr( + NewBloomFilterPolicy(20, false))), + ::testing::Values(2, 6))); + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/dbformat.cc b/db/dbformat.cc index 46137feb7..c72388c31 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -23,7 +23,7 @@ namespace ROCKSDB_NAMESPACE { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -const ValueType kValueTypeForSeek = kTypeBlobIndex; +const ValueType kValueTypeForSeek = kTypeDeletionWithTimestamp; const ValueType kValueTypeForSeekForPrev = kTypeDeletion; uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { diff --git a/db/dbformat.h b/db/dbformat.h index e664fa9b2..dac6f3d46 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -69,7 +69,8 @@ enum ValueType : unsigned char { // generated by WriteUnprepared write policy is not mistakenly read by // another. kTypeBeginUnprepareXID = 0x13, // WAL only. - kMaxValue = 0x7F // Not used for storing records. + kTypeDeletionWithTimestamp = 0x14, + kMaxValue = 0x7F // Not used for storing records. }; // Defined in dbformat.cc @@ -79,7 +80,8 @@ extern const ValueType kValueTypeForSeekForPrev; // Checks whether a type is an inline value type // (i.e. a type used in memtable skiplist and sst file datablock). inline bool IsValueType(ValueType t) { - return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex; + return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex || + kTypeDeletionWithTimestamp == t; } // Checks whether a type is from user operation diff --git a/db/memtable.cc b/db/memtable.cc index 82b577ed8..32f8977df 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -724,6 +724,7 @@ static bool SaveValue(void* arg, const char* entry) { return false; } case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: case kTypeRangeDeletion: { if (*(s->merge_in_progress)) { diff --git a/db/write_batch.cc b/db/write_batch.cc index 77715a208..645bbd70a 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -908,7 +908,14 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, b->rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSlice(&b->rep_, key); + if (0 == b->timestamp_size_) { + PutLengthPrefixedSlice(&b->rep_, key); + } else { + PutVarint32(&b->rep_, + static_cast(key.size() + b->timestamp_size_)); + b->rep_.append(key.data(), key.size()); + b->rep_.append(b->timestamp_size_, '\0'); + } b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE, std::memory_order_relaxed); @@ -930,7 +937,11 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, b->rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSliceParts(&b->rep_, key); + if (0 == b->timestamp_size_) { + PutLengthPrefixedSliceParts(&b->rep_, key); + } else { + PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_); + } b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE, std::memory_order_relaxed); @@ -1546,7 +1557,14 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } - auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion); + ColumnFamilyData* cfd = cf_mems_->current(); + assert(!cfd || cfd->user_comparator()); + const size_t ts_sz = (cfd && cfd->user_comparator()) + ? cfd->user_comparator()->timestamp_size() + : 0; + const ValueType delete_type = + (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp; + auto ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type); // optimize for non-recovery mode if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { assert(!write_after_commit_); diff --git a/table/get_context.cc b/table/get_context.cc index 686a54d1f..ecd59220a 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -303,6 +303,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, return false; case kTypeDeletion: + case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: case kTypeRangeDeletion: // TODO(noetzli): Verify correctness once merge of single-deletes