diff --git a/HISTORY.md b/HISTORY.md index 180e8c435..b10c72958 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### Bug Fixes * Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes. +* Fixed an issue in `Get` and `MultiGet` when user-defined timestamps is enabled in combination with BlobDB. ### Feature Removal * Remove RocksDB Lite. diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 255e4552e..60d9fbdcd 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -11,6 +11,7 @@ #include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" #include "db/db_test_util.h" +#include "db/db_with_timestamp_test_util.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" #include "utilities/fault_injection_env.h" @@ -1771,6 +1772,190 @@ TEST_F(DBBlobBasicTest, WarmCacheWithBlobsSecondary) { 1); } +class DBBlobWithTimestampTest : public DBBasicTestWithTimestampBase { + protected: + DBBlobWithTimestampTest() + : DBBasicTestWithTimestampBase("db_blob_with_timestamp_test") {} +}; + +TEST_F(DBBlobWithTimestampTest, GetBlob) { + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.enable_blob_files = true; + options.min_blob_size = 0; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + DestroyAndReopen(options); + WriteOptions write_opts; + const std::string ts = Timestamp(1, 0); + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(db_->Put(write_opts, key, ts, blob_value)); + + ASSERT_OK(Flush()); + + const std::string read_ts = Timestamp(2, 0); + Slice read_ts_slice(read_ts); + ReadOptions read_opts; + read_opts.timestamp = &read_ts_slice; + std::string value; + ASSERT_OK(db_->Get(read_opts, key, &value)); + ASSERT_EQ(value, blob_value); +} + +TEST_F(DBBlobWithTimestampTest, MultiGetBlobs) { + constexpr size_t min_blob_size = 6; + + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + DestroyAndReopen(options); + + // Put then retrieve three key-values. The first value is below the size limit + // and is thus stored inline; the other two are stored separately as blobs. + constexpr size_t num_keys = 3; + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "short"; + static_assert(sizeof(first_value) - 1 < min_blob_size, + "first_value too long to be inlined"); + + DestroyAndReopen(options); + WriteOptions write_opts; + const std::string ts = Timestamp(1, 0); + ASSERT_OK(db_->Put(write_opts, first_key, ts, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "long_value"; + static_assert(sizeof(second_value) - 1 >= min_blob_size, + "second_value too short to be stored as blob"); + + ASSERT_OK(db_->Put(write_opts, second_key, ts, second_value)); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "other_long_value"; + static_assert(sizeof(third_value) - 1 >= min_blob_size, + "third_value too short to be stored as blob"); + + ASSERT_OK(db_->Put(write_opts, third_key, ts, third_value)); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + const std::string read_ts = Timestamp(2, 0); + Slice read_ts_slice(read_ts); + read_options.timestamp = &read_ts_slice; + std::array keys{{first_key, second_key, third_key}}; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } +} + +TEST_F(DBBlobWithTimestampTest, GetMergeBlobWithPut) { + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + DestroyAndReopen(options); + + WriteOptions write_opts; + const std::string ts = Timestamp(1, 0); + ASSERT_OK(db_->Put(write_opts, "Key1", ts, "v1")); + ASSERT_OK(Flush()); + ASSERT_OK( + db_->Merge(write_opts, db_->DefaultColumnFamily(), "Key1", ts, "v2")); + ASSERT_OK(Flush()); + ASSERT_OK( + db_->Merge(write_opts, db_->DefaultColumnFamily(), "Key1", ts, "v3")); + ASSERT_OK(Flush()); + + std::string value; + const std::string read_ts = Timestamp(2, 0); + Slice read_ts_slice(read_ts); + ReadOptions read_opts; + read_opts.timestamp = &read_ts_slice; + ASSERT_OK(db_->Get(read_opts, "Key1", &value)); + ASSERT_EQ(value, "v1,v2,v3"); +} + +TEST_F(DBBlobWithTimestampTest, MultiGetMergeBlobWithPut) { + constexpr size_t num_keys = 3; + + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + options.enable_blob_files = true; + options.min_blob_size = 0; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + DestroyAndReopen(options); + + WriteOptions write_opts; + const std::string ts = Timestamp(1, 0); + + ASSERT_OK(db_->Put(write_opts, "Key0", ts, "v0_0")); + ASSERT_OK(db_->Put(write_opts, "Key1", ts, "v1_0")); + ASSERT_OK(db_->Put(write_opts, "Key2", ts, "v2_0")); + ASSERT_OK(Flush()); + ASSERT_OK( + db_->Merge(write_opts, db_->DefaultColumnFamily(), "Key0", ts, "v0_1")); + ASSERT_OK( + db_->Merge(write_opts, db_->DefaultColumnFamily(), "Key1", ts, "v1_1")); + ASSERT_OK(Flush()); + ASSERT_OK( + db_->Merge(write_opts, db_->DefaultColumnFamily(), "Key0", ts, "v0_2")); + ASSERT_OK(Flush()); + + const std::string read_ts = Timestamp(2, 0); + Slice read_ts_slice(read_ts); + ReadOptions read_opts; + read_opts.timestamp = &read_ts_slice; + std::array keys{{"Key0", "Key1", "Key2"}}; + std::array values; + std::array statuses; + + db_->MultiGet(read_opts, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], "v0_0,v0_1,v0_2"); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], "v1_0,v1_1"); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], "v2_0"); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 7bdd02650..81dbf1944 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2194,8 +2194,9 @@ void Version::MultiGetBlob( key_context.value->Reset(); blob_reqs_in_file.emplace_back( - key_context.ukey_with_ts, blob_index.offset(), blob_index.size(), - blob_index.compression(), key_context.value, key_context.s); + key_context.get_context->ukey_to_get_blob_value(), + blob_index.offset(), blob_index.size(), blob_index.compression(), + key_context.value, key_context.s); } if (blob_reqs_in_file.size() > 0) { const auto file_size = blob_file_meta->GetBlobFileSize(); @@ -2204,7 +2205,8 @@ void Version::MultiGetBlob( } if (blob_reqs.size() > 0) { - blob_source_->MultiGetBlob(read_options, blob_reqs, /*bytes_read=*/nullptr); + blob_source_->MultiGetBlob(read_options, blob_reqs, + /*bytes_read=*/nullptr); } for (auto& ctx : blob_ctxs) { @@ -2345,8 +2347,9 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; - *status = GetBlob(read_options, user_key, *value, prefetch_buffer, - value, bytes_read); + *status = + GetBlob(read_options, get_context.ukey_to_get_blob_value(), + *value, prefetch_buffer, value, bytes_read); if (!status->ok()) { if (status->IsIncomplete()) { get_context.MarkKeyMayExist(); diff --git a/table/get_context.cc b/table/get_context.cc index d4b2e35d2..f6acb17a9 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -300,6 +300,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, if (kNotFound == state_) { state_ = kFound; if (do_merge_) { + if (type == kTypeBlobIndex && ucmp_->timestamp_size() != 0) { + ukey_with_ts_found_.PinSelf(parsed_key.user_key); + } if (LIKELY(pinnable_val_ != nullptr)) { Slice value_to_use = value; @@ -339,7 +342,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, // merge_context_->operand_list if (type == kTypeBlobIndex) { PinnableSlice pin_val; - if (GetBlobValue(value, &pin_val) == false) { + if (GetBlobValue(parsed_key.user_key, value, &pin_val) == false) { return false; } Slice blob_value(pin_val); @@ -365,7 +368,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(merge_operator_ != nullptr); if (type == kTypeBlobIndex) { PinnableSlice pin_val; - if (GetBlobValue(value, &pin_val) == false) { + if (GetBlobValue(parsed_key.user_key, value, &pin_val) == false) { return false; } Slice blob_value(pin_val); @@ -545,13 +548,13 @@ void GetContext::MergeWithEntity(Slice entity) { } } -bool GetContext::GetBlobValue(const Slice& blob_index, +bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index, PinnableSlice* blob_value) { constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; constexpr uint64_t* bytes_read = nullptr; Status status = blob_fetcher_->FetchBlob( - user_key_, blob_index, prefetch_buffer, blob_value, bytes_read); + user_key, blob_index, prefetch_buffer, blob_value, bytes_read); if (!status.ok()) { if (status.IsIncomplete()) { // FIXME: this code is not covered by unit tests diff --git a/table/get_context.h b/table/get_context.h index dcc7ab8d6..10371529b 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -177,6 +177,14 @@ class GetContext { bool has_callback() const { return callback_ != nullptr; } + const Slice& ukey_to_get_blob_value() const { + if (!ukey_with_ts_found_.empty()) { + return ukey_with_ts_found_; + } else { + return user_key_; + } + } + uint64_t get_tracing_get_id() const { return tracing_get_id_; } void push_operand(const Slice& value, Cleanable* value_pinner); @@ -184,7 +192,8 @@ class GetContext { private: void Merge(const Slice* value); void MergeWithEntity(Slice entity); - bool GetBlobValue(const Slice& blob_index, PinnableSlice* blob_value); + bool GetBlobValue(const Slice& user_key, const Slice& blob_index, + PinnableSlice* blob_value); const Comparator* ucmp_; const MergeOperator* merge_operator_; @@ -194,6 +203,10 @@ class GetContext { GetState state_; Slice user_key_; + // When a blob index is found with the user key containing timestamp, + // this copies the corresponding user key on record in the sst file + // and is later used for blob verification. + PinnableSlice ukey_with_ts_found_; PinnableSlice* pinnable_val_; PinnableWideColumns* columns_; std::string* timestamp_;