From d1cab2b64ea9d92fb4b27f3f31a0d45267d7fbc5 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 3 Oct 2017 09:08:07 -0700 Subject: [PATCH] Add ValueType::kTypeBlobIndex Summary: Add kTypeBlobIndex value type, which will be used by blob db only, to insert a (key, blob_offset) KV pair. The purpose is to 1. Make it possible to open existing rocksdb instance as blob db. Existing value will be of kTypeIndex type, while value inserted by blob db will be of kTypeBlobIndex. 2. Make rocksdb able to detect if the db contains value written by blob db, if so return error. 3. Make it possible to have blob db optionally store value in SST file (with kTypeValue type) or as a blob value (with kTypeBlobIndex type). The root db (DBImpl) basically pretended kTypeBlobIndex are normal value on write. On Get if is_blob is provided, return whether the value read is of kTypeBlobIndex type, or return Status::NotSupported() status if is_blob is not provided. On scan allow_blob flag is pass and if the flag is true, return wether the value is of kTypeBlobIndex type via iter->IsBlob(). Changes on blob db side will be in a separate patch. Closes https://github.com/facebook/rocksdb/pull/2886 Differential Revision: D5838431 Pulled By: yiwu-arbug fbshipit-source-id: 3c5306c62bc13bb11abc03422ec5cbcea1203cca --- CMakeLists.txt | 1 + Makefile | 4 + TARGETS | 1 + db/compaction_iterator.cc | 2 + db/db_blob_index_test.cc | 408 ++++++++++++++++++++++++++++++++++ db/db_impl.cc | 158 +++++++------ db/db_impl.h | 15 +- db/db_impl_debug.cc | 9 + db/db_iter.cc | 99 +++++++-- db/db_iter.h | 15 +- db/dbformat.cc | 2 +- db/dbformat.h | 4 +- db/memtable.cc | 26 ++- db/memtable.h | 7 +- db/memtable_list.cc | 9 +- db/memtable_list.h | 10 +- db/version_set.cc | 12 +- db/version_set.h | 2 +- db/write_batch.cc | 64 +++++- db/write_batch_internal.h | 3 + include/rocksdb/listener.h | 1 + include/rocksdb/write_batch.h | 6 + table/get_context.cc | 29 ++- table/get_context.h | 6 +- 24 files changed, 754 insertions(+), 139 deletions(-) create mode 100644 db/db_blob_index_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 7d93e5ab6..b4cad7e29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -703,6 +703,7 @@ if(WITH_TESTS) db/corruption_test.cc db/cuckoo_table_db_test.cc db/db_basic_test.cc + db/db_blob_index_test.cc db/db_block_cache_test.cc db/db_bloom_filter_test.cc db/db_compaction_filter_test.cc diff --git a/Makefile b/Makefile index 4c464a15c..f413c7982 100644 --- a/Makefile +++ b/Makefile @@ -378,6 +378,7 @@ TESTS = \ db_wal_test \ db_block_cache_test \ db_test \ + db_blob_index_test \ db_bloom_filter_test \ db_iter_test \ db_log_iter_test \ @@ -1104,6 +1105,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_blob_index_test: db/db_blob_index_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 5dcdabad9..1bea19bce 100644 --- a/TARGETS +++ b/TARGETS @@ -366,6 +366,7 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'], ['cuckoo_table_reader_test', 'table/cuckoo_table_reader_test.cc', 'serial'], ['date_tiered_test', 'utilities/date_tiered/date_tiered_test.cc', 'serial'], ['db_basic_test', 'db/db_basic_test.cc', 'serial'], + ['db_blob_index_test', 'db/db_blob_index_test.cc', 'serial'], ['db_block_cache_test', 'db/db_block_cache_test.cc', 'serial'], ['db_bloom_filter_test', 'db/db_bloom_filter_test.cc', 'serial'], ['db_compaction_filter_test', 'db/db_compaction_filter_test.cc', 'parallel'], diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 547260292..8eac637c4 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -25,6 +25,8 @@ CompactionEventListener::CompactionListenerValueType fromInternalValueType( kSingleDelete; case kTypeRangeDeletion: return CompactionEventListener::CompactionListenerValueType::kRangeDelete; + case kTypeBlobIndex: + return CompactionEventListener::CompactionListenerValueType::kBlobIndex; default: assert(false); return CompactionEventListener::CompactionListenerValueType::kInvalid; diff --git a/db/db_blob_index_test.cc b/db/db_blob_index_test.cc new file mode 100644 index 000000000..9d5f07411 --- /dev/null +++ b/db/db_blob_index_test.cc @@ -0,0 +1,408 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_iter.h" +#include "db/db_test_util.h" +#include "db/dbformat.h" +#include "db/write_batch_internal.h" +#include "port/port.h" +#include "port/stack_trace.h" +#include "util/string_util.h" +#include "utilities/merge_operators.h" + +namespace rocksdb { + +// kTypeBlobIndex is a value type used by BlobDB only. The base rocksdb +// should accept the value type on write, and report not supported value +// for reads, unless caller request for it explicitly. The base rocksdb +// doesn't understand format of actual blob index (the value). +class DBBlobIndexTest : public DBTestBase { + public: + enum Tier { + kMemtable = 0, + kImmutableMemtables = 1, + kL0SstFile = 2, + kLnSstFile = 3, + }; + const std::vector kAllTiers = {Tier::kMemtable, + Tier::kImmutableMemtables, + Tier::kL0SstFile, Tier::kLnSstFile}; + + DBBlobIndexTest() : DBTestBase("/db_blob_index_test") {} + + ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); } + + ColumnFamilyData* cfd() { + return reinterpret_cast(cfh())->cfd(); + } + + Status PutBlobIndex(WriteBatch* batch, const Slice& key, + const Slice& blob_index) { + return WriteBatchInternal::PutBlobIndex(batch, cfd()->GetID(), key, + blob_index); + } + + Status Write(WriteBatch* batch) { + return dbfull()->Write(WriteOptions(), batch); + } + + std::string GetImpl(const Slice& key, bool* is_blob_index = nullptr, + const Snapshot* snapshot = nullptr) { + ReadOptions read_options; + read_options.snapshot = snapshot; + PinnableSlice value; + auto s = dbfull()->GetImpl(read_options, cfh(), key, &value, + nullptr /*value_found*/, nullptr /*callback*/, + is_blob_index); + if (s.IsNotFound()) { + return "NOT_FOUND"; + } + if (s.IsNotSupported()) { + return "NOT_SUPPORTED"; + } + if (!s.ok()) { + return s.ToString(); + } + return value.ToString(); + } + + std::string GetBlobIndex(const Slice& key, + const Snapshot* snapshot = nullptr) { + bool is_blob_index = false; + std::string value = GetImpl(key, &is_blob_index, snapshot); + if (!is_blob_index) { + return "NOT_BLOB"; + } + return value; + } + + ArenaWrappedDBIter* GetBlobIterator() { + return dbfull()->NewIteratorImpl(ReadOptions(), cfd(), + dbfull()->GetLatestSequenceNumber(), + true /*allow_blob*/); + } + + Options GetTestOptions() { + Options options; + options.create_if_missing = true; + options.num_levels = 2; + options.disable_auto_compactions = true; + // Disable auto flushes. + options.max_write_buffer_number = 10; + options.min_write_buffer_number_to_merge = 10; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + return options; + } + + void MoveDataTo(Tier tier) { + switch (tier) { + case Tier::kMemtable: + break; + case Tier::kImmutableMemtables: + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + break; + case Tier::kL0SstFile: + ASSERT_OK(Flush()); + break; + case Tier::kLnSstFile: + ASSERT_OK(Flush()); + ASSERT_OK(Put("a", "dummy")); + ASSERT_OK(Put("z", "dummy")); + ASSERT_OK(Flush()); + ASSERT_OK( + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel()); + break; + } + } +}; + +// Should be able to write kTypeBlobIndex to memtables and SST files. +TEST_F(DBBlobIndexTest, Write) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + for (int i = 1; i <= 5; i++) { + std::string index = ToString(i); + WriteBatch batch; + ASSERT_OK(PutBlobIndex(&batch, "key" + index, "blob" + index)); + ASSERT_OK(Write(&batch)); + } + MoveDataTo(tier); + for (int i = 1; i <= 5; i++) { + std::string index = ToString(i); + ASSERT_EQ("blob" + index, GetBlobIndex("key" + index)); + } + } +} + +// Get should be able to return blob index if is_blob_index is provided, +// otherwise return Status::NotSupported status. +TEST_F(DBBlobIndexTest, Get) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + WriteBatch batch; + ASSERT_OK(batch.Put("key", "value")); + ASSERT_OK(PutBlobIndex(&batch, "blob_key", "blob_index")); + ASSERT_OK(Write(&batch)); + MoveDataTo(tier); + // Verify normal value + bool is_blob_index = false; + PinnableSlice value; + ASSERT_EQ("value", Get("key")); + ASSERT_EQ("value", GetImpl("key")); + ASSERT_EQ("value", GetImpl("key", &is_blob_index)); + ASSERT_FALSE(is_blob_index); + // Verify blob index + ASSERT_TRUE(Get("blob_key", &value).IsNotSupported()); + ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key")); + ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index)); + ASSERT_TRUE(is_blob_index); + } +} + +// Get should NOT return Status::NotSupported if blob index is updated with +// a normal value. +TEST_F(DBBlobIndexTest, Updated) { + for (auto tier : kAllTiers) { + DestroyAndReopen(GetTestOptions()); + WriteBatch batch; + for (int i = 0; i < 10; i++) { + ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), "blob_index")); + } + ASSERT_OK(Write(&batch)); + // Avoid blob values from being purged. + const Snapshot* snapshot = dbfull()->GetSnapshot(); + ASSERT_OK(Put("key1", "new_value")); + ASSERT_OK(Merge("key2", "a")); + ASSERT_OK(Merge("key2", "b")); + ASSERT_OK(Merge("key2", "c")); + ASSERT_OK(Delete("key3")); + ASSERT_OK(SingleDelete("key4")); + ASSERT_OK(Delete("key5")); + ASSERT_OK(Merge("key5", "a")); + ASSERT_OK(Merge("key5", "b")); + ASSERT_OK(Merge("key5", "c")); + ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), cfh(), "key6", "key9")); + MoveDataTo(tier); + for (int i = 0; i < 10; i++) { + ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot)); + } + ASSERT_EQ("new_value", Get("key1")); + ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2")); + ASSERT_EQ("NOT_FOUND", Get("key3")); + ASSERT_EQ("NOT_FOUND", Get("key4")); + ASSERT_EQ("a,b,c", GetImpl("key5")); + for (int i = 6; i < 9; i++) { + ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i))); + } + ASSERT_EQ("blob_index", GetBlobIndex("key9")); + dbfull()->ReleaseSnapshot(snapshot); + } +} + +// Iterator should get blob value if allow_blob flag is set, +// otherwise return Status::NotSupported status. +TEST_F(DBBlobIndexTest, Iterate) { + const std::vector> data = { + /*00*/ {kTypeValue}, + /*01*/ {kTypeBlobIndex}, + /*02*/ {kTypeValue}, + /*03*/ {kTypeBlobIndex, kTypeValue}, + /*04*/ {kTypeValue}, + /*05*/ {kTypeValue, kTypeBlobIndex}, + /*06*/ {kTypeValue}, + /*07*/ {kTypeDeletion, kTypeBlobIndex}, + /*08*/ {kTypeValue}, + /*09*/ {kTypeSingleDeletion, kTypeBlobIndex}, + /*10*/ {kTypeValue}, + /*11*/ {kTypeMerge, kTypeMerge, kTypeMerge, kTypeBlobIndex}, + /*12*/ {kTypeValue}, + /*13*/ + {kTypeMerge, kTypeMerge, kTypeMerge, kTypeDeletion, kTypeBlobIndex}, + /*14*/ {kTypeValue}, + /*15*/ {kTypeBlobIndex}, + /*16*/ {kTypeValue}, + }; + + auto get_key = [](int index) { + char buf[20]; + snprintf(buf, sizeof(buf), "%02d", index); + return "key" + std::string(buf); + }; + + auto get_value = [&](int index, int version) { + return get_key(index) + "_value" + ToString(version); + }; + + auto check_iterator = [&](Iterator* iterator, Status::Code expected_status, + const Slice& expected_value) { + ASSERT_EQ(expected_status, iterator->status().code()); + if (expected_status == Status::kOk) { + ASSERT_TRUE(iterator->Valid()); + ASSERT_EQ(expected_value, iterator->value()); + } else { + ASSERT_FALSE(iterator->Valid()); + } + }; + + auto create_normal_iterator = [&]() -> Iterator* { + return dbfull()->NewIterator(ReadOptions()); + }; + + auto create_blob_iterator = [&]() -> Iterator* { return GetBlobIterator(); }; + + auto check_is_blob = [&](bool is_blob) { + return [is_blob](Iterator* iterator) { + ASSERT_EQ(is_blob, + reinterpret_cast(iterator)->IsBlob()); + }; + }; + + auto verify = [&](int index, Status::Code expected_status, + const Slice& forward_value, const Slice& backward_value, + std::function create_iterator, + std::function extra_check = nullptr) { + // Seek + auto* iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index)); + check_iterator(iterator, expected_status, forward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // Next + iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->Seek(get_key(index - 1)); + ASSERT_TRUE(iterator->Valid()); + iterator->Next(); + check_iterator(iterator, expected_status, forward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // SeekForPrev + iterator = create_iterator(); + ASSERT_OK(iterator->Refresh()); + iterator->SeekForPrev(get_key(index)); + check_iterator(iterator, expected_status, backward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + + // Prev + iterator = create_iterator(); + iterator->Seek(get_key(index + 1)); + ASSERT_TRUE(iterator->Valid()); + iterator->Prev(); + check_iterator(iterator, expected_status, backward_value); + if (extra_check) { + extra_check(iterator); + } + delete iterator; + }; + + for (auto tier : {Tier::kMemtable} /*kAllTiers*/) { + // Avoid values from being purged. + std::vector snapshots; + DestroyAndReopen(GetTestOptions()); + + // fill data + for (int i = 0; i < static_cast(data.size()); i++) { + for (int j = static_cast(data[i].size()) - 1; j >= 0; j--) { + std::string key = get_key(i); + std::string value = get_value(i, j); + WriteBatch batch; + switch (data[i][j]) { + case kTypeValue: + ASSERT_OK(Put(key, value)); + break; + case kTypeDeletion: + ASSERT_OK(Delete(key)); + break; + case kTypeSingleDeletion: + ASSERT_OK(SingleDelete(key)); + break; + case kTypeMerge: + ASSERT_OK(Merge(key, value)); + break; + case kTypeBlobIndex: + ASSERT_OK(PutBlobIndex(&batch, key, value)); + ASSERT_OK(Write(&batch)); + break; + default: + assert(false); + }; + } + snapshots.push_back(dbfull()->GetSnapshot()); + } + ASSERT_OK( + dbfull()->DeleteRange(WriteOptions(), cfh(), get_key(15), get_key(16))); + snapshots.push_back(dbfull()->GetSnapshot()); + MoveDataTo(tier); + + // Normal iterator + verify(1, Status::kNotSupported, "", "", create_normal_iterator); + verify(3, Status::kNotSupported, "", "", create_normal_iterator); + verify(5, Status::kOk, get_value(5, 0), get_value(5, 0), + create_normal_iterator); + verify(7, Status::kOk, get_value(8, 0), get_value(6, 0), + create_normal_iterator); + verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), + create_normal_iterator); + verify(11, Status::kNotSupported, "", "", create_normal_iterator); + verify(13, Status::kOk, + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + create_normal_iterator); + verify(15, Status::kOk, get_value(16, 0), get_value(14, 0), + create_normal_iterator); + + // Iterator with blob support + verify(1, Status::kOk, get_value(1, 0), get_value(1, 0), + create_blob_iterator, check_is_blob(true)); + verify(3, Status::kOk, get_value(3, 0), get_value(3, 0), + create_blob_iterator, check_is_blob(true)); + verify(5, Status::kOk, get_value(5, 0), get_value(5, 0), + create_blob_iterator, check_is_blob(false)); + verify(7, Status::kOk, get_value(8, 0), get_value(6, 0), + create_blob_iterator, check_is_blob(false)); + verify(9, Status::kOk, get_value(10, 0), get_value(8, 0), + create_blob_iterator, check_is_blob(false)); + verify(11, Status::kNotSupported, "", "", create_blob_iterator); + verify(13, Status::kOk, + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0), + create_blob_iterator, check_is_blob(false)); + verify(15, Status::kOk, get_value(16, 0), get_value(14, 0), + create_blob_iterator, check_is_blob(false)); + + for (auto* snapshot : snapshots) { + dbfull()->ReleaseSnapshot(snapshot); + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl.cc b/db/db_impl.cc index 09b59fad3..fce79eacc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -924,7 +924,7 @@ Status DBImpl::Get(const ReadOptions& read_options, Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, bool* value_found, - ReadCallback* callback) { + ReadCallback* callback, bool* is_blob_index) { assert(pinnable_val != nullptr); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); @@ -978,13 +978,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, bool done = false; if (!skip_memtable) { if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options, callback)) { + &range_del_agg, read_options, callback, is_blob_index)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); } else if ((s.ok() || s.IsMergeInProgress()) && sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, - &range_del_agg, read_options, callback)) { + &range_del_agg, read_options, callback, + is_blob_index)) { done = true; pinnable_val->PinSelf(); RecordTick(stats_, MEMTABLE_HIT); @@ -997,7 +998,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, - &range_del_agg, value_found, nullptr, nullptr, callback); + &range_del_agg, value_found, nullptr, nullptr, callback, + is_blob_index); RecordTick(stats_, MEMTABLE_MISS); } @@ -1437,73 +1439,79 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); - auto snapshot = read_options.snapshot != nullptr - ? reinterpret_cast( - read_options.snapshot)->number_ + ? reinterpret_cast(read_options.snapshot) + ->number_ : latest_snapshot; - - // Try to generate a DB iterator tree in continuous memory area to be - // cache friendly. Here is an example of result: - // +-------------------------------+ - // | | - // | ArenaWrappedDBIter | - // | + | - // | +---> Inner Iterator ------------+ - // | | | | - // | | +-- -- -- -- -- -- -- --+ | - // | +--- | Arena | | - // | | | | - // | Allocated Memory: | | - // | | +-------------------+ | - // | | | DBIter | <---+ - // | | + | - // | | | +-> iter_ ------------+ - // | | | | | - // | | +-------------------+ | - // | | | MergingIterator | <---+ - // | | + | - // | | | +->child iter1 ------------+ - // | | | | | | - // | | +->child iter2 ----------+ | - // | | | | | | | - // | | | +->child iter3 --------+ | | - // | | | | | | - // | | +-------------------+ | | | - // | | | Iterator1 | <--------+ - // | | +-------------------+ | | - // | | | Iterator2 | <------+ - // | | +-------------------+ | - // | | | Iterator3 | <----+ - // | | +-------------------+ - // | | | - // +-------+-----------------------+ - // - // ArenaWrappedDBIter inlines an arena area where all the iterators in - // the iterator tree are allocated in the order of being accessed when - // querying. - // Laying out the iterators in the order of being accessed makes it more - // likely that any iterator pointer is close to the iterator it points to so - // that they are likely to be in the same cache line and/or page. - ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), snapshot, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd); - - InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator()); - db_iter->SetIterUnderDBIter(internal_iter); - - return db_iter; + return NewIteratorImpl(read_options, cfd, snapshot); } // To stop compiler from complaining return nullptr; } +ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + bool allow_blob) { + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + + // Try to generate a DB iterator tree in continuous memory area to be + // cache friendly. Here is an example of result: + // +-------------------------------+ + // | | + // | ArenaWrappedDBIter | + // | + | + // | +---> Inner Iterator ------------+ + // | | | | + // | | +-- -- -- -- -- -- -- --+ | + // | +--- | Arena | | + // | | | | + // | Allocated Memory: | | + // | | +-------------------+ | + // | | | DBIter | <---+ + // | | + | + // | | | +-> iter_ ------------+ + // | | | | | + // | | +-------------------+ | + // | | | MergingIterator | <---+ + // | | + | + // | | | +->child iter1 ------------+ + // | | | | | | + // | | +->child iter2 ----------+ | + // | | | | | | | + // | | | +->child iter3 --------+ | | + // | | | | | | + // | | +-------------------+ | | | + // | | | Iterator1 | <--------+ + // | | +-------------------+ | | + // | | | Iterator2 | <------+ + // | | +-------------------+ | + // | | | Iterator3 | <----+ + // | | +-------------------+ + // | | | + // +-------+-----------------------+ + // + // ArenaWrappedDBIter inlines an arena area where all the iterators in + // the iterator tree are allocated in the order of being accessed when + // querying. + // Laying out the iterators in the order of being accessed makes it more + // likely that any iterator pointer is close to the iterator it points to so + // that they are likely to be in the same cache line and/or page. + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + sv->version_number, ((read_options.snapshot != nullptr) ? nullptr : this), + cfd, allow_blob); + + InternalIterator* internal_iter = + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); + db_iter->SetIterUnderDBIter(internal_iter); + + return db_iter; +} + Status DBImpl::NewIterators( const ReadOptions& read_options, const std::vector& column_families, @@ -1547,28 +1555,16 @@ Status DBImpl::NewIterators( #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); + auto snapshot = + read_options.snapshot != nullptr + ? reinterpret_cast(read_options.snapshot) + ->number_ + : latest_snapshot; for (size_t i = 0; i < column_families.size(); ++i) { auto* cfd = reinterpret_cast( column_families[i])->cfd(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); - - auto snapshot = - read_options.snapshot != nullptr - ? reinterpret_cast( - read_options.snapshot)->number_ - : latest_snapshot; - - ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, read_options, *cfd->ioptions(), snapshot, - sv->mutable_cf_options.max_sequential_skip_in_iterations, - sv->version_number, - ((read_options.snapshot != nullptr) ? nullptr : this), cfd); - InternalIterator* internal_iter = - NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), - db_iter->GetRangeDelAggregator()); - db_iter->SetIterUnderDBIter(internal_iter); - iterators->push_back(db_iter); + iterators->push_back(NewIteratorImpl(read_options, cfd, snapshot)); } } diff --git a/db/db_impl.h b/db/db_impl.h index 718638fc5..2253010aa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -53,6 +53,7 @@ namespace rocksdb { +class ArenaWrappedDBIter; class MemTable; class TableCache; class Version; @@ -124,6 +125,7 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found = nullptr) override; + using DB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override; @@ -131,6 +133,11 @@ class DBImpl : public DB { const ReadOptions& options, const std::vector& column_families, std::vector* iterators) override; + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + bool allow_blob = false); + virtual const Snapshot* GetSnapshot() override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override; using DB::GetProperty; @@ -342,6 +349,8 @@ class DBImpl : public DB { return alive_log_files_.begin()->getting_flushed; } + Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); + // Force current memtable contents to be flushed. Status TEST_FlushMemTable(bool wait = true, ColumnFamilyHandle* cfh = nullptr); @@ -638,7 +647,6 @@ class DBImpl : public DB { private: friend class DB; - friend class DBTest2_ReadCallbackTest_Test; friend class InternalStats; friend class PessimisticTransaction; friend class WriteCommittedTxn; @@ -651,7 +659,9 @@ class DBImpl : public DB { friend struct SuperVersion; friend class CompactedDBImpl; #ifndef NDEBUG + friend class DBTest2_ReadCallbackTest_Test; friend class XFTransactionWriteHandler; + friend class DBBlobIndexTest; #endif struct CompactionState; @@ -1254,7 +1264,8 @@ class DBImpl : public DB { // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, - bool* value_found = nullptr, ReadCallback* callback = nullptr); + bool* value_found = nullptr, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 2ed72a395..e7f689121 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -80,6 +80,15 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, disallow_trivial_move); } +Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) { + WriteContext write_context; + InstrumentedMutexLock l(&mutex_); + if (cfd == nullptr) { + cfd = default_cf_handle_->cfd(); + } + return SwitchMemtable(cfd, &write_context); +} + Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) { FlushOptions fo; fo.wait = wait; diff --git a/db/db_iter.cc b/db/db_iter.cc index 3280b4d74..d9a1eaee7 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -8,8 +8,6 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_iter.h" -#include -#include #include #include @@ -18,7 +16,6 @@ #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "monitoring/perf_context_imp.h" -#include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/merge_operator.h" @@ -105,7 +102,7 @@ class DBIter final: public Iterator { DBIter(Env* _env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, - uint64_t max_sequential_skip_in_iterations) + uint64_t max_sequential_skip_in_iterations, bool allow_blob) : arena_mode_(arena_mode), env_(_env), logger_(cf_options.info_log), @@ -122,7 +119,8 @@ class DBIter final: public Iterator { pin_thru_lifetime_(read_options.pin_data), total_order_seek_(read_options.total_order_seek), range_del_agg_(cf_options.internal_comparator, s, - true /* collapse_deletions */) { + true /* collapse_deletions */), + allow_blob_(allow_blob) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = cf_options.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -180,6 +178,10 @@ class DBIter final: public Iterator { return status_; } } + bool IsBlob() const { + assert(valid_ && (allow_blob_ || !is_blob_)); + return is_blob_; + } virtual Status GetProperty(std::string prop_name, std::string* prop) override { @@ -291,6 +293,8 @@ class DBIter final: public Iterator { RangeDelAggregator range_del_agg_; LocalStatistics local_stats_; PinnedIteratorsManager pinned_iters_mgr_; + bool allow_blob_; + bool is_blob_; // No copying allowed DBIter(const DBIter&); @@ -380,6 +384,8 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // - none of the above : saved_key_ can contain anything, it doesn't matter. uint64_t num_skipped = 0; + is_blob_ = false; + do { if (!ParseKey(&ikey_)) { // Skip corrupted keys. @@ -421,6 +427,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeValue: + case kTypeBlobIndex: saved_key_.SetUserKey( ikey_.user_key, !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); @@ -432,6 +439,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { skipping = true; num_skipped = 0; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + } else if (ikey_.type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + } else { + is_blob_ = true; + valid_ = true; + } + return; } else { valid_ = true; return; @@ -573,6 +592,18 @@ void DBIter::MergeValuesNewToOld() { merge_context_.PushOperand(iter_->value(), iter_->IsValuePinned() /* operand_pinned */); PERF_COUNTER_ADD(internal_merge_count, 1); + } else if (kTypeBlobIndex == ikey.type) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } + valid_ = false; + return; } else { assert(false); } @@ -679,7 +710,6 @@ void DBIter::PrevInternal() { !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); if (FindValueForCurrentKey()) { - valid_ = true; if (!iter_->Valid()) { return; } @@ -746,6 +776,7 @@ bool DBIter::FindValueForCurrentKey() { last_key_entry_type = ikey.type; switch (last_key_entry_type) { case kTypeValue: + case kTypeBlobIndex: if (range_del_agg_.ShouldDelete( ikey, RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) { @@ -791,6 +822,7 @@ bool DBIter::FindValueForCurrentKey() { } Status s; + is_blob_ = false; switch (last_key_entry_type) { case kTypeDeletion: case kTypeSingleDeletion: @@ -806,6 +838,18 @@ bool DBIter::FindValueForCurrentKey() { merge_operator_, saved_key_.GetUserKey(), nullptr, merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, &pinned_value_, true); + } else if (last_not_merge_type == kTypeBlobIndex) { + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else { + status_ = + Status::NotSupported("Blob DB does not support merge operator."); + } + valid_ = false; + return true; } else { assert(last_not_merge_type == kTypeValue); s = MergeHelper::TimedFullMerge( @@ -817,6 +861,17 @@ bool DBIter::FindValueForCurrentKey() { case kTypeValue: // do nothing - we've already has value in saved_value_ break; + case kTypeBlobIndex: + if (!allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + return true; + } + is_blob_ = true; + break; default: assert(false); break; @@ -850,7 +905,15 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { valid_ = false; return false; } - if (ikey.type == kTypeValue) { + if (ikey.type == kTypeBlobIndex && !allow_blob_) { + ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); + status_ = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + valid_ = false; + return true; + } + if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { assert(iter_->IsValuePinned()); pinned_value_ = iter_->value(); valid_ = true; @@ -1161,10 +1224,11 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations) { - DBIter* db_iter = new DBIter(env, read_options, cf_options, - user_key_comparator, internal_iter, sequence, - false, max_sequential_skip_in_iterations); + uint64_t max_sequential_skip_in_iterations, + bool allow_blob) { + DBIter* db_iter = new DBIter( + env, read_options, cf_options, user_key_comparator, internal_iter, + sequence, false, max_sequential_skip_in_iterations, allow_blob); return db_iter; } @@ -1192,6 +1256,7 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } +bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); } inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, std::string* prop) { if (prop_name == "rocksdb.iterator.super-version-number") { @@ -1208,11 +1273,11 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration, - uint64_t version_number) { + uint64_t version_number, bool allow_blob) { auto mem = arena_.AllocateAligned(sizeof(DBIter)); db_iter_ = new (mem) DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr, - sequence, true, max_sequential_skip_in_iteration); + sequence, true, max_sequential_skip_in_iteration, allow_blob); sv_number_ = version_number; } @@ -1232,7 +1297,7 @@ Status ArenaWrappedDBIter::Refresh() { SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); Init(env, read_options_, *(cfd_->ioptions()), latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, - cur_sv_number); + cur_sv_number, allow_blob_); InternalIterator* internal_iter = db_impl_->NewInternalIterator( read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); @@ -1248,12 +1313,12 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl, ColumnFamilyData* cfd) { + DBImpl* db_impl, ColumnFamilyData* cfd, bool allow_blob) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); iter->Init(env, read_options, cf_options, sequence, - max_sequential_skip_in_iterations, version_number); + max_sequential_skip_in_iterations, version_number, allow_blob); if (db_impl != nullptr && cfd != nullptr) { - iter->StoreRefreshInfo(read_options, db_impl, cfd); + iter->StoreRefreshInfo(read_options, db_impl, cfd, allow_blob); } return iter; diff --git a/db/db_iter.h b/db/db_iter.h index ea98ff433..26fcd44cb 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -33,7 +33,8 @@ extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations); + uint64_t max_sequential_skip_in_iterations, + bool allow_blob = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -63,20 +64,22 @@ class ArenaWrappedDBIter : public Iterator { virtual Slice value() const override; virtual Status status() const override; virtual Status Refresh() override; + bool IsBlob() const; virtual Status GetProperty(std::string prop_name, std::string* prop) override; void Init(Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, - uint64_t version_number); + uint64_t max_sequential_skip_in_iterations, uint64_t version_number, + bool allow_blob); void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl, - ColumnFamilyData* cfd) { + ColumnFamilyData* cfd, bool allow_blob) { read_options_ = read_options; db_impl_ = db_impl; cfd_ = cfd; + allow_blob_ = allow_blob; } private: @@ -86,6 +89,7 @@ class ArenaWrappedDBIter : public Iterator { ColumnFamilyData* cfd_ = nullptr; DBImpl* db_impl_ = nullptr; ReadOptions read_options_; + bool allow_blob_ = false; }; // Generate the arena wrapped iterator class. @@ -95,6 +99,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ReadOptions& read_options, const ImmutableCFOptions& cf_options, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr); + DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr, + bool allow_blob = false); } // namespace rocksdb diff --git a/db/dbformat.cc b/db/dbformat.cc index d76b28e60..19e45f800 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -27,7 +27,7 @@ namespace rocksdb { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -const ValueType kValueTypeForSeek = kTypeSingleDeletion; +const ValueType kValueTypeForSeek = kTypeBlobIndex; const ValueType kValueTypeForSeekForPrev = kTypeDeletion; uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { diff --git a/db/dbformat.h b/db/dbformat.h index 8e176807d..d3fa71c7d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -47,6 +47,8 @@ enum ValueType : unsigned char { kTypeNoop = 0xD, // WAL only. kTypeColumnFamilyRangeDeletion = 0xE, // WAL only. kTypeRangeDeletion = 0xF, // meta block + kTypeColumnFamilyBlobIndex = 0x10, // Blob DB only + kTypeBlobIndex = 0x11, // Blob DB only kMaxValue = 0x7F // Not used for storing records. }; @@ -57,7 +59,7 @@ extern const ValueType kValueTypeForSeekForPrev; // Checks whether a type is an inline value type // (i.e. a type used in memtable skiplist and sst file datablock). inline bool IsValueType(ValueType t) { - return t <= kTypeMerge || t == kTypeSingleDeletion; + return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex; } // Checks whether a type is from user operation diff --git a/db/memtable.cc b/db/memtable.cc index 19232f9d7..989a526c8 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -539,6 +539,8 @@ struct Saver { bool inplace_update_support; Env* env_; ReadCallback* callback_; + bool* is_blob_index; + bool CheckCallback(SequenceNumber _seq) { if (callback_) { return callback_->IsCommitted(_seq); @@ -581,11 +583,26 @@ static bool SaveValue(void* arg, const char* entry) { s->seq = seq; - if ((type == kTypeValue || type == kTypeMerge) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) { type = kTypeRangeDeletion; } switch (type) { + case kTypeBlobIndex: + if (s->is_blob_index == nullptr) { + ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); + *(s->status) = Status::NotSupported( + "Encounter unsupported blob value. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + } else if (*(s->merge_in_progress)) { + *(s->status) = + Status::NotSupported("Blob DB does not support merge operator."); + } + if (!s->status->ok()) { + *(s->found_final_value) = true; + return false; + } + // intentional fallthrough case kTypeValue: { if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadLock(); @@ -606,6 +623,9 @@ static bool SaveValue(void* arg, const char* entry) { s->mem->GetLock(s->key->user_key())->ReadUnlock(); } *(s->found_final_value) = true; + if (s->is_blob_index != nullptr) { + *(s->is_blob_index) = (type == kTypeBlobIndex); + } return false; } case kTypeDeletion: @@ -662,7 +682,8 @@ static bool SaveValue(void* arg, const char* entry) { bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts, ReadCallback* callback) { + const ReadOptions& read_opts, ReadCallback* callback, + bool* is_blob_index) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -709,6 +730,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, saver.statistics = moptions_.statistics; saver.env_ = env_; saver.callback_ = callback; + saver.is_blob_index = is_blob_index; table_->Get(key, &saver, SaveValue); *seq = saver.seq; diff --git a/db/memtable.h b/db/memtable.h index 9c98705bc..fb01b5a82 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -189,14 +189,15 @@ class MemTable { bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts, ReadCallback* callback = nullptr) { + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr) { SequenceNumber seq; return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, - callback); + callback, is_blob_index); } // Attempts to update the new_value inplace, else does normal Add diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f0fb4843b..93bc19281 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -104,9 +104,9 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback) { + ReadCallback* callback, bool* is_blob_index) { return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, - seq, read_opts, callback); + seq, read_opts, callback, is_blob_index); } bool MemTableListVersion::GetFromHistory(const LookupKey& key, @@ -122,14 +122,15 @@ bool MemTableListVersion::GetFromHistory(const LookupKey& key, bool MemTableListVersion::GetFromList( std::list* list, const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback) { + SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback, + bool* is_blob_index) { *seq = kMaxSequenceNumber; for (auto& memtable : *list) { SequenceNumber current_seq = kMaxSequenceNumber; bool done = memtable->Get(key, value, s, merge_context, range_del_agg, - ¤t_seq, read_opts, callback); + ¤t_seq, read_opts, callback, is_blob_index); if (*seq == kMaxSequenceNumber) { // Store the most recent sequence number of any operation on this key. // Since we only care about the most recent change, we only need to diff --git a/db/memtable_list.h b/db/memtable_list.h index 1bec0debe..52833a245 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -55,14 +55,15 @@ class MemTableListVersion { bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts, ReadCallback* callback = nullptr) { + const ReadOptions& read_opts, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr) { SequenceNumber seq; return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts, - callback); + callback, is_blob_index); } // Similar to Get(), but searches the Memtable history of memtables that @@ -120,7 +121,8 @@ class MemTableListVersion { std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, const ReadOptions& read_opts, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); void AddMemTable(MemTable* m); diff --git a/db/version_set.cc b/db/version_set.cc index 25f873edb..281c6fc71 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -964,8 +964,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, PinnableSlice* value, Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found, - bool* key_exists, SequenceNumber* seq, - ReadCallback* callback) { + bool* key_exists, SequenceNumber* seq, ReadCallback* callback, + bool* is_blob) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -981,7 +981,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, value, value_found, merge_context, range_del_agg, this->env_, seq, - merge_operator_ ? &pinned_iters_mgr : nullptr, callback); + merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob); // Pin blocks that we read to hold merge operands if (merge_operator_) { @@ -1030,6 +1030,12 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, return; case GetContext::kMerge: break; + case GetContext::kBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "rocksdb::blob_db::BlobDB instead."); + return; } f = fp.GetNextFile(); } diff --git a/db/version_set.h b/db/version_set.h index 8c1324698..0ad1bf76f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -487,7 +487,7 @@ class Version { Status* status, MergeContext* merge_context, RangeDelAggregator* range_del_agg, bool* value_found = nullptr, bool* key_exists = nullptr, SequenceNumber* seq = nullptr, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob = nullptr); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/db/write_batch.cc b/db/write_batch.cc index c35499e0b..d3107f551 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -67,6 +67,7 @@ enum ContentFlags : uint32_t { HAS_COMMIT = 1 << 7, HAS_ROLLBACK = 1 << 8, HAS_DELETE_RANGE = 1 << 9, + HAS_BLOB_INDEX = 1 << 10, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -97,6 +98,11 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } + Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_BLOB_INDEX; + return Status::OK(); + } + Status MarkBeginPrepare() override { content_flags |= ContentFlags::HAS_BEGIN_PREPARE; return Status::OK(); @@ -328,6 +334,17 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad WriteBatch Merge"); } break; + case kTypeColumnFamilyBlobIndex: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + // intentional fallthrough + case kTypeBlobIndex: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch BlobIndex"); + } + break; case kTypeLogData: assert(blob != nullptr); if (!GetLengthPrefixedSlice(input, blob)) { @@ -424,6 +441,13 @@ Status WriteBatch::Iterate(Handler* handler) const { empty_batch = false; found++; break; + case kTypeColumnFamilyBlobIndex: + case kTypeBlobIndex: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX)); + s = handler->PutBlobIndexCF(column_family, key, value); + found++; + break; case kTypeLogData: handler->LogData(blob); empty_batch = true; @@ -776,6 +800,25 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family, value); } +Status WriteBatchInternal::PutBlobIndex(WriteBatch* b, + uint32_t column_family_id, + const Slice& key, const Slice& value) { + LocalSavePoint save(b); + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeBlobIndex)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyBlobIndex)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BLOB_INDEX, + std::memory_order_relaxed); + return save.commit(); +} + Status WriteBatch::PutLogData(const Slice& blob) { LocalSavePoint save(this); rep_.push_back(static_cast(kTypeLogData)); @@ -967,8 +1010,8 @@ class MemTableInserter : public WriteBatch::Handler { return true; } - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { + Status PutCFImpl(uint32_t column_family_id, const Slice& key, + const Slice& value, ValueType value_type) { if (rebuilding_trx_ != nullptr) { WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); if (write_after_commit_) { @@ -986,7 +1029,7 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { - mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_, + mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem)); } else if (moptions->inplace_callback == nullptr) { assert(!concurrent_memtable_writes_); @@ -1024,11 +1067,11 @@ class MemTableInserter : public WriteBatch::Handler { value, &merged_value); if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. - mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); + mem->Add(sequence_, value_type, key, Slice(prev_buffer, prev_size)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } else if (status == UpdateStatus::UPDATED) { // merged_value contains the final value. - mem->Add(sequence_, kTypeValue, key, Slice(merged_value)); + mem->Add(sequence_, value_type, key, Slice(merged_value)); RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); } } @@ -1041,6 +1084,11 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + return PutCFImpl(column_family_id, key, value, kTypeValue); + } + Status DeleteImpl(uint32_t column_family_id, const Slice& key, const Slice& value, ValueType delete_type) { MemTable* mem = cf_mems_->GetMemTable(); @@ -1209,6 +1257,12 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } + virtual Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + // Same as PutCF except for value type. + return PutCFImpl(column_family_id, key, value, kTypeBlobIndex); + } + void CheckMemtableFull() { if (flush_scheduler_ != nullptr) { auto* cfd = cf_mems_->current(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index a74d8ea0e..abe70751f 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -99,6 +99,9 @@ class WriteBatchInternal { static Status Merge(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid); static Status MarkRollback(WriteBatch* batch, const Slice& xid); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 40d318e09..e132033db 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -206,6 +206,7 @@ class CompactionEventListener { kDelete, kSingleDelete, kRangeDelete, + kBlobIndex, kInvalid, }; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 0dc844773..52c26d09a 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -233,6 +233,12 @@ class WriteBatch : public WriteBatchBase { } virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {} + virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/, + const Slice& /*key*/, + const Slice& /*value*/) { + return Status::InvalidArgument("PutBlobIndexCF not implemented"); + } + // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); diff --git a/table/get_context.cc b/table/get_context.cc index b17b7dfbd..355f6e062 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -34,12 +34,15 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) { } // namespace -GetContext::GetContext( - const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, - Statistics* statistics, GetState init_state, const Slice& user_key, - PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context, - RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq, - PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback) +GetContext::GetContext(const Comparator* ucmp, + const MergeOperator* merge_operator, Logger* logger, + Statistics* statistics, GetState init_state, + const Slice& user_key, PinnableSlice* pinnable_val, + bool* value_found, MergeContext* merge_context, + RangeDelAggregator* _range_del_agg, Env* env, + SequenceNumber* seq, + PinnedIteratorsManager* _pinned_iters_mgr, + ReadCallback* callback, bool* is_blob_index) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -54,7 +57,8 @@ GetContext::GetContext( seq_(seq), replay_log_(nullptr), pinned_iters_mgr_(_pinned_iters_mgr), - callback_(callback) { + callback_(callback), + is_blob_index_(is_blob_index) { if (seq_) { *seq_ = kMaxSequenceNumber; } @@ -104,13 +108,19 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, auto type = parsed_key.type; // Key matches. Process it - if ((type == kTypeValue || type == kTypeMerge) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) { type = kTypeRangeDeletion; } switch (type) { case kTypeValue: + case kTypeBlobIndex: assert(state_ == kNotFound || state_ == kMerge); + if (type == kTypeBlobIndex && is_blob_index_ == nullptr) { + // Blob value not supported. Stop. + state_ = kBlobIndex; + return false; + } if (kNotFound == state_) { state_ = kFound; if (LIKELY(pinnable_val_ != nullptr)) { @@ -136,6 +146,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } } } + if (is_blob_index_ != nullptr) { + *is_blob_index_ = (type == kTypeBlobIndex); + } return false; case kTypeDeletion: diff --git a/table/get_context.h b/table/get_context.h index efea68b0a..b0c8da134 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -23,7 +23,8 @@ class GetContext { kFound, kDeleted, kCorrupt, - kMerge // saver contains the current merge result (the operands) + kMerge, // saver contains the current merge result (the operands) + kBlobIndex, }; GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, @@ -32,7 +33,7 @@ class GetContext { MergeContext* merge_context, RangeDelAggregator* range_del_agg, Env* env, SequenceNumber* seq = nullptr, PinnedIteratorsManager* _pinned_iters_mgr = nullptr, - ReadCallback* callback = nullptr); + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr); void MarkKeyMayExist(); @@ -93,6 +94,7 @@ class GetContext { PinnedIteratorsManager* pinned_iters_mgr_; ReadCallback* callback_; bool sample_; + bool* is_blob_index_; }; void replayGetContextLog(const Slice& replay_log, const Slice& user_key,