diff --git a/CMakeLists.txt b/CMakeLists.txt index 7d93e5ab6..b4cad7e29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -703,6 +703,7 @@ if(WITH_TESTS) db/corruption_test.cc db/cuckoo_table_db_test.cc db/db_basic_test.cc + db/db_blob_index_test.cc db/db_block_cache_test.cc db/db_bloom_filter_test.cc db/db_compaction_filter_test.cc diff --git a/Makefile b/Makefile index 4c464a15c..f413c7982 100644 --- a/Makefile +++ b/Makefile @@ -378,6 +378,7 @@ TESTS = \ db_wal_test \ db_block_cache_test \ db_test \ + db_blob_index_test \ db_bloom_filter_test \ db_iter_test \ db_log_iter_test \ @@ -1104,6 +1105,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_blob_index_test: db/db_blob_index_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 5dcdabad9..1bea19bce 100644 --- a/TARGETS +++ b/TARGETS @@ -366,6 +366,7 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'], ['cuckoo_table_reader_test', 'table/cuckoo_table_reader_test.cc', 'serial'], ['date_tiered_test', 'utilities/date_tiered/date_tiered_test.cc', 'serial'], ['db_basic_test', 'db/db_basic_test.cc', 'serial'], + ['db_blob_index_test', 'db/db_blob_index_test.cc', 'serial'], ['db_block_cache_test', 'db/db_block_cache_test.cc', 'serial'], ['db_bloom_filter_test', 'db/db_bloom_filter_test.cc', 'serial'], ['db_compaction_filter_test', 'db/db_compaction_filter_test.cc', 'parallel'], diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 547260292..8eac637c4 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -25,6 +25,8 @@ CompactionEventListener::CompactionListenerValueType fromInternalValueType( kSingleDelete; case kTypeRangeDeletion: return CompactionEventListener::CompactionListenerValueType::kRangeDelete; + case kTypeBlobIndex: + return CompactionEventListener::CompactionListenerValueType::kBlobIndex; default: assert(false); return CompactionEventListener::CompactionListenerValueType::kInvalid; diff --git a/db/db_blob_index_test.cc b/db/db_blob_index_test.cc new file mode 100644 index 000000000..9d5f07411 --- /dev/null +++ b/db/db_blob_index_test.cc @@ -0,0 +1,408 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_iter.h" +#include "db/db_test_util.h" +#include "db/dbformat.h" +#include "db/write_batch_internal.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "util/string_util.h" +#include "utilities/merge_operators.h" + +namespace rocksdb { + +// kTypeBlobIndex is a value type used by BlobDB only. The base rocksdb +// should accept the value type on write, and report not supported value +// for reads, unless caller request for it explicitly. The base rocksdb +// doesn't understand format of actual blob index (the value). +class DBBlobIndexTest : public DBTestBase { + public: + enum Tier { + kMemtable = 0, + kImmutableMemtables = 1, + kL0SstFile = 2, + kLnSstFile = 3, + }; + const std::vector kAllTiers = {Tier::kMemtable, + Tier::kImmutableMemtables, + Tier::kL0SstFile, Tier::kLnSstFile}; + + DBBlobIndexTest() : DBTestBase("/db_blob_index_test") {} + + ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); } + + ColumnFamilyData* cfd() { + return reinterpret_cast(cfh())->cfd(); + } + + Status PutBlobIndex(WriteBatch* batch, const Slice& key, + const Slice& blob_index) { + return WriteBatchInternal::PutBlobIndex(batch, cfd()->GetID(), key, + blob_index); + } + + Status Write(WriteBatch* batch) { + return dbfull()->Write(WriteOptions(), batch); + } + + std::string GetImpl(const Slice& key, bool* is_blob_index = nullptr, + const Snapshot* snapshot = nullptr) { + ReadOptions read_options; + read_options.snapshot = snapshot; + PinnableSlice value; + auto s = dbfull()->GetImpl(read_options, cfh(), key, &value, + nullptr /*value_found*/, nullptr /*callback*/, + is_blob_index); + if (s.IsNotFound()) { + return "NOT_FOUND"; + } + if (s.IsNotSupported()) { + return "NOT_SUPPORTED"; + } + if (!s.ok()) { + return s.ToString(); + } + return value.ToString(); + } + + std::string GetBlobIndex(const Slice& key, + const Snapshot* snapshot = nullptr) { + bool is_blob_index = false; + std::string value = GetImpl(key, &is_blob_index, snapshot); + if (!is_blob_index) { + return "NOT_BLOB"; + } + return value; + } + + ArenaWrappedDBIter* GetBlobIterator() { + return dbfull()->NewIteratorImpl(ReadOptions(), cfd(), + dbfull()->GetLatestSequenceNumber(), + true /*allow_blob*/); + } + + Options GetTestOptions() { + Options options; + options.create_if_missing = true; + options.num_levels = 2; + options.disable_auto_compactions = true; + // Disable auto flushes. + options.max_write_buffer_number = 10; + options.min_write_buffer_number_to_merge = 10; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + return options; + } + + void MoveDataTo(Tier tier) { + switch (tier) { + case Tier::kMemtable: + break; + case Tier::kImmutableMemtables: + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + break; + case Tier::kL0SstFile: + ASSERT_OK(Flush()); + break; + case Tier::kLnSstFile: + ASSERT_OK(Flush()); + ASSERT_OK(Put("a", "dummy")); + ASSERT_OK(Put("z", "dummy")); + ASSERT_OK(Flush()); + ASSERT_OK( + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel()); + break; + } + } +}; + +// Should be able to write kTypeBlobIndex to memtables and SST files. +TEST_F(DBBlobIndexTest, Write) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + for (int i = 1; i <= 5; i++) { + std::string index = ToString(i); + WriteBatch batch; + ASSERT_OK(PutBlobIndex(&batch, "key" + index, "blob" + index)); + ASSERT_OK(Write(&batch)); + } + MoveDataTo(tier); + for (int i = 1; i <= 5; i++) { + std::string index = ToString(i); + ASSERT_EQ("blob" + index, GetBlobIndex("key" + index)); + } + } +} + +// Get should be able to return blob index if is_blob_index is provided, +// otherwise return Status::NotSupported status. +TEST_F(DBBlobIndexTest, Get) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + WriteBatch batch; + ASSERT_OK(batch.Put("key", "value")); + ASSERT_OK(PutBlobIndex(&batch, "blob_key", "blob_index")); + ASSERT_OK(Write(&batch)); + MoveDataTo(tier); + // Verify normal value + bool is_blob_index = false; + PinnableSlice value; + ASSERT_EQ("value", Get("key")); + ASSERT_EQ("value", GetImpl("key")); + ASSERT_EQ("value", GetImpl("key", &is_blob_index)); + ASSERT_FALSE(is_blob_index); + // Verify blob index + ASSERT_TRUE(Get("blob_key", &value).IsNotSupported()); + ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key")); + ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index)); + ASSERT_TRUE(is_blob_index); + } +} + +// Get should NOT return Status::NotSupported if blob index is updated with +// a normal value. +TEST_F(DBBlobIndexTest, Updated) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + WriteBatch batch; + for (int i = 0; i < 10; i++) { + ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), "blob_index")); + } + ASSERT_OK(Write(&batch)); + // Avoid blob values from being purged. + const Snapshot* snapshot = dbfull()->GetSnapshot(); + ASSERT_OK(Put("key1", "new_value")); + ASSERT_OK(Merge("key2", "a")); + ASSERT_OK(Merge("key2", "b")); + ASSERT_OK(Merge("key2", "c")); + ASSERT_OK(Delete("key3")); + ASSERT_OK(SingleDelete("key4")); + ASSERT_OK(Delete("key5")); + ASSERT_OK(Merge("key5", "a")); + ASSERT_OK(Merge("key5", "b")); + ASSERT_OK(Merge("key5", "c")); + ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), cfh(), "key6", "key9")); + MoveDataTo(tier); + for (int i = 0; i < 10; i++) { + ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot)); + } + ASSERT_EQ("new_value", Get("key1")); + ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2")); + ASSERT_EQ("NOT_FOUND", Get("key3")); + ASSERT_EQ("NOT_FOUND", Get("key4")); + ASSERT_EQ("a,b,c", GetImpl("key5")); + for (int i = 6; i < 9; i++) { + ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i))); + } + ASSERT_EQ("blob_index", GetBlobIndex("key9")); + dbfull()->ReleaseSnapshot(snapshot); + } +} + +// Iterator should get blob value if allow_blob flag is set, +// otherwise return Status::NotSupported status. +TEST_F(DBBlobIndexTest, Iterate) { + const std::vector> data = { + /*00*/ {kTypeValue}, + /*01*/ {kTypeBlobIndex}, + /*02*/ {kTypeValue}, + /*03*/ {kTypeBlobIndex, kTypeValue}, + /*04*/ {kTypeValue}, + /*05*/ {kTypeValue, kTypeBlobIndex}, + /*06*/ {kTypeValue}, + /*07*/ {kTypeDeletion, kTypeBlobIndex}, + /*08*/ {kTypeValue}, + /*09*/ {kTypeSingleDeletion, kTypeBlobIndex}, + /*10*/ {kTypeValue}, + /*11*/ {kTypeMerge, kTypeMerge, kTypeMerge, kTypeBlobIndex}, + /*12*/ {kTypeValue}, + /*13*/ + {kTypeMerge, kTypeMerge, kTypeMerge, kTypeDeletion, kTypeBlobIndex}, + /*14*/ {kTypeValue}, + /*15*/ {kTypeBlobIndex}, + /*16*/ {kTypeValue}, + }; + + auto get_key = [](int index) { + char buf[20]; + snprintf(buf, sizeof(buf), "%02d", index); + return "key" + std::string(buf); + }; + + auto get_value = [&](int index, int version) { + return get_key(index) + "_value" + ToString(version); + }; + + auto check_iterator = [&](Iterator* iterator, Status::Code expected_status, + const Slice& expected_value) { + ASSERT_EQ(expected_status, iterator->status().code()); + if (expected_status == Status::kOk) { + ASSERT_TRUE(iterator->Valid()); + ASSERT_EQ(expected_value, iterator->value()); + } else { + ASSERT_FALSE(iterator->Valid()); + } + }; + + auto create_normal_iterator = [&]() -> Iterator* { + return dbfull()->NewIterator(ReadOptions()); + }; + + auto create_blob_iterator = [&]() -> Iterator* { return GetBlobIterator(); }; + + auto check_is_blob = [&](bool is_blob) { + return [is_blob](Iterator* iterator) { + ASSERT_EQ(is_blob, + reinterpret_cast(iterator)->IsBlob()); + }; + }; + + auto verify = [&](int index, Status::Code expected_status, + const Slice& forward_value, const Slice& backward_value, + std::function create_iterator, + std::function extra_check = nullptr) { + // Seek + auto* iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index)); + check_iterator(iterator, expected_status, forward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // Next + iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index - 1)); + ASSERT_TRUE(iterator->Valid()); + iterator->Next(); + check_iterator(iterator, expected_status, forward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // SeekForPrev + iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->SeekForPrev(get_key(index)); + check_iterator(iterator, expected_status, backward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // Prev + iterator = create_iterator(); + iterator->Seek(get_key(index + 1)); + ASSERT_TRUE(iterator->Valid()); + iterator->Prev(); + check_iterator(iterator, expected_status, backward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + }; + + for (auto tier : {Tier::kMemtable} /*kAllTiers*/) { + // Avoid values from being purged. + std::vector snapshots; + DestroyAndReopen(GetTestOptions()); + + // fill data + for (int i = 0; i < static_cast(data.size()); i++) { + for (int j = static_cast(data[i].size()) - 1; j >= 0; j--) { + std::string key = get_key(i); + std::string value = get_value(i, j); + WriteBatch batch; + switch (data[i][j]) { + case kTypeValue: + ASSERT_OK(Put(key, value)); + break; + case kTypeDeletion: + ASSERT_OK(Delete(key)); + break; + case kTypeSingleDeletion: + ASSERT_OK(SingleDelete(key)); + break; + case kTypeMerge: + ASSERT_OK(Merge(key, value)); + break; + case kTypeBlobIndex: + ASSERT_OK(PutBlobIndex(&batch, key, value)); + ASSERT_OK(Write(&batch)); + break; + default: + assert(false); + }; + } + snapshots.push_back(dbfull()->GetSnapshot()); + } + ASSERT_OK( + dbfull()->DeleteRange(WriteOptions(), cfh(), get_key(15), get_key(16))); + snapshots.push_back(dbfull()->GetSnapshot()); + MoveDataTo(tier); + + // Normal iterator + verify(1, Status::kNotSupported, "", "", create_normal_iterator); + verify(3, Status::kNotSupported, "", "", create_normal_iterator); + verify(5, Status::kOk, get_value(5, 0), get_value(5, 0), + create_normal_iterator); + verify(7, Status::kOk, get_value(8, 0), get_value(6, 0), + create_normal_iterator); + verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), + create_normal_iterator); + verify(11, Status::kNotSupported, "", "", create_normal_iterator); + verify(13, Status::kOk, + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + create_normal_iterator); + verify(15, Status::kOk, get_value(16, 0), get_value(14, 0), + create_normal_iterator); + + // Iterator with blob support + verify(1, Status::kOk, get_value(1, 0), get_value(1, 0), + create_blob_iterator, check_is_blob(true)); + verify(3, Status::kOk, get_value(3, 0), get_value(3, 0), + create_blob_iterator, check_is_blob(true)); + verify(5, Status::kOk, get_value(5, 0), get_value(5, 0), + create_blob_iterator, check_is_blob(false)); + verify(7, Status::kOk, get_value(8, 0), get_value(6, 0), + create_blob_iterator, check_is_blob(false)); + verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), + create_blob_iterator, check_is_blob(false)); + verify(11, Status::kNotSupported, "", "", create_blob_iterator); + verify(13, Status::kOk, + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + create_blob_iterator, check_is_blob(false)); + verify(15, Status::kOk, get_value(16, 0), get_value(14, 0), + create_blob_iterator, check_is_blob(false)); + + for (auto* snapshot : snapshots) { + dbfull()->ReleaseSnapshot(snapshot); + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl.cc b/db/db_impl.cc index 09b59fad3..fce79eacc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -924,7 +924,7 @@ Status DBImpl::Get(const ReadOptions& read_options, Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, bool* value_found, - ReadCallback* callback) { + ReadCallback* callback, bool* is_blob_index) { assert(pinnable_val != nullptr); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); @@ -978,13 +978,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, bool done = false; if (!skip_memtable) { if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options, callback)) { + &range_del_agg, read_options, callback, is_blob_index)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options, callback)) { + &range_del_agg, read_options, callback, + is_blob_index)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); @@ -997,7 +998,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, - &range_del_agg, value_found, nullptr, nullptr, callback); + &range_del_agg, value_found, nullptr, nullptr, callback, + is_blob_index); RecordTick(stats_, MEMTABLE_MISS); } @@ -1437,73 +1439,79 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); - auto snapshot = read_options.snapshot != nullptr - ? reinterpret_cast( - read_options.snapshot)->number_ + ? reinterpret_cast(read_options.snapshot) + ->number_ : latest_snapshot; - - // Try to generate a DB iterator tree in continuous memory area to be - // cache friendly. Here is an example of result: - // +-------------------------------+ - // | | - // | ArenaWrappedDBIter | - // | + | - // | +---> Inner Iterator ------------+ - // | | | | - // | | +-- -- -- -- -- -- -- --+ | - // | +--- | Arena | | - // | | | | - // | Allocated Memory: | | - // | | +-------------------+ | - // | | | DBIter | <---+ - // | | + | - // | | | +-> iter_ ------------+ - // | | | | | - // | | +-------------------+ | - // | | | MergingIterator | <---+ - // | | + | - // | | | +->child iter1 ------------+ - // | | | | | | - // | | +->child iter2 ----------+ | - // | | | | | | | - // | | | +->child iter3 --------+ | | - // | | | | | | - // | | +-------------------+ | | | - // | | | Iterator1 | <--------+ - // | | +-------------------+ | | - // | | | Iterator2 | <------+ - // | | +-------------------+ | - // | | | Iterator3 | <----+ - // | | +-------------------+ - // | | | - // +-------+-----------------------+ - // - // ArenaWrappedDBIter inlines an arena area where all the iterators in - // the iterator tree are allocated in the order of being accessed when - // querying. - // Laying out the iterators in the order of being accessed makes it more - // likely that any iterator pointer is close to the iterator it points to so - // that they are likely to be in the same cache line and/or page. - ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), snapshot, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd); - - InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator()); - db_iter->SetIterUnderDBIter(internal_iter); - - return db_iter; + return NewIteratorImpl(read_options, cfd, snapshot); } // To stop compiler from complaining return nullptr; } +ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + bool allow_blob) { + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + + // Try to generate a DB iterator tree in continuous memory area to be + // cache friendly. Here is an example of result: + // +-------------------------------+ + // | | + // | ArenaWrappedDBIter | + // | + | + // | +---> Inner Iterator ------------+ + // | | | | + // | | +-- -- -- -- -- -- -- --+ | + // | +--- | Arena | | + // | | | | + // | Allocated Memory: | | + // | | +-------------------+ | + // | | | DBIter | <---+ + // | | + | + // | | | +-> iter_ ------------+ + // | | | | | + // | | +-------------------+ | + // | | | MergingIterator | <---+ + // | | + | + // | | | +->child iter1 ------------+ + // | | | | | | + // | | +->child iter2 ----------+ | + // | | | | | | | + // | | | +->child iter3 --------+ | | + // | | | | | | + // | | +-------------------+ | | | + // | | | Iterator1 | <--------+ + // | | +-------------------+ | | + // | | | Iterator2 | <------+ + // | | +-------------------+ | + // | | | Iterator3 | <----+ + // | | +-------------------+ + // | | | + // +-------+-----------------------+ + // + // ArenaWrappedDBIter inlines an arena area where all the iterators in + // the iterator tree are allocated in the order of being accessed when + // querying. + // Laying out the iterators in the order of being accessed makes it more + // likely that any iterator pointer is close to the iterator it points to so + // that they are likely to be in the same cache line and/or page. + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + sv->version_number, ((read_options.snapshot != nullptr) ? nullptr : this), + cfd, allow_blob); + + InternalIterator* internal_iter = + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); + db_iter->SetIterUnderDBIter(internal_iter); + + return db_iter; +} + Status DBImpl::NewIterators( const ReadOptions& read_options, const std::vector& column_families, @@ -1547,28 +1555,16 @@ Status DBImpl::NewIterators( #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); + auto snapshot = + read_options.snapshot != nullptr + ? reinterpret_cast(read_options.snapshot) + ->number_ + : latest_snapshot; for (size_t i = 0; i < column_families.size(); ++i) { auto* cfd = reinterpret_cast( column_families[i])->cfd(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); - - auto snapshot = - read_options.snapshot != nullptr - ? reinterpret_cast( - read_options.snapshot)->number_ - : latest_snapshot; - - ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), snapshot, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd); - InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator()); - db_iter->SetIterUnderDBIter(internal_iter); - iterators->push_back(db_iter); + iterators->push_back(NewIteratorImpl(read_options, cfd, snapshot)); } } diff --git a/db/db_impl.h b/db/db_impl.h index 718638fc5..2253010aa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -53,6 +53,7 @@ namespace rocksdb { +class ArenaWrappedDBIter; class MemTable; class TableCache; class Version; @@ -124,6 +125,7 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found = nullptr) override; + using DB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override; @@ -131,6 +133,11 @@ class DBImpl : public DB { const ReadOptions& options, const std::vector& column_families, std::vector* iterators) override; + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + bool allow_blob = false); + virtual const Snapshot* GetSnapshot() override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override; using DB::GetProperty; @@ -342,6 +349,8 @@ class DBImpl : public DB { return alive_log_files_.begin()->getting_flushed; } + Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); + // Force current memtable contents to be flushed. Status TEST_FlushMemTable(bool wait = true, ColumnFamilyHandle* cfh = nullptr); @@ -638,7 +647,6 @@ class DBImpl : public DB { private: friend class DB; - friend class DBTest2_ReadCallbackTest_Test; friend class InternalStats; friend class PessimisticTransaction; friend class WriteCommittedTxn; @@ -651,7 +659,9 @@ class DBImpl : public DB { friend struct SuperVersion; friend class CompactedDBImpl; #ifndef NDEBUG + friend class DBTest2_ReadCallbackTest_Test; friend class XFTransactionWriteHandler; + friend class DBBlobIndexTest; #endif struct CompactionState; @@ -1254,7 +1264,8 @@ class DBImpl : public DB { // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, - bool* value_found = nullptr, ReadCallback* callback = nullptr); + bool* value_found = nullptr, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 2ed72a395..e7f689121 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -80,6 +80,15 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, disallow_trivial_move); } +Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) { + WriteContext write_context; + InstrumentedMutexLock l(&mutex_); + if (cfd == nullptr) { + cfd = default_cf_handle_->cfd(); + } + return SwitchMemtable(cfd, &write_context); +} + Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) { FlushOptions fo; fo.wait = wait; diff --git a/db/db_iter.cc b/db/db_iter.cc index 3280b4d74..d9a1eaee7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -8,8 +8,6 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_iter.h" -#include -#include #include #include @@ -18,7 +16,6 @@ #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "monitoring/perf_context_imp.h" -#include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" @@ -105,7 +102,7 @@ class DBIter final: public Iterator { DBIter(Env* _env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, - uint64_t max_sequential_skip_in_iterations) + uint64_t max_sequential_skip_in_iterations, bool allow_blob) : arena_mode_(arena_mode), env_(_env), logger_(cf_options.info_log), @@ -122,7 +119,8 @@ class DBIter final: public Iterator { pin_thru_lifetime_(read_options.pin_data), total_order_seek_(read_options.total_order_seek), range_del_agg_(cf_options.internal_comparator, s, - true /* collapse_deletions */) { + true /* collapse_deletions */), + allow_blob_(allow_blob) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = cf_options.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -180,6 +178,10 @@ class DBIter final: public Iterator { return status_; } } + bool IsBlob() const { + assert(valid_ && (allow_blob_ || !is_blob_)); + return is_blob_; + } virtual Status GetProperty(std::string prop_name, std::string* prop) override { @@ -291,6 +293,8 @@ class DBIter final: public Iterator { RangeDelAggregator range_del_agg_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; + bool allow_blob_; + bool is_blob_; // No copying allowed DBIter(const DBIter&); @@ -380,6 +384,8 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // - none of the above : saved_key_ can contain anything, it doesn't matter. uint64_t num_skipped = 0; + is_blob_ = false; + do { if (!ParseKey(&ikey_)) { // Skip corrupted keys. @@ -421,6 +427,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeValue: + case kTypeBlobIndex: saved_key_.SetUserKey( ikey_.user_key, !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); @@ -432,6 +439,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { skipping = true; num_skipped = 0; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else if (ikey_.type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + } else { + is_blob_ = true; + valid_ = true; + } + return; } else { valid_ = true; return; @@ -573,6 +592,18 @@ void DBIter::MergeValuesNewToOld() { merge_context_.PushOperand(iter_->value(), iter_->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); + } else if (kTypeBlobIndex == ikey.type) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } + valid_ = false; + return; } else { assert(false); } @@ -679,7 +710,6 @@ void DBIter::PrevInternal() { !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); if (FindValueForCurrentKey()) { - valid_ = true; if (!iter_->Valid()) { return; } @@ -746,6 +776,7 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: + case kTypeBlobIndex: if (range_del_agg_.ShouldDelete( ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { @@ -791,6 +822,7 @@ bool DBIter::FindValueForCurrentKey() { } Status s; + is_blob_ = false; switch (last_key_entry_type) { case kTypeDeletion: case kTypeSingleDeletion: @@ -806,6 +838,18 @@ bool DBIter::FindValueForCurrentKey() { merge_operator_, saved_key_.GetUserKey(), nullptr, merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, &pinned_value_, true); + } else if (last_not_merge_type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } + valid_ = false; + return true; } else { assert(last_not_merge_type == kTypeValue); s = MergeHelper::TimedFullMerge( @@ -817,6 +861,17 @@ bool DBIter::FindValueForCurrentKey() { case kTypeValue: // do nothing - we've already has value in saved_value_ break; + case kTypeBlobIndex: + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + return true; + } + is_blob_ = true; + break; default: assert(false); break; @@ -850,7 +905,15 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { valid_ = false; return false; } - if (ikey.type == kTypeValue) { + if (ikey.type == kTypeBlobIndex && !allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + return true; + } + if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { assert(iter_->IsValuePinned()); pinned_value_ = iter_->value(); valid_ = true; @@ -1161,10 +1224,11 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations) { - DBIter* db_iter = new DBIter(env, read_options, cf_options, - user_key_comparator, internal_iter, sequence, - false, max_sequential_skip_in_iterations); + uint64_t max_sequential_skip_in_iterations, + bool allow_blob) { + DBIter* db_iter = new DBIter( + env, read_options, cf_options, user_key_comparator, internal_iter, + sequence, false, max_sequential_skip_in_iterations, allow_blob); return db_iter; } @@ -1192,6 +1256,7 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } +bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); } inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, std::string* prop) { if (prop_name == "rocksdb.iterator.super-version-number") { @@ -1208,11 +1273,11 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, - uint64_t version_number) { + uint64_t version_number, bool allow_blob) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); db_iter_ = new (mem) DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr, - sequence, true, max_sequential_skip_in_iteration); + sequence, true, max_sequential_skip_in_iteration, allow_blob); sv_number_ = version_number; } @@ -1232,7 +1297,7 @@ Status ArenaWrappedDBIter::Refresh() { SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); Init(env, read_options_, *(cfd_->ioptions()), latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number); + cur_sv_number, allow_blob_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); @@ -1248,12 +1313,12 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl, ColumnFamilyData* cfd) { + DBImpl* db_impl, ColumnFamilyData* cfd, bool allow_blob) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, cf_options, sequence, - max_sequential_skip_in_iterations, version_number); + max_sequential_skip_in_iterations, version_number, allow_blob); if (db_impl != nullptr && cfd != nullptr) { - iter->StoreRefreshInfo(read_options, db_impl, cfd); + iter->StoreRefreshInfo(read_options, db_impl, cfd, allow_blob); } return iter; diff --git a/db/db_iter.h b/db/db_iter.h index ea98ff433..26fcd44cb 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -33,7 +33,8 @@ extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations); + uint64_t max_sequential_skip_in_iterations, + bool allow_blob = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -63,20 +64,22 @@ class ArenaWrappedDBIter : public Iterator { virtual Slice value() const override; virtual Status status() const override; virtual Status Refresh() override; + bool IsBlob() const; virtual Status GetProperty(std::string prop_name, std::string* prop) override; void Init(Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, - uint64_t version_number); + uint64_t max_sequential_skip_in_iterations, uint64_t version_number, + bool allow_blob); void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, - ColumnFamilyData* cfd) { + ColumnFamilyData* cfd, bool allow_blob) { read_options_ = read_options; db_impl_ = db_impl; cfd_ = cfd; + allow_blob_ = allow_blob; } private: @@ -86,6 +89,7 @@ class ArenaWrappedDBIter : public Iterator { ColumnFamilyData* cfd_ = nullptr; DBImpl* db_impl_ = nullptr; ReadOptions read_options_; + bool allow_blob_ = false; }; // Generate the arena wrapped iterator class. @@ -95,6 +99,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr); + DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr, + bool allow_blob = false); } // namespace rocksdb diff --git a/db/dbformat.cc b/db/dbformat.cc index d76b28e60..19e45f800 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -27,7 +27,7 @@ namespace rocksdb { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -const ValueType kValueTypeForSeek = kTypeSingleDeletion; +const ValueType kValueTypeForSeek = kTypeBlobIndex; const ValueType kValueTypeForSeekForPrev = kTypeDeletion; uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { diff --git a/db/dbformat.h b/db/dbformat.h index 8e176807d..d3fa71c7d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -47,6 +47,8 @@ enum ValueType : unsigned char { kTypeNoop = 0xD, // WAL only. kTypeColumnFamilyRangeDeletion = 0xE, // WAL only. kTypeRangeDeletion = 0xF, // meta block + kTypeColumnFamilyBlobIndex = 0x10, // Blob DB only + kTypeBlobIndex = 0x11, // Blob DB only kMaxValue = 0x7F // Not used for storing records. }; @@ -57,7 +59,7 @@ extern const ValueType kValueTypeForSeekForPrev; // Checks whether a type is an inline value type // (i.e. a type used in memtable skiplist and sst file datablock). inline bool IsValueType(ValueType t) { - return t <= kTypeMerge || t == kTypeSingleDeletion; + return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex; } // Checks whether a type is from user operation diff --git a/db/memtable.cc b/db/memtable.cc index 19232f9d7..989a526c8 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -539,6 +539,8 @@ struct Saver { bool inplace_update_support; Env* env_; ReadCallback* callback_; + bool* is_blob_index; + bool CheckCallback(SequenceNumber _seq) { if (callback_) { return callback_->IsCommitted(_seq); @@ -581,11 +583,26 @@ static bool SaveValue(void* arg, const char* entry) { s->seq = seq; - if ((type == kTypeValue || type == kTypeMerge) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) { type = kTypeRangeDeletion; } switch (type) { + case kTypeBlobIndex: + if (s->is_blob_index == nullptr) { + ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); + *(s->status) = Status::NotSupported( + "Encounter unsupported blob value. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else if (*(s->merge_in_progress)) { + *(s->status) = + Status::NotSupported("Blob DB does not support merge operator."); + } + if (!s->status->ok()) { + *(s->found_final_value) = true; + return false; + } + // intentional fallthrough case kTypeValue: { if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadLock(); @@ -606,6 +623,9 @@ static bool SaveValue(void* arg, const char* entry) { s->mem->GetLock(s->key->user_key())->ReadUnlock(); } *(s->found_final_value) = true; + if (s->is_blob_index != nullptr) { + *(s->is_blob_index) = (type == kTypeBlobIndex); + } return false; } case kTypeDeletion: @@ -662,7 +682,8 @@ static bool SaveValue(void* arg, const char* entry) { bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts, ReadCallback* callback) { + const ReadOptions& read_opts, ReadCallback* callback, + bool* is_blob_index) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -709,6 +730,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.statistics = moptions_.statistics; saver.env_ = env_; saver.callback_ = callback; + saver.is_blob_index = is_blob_index; table_->Get(key, &saver, SaveValue); *seq = saver.seq; diff --git a/db/memtable.h b/db/memtable.h index 9c98705bc..fb01b5a82 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -189,14 +189,15 @@ class MemTable { bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts, ReadCallback* callback = nullptr) { + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr) { SequenceNumber seq; return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, - callback); + callback, is_blob_index); } // Attempts to update the new_value inplace, else does normal Add diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f0fb4843b..93bc19281 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -104,9 +104,9 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback) { + ReadCallback* callback, bool* is_blob_index) { return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, - seq, read_opts, callback); + seq, read_opts, callback, is_blob_index); } bool MemTableListVersion::GetFromHistory(const LookupKey& key, @@ -122,14 +122,15 @@ bool MemTableListVersion::GetFromHistory(const LookupKey& key, bool MemTableListVersion::GetFromList( std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback) { + SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback, + bool* is_blob_index) { *seq = kMaxSequenceNumber; for (auto& memtable : *list) { SequenceNumber current_seq = kMaxSequenceNumber; bool done = memtable->Get(key, value, s, merge_context, range_del_agg, - ¤t_seq, read_opts, callback); + ¤t_seq, read_opts, callback, is_blob_index); if (*seq == kMaxSequenceNumber) { // Store the most recent sequence number of any operation on this key. // Since we only care about the most recent change, we only need to diff --git a/db/memtable_list.h b/db/memtable_list.h index 1bec0debe..52833a245 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -55,14 +55,15 @@ class MemTableListVersion { bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts, ReadCallback* callback = nullptr) { + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr) { SequenceNumber seq; return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, - callback); + callback, is_blob_index); } // Similar to Get(), but searches the Memtable history of memtables that @@ -120,7 +121,8 @@ class MemTableListVersion { std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); void AddMemTable(MemTable* m); diff --git a/db/version_set.cc b/db/version_set.cc index 25f873edb..281c6fc71 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -964,8 +964,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found, - bool* key_exists, SequenceNumber* seq, - ReadCallback* callback) { + bool* key_exists, SequenceNumber* seq, ReadCallback* callback, + bool* is_blob) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -981,7 +981,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, value, value_found, merge_context, range_del_agg, this->env_, seq, - merge_operator_ ? &pinned_iters_mgr : nullptr, callback); + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob); // Pin blocks that we read to hold merge operands if (merge_operator_) { @@ -1030,6 +1030,12 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, return; case GetContext::kMerge: break; + case GetContext::kBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + return; } f = fp.GetNextFile(); } diff --git a/db/version_set.h b/db/version_set.h index 8c1324698..0ad1bf76f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -487,7 +487,7 @@ class Version { Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found = nullptr, bool* key_exists = nullptr, SequenceNumber* seq = nullptr, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob = nullptr); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/db/write_batch.cc b/db/write_batch.cc index c35499e0b..d3107f551 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -67,6 +67,7 @@ enum ContentFlags : uint32_t { HAS_COMMIT = 1 << 7, HAS_ROLLBACK = 1 << 8, HAS_DELETE_RANGE = 1 << 9, + HAS_BLOB_INDEX = 1 << 10, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -97,6 +98,11 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } + Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_BLOB_INDEX; + return Status::OK(); + } + Status MarkBeginPrepare() override { content_flags |= ContentFlags::HAS_BEGIN_PREPARE; return Status::OK(); @@ -328,6 +334,17 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad WriteBatch Merge"); } break; + case kTypeColumnFamilyBlobIndex: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + // intentional fallthrough + case kTypeBlobIndex: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + break; case kTypeLogData: assert(blob != nullptr); if (!GetLengthPrefixedSlice(input, blob)) { @@ -424,6 +441,13 @@ Status WriteBatch::Iterate(Handler* handler) const { empty_batch = false; found++; break; + case kTypeColumnFamilyBlobIndex: + case kTypeBlobIndex: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); + s = handler->PutBlobIndexCF(column_family, key, value); + found++; + break; case kTypeLogData: handler->LogData(blob); empty_batch = true; @@ -776,6 +800,25 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family, value); } +Status WriteBatchInternal::PutBlobIndex(WriteBatch* b, + uint32_t column_family_id, + const Slice& key, const Slice& value) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeBlobIndex)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyBlobIndex)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BLOB_INDEX, + std::memory_order_relaxed); + return save.commit(); +} + Status WriteBatch::PutLogData(const Slice& blob) { LocalSavePoint save(this); rep_.push_back(static_cast(kTypeLogData)); @@ -967,8 +1010,8 @@ class MemTableInserter : public WriteBatch::Handler { return true; } - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + Status PutCFImpl(uint32_t column_family_id, const Slice& key, + const Slice& value, ValueType value_type) { if (rebuilding_trx_ != nullptr) { WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); if (write_after_commit_) { @@ -986,7 +1029,7 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { - mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_, + mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem)); } else if (moptions->inplace_callback == nullptr) { assert(!concurrent_memtable_writes_); @@ -1024,11 +1067,11 @@ class MemTableInserter : public WriteBatch::Handler { value, &merged_value); if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. - mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); + mem->Add(sequence_, value_type, key, Slice(prev_buffer, prev_size)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } else if (status == UpdateStatus::UPDATED) { // merged_value contains the final value. - mem->Add(sequence_, kTypeValue, key, Slice(merged_value)); + mem->Add(sequence_, value_type, key, Slice(merged_value)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } } @@ -1041,6 +1084,11 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + return PutCFImpl(column_family_id, key, value, kTypeValue); + } + Status DeleteImpl(uint32_t column_family_id, const Slice& key, const Slice& value, ValueType delete_type) { MemTable* mem = cf_mems_->GetMemTable(); @@ -1209,6 +1257,12 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } + virtual Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + // Same as PutCF except for value type. + return PutCFImpl(column_family_id, key, value, kTypeBlobIndex); + } + void CheckMemtableFull() { if (flush_scheduler_ != nullptr) { auto* cfd = cf_mems_->current(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index a74d8ea0e..abe70751f 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -99,6 +99,9 @@ class WriteBatchInternal { static Status Merge(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid); static Status MarkRollback(WriteBatch* batch, const Slice& xid); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 40d318e09..e132033db 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -206,6 +206,7 @@ class CompactionEventListener { kDelete, kSingleDelete, kRangeDelete, + kBlobIndex, kInvalid, }; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 0dc844773..52c26d09a 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -233,6 +233,12 @@ class WriteBatch : public WriteBatchBase { } virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {} + virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/, + const Slice& /*key*/, + const Slice& /*value*/) { + return Status::InvalidArgument("PutBlobIndexCF not implemented"); + } + // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); diff --git a/table/get_context.cc b/table/get_context.cc index b17b7dfbd..355f6e062 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -34,12 +34,15 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { } // namespace -GetContext::GetContext( - const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, - Statistics* statistics, GetState init_state, const Slice& user_key, - PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, - RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, - PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback) +GetContext::GetContext(const Comparator* ucmp, + const MergeOperator* merge_operator, Logger* logger, + Statistics* statistics, GetState init_state, + const Slice& user_key, PinnableSlice* pinnable_val, + bool* value_found, MergeContext* merge_context, + RangeDelAggregator* _range_del_agg, Env* env, + SequenceNumber* seq, + PinnedIteratorsManager* _pinned_iters_mgr, + ReadCallback* callback, bool* is_blob_index) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -54,7 +57,8 @@ GetContext::GetContext( seq_(seq), replay_log_(nullptr), pinned_iters_mgr_(_pinned_iters_mgr), - callback_(callback) { + callback_(callback), + is_blob_index_(is_blob_index) { if (seq_) { *seq_ = kMaxSequenceNumber; } @@ -104,13 +108,19 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, auto type = parsed_key.type; // Key matches. Process it - if ((type == kTypeValue || type == kTypeMerge) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) { type = kTypeRangeDeletion; } switch (type) { case kTypeValue: + case kTypeBlobIndex: assert(state_ == kNotFound || state_ == kMerge); + if (type == kTypeBlobIndex && is_blob_index_ == nullptr) { + // Blob value not supported. Stop. + state_ = kBlobIndex; + return false; + } if (kNotFound == state_) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { @@ -136,6 +146,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } } } + if (is_blob_index_ != nullptr) { + *is_blob_index_ = (type == kTypeBlobIndex); + } return false; case kTypeDeletion: diff --git a/table/get_context.h b/table/get_context.h index efea68b0a..b0c8da134 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -23,7 +23,8 @@ class GetContext { kFound, kDeleted, kCorrupt, - kMerge // saver contains the current merge result (the operands) + kMerge, // saver contains the current merge result (the operands) + kBlobIndex, }; GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, @@ -32,7 +33,7 @@ class GetContext { MergeContext* merge_context, RangeDelAggregator* range_del_agg, Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); void MarkKeyMayExist(); @@ -93,6 +94,7 @@ class GetContext { PinnedIteratorsManager* pinned_iters_mgr_; ReadCallback* callback_; bool sample_; + bool* is_blob_index_; }; void replayGetContextLog(const Slice& replay_log, const Slice& user_key,