From 7e78d7c54020d65c6beb54d0ef950df70b6446a3 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Thu, 9 Sep 2021 18:57:01 -0700 Subject: [PATCH] Support timestamps in SstFileWriter (#8899) Summary: As a first step of supporting user-defined timestamps with ingestion, the patch adds timestamp support to `SstFileWriter`; namely, it adds new versions of the `Put` and `Delete` APIs that take timestamps. (`Merge` and `DeleteRange` are currently not supported with user-defined timestamps in general but once those features are implemented, we can handle them in `SstFileWriter` in a similar fashion.) The new APIs validate the size of the timestamp provided by the client. Similarly, calls to the pre-existing timestamp-less APIs are now disallowed when user-defined timestamps are in use according to the comparator. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8899 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D30850699 Pulled By: ltamasi fbshipit-source-id: 779154373618f19b8f0797976bb7286783c57b67 --- include/rocksdb/sst_file_writer.h | 19 +++ table/block_based/block.cc | 4 +- table/sst_file_reader_test.cc | 218 ++++++++++++++++++++++++++++++ table/sst_file_writer.cc | 74 +++++++--- 4 files changed, 295 insertions(+), 20 deletions(-) diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h index 3bcc8a693..a6430eaa9 100644 --- a/include/rocksdb/sst_file_writer.h +++ b/include/rocksdb/sst_file_writer.h @@ -112,21 +112,40 @@ class SstFileWriter { // Add a Put key with value to currently opened file (deprecated) // REQUIRES: key is after any previously added key according to comparator. + // REQUIRES: comparator is *not* timestamp-aware. ROCKSDB_DEPRECATED_FUNC Status Add(const Slice& user_key, const Slice& value); // Add a Put key with value to currently opened file // REQUIRES: key is after any previously added key according to comparator. + // REQUIRES: comparator is *not* timestamp-aware. Status Put(const Slice& user_key, const Slice& value); + // Add a Put (key with timestamp, value) to the currently opened file + // REQUIRES: key is after any previously added key according to the + // comparator. + // REQUIRES: the timestamp's size is equal to what is expected by + // the comparator. + Status Put(const Slice& user_key, const Slice& timestamp, const Slice& value); + // Add a Merge key with value to currently opened file // REQUIRES: key is after any previously added key according to comparator. + // REQUIRES: comparator is *not* timestamp-aware. Status Merge(const Slice& user_key, const Slice& value); // Add a deletion key to currently opened file // REQUIRES: key is after any previously added key according to comparator. + // REQUIRES: comparator is *not* timestamp-aware. Status Delete(const Slice& user_key); + // Add a deletion key with timestamp to the currently opened file + // REQUIRES: key is after any previously added key according to the + // comparator. + // REQUIRES: the timestamp's size is equal to what is expected by + // the comparator. + Status Delete(const Slice& user_key, const Slice& timestamp); + // Add a range deletion tombstone to currently opened file + // REQUIRES: comparator is *not* timestamp-aware. Status DeleteRange(const Slice& begin_key, const Slice& end_key); // Finalize writing to sst file and close file. diff --git a/table/block_based/block.cc b/table/block_based/block.cc index 2d32ebcb4..9c2a60844 100644 --- a/table/block_based/block.cc +++ b/table/block_based/block.cc @@ -522,7 +522,8 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) { if (global_seqno_ != kDisableGlobalSequenceNumber) { // If we are reading a file with a global sequence number we should // expect that all encoded sequence numbers are zeros and any value - // type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion. + // type is kTypeValue, kTypeMerge, kTypeDeletion, + // kTypeDeletionWithTimestamp, or kTypeRangeDeletion. uint64_t packed = ExtractInternalKeyFooter(raw_key_.GetKey()); SequenceNumber seqno; ValueType value_type; @@ -530,6 +531,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) { assert(value_type == ValueType::kTypeValue || value_type == ValueType::kTypeMerge || value_type == ValueType::kTypeDeletion || + value_type == ValueType::kTypeDeletionWithTimestamp || value_type == ValueType::kTypeRangeDeletion); assert(seqno == 0); } diff --git a/table/sst_file_reader_test.cc b/table/sst_file_reader_test.cc index 52cab2ab3..d1394b938 100644 --- a/table/sst_file_reader_test.cc +++ b/table/sst_file_reader_test.cc @@ -195,6 +195,224 @@ TEST_F(SstFileReaderTest, ReadFileWithGlobalSeqno) { ASSERT_OK(DestroyDB(db_name, options)); } +TEST_F(SstFileReaderTest, TimestampSizeMismatch) { + SstFileWriter writer(soptions_, options_); + + ASSERT_OK(writer.Open(sst_name_)); + + // Comparator is not timestamp-aware; calls to APIs taking timestamps should + // fail. + ASSERT_NOK(writer.Put("key", EncodeAsUint64(100), "value")); + ASSERT_NOK(writer.Delete("another_key", EncodeAsUint64(200))); +} + +class SstFileReaderTimestampTest : public testing::Test { + public: + SstFileReaderTimestampTest() { + Env* env = Env::Default(); + EXPECT_OK(test::CreateEnvFromSystem(ConfigOptions(), &env, &env_guard_)); + EXPECT_NE(nullptr, env); + + options_.env = env; + + options_.comparator = test::ComparatorWithU64Ts(); + + sst_name_ = test::PerThreadDBPath("sst_file_ts"); + } + + ~SstFileReaderTimestampTest() { + EXPECT_OK(options_.env->DeleteFile(sst_name_)); + } + + struct KeyValueDesc { + KeyValueDesc(std::string k, std::string ts, std::string v) + : key(std::move(k)), timestamp(std::move(ts)), value(std::move(v)) {} + + std::string key; + std::string timestamp; + std::string value; + }; + + struct InputKeyValueDesc : public KeyValueDesc { + InputKeyValueDesc(std::string k, std::string ts, std::string v, bool is_del, + bool use_contig_buf) + : KeyValueDesc(std::move(k), std::move(ts), std::move(v)), + is_delete(is_del), + use_contiguous_buffer(use_contig_buf) {} + + bool is_delete = false; + bool use_contiguous_buffer = false; + }; + + struct OutputKeyValueDesc : public KeyValueDesc { + OutputKeyValueDesc(std::string k, std::string ts, std::string v) + : KeyValueDesc(std::move(k), std::string(ts), std::string(v)) {} + }; + + void CreateFile(const std::vector& descs) { + SstFileWriter writer(soptions_, options_); + + ASSERT_OK(writer.Open(sst_name_)); + + for (const auto& desc : descs) { + if (desc.is_delete) { + if (desc.use_contiguous_buffer) { + std::string key_with_ts(desc.key + desc.timestamp); + ASSERT_OK(writer.Delete(Slice(key_with_ts.data(), desc.key.size()), + Slice(key_with_ts.data() + desc.key.size(), + desc.timestamp.size()))); + } else { + ASSERT_OK(writer.Delete(desc.key, desc.timestamp)); + } + } else { + if (desc.use_contiguous_buffer) { + std::string key_with_ts(desc.key + desc.timestamp); + ASSERT_OK(writer.Put(Slice(key_with_ts.data(), desc.key.size()), + Slice(key_with_ts.data() + desc.key.size(), + desc.timestamp.size()), + desc.value)); + } else { + ASSERT_OK(writer.Put(desc.key, desc.timestamp, desc.value)); + } + } + } + + ASSERT_OK(writer.Finish()); + } + + void CheckFile(const std::string& timestamp, + const std::vector& descs) { + SstFileReader reader(options_); + + ASSERT_OK(reader.Open(sst_name_)); + ASSERT_OK(reader.VerifyChecksum()); + + Slice ts_slice(timestamp); + + ReadOptions read_options; + read_options.timestamp = &ts_slice; + + std::unique_ptr iter(reader.NewIterator(read_options)); + iter->SeekToFirst(); + + for (const auto& desc : descs) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key(), desc.key); + ASSERT_EQ(iter->timestamp(), desc.timestamp); + ASSERT_EQ(iter->value(), desc.value); + + iter->Next(); + } + + ASSERT_FALSE(iter->Valid()); + } + + protected: + std::shared_ptr env_guard_; + Options options_; + EnvOptions soptions_; + std::string sst_name_; +}; + +TEST_F(SstFileReaderTimestampTest, Basic) { + std::vector input_descs; + + for (uint64_t k = 0; k < kNumKeys; k += 4) { + // A Put with key k, timestamp k that gets overwritten by a subsequent Put + // with timestamp (k + 1). Note that the comparator uses descending order + // for the timestamp part, so we add the later Put first. + input_descs.emplace_back( + /* key */ EncodeAsString(k), /* timestamp */ EncodeAsUint64(k + 1), + /* value */ EncodeAsString(k * 2), /* is_delete */ false, + /* use_contiguous_buffer */ false); + input_descs.emplace_back( + /* key */ EncodeAsString(k), /* timestamp */ EncodeAsUint64(k), + /* value */ EncodeAsString(k * 3), /* is_delete */ false, + /* use_contiguous_buffer */ true); + + // A Put with key (k + 2), timestamp (k + 2) that gets cancelled out by a + // Delete with timestamp (k + 3). Note that the comparator uses descending + // order for the timestamp part, so we add the Delete first. + input_descs.emplace_back(/* key */ EncodeAsString(k + 2), + /* timestamp */ EncodeAsUint64(k + 3), + /* value */ std::string(), /* is_delete */ true, + /* use_contiguous_buffer */ (k % 8) == 0); + input_descs.emplace_back( + /* key */ EncodeAsString(k + 2), /* timestamp */ EncodeAsUint64(k + 2), + /* value */ EncodeAsString(k * 5), /* is_delete */ false, + /* use_contiguous_buffer */ (k % 8) != 0); + } + + CreateFile(input_descs); + + // Note: below, we check the results as of each timestamp in the range, + // updating the expected result as needed. + std::vector output_descs; + + for (uint64_t ts = 0; ts < kNumKeys; ++ts) { + const uint64_t k = ts - (ts % 4); + + switch (ts % 4) { + case 0: // Initial Put for key k + output_descs.emplace_back(/* key */ EncodeAsString(k), + /* timestamp */ EncodeAsUint64(ts), + /* value */ EncodeAsString(k * 3)); + break; + + case 1: // Second Put for key k + assert(output_descs.back().key == EncodeAsString(k)); + assert(output_descs.back().timestamp == EncodeAsUint64(ts - 1)); + assert(output_descs.back().value == EncodeAsString(k * 3)); + output_descs.back().timestamp = EncodeAsUint64(ts); + output_descs.back().value = EncodeAsString(k * 2); + break; + + case 2: // Put for key (k + 2) + output_descs.emplace_back(/* key */ EncodeAsString(k + 2), + /* timestamp */ EncodeAsUint64(ts), + /* value */ EncodeAsString(k * 5)); + break; + + case 3: // Delete for key (k + 2) + assert(output_descs.back().key == EncodeAsString(k + 2)); + assert(output_descs.back().timestamp == EncodeAsUint64(ts - 1)); + assert(output_descs.back().value == EncodeAsString(k * 5)); + output_descs.pop_back(); + break; + } + + CheckFile(EncodeAsUint64(ts), output_descs); + } +} + +TEST_F(SstFileReaderTimestampTest, TimestampsOutOfOrder) { + SstFileWriter writer(soptions_, options_); + + ASSERT_OK(writer.Open(sst_name_)); + + // Note: KVs that have the same user key disregarding timestamps should be in + // descending order of timestamps. + ASSERT_OK(writer.Put("key", EncodeAsUint64(1), "value1")); + ASSERT_NOK(writer.Put("key", EncodeAsUint64(2), "value2")); +} + +TEST_F(SstFileReaderTimestampTest, TimestampSizeMismatch) { + SstFileWriter writer(soptions_, options_); + + ASSERT_OK(writer.Open(sst_name_)); + + // Comparator expects 64-bit timestamps; timestamps with other sizes as well + // as calls to the timestamp-less APIs should be rejected. + ASSERT_NOK(writer.Put("key", "not_an_actual_64_bit_timestamp", "value")); + ASSERT_NOK(writer.Delete("another_key", "timestamp_of_unexpected_size")); + + ASSERT_NOK(writer.Put("key_without_timestamp", "value")); + ASSERT_NOK(writer.Merge("another_key_missing_a_timestamp", "merge_operand")); + ASSERT_NOK(writer.Delete("yet_another_key_still_no_timestamp")); + ASSERT_NOK(writer.DeleteRange("begin_key_timestamp_absent", + "end_key_with_a_complete_lack_of_timestamps")); +} + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index fa367e69b..5a0574776 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -63,8 +63,8 @@ struct SstFileWriter::Rep { std::string db_session_id; uint64_t next_file_number = 1; - Status Add(const Slice& user_key, const Slice& value, - const ValueType value_type) { + Status AddImpl(const Slice& user_key, const Slice& value, + ValueType value_type) { if (!builder) { return Status::InvalidArgument("File is not opened"); } @@ -80,23 +80,14 @@ struct SstFileWriter::Rep { } } - // TODO(tec) : For external SST files we could omit the seqno and type. - switch (value_type) { - case ValueType::kTypeValue: - ikey.Set(user_key, 0 /* Sequence Number */, - ValueType::kTypeValue /* Put */); - break; - case ValueType::kTypeMerge: - ikey.Set(user_key, 0 /* Sequence Number */, - ValueType::kTypeMerge /* Merge */); - break; - case ValueType::kTypeDeletion: - ikey.Set(user_key, 0 /* Sequence Number */, - ValueType::kTypeDeletion /* Delete */); - break; - default: - return Status::InvalidArgument("Value type is not supported"); - } + assert(value_type == kTypeValue || value_type == kTypeMerge || + value_type == kTypeDeletion || + value_type == kTypeDeletionWithTimestamp); + + constexpr SequenceNumber sequence_number = 0; + + ikey.Set(user_key, sequence_number, value_type); + builder->Add(ikey.Encode(), value); // update file info @@ -108,7 +99,42 @@ struct SstFileWriter::Rep { return Status::OK(); } + Status Add(const Slice& user_key, const Slice& value, ValueType value_type) { + if (internal_comparator.timestamp_size() != 0) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + + return AddImpl(user_key, value, value_type); + } + + Status Add(const Slice& user_key, const Slice& timestamp, const Slice& value, + ValueType value_type) { + const size_t timestamp_size = timestamp.size(); + + if (internal_comparator.timestamp_size() != timestamp_size) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + + const size_t user_key_size = user_key.size(); + + if (user_key.data() + user_key_size == timestamp.data()) { + Slice user_key_with_ts(user_key.data(), user_key_size + timestamp_size); + return AddImpl(user_key_with_ts, value, value_type); + } + + std::string user_key_with_ts; + user_key_with_ts.reserve(user_key_size + timestamp_size); + user_key_with_ts.append(user_key.data(), user_key_size); + user_key_with_ts.append(timestamp.data(), timestamp_size); + + return AddImpl(user_key_with_ts, value, value_type); + } + Status DeleteRange(const Slice& begin_key, const Slice& end_key) { + if (internal_comparator.timestamp_size() != 0) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + if (!builder) { return Status::InvalidArgument("File is not opened"); } @@ -294,6 +320,11 @@ Status SstFileWriter::Put(const Slice& user_key, const Slice& value) { return rep_->Add(user_key, value, ValueType::kTypeValue); } +Status SstFileWriter::Put(const Slice& user_key, const Slice& timestamp, + const Slice& value) { + return rep_->Add(user_key, timestamp, value, ValueType::kTypeValue); +} + Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) { return rep_->Add(user_key, value, ValueType::kTypeMerge); } @@ -302,6 +333,11 @@ Status SstFileWriter::Delete(const Slice& user_key) { return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion); } +Status SstFileWriter::Delete(const Slice& user_key, const Slice& timestamp) { + return rep_->Add(user_key, timestamp, Slice(), + ValueType::kTypeDeletionWithTimestamp); +} + Status SstFileWriter::DeleteRange(const Slice& begin_key, const Slice& end_key) { return rep_->DeleteRange(begin_key, end_key);