From 24bcab7d5d898253f3d514d3e3a180f5cefaed95 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 8 Aug 2022 16:10:08 -0700 Subject: [PATCH] Make queries return the value of the default column for wide-column entities (#10483) Summary: The patch adds support for wide-column entities to the existing query APIs (`Get`, `MultiGet`, and iterator). Namely, when during a query a wide-column entity is encountered, we will return the value of the default (anonymous) column as the result. Later, we plan to add wide-column specific query APIs which will enable retrieving entire wide-column entities or a subset of their columns. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10483 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D38441881 Pulled By: ltamasi fbshipit-source-id: 6444e79a31aff2470e866698e3a97985bc2b3543 --- db/db_iter.cc | 52 +++++++- db/db_iter.h | 10 ++ db/memtable.cc | 19 +-- db/wide/db_wide_basic_test.cc | 179 ++++++++++++++++++++------- db/wide/wide_column_serialization.cc | 21 ++++ db/wide/wide_column_serialization.h | 1 + include/rocksdb/wide_columns.h | 2 + table/get_context.cc | 22 +++- 8 files changed, 246 insertions(+), 60 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index 371c20d32..4f5c5b08b 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -17,6 +17,7 @@ #include "db/merge_context.h" #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" +#include "db/wide/wide_column_serialization.h" #include "file/filename.h" #include "logging/logging.h" #include "memory/arena.h" @@ -75,6 +76,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, verify_checksums_(read_options.verify_checksums), expose_blob_index_(expose_blob_index), is_blob_(false), + is_wide_(false), arena_mode_(arena_mode), range_del_agg_(&ioptions.internal_comparator, s), db_impl_(db_impl), @@ -132,6 +134,7 @@ void DBIter::Next() { PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, clock_); // Release temporarily pinned blocks from last operation ReleaseTempPinnedData(); + ResetWideColumnValue(); local_stats_.skip_count_ += num_internal_keys_skipped_; local_stats_.skip_count_--; num_internal_keys_skipped_ = 0; @@ -174,6 +177,8 @@ void DBIter::Next() { bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index) { assert(!is_blob_); + assert(!is_wide_); + assert(value_of_default_column_.empty()); if (expose_blob_index_) { // Stacked BlobDB implementation is_blob_ = true; @@ -209,13 +214,24 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, return true; } -bool DBIter::SetWideColumnValueIfNeeded(const Slice& /* wide_columns_slice */) { +bool DBIter::SetWideColumnValueIfNeeded(const Slice& wide_columns_slice) { assert(!is_blob_); + assert(!is_wide_); + assert(value_of_default_column_.empty()); - // TODO: support wide-column entities - status_ = Status::NotSupported("Encountered unexpected wide-column entity"); - valid_ = false; - return false; + Slice wide_columns_copy = wide_columns_slice; + + const Status s = WideColumnSerialization::GetValueOfDefaultColumn( + wide_columns_copy, value_of_default_column_); + + if (!s.ok()) { + status_ = s; + valid_ = false; + return false; + } + + is_wide_ = true; + return true; } // PRE: saved_key_ has the current user key if skipping_saved_key @@ -265,6 +281,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, bool reseek_done = false; is_blob_ = false; + assert(!is_wide_); + assert(value_of_default_column_.empty()); do { // Will update is_key_seqnum_zero_ as soon as we parsed the current key @@ -592,7 +610,11 @@ bool DBIter::MergeValuesNewToOld() { if (!s.ok()) { return false; } + is_blob_ = false; + assert(!is_wide_); + assert(value_of_default_column_.empty()); + // iter_ is positioned after put iter_.Next(); if (!iter_.status().ok()) { @@ -638,6 +660,7 @@ void DBIter::Prev() { PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, clock_); ReleaseTempPinnedData(); + ResetWideColumnValue(); ResetInternalKeysSkippedCounter(); bool ok = true; if (direction_ == kForward) { @@ -968,6 +991,9 @@ bool DBIter::FindValueForCurrentKey() { Status s; s.PermitUncheckedError(); is_blob_ = false; + assert(!is_wide_); + assert(value_of_default_column_.empty()); + switch (last_key_entry_type) { case kTypeDeletion: case kTypeDeletionWithTimestamp: @@ -1006,7 +1032,11 @@ bool DBIter::FindValueForCurrentKey() { if (!s.ok()) { return false; } + is_blob_ = false; + assert(!is_wide_); + assert(value_of_default_column_.empty()); + return true; } else if (last_not_merge_type == kTypeWideColumnEntity) { // TODO: support wide-column entities @@ -1082,6 +1112,9 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // Find the next value that's visible. ParsedInternalKey ikey; is_blob_ = false; + assert(!is_wide_); + assert(value_of_default_column_.empty()); + while (true) { if (!iter_.Valid()) { valid_ = false; @@ -1216,7 +1249,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (!s.ok()) { return false; } + is_blob_ = false; + assert(!is_wide_); + assert(value_of_default_column_.empty()); + return true; } else if (ikey.type == kTypeWideColumnEntity) { // TODO: support wide-column entities @@ -1441,6 +1478,7 @@ void DBIter::Seek(const Slice& target) { status_ = Status::OK(); ReleaseTempPinnedData(); + ResetWideColumnValue(); ResetInternalKeysSkippedCounter(); // Seek the inner iterator based on the target key. @@ -1517,6 +1555,7 @@ void DBIter::SeekForPrev(const Slice& target) { status_ = Status::OK(); ReleaseTempPinnedData(); + ResetWideColumnValue(); ResetInternalKeysSkippedCounter(); // Seek the inner iterator based on the target key. @@ -1574,6 +1613,7 @@ void DBIter::SeekToFirst() { status_ = Status::OK(); direction_ = kForward; ReleaseTempPinnedData(); + ResetWideColumnValue(); ResetInternalKeysSkippedCounter(); ClearSavedValue(); is_key_seqnum_zero_ = false; @@ -1621,6 +1661,7 @@ void DBIter::SeekToLast() { *iterate_upper_bound_, /*a_has_ts=*/false, k, /*b_has_ts=*/false)) { ReleaseTempPinnedData(); + ResetWideColumnValue(); PrevInternal(nullptr); k = key(); @@ -1640,6 +1681,7 @@ void DBIter::SeekToLast() { status_ = Status::OK(); direction_ = kReverse; ReleaseTempPinnedData(); + ResetWideColumnValue(); ResetInternalKeysSkippedCounter(); ClearSavedValue(); is_key_seqnum_zero_ = false; diff --git a/db/db_iter.h b/db/db_iter.h index 1847060dd..5e0ff2031 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -160,9 +160,12 @@ class DBIter final : public Iterator { } Slice value() const override { assert(valid_); + assert(!is_blob_ || !is_wide_); if (!expose_blob_index_ && is_blob_) { return blob_value_; + } else if (is_wide_) { + return value_of_default_column_; } else if (current_entry_is_merged_) { // If pinned_value_ is set then the result of merge operator is one of // the merge operands and we should return it. @@ -302,6 +305,11 @@ class DBIter final : public Iterator { bool SetWideColumnValueIfNeeded(const Slice& wide_columns_slice); + void ResetWideColumnValue() { + is_wide_ = false; + value_of_default_column_.clear(); + } + Status Merge(const Slice* val, const Slice& user_key); const SliceTransform* prefix_extractor_; @@ -326,6 +334,7 @@ class DBIter final : public Iterator { Slice pinned_value_; // for prefix seek mode to support prev() PinnableSlice blob_value_; + Slice value_of_default_column_; Statistics* statistics_; uint64_t max_skip_; uint64_t max_skippable_internal_keys_; @@ -362,6 +371,7 @@ class DBIter final : public Iterator { // the stacked BlobDB implementation is used, false otherwise. bool expose_blob_index_; bool is_blob_; + bool is_wide_; bool arena_mode_; // List of operands for merge operator. MergeContext merge_context_; diff --git a/db/memtable.cc b/db/memtable.cc index 645dd065f..3998df837 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -21,6 +21,7 @@ #include "db/pinned_iterators_manager.h" #include "db/range_tombstone_fragmenter.h" #include "db/read_callback.h" +#include "db/wide/wide_column_serialization.h" #include "logging/logging.h" #include "memory/arena.h" #include "memory/memory_usage.h" @@ -786,12 +787,6 @@ static bool SaveValue(void* arg, const char* entry) { "Encounter unsupported blob value. Please open DB with " "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); } - } else { - assert(type == kTypeWideColumnEntity); - - // TODO: support wide-column entities - *(s->status) = - Status::NotSupported("Encountered unexpected wide-column entity"); } if (!s->status->ok()) { @@ -825,7 +820,17 @@ static bool SaveValue(void* arg, const char* entry) { merge_context->PushOperand( v, s->inplace_update_support == false /* operand_pinned */); } else if (s->value != nullptr) { - s->value->assign(v.data(), v.size()); + if (type != kTypeWideColumnEntity) { + assert(type == kTypeValue || type == kTypeBlobIndex); + s->value->assign(v.data(), v.size()); + } else { + Slice value; + *(s->status) = + WideColumnSerialization::GetValueOfDefaultColumn(v, value); + if (s->status->ok()) { + s->value->assign(value.data(), value.size()); + } + } } if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadUnlock(); diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc index 89fe5c237..2c5e8f424 100644 --- a/db/wide/db_wide_basic_test.cc +++ b/db/wide/db_wide_basic_test.cc @@ -22,15 +22,90 @@ class DBWideBasicTest : public DBTestBase { TEST_F(DBWideBasicTest, PutEntity) { Options options = GetDefaultOptions(); - // Use the DB::PutEntity API constexpr char first_key[] = "first"; - WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + constexpr char second_key[] = "second"; + + constexpr char first_value_of_default_column[] = "hello"; + + auto verify = [&]() { + { + PinnableSlice result; + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key, + &result)); + ASSERT_EQ(result, first_value_of_default_column); + } + + { + PinnableSlice result; + ASSERT_OK(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), second_key, + &result)); + ASSERT_TRUE(result.empty()); + } + + { + constexpr size_t num_keys = 2; + + std::array keys{{first_key, second_key}}; + std::array values; + std::array statuses; + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value_of_default_column); + + ASSERT_OK(statuses[1]); + ASSERT_TRUE(values[1].empty()); + } + + { + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_value_of_default_column); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_TRUE(iter->value().empty()); + + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + + iter->SeekToLast(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), second_key); + ASSERT_TRUE(iter->value().empty()); + + iter->Prev(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), first_key); + ASSERT_EQ(iter->value(), first_value_of_default_column); + + iter->Prev(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + // Use the DB::PutEntity API + WideColumns first_columns{ + {kDefaultWideColumnName, first_value_of_default_column}, + {"attr_name1", "foo"}, + {"attr_name2", "bar"}}; ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), first_key, first_columns)); // Use WriteBatch - constexpr char second_key[] = "second"; WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; WriteBatch batch; @@ -38,7 +113,51 @@ TEST_F(DBWideBasicTest, PutEntity) { batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); - // Note: currently, read APIs are supposed to return NotSupported + // Try reading from memtable + verify(); + + // Try reading after recovery + Close(); + options.avoid_flush_during_recovery = true; + Reopen(options); + + verify(); + + // Try reading from storage + ASSERT_OK(Flush()); + + verify(); +} + +TEST_F(DBWideBasicTest, PutEntityColumnFamily) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"corinthian"}, options); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + + ASSERT_OK( + db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns)); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + + WriteBatch batch; + ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); +} + +TEST_F(DBWideBasicTest, PutEntityMergeNotSupported) { + Options options = GetDefaultOptions(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + Reopen(options); + + constexpr char first_key[] = "first"; + constexpr char second_key[] = "second"; + + // Note: Merge is currently not supported for wide-column entities auto verify = [&]() { { PinnableSlice result; @@ -84,26 +203,23 @@ TEST_F(DBWideBasicTest, PutEntity) { } }; - // Try reading from memtable - verify(); + // Use the DB::PutEntity API + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; - // Try reading after recovery - Close(); - options.avoid_flush_during_recovery = true; - Reopen(options); + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); - verify(); + // Use WriteBatch + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; - // Try reading from storage - ASSERT_OK(Flush()); + WriteBatch batch; + ASSERT_OK( + batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); - verify(); + ASSERT_OK(Flush()); // Add a couple of merge operands - Close(); - options.merge_operator = MergeOperators::CreateStringAppendOperator(); - Reopen(options); - constexpr char merge_operand[] = "bla"; ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, @@ -111,15 +227,15 @@ TEST_F(DBWideBasicTest, PutEntity) { ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, merge_operand)); - // Try reading from memtable + // Try reading when PutEntity is in storage, Merge is in memtable verify(); - // Try reading from storage + // Try reading when PutEntity and Merge are both in storage ASSERT_OK(Flush()); verify(); - // Do it again, with the Put and the Merge in the same memtable + // Try reading when PutEntity and Merge are both in memtable ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), first_key, first_columns)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); @@ -128,30 +244,9 @@ TEST_F(DBWideBasicTest, PutEntity) { ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, merge_operand)); - // Try reading from memtable verify(); } -TEST_F(DBWideBasicTest, PutEntityColumnFamily) { - Options options = GetDefaultOptions(); - CreateAndReopenWithCF({"corinthian"}, options); - - // Use the DB::PutEntity API - constexpr char first_key[] = "first"; - WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; - - ASSERT_OK( - db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns)); - - // Use WriteBatch - constexpr char second_key[] = "second"; - WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; - - WriteBatch batch; - ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns)); - ASSERT_OK(db_->Write(WriteOptions(), &batch)); -} - TEST_F(DBWideBasicTest, PutEntityTimestampError) { // Note: timestamps are currently not supported diff --git a/db/wide/wide_column_serialization.cc b/db/wide/wide_column_serialization.cc index fb002366b..6c11fdcab 100644 --- a/db/wide/wide_column_serialization.cc +++ b/db/wide/wide_column_serialization.cc @@ -15,6 +15,8 @@ namespace ROCKSDB_NAMESPACE { +const Slice kDefaultWideColumnName; + Status WideColumnSerialization::Serialize(const WideColumns& columns, std::string& output) { if (columns.size() > @@ -137,4 +139,23 @@ WideColumns::const_iterator WideColumnSerialization::Find( return it; } +Status WideColumnSerialization::GetValueOfDefaultColumn(Slice& input, + Slice& value) { + WideColumns columns; + + const Status s = Deserialize(input, columns); + if (!s.ok()) { + return s; + } + + if (columns.empty() || columns[0].name() != kDefaultWideColumnName) { + value.clear(); + return Status::OK(); + } + + value = columns[0].value(); + + return Status::OK(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/wide/wide_column_serialization.h b/db/wide/wide_column_serialization.h index 67b771e76..7eeccdee3 100644 --- a/db/wide/wide_column_serialization.h +++ b/db/wide/wide_column_serialization.h @@ -48,6 +48,7 @@ class WideColumnSerialization { static WideColumns::const_iterator Find(const WideColumns& columns, const Slice& column_name); + static Status GetValueOfDefaultColumn(Slice& input, Slice& value); static constexpr uint32_t kCurrentVersion = 1; }; diff --git a/include/rocksdb/wide_columns.h b/include/rocksdb/wide_columns.h index 1974a6951..41f185e62 100644 --- a/include/rocksdb/wide_columns.h +++ b/include/rocksdb/wide_columns.h @@ -71,4 +71,6 @@ inline bool operator!=(const WideColumn& lhs, const WideColumn& rhs) { using WideColumns = std::vector; +extern const Slice kDefaultWideColumnName; + } // namespace ROCKSDB_NAMESPACE diff --git a/table/get_context.cc b/table/get_context.cc index d2c6ab83a..a1f947dad 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -9,6 +9,7 @@ #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "db/read_callback.h" +#include "db/wide/wide_column_serialization.h" #include "monitoring/file_read_sample.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -258,10 +259,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kUnexpectedBlobIndex; return false; } - } else if (type == kTypeWideColumnEntity) { - // TODO: support wide-column entities - state_ = kUnexpectedWideColumnEntity; - return false; } if (is_blob_index_ != nullptr) { @@ -272,14 +269,27 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, state_ = kFound; if (do_merge_) { if (LIKELY(pinnable_val_ != nullptr)) { + Slice value_to_use = value; + + if (type == kTypeWideColumnEntity) { + Slice value_copy = value; + + if (!WideColumnSerialization::GetValueOfDefaultColumn( + value_copy, value_to_use) + .ok()) { + state_ = kCorrupt; + return false; + } + } + if (LIKELY(value_pinner != nullptr)) { // If the backing resources for the value are provided, pin them - pinnable_val_->PinSlice(value, value_pinner); + pinnable_val_->PinSlice(value_to_use, value_pinner); } else { TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", this); // Otherwise copy the value - pinnable_val_->PinSelf(value); + pinnable_val_->PinSelf(value_to_use); } } } else {