Add ValueType::kTypeBlobIndex

Summary:
Add kTypeBlobIndex value type, which will be used by blob db only, to insert a (key, blob_offset) KV pair. The purpose is to
1. Make it possible to open existing rocksdb instance as blob db. Existing value will be of kTypeIndex type, while value inserted by blob db will be of kTypeBlobIndex.
2. Make rocksdb able to detect if the db contains value written by blob db, if so return error.
3. Make it possible to have blob db optionally store value in SST file (with kTypeValue type) or as a blob value (with kTypeBlobIndex type).

The root db (DBImpl) basically pretended kTypeBlobIndex are normal value on write. On Get if is_blob is provided, return whether the value read is of kTypeBlobIndex type, or return Status::NotSupported() status if is_blob is not provided. On scan allow_blob flag is pass and if the flag is true, return wether the value is of kTypeBlobIndex type via iter->IsBlob().

Changes on blob db side will be in a separate patch.
Closes https://github.com/facebook/rocksdb/pull/2886

Differential Revision: D5838431

Pulled By: yiwu-arbug

fbshipit-source-id: 3c5306c62bc13bb11abc03422ec5cbcea1203cca
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 880411f54c
commit d1cab2b64e
  1. 1
      CMakeLists.txt
  2. 4
      Makefile
  3. 1
      TARGETS
  4. 2
      db/compaction_iterator.cc
  5. 408
      db/db_blob_index_test.cc
  6. 58
      db/db_impl.cc
  7. 15
      db/db_impl.h
  8. 9
      db/db_impl_debug.cc
  9. 99
      db/db_iter.cc
  10. 15
      db/db_iter.h
  11. 2
      db/dbformat.cc
  12. 4
      db/dbformat.h
  13. 26
      db/memtable.cc
  14. 7
      db/memtable.h
  15. 9
      db/memtable_list.cc
  16. 10
      db/memtable_list.h
  17. 12
      db/version_set.cc
  18. 2
      db/version_set.h
  19. 64
      db/write_batch.cc
  20. 3
      db/write_batch_internal.h
  21. 1
      include/rocksdb/listener.h
  22. 6
      include/rocksdb/write_batch.h
  23. 29
      table/get_context.cc
  24. 6
      table/get_context.h

@ -703,6 +703,7 @@ if(WITH_TESTS)
db/corruption_test.cc
db/cuckoo_table_db_test.cc
db/db_basic_test.cc
db/db_blob_index_test.cc
db/db_block_cache_test.cc
db/db_bloom_filter_test.cc
db/db_compaction_filter_test.cc

@ -378,6 +378,7 @@ TESTS = \
db_wal_test \
db_block_cache_test \
db_test \
db_blob_index_test \
db_bloom_filter_test \
db_iter_test \
db_log_iter_test \
@ -1104,6 +1105,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_blob_index_test: db/db_blob_index_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_block_cache_test: db/db_block_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -366,6 +366,7 @@ ROCKS_TESTS = [['arena_test', 'util/arena_test.cc', 'serial'],
['cuckoo_table_reader_test', 'table/cuckoo_table_reader_test.cc', 'serial'],
['date_tiered_test', 'utilities/date_tiered/date_tiered_test.cc', 'serial'],
['db_basic_test', 'db/db_basic_test.cc', 'serial'],
['db_blob_index_test', 'db/db_blob_index_test.cc', 'serial'],
['db_block_cache_test', 'db/db_block_cache_test.cc', 'serial'],
['db_bloom_filter_test', 'db/db_bloom_filter_test.cc', 'serial'],
['db_compaction_filter_test', 'db/db_compaction_filter_test.cc', 'parallel'],

@ -25,6 +25,8 @@ CompactionEventListener::CompactionListenerValueType fromInternalValueType(
kSingleDelete;
case kTypeRangeDeletion:
return CompactionEventListener::CompactionListenerValueType::kRangeDelete;
case kTypeBlobIndex:
return CompactionEventListener::CompactionListenerValueType::kBlobIndex;
default:
assert(false);
return CompactionEventListener::CompactionListenerValueType::kInvalid;

@ -0,0 +1,408 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <functional>
#include <string>
#include <utility>
#include <vector>
#include "db/column_family.h"
#include "db/db_iter.h"
#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "db/write_batch_internal.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "util/string_util.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
// kTypeBlobIndex is a value type used by BlobDB only. The base rocksdb
// should accept the value type on write, and report not supported value
// for reads, unless caller request for it explicitly. The base rocksdb
// doesn't understand format of actual blob index (the value).
class DBBlobIndexTest : public DBTestBase {
public:
enum Tier {
kMemtable = 0,
kImmutableMemtables = 1,
kL0SstFile = 2,
kLnSstFile = 3,
};
const std::vector<Tier> kAllTiers = {Tier::kMemtable,
Tier::kImmutableMemtables,
Tier::kL0SstFile, Tier::kLnSstFile};
DBBlobIndexTest() : DBTestBase("/db_blob_index_test") {}
ColumnFamilyHandle* cfh() { return dbfull()->DefaultColumnFamily(); }
ColumnFamilyData* cfd() {
return reinterpret_cast<ColumnFamilyHandleImpl*>(cfh())->cfd();
}
Status PutBlobIndex(WriteBatch* batch, const Slice& key,
const Slice& blob_index) {
return WriteBatchInternal::PutBlobIndex(batch, cfd()->GetID(), key,
blob_index);
}
Status Write(WriteBatch* batch) {
return dbfull()->Write(WriteOptions(), batch);
}
std::string GetImpl(const Slice& key, bool* is_blob_index = nullptr,
const Snapshot* snapshot = nullptr) {
ReadOptions read_options;
read_options.snapshot = snapshot;
PinnableSlice value;
auto s = dbfull()->GetImpl(read_options, cfh(), key, &value,
nullptr /*value_found*/, nullptr /*callback*/,
is_blob_index);
if (s.IsNotFound()) {
return "NOT_FOUND";
}
if (s.IsNotSupported()) {
return "NOT_SUPPORTED";
}
if (!s.ok()) {
return s.ToString();
}
return value.ToString();
}
std::string GetBlobIndex(const Slice& key,
const Snapshot* snapshot = nullptr) {
bool is_blob_index = false;
std::string value = GetImpl(key, &is_blob_index, snapshot);
if (!is_blob_index) {
return "NOT_BLOB";
}
return value;
}
ArenaWrappedDBIter* GetBlobIterator() {
return dbfull()->NewIteratorImpl(ReadOptions(), cfd(),
dbfull()->GetLatestSequenceNumber(),
true /*allow_blob*/);
}
Options GetTestOptions() {
Options options;
options.create_if_missing = true;
options.num_levels = 2;
options.disable_auto_compactions = true;
// Disable auto flushes.
options.max_write_buffer_number = 10;
options.min_write_buffer_number_to_merge = 10;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
return options;
}
void MoveDataTo(Tier tier) {
switch (tier) {
case Tier::kMemtable:
break;
case Tier::kImmutableMemtables:
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
break;
case Tier::kL0SstFile:
ASSERT_OK(Flush());
break;
case Tier::kLnSstFile:
ASSERT_OK(Flush());
ASSERT_OK(Put("a", "dummy"));
ASSERT_OK(Put("z", "dummy"));
ASSERT_OK(Flush());
ASSERT_OK(
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel());
break;
}
}
};
// Should be able to write kTypeBlobIndex to memtables and SST files.
TEST_F(DBBlobIndexTest, Write) {
for (auto tier : kAllTiers) {
DestroyAndReopen(GetTestOptions());
for (int i = 1; i <= 5; i++) {
std::string index = ToString(i);
WriteBatch batch;
ASSERT_OK(PutBlobIndex(&batch, "key" + index, "blob" + index));
ASSERT_OK(Write(&batch));
}
MoveDataTo(tier);
for (int i = 1; i <= 5; i++) {
std::string index = ToString(i);
ASSERT_EQ("blob" + index, GetBlobIndex("key" + index));
}
}
}
// Get should be able to return blob index if is_blob_index is provided,
// otherwise return Status::NotSupported status.
TEST_F(DBBlobIndexTest, Get) {
for (auto tier : kAllTiers) {
DestroyAndReopen(GetTestOptions());
WriteBatch batch;
ASSERT_OK(batch.Put("key", "value"));
ASSERT_OK(PutBlobIndex(&batch, "blob_key", "blob_index"));
ASSERT_OK(Write(&batch));
MoveDataTo(tier);
// Verify normal value
bool is_blob_index = false;
PinnableSlice value;
ASSERT_EQ("value", Get("key"));
ASSERT_EQ("value", GetImpl("key"));
ASSERT_EQ("value", GetImpl("key", &is_blob_index));
ASSERT_FALSE(is_blob_index);
// Verify blob index
ASSERT_TRUE(Get("blob_key", &value).IsNotSupported());
ASSERT_EQ("NOT_SUPPORTED", GetImpl("blob_key"));
ASSERT_EQ("blob_index", GetImpl("blob_key", &is_blob_index));
ASSERT_TRUE(is_blob_index);
}
}
// Get should NOT return Status::NotSupported if blob index is updated with
// a normal value.
TEST_F(DBBlobIndexTest, Updated) {
for (auto tier : kAllTiers) {
DestroyAndReopen(GetTestOptions());
WriteBatch batch;
for (int i = 0; i < 10; i++) {
ASSERT_OK(PutBlobIndex(&batch, "key" + ToString(i), "blob_index"));
}
ASSERT_OK(Write(&batch));
// Avoid blob values from being purged.
const Snapshot* snapshot = dbfull()->GetSnapshot();
ASSERT_OK(Put("key1", "new_value"));
ASSERT_OK(Merge("key2", "a"));
ASSERT_OK(Merge("key2", "b"));
ASSERT_OK(Merge("key2", "c"));
ASSERT_OK(Delete("key3"));
ASSERT_OK(SingleDelete("key4"));
ASSERT_OK(Delete("key5"));
ASSERT_OK(Merge("key5", "a"));
ASSERT_OK(Merge("key5", "b"));
ASSERT_OK(Merge("key5", "c"));
ASSERT_OK(dbfull()->DeleteRange(WriteOptions(), cfh(), "key6", "key9"));
MoveDataTo(tier);
for (int i = 0; i < 10; i++) {
ASSERT_EQ("blob_index", GetBlobIndex("key" + ToString(i), snapshot));
}
ASSERT_EQ("new_value", Get("key1"));
ASSERT_EQ("NOT_SUPPORTED", GetImpl("key2"));
ASSERT_EQ("NOT_FOUND", Get("key3"));
ASSERT_EQ("NOT_FOUND", Get("key4"));
ASSERT_EQ("a,b,c", GetImpl("key5"));
for (int i = 6; i < 9; i++) {
ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i)));
}
ASSERT_EQ("blob_index", GetBlobIndex("key9"));
dbfull()->ReleaseSnapshot(snapshot);
}
}
// Iterator should get blob value if allow_blob flag is set,
// otherwise return Status::NotSupported status.
TEST_F(DBBlobIndexTest, Iterate) {
const std::vector<std::vector<ValueType>> data = {
/*00*/ {kTypeValue},
/*01*/ {kTypeBlobIndex},
/*02*/ {kTypeValue},
/*03*/ {kTypeBlobIndex, kTypeValue},
/*04*/ {kTypeValue},
/*05*/ {kTypeValue, kTypeBlobIndex},
/*06*/ {kTypeValue},
/*07*/ {kTypeDeletion, kTypeBlobIndex},
/*08*/ {kTypeValue},
/*09*/ {kTypeSingleDeletion, kTypeBlobIndex},
/*10*/ {kTypeValue},
/*11*/ {kTypeMerge, kTypeMerge, kTypeMerge, kTypeBlobIndex},
/*12*/ {kTypeValue},
/*13*/
{kTypeMerge, kTypeMerge, kTypeMerge, kTypeDeletion, kTypeBlobIndex},
/*14*/ {kTypeValue},
/*15*/ {kTypeBlobIndex},
/*16*/ {kTypeValue},
};
auto get_key = [](int index) {
char buf[20];
snprintf(buf, sizeof(buf), "%02d", index);
return "key" + std::string(buf);
};
auto get_value = [&](int index, int version) {
return get_key(index) + "_value" + ToString(version);
};
auto check_iterator = [&](Iterator* iterator, Status::Code expected_status,
const Slice& expected_value) {
ASSERT_EQ(expected_status, iterator->status().code());
if (expected_status == Status::kOk) {
ASSERT_TRUE(iterator->Valid());
ASSERT_EQ(expected_value, iterator->value());
} else {
ASSERT_FALSE(iterator->Valid());
}
};
auto create_normal_iterator = [&]() -> Iterator* {
return dbfull()->NewIterator(ReadOptions());
};
auto create_blob_iterator = [&]() -> Iterator* { return GetBlobIterator(); };
auto check_is_blob = [&](bool is_blob) {
return [is_blob](Iterator* iterator) {
ASSERT_EQ(is_blob,
reinterpret_cast<ArenaWrappedDBIter*>(iterator)->IsBlob());
};
};
auto verify = [&](int index, Status::Code expected_status,
const Slice& forward_value, const Slice& backward_value,
std::function<Iterator*()> create_iterator,
std::function<void(Iterator*)> extra_check = nullptr) {
// Seek
auto* iterator = create_iterator();
ASSERT_OK(iterator->Refresh());
iterator->Seek(get_key(index));
check_iterator(iterator, expected_status, forward_value);
if (extra_check) {
extra_check(iterator);
}
delete iterator;
// Next
iterator = create_iterator();
ASSERT_OK(iterator->Refresh());
iterator->Seek(get_key(index - 1));
ASSERT_TRUE(iterator->Valid());
iterator->Next();
check_iterator(iterator, expected_status, forward_value);
if (extra_check) {
extra_check(iterator);
}
delete iterator;
// SeekForPrev
iterator = create_iterator();
ASSERT_OK(iterator->Refresh());
iterator->SeekForPrev(get_key(index));
check_iterator(iterator, expected_status, backward_value);
if (extra_check) {
extra_check(iterator);
}
delete iterator;
// Prev
iterator = create_iterator();
iterator->Seek(get_key(index + 1));
ASSERT_TRUE(iterator->Valid());
iterator->Prev();
check_iterator(iterator, expected_status, backward_value);
if (extra_check) {
extra_check(iterator);
}
delete iterator;
};
for (auto tier : {Tier::kMemtable} /*kAllTiers*/) {
// Avoid values from being purged.
std::vector<const Snapshot*> snapshots;
DestroyAndReopen(GetTestOptions());
// fill data
for (int i = 0; i < static_cast<int>(data.size()); i++) {
for (int j = static_cast<int>(data[i].size()) - 1; j >= 0; j--) {
std::string key = get_key(i);
std::string value = get_value(i, j);
WriteBatch batch;
switch (data[i][j]) {
case kTypeValue:
ASSERT_OK(Put(key, value));
break;
case kTypeDeletion:
ASSERT_OK(Delete(key));
break;
case kTypeSingleDeletion:
ASSERT_OK(SingleDelete(key));
break;
case kTypeMerge:
ASSERT_OK(Merge(key, value));
break;
case kTypeBlobIndex:
ASSERT_OK(PutBlobIndex(&batch, key, value));
ASSERT_OK(Write(&batch));
break;
default:
assert(false);
};
}
snapshots.push_back(dbfull()->GetSnapshot());
}
ASSERT_OK(
dbfull()->DeleteRange(WriteOptions(), cfh(), get_key(15), get_key(16)));
snapshots.push_back(dbfull()->GetSnapshot());
MoveDataTo(tier);
// Normal iterator
verify(1, Status::kNotSupported, "", "", create_normal_iterator);
verify(3, Status::kNotSupported, "", "", create_normal_iterator);
verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
create_normal_iterator);
verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
create_normal_iterator);
verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
create_normal_iterator);
verify(11, Status::kNotSupported, "", "", create_normal_iterator);
verify(13, Status::kOk,
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
create_normal_iterator);
verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
create_normal_iterator);
// Iterator with blob support
verify(1, Status::kOk, get_value(1, 0), get_value(1, 0),
create_blob_iterator, check_is_blob(true));
verify(3, Status::kOk, get_value(3, 0), get_value(3, 0),
create_blob_iterator, check_is_blob(true));
verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
create_blob_iterator, check_is_blob(false));
verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),
create_blob_iterator, check_is_blob(false));
verify(9, Status::kOk, get_value(10, 0), get_value(8, 0),
create_blob_iterator, check_is_blob(false));
verify(11, Status::kNotSupported, "", "", create_blob_iterator);
verify(13, Status::kOk,
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
get_value(13, 2) + "," + get_value(13, 1) + "," + get_value(13, 0),
create_blob_iterator, check_is_blob(false));
verify(15, Status::kOk, get_value(16, 0), get_value(14, 0),
create_blob_iterator, check_is_blob(false));
for (auto* snapshot : snapshots) {
dbfull()->ReleaseSnapshot(snapshot);
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -924,7 +924,7 @@ Status DBImpl::Get(const ReadOptions& read_options,
Status DBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool* value_found,
ReadCallback* callback) {
ReadCallback* callback, bool* is_blob_index) {
assert(pinnable_val != nullptr);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
@ -978,13 +978,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
bool done = false;
if (!skip_memtable) {
if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&range_del_agg, read_options, callback)) {
&range_del_agg, read_options, callback, is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&range_del_agg, read_options, callback)) {
&range_del_agg, read_options, callback,
is_blob_index)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
@ -997,7 +998,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&range_del_agg, value_found, nullptr, nullptr, callback);
&range_del_agg, value_found, nullptr, nullptr, callback,
is_blob_index);
RecordTick(stats_, MEMTABLE_MISS);
}
@ -1437,13 +1439,22 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto snapshot =
read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
->number_
: latest_snapshot;
return NewIteratorImpl(read_options, cfd, snapshot);
}
// To stop compiler from complaining
return nullptr;
}
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
bool allow_blob) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
@ -1490,8 +1501,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number,
((read_options.snapshot != nullptr) ? nullptr : this), cfd);
sv->version_number, ((read_options.snapshot != nullptr) ? nullptr : this),
cfd, allow_blob);
InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
@ -1500,9 +1511,6 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
return db_iter;
}
// To stop compiler from complaining
return nullptr;
}
Status DBImpl::NewIterators(
const ReadOptions& read_options,
@ -1547,28 +1555,16 @@ Status DBImpl::NewIterators(
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
for (size_t i = 0; i < column_families.size(); ++i) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
column_families[i])->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
auto snapshot =
read_options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_
? reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
->number_
: latest_snapshot;
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number,
((read_options.snapshot != nullptr) ? nullptr : this), cfd);
InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator());
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
for (size_t i = 0; i < column_families.size(); ++i) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
column_families[i])->cfd();
iterators->push_back(NewIteratorImpl(read_options, cfd, snapshot));
}
}

@ -53,6 +53,7 @@
namespace rocksdb {
class ArenaWrappedDBIter;
class MemTable;
class TableCache;
class Version;
@ -124,6 +125,7 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value,
bool* value_found = nullptr) override;
using DB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
@ -131,6 +133,11 @@ class DBImpl : public DB {
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
bool allow_blob = false);
virtual const Snapshot* GetSnapshot() override;
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
using DB::GetProperty;
@ -342,6 +349,8 @@ class DBImpl : public DB {
return alive_log_files_.begin()->getting_flushed;
}
Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true,
ColumnFamilyHandle* cfh = nullptr);
@ -638,7 +647,6 @@ class DBImpl : public DB {
private:
friend class DB;
friend class DBTest2_ReadCallbackTest_Test;
friend class InternalStats;
friend class PessimisticTransaction;
friend class WriteCommittedTxn;
@ -651,7 +659,9 @@ class DBImpl : public DB {
friend struct SuperVersion;
friend class CompactedDBImpl;
#ifndef NDEBUG
friend class DBTest2_ReadCallbackTest_Test;
friend class XFTransactionWriteHandler;
friend class DBBlobIndexTest;
#endif
struct CompactionState;
@ -1254,7 +1264,8 @@ class DBImpl : public DB {
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
bool* value_found = nullptr, ReadCallback* callback = nullptr);
bool* value_found = nullptr, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
bool GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,

@ -80,6 +80,15 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
disallow_trivial_move);
}
Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
WriteContext write_context;
InstrumentedMutexLock l(&mutex_);
if (cfd == nullptr) {
cfd = default_cf_handle_->cfd();
}
return SwitchMemtable(cfd, &write_context);
}
Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) {
FlushOptions fo;
fo.wait = wait;

@ -8,8 +8,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_iter.h"
#include <stdexcept>
#include <deque>
#include <string>
#include <limits>
@ -18,7 +16,6 @@
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
@ -105,7 +102,7 @@ class DBIter final: public Iterator {
DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations)
uint64_t max_sequential_skip_in_iterations, bool allow_blob)
: arena_mode_(arena_mode),
env_(_env),
logger_(cf_options.info_log),
@ -122,7 +119,8 @@ class DBIter final: public Iterator {
pin_thru_lifetime_(read_options.pin_data),
total_order_seek_(read_options.total_order_seek),
range_del_agg_(cf_options.internal_comparator, s,
true /* collapse_deletions */) {
true /* collapse_deletions */),
allow_blob_(allow_blob) {
RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = cf_options.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations;
@ -180,6 +178,10 @@ class DBIter final: public Iterator {
return status_;
}
}
bool IsBlob() const {
assert(valid_ && (allow_blob_ || !is_blob_));
return is_blob_;
}
virtual Status GetProperty(std::string prop_name,
std::string* prop) override {
@ -291,6 +293,8 @@ class DBIter final: public Iterator {
RangeDelAggregator range_del_agg_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
bool allow_blob_;
bool is_blob_;
// No copying allowed
DBIter(const DBIter&);
@ -380,6 +384,8 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// - none of the above : saved_key_ can contain anything, it doesn't matter.
uint64_t num_skipped = 0;
is_blob_ = false;
do {
if (!ParseKey(&ikey_)) {
// Skip corrupted keys.
@ -421,6 +427,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
case kTypeBlobIndex:
saved_key_.SetUserKey(
ikey_.user_key,
!pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */);
@ -432,6 +439,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else if (ikey_.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
} else {
is_blob_ = true;
valid_ = true;
}
return;
} else {
valid_ = true;
return;
@ -573,6 +592,18 @@ void DBIter::MergeValuesNewToOld() {
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
valid_ = false;
return;
} else {
assert(false);
}
@ -679,7 +710,6 @@ void DBIter::PrevInternal() {
!iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
if (FindValueForCurrentKey()) {
valid_ = true;
if (!iter_->Valid()) {
return;
}
@ -746,6 +776,7 @@ bool DBIter::FindValueForCurrentKey() {
last_key_entry_type = ikey.type;
switch (last_key_entry_type) {
case kTypeValue:
case kTypeBlobIndex:
if (range_del_agg_.ShouldDelete(
ikey,
RangeDelAggregator::RangePositioningMode::kBackwardTraversal)) {
@ -791,6 +822,7 @@ bool DBIter::FindValueForCurrentKey() {
}
Status s;
is_blob_ = false;
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
@ -806,6 +838,18 @@ bool DBIter::FindValueForCurrentKey() {
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_, true);
} else if (last_not_merge_type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
valid_ = false;
return true;
} else {
assert(last_not_merge_type == kTypeValue);
s = MergeHelper::TimedFullMerge(
@ -817,6 +861,17 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeValue:
// do nothing - we've already has value in saved_value_
break;
case kTypeBlobIndex:
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return true;
}
is_blob_ = true;
break;
default:
assert(false);
break;
@ -850,7 +905,15 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
valid_ = false;
return false;
}
if (ikey.type == kTypeValue) {
if (ikey.type == kTypeBlobIndex && !allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
valid_ = false;
return true;
}
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
valid_ = true;
@ -1161,10 +1224,11 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
const Comparator* user_key_comparator,
InternalIterator* internal_iter,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations) {
DBIter* db_iter = new DBIter(env, read_options, cf_options,
user_key_comparator, internal_iter, sequence,
false, max_sequential_skip_in_iterations);
uint64_t max_sequential_skip_in_iterations,
bool allow_blob) {
DBIter* db_iter = new DBIter(
env, read_options, cf_options, user_key_comparator, internal_iter,
sequence, false, max_sequential_skip_in_iterations, allow_blob);
return db_iter;
}
@ -1192,6 +1256,7 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); }
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
@ -1208,11 +1273,11 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number) {
uint64_t version_number, bool allow_blob) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem)
DBIter(env, read_options, cf_options, cf_options.user_comparator, nullptr,
sequence, true, max_sequential_skip_in_iteration);
sequence, true, max_sequential_skip_in_iteration, allow_blob);
sv_number_ = version_number;
}
@ -1232,7 +1297,7 @@ Status ArenaWrappedDBIter::Refresh() {
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
Init(env, read_options_, *(cfd_->ioptions()), latest_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number);
cur_sv_number, allow_blob_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator());
@ -1248,12 +1313,12 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
DBImpl* db_impl, ColumnFamilyData* cfd) {
DBImpl* db_impl, ColumnFamilyData* cfd, bool allow_blob) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, sequence,
max_sequential_skip_in_iterations, version_number);
max_sequential_skip_in_iterations, version_number, allow_blob);
if (db_impl != nullptr && cfd != nullptr) {
iter->StoreRefreshInfo(read_options, db_impl, cfd);
iter->StoreRefreshInfo(read_options, db_impl, cfd, allow_blob);
}
return iter;

@ -33,7 +33,8 @@ extern Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
const Comparator* user_key_comparator,
InternalIterator* internal_iter,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations);
uint64_t max_sequential_skip_in_iterations,
bool allow_blob = false);
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
@ -63,20 +64,22 @@ class ArenaWrappedDBIter : public Iterator {
virtual Slice value() const override;
virtual Status status() const override;
virtual Status Refresh() override;
bool IsBlob() const;
virtual Status GetProperty(std::string prop_name, std::string* prop) override;
void Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
uint64_t version_number);
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
bool allow_blob);
void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl,
ColumnFamilyData* cfd) {
ColumnFamilyData* cfd, bool allow_blob) {
read_options_ = read_options;
db_impl_ = db_impl;
cfd_ = cfd;
allow_blob_ = allow_blob;
}
private:
@ -86,6 +89,7 @@ class ArenaWrappedDBIter : public Iterator {
ColumnFamilyData* cfd_ = nullptr;
DBImpl* db_impl_ = nullptr;
ReadOptions read_options_;
bool allow_blob_ = false;
};
// Generate the arena wrapped iterator class.
@ -95,6 +99,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr);
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool allow_blob = false);
} // namespace rocksdb

@ -27,7 +27,7 @@ namespace rocksdb {
// and the value type is embedded as the low 8 bits in the sequence
// number in internal keys, we need to use the highest-numbered
// ValueType, not the lowest).
const ValueType kValueTypeForSeek = kTypeSingleDeletion;
const ValueType kValueTypeForSeek = kTypeBlobIndex;
const ValueType kValueTypeForSeekForPrev = kTypeDeletion;
uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {

@ -47,6 +47,8 @@ enum ValueType : unsigned char {
kTypeNoop = 0xD, // WAL only.
kTypeColumnFamilyRangeDeletion = 0xE, // WAL only.
kTypeRangeDeletion = 0xF, // meta block
kTypeColumnFamilyBlobIndex = 0x10, // Blob DB only
kTypeBlobIndex = 0x11, // Blob DB only
kMaxValue = 0x7F // Not used for storing records.
};
@ -57,7 +59,7 @@ extern const ValueType kValueTypeForSeekForPrev;
// Checks whether a type is an inline value type
// (i.e. a type used in memtable skiplist and sst file datablock).
inline bool IsValueType(ValueType t) {
return t <= kTypeMerge || t == kTypeSingleDeletion;
return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex;
}
// Checks whether a type is from user operation

@ -539,6 +539,8 @@ struct Saver {
bool inplace_update_support;
Env* env_;
ReadCallback* callback_;
bool* is_blob_index;
bool CheckCallback(SequenceNumber _seq) {
if (callback_) {
return callback_->IsCommitted(_seq);
@ -581,11 +583,26 @@ static bool SaveValue(void* arg, const char* entry) {
s->seq = seq;
if ((type == kTypeValue || type == kTypeMerge) &&
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) {
type = kTypeRangeDeletion;
}
switch (type) {
case kTypeBlobIndex:
if (s->is_blob_index == nullptr) {
ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
*(s->status) = Status::NotSupported(
"Encounter unsupported blob value. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
} else if (*(s->merge_in_progress)) {
*(s->status) =
Status::NotSupported("Blob DB does not support merge operator.");
}
if (!s->status->ok()) {
*(s->found_final_value) = true;
return false;
}
// intentional fallthrough
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
@ -606,6 +623,9 @@ static bool SaveValue(void* arg, const char* entry) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
}
*(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex);
}
return false;
}
case kTypeDeletion:
@ -662,7 +682,8 @@ static bool SaveValue(void* arg, const char* entry) {
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts, ReadCallback* callback) {
const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
@ -709,6 +730,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
saver.statistics = moptions_.statistics;
saver.env_ = env_;
saver.callback_ = callback;
saver.is_blob_index = is_blob_index;
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;

@ -189,14 +189,15 @@ class MemTable {
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback = nullptr);
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts, ReadCallback* callback = nullptr) {
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr) {
SequenceNumber seq;
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts,
callback);
callback, is_blob_index);
}
// Attempts to update the new_value inplace, else does normal Add

@ -104,9 +104,9 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback) {
ReadCallback* callback, bool* is_blob_index) {
return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg,
seq, read_opts, callback);
seq, read_opts, callback, is_blob_index);
}
bool MemTableListVersion::GetFromHistory(const LookupKey& key,
@ -122,14 +122,15 @@ bool MemTableListVersion::GetFromHistory(const LookupKey& key,
bool MemTableListVersion::GetFromList(
std::list<MemTable*>* list, const LookupKey& key, std::string* value,
Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback) {
SequenceNumber* seq, const ReadOptions& read_opts, ReadCallback* callback,
bool* is_blob_index) {
*seq = kMaxSequenceNumber;
for (auto& memtable : *list) {
SequenceNumber current_seq = kMaxSequenceNumber;
bool done = memtable->Get(key, value, s, merge_context, range_del_agg,
&current_seq, read_opts, callback);
&current_seq, read_opts, callback, is_blob_index);
if (*seq == kMaxSequenceNumber) {
// Store the most recent sequence number of any operation on this key.
// Since we only care about the most recent change, we only need to

@ -55,14 +55,15 @@ class MemTableListVersion {
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback = nullptr);
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr);
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts, ReadCallback* callback = nullptr) {
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr) {
SequenceNumber seq;
return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts,
callback);
callback, is_blob_index);
}
// Similar to Get(), but searches the Memtable history of memtables that
@ -120,7 +121,8 @@ class MemTableListVersion {
std::string* value, Status* s, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts,
ReadCallback* callback = nullptr);
ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
void AddMemTable(MemTable* m);

@ -964,8 +964,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, Status* status,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, bool* value_found,
bool* key_exists, SequenceNumber* seq,
ReadCallback* callback) {
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
bool* is_blob) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
@ -981,7 +981,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, range_del_agg, this->env_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback);
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
@ -1030,6 +1030,12 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
return;
case GetContext::kMerge:
break;
case GetContext::kBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
return;
}
f = fp.GetNextFile();
}

@ -487,7 +487,7 @@ class Version {
Status* status, MergeContext* merge_context,
RangeDelAggregator* range_del_agg, bool* value_found = nullptr,
bool* key_exists = nullptr, SequenceNumber* seq = nullptr,
ReadCallback* callback = nullptr);
ReadCallback* callback = nullptr, bool* is_blob = nullptr);
// Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set.

@ -67,6 +67,7 @@ enum ContentFlags : uint32_t {
HAS_COMMIT = 1 << 7,
HAS_ROLLBACK = 1 << 8,
HAS_DELETE_RANGE = 1 << 9,
HAS_BLOB_INDEX = 1 << 10,
};
struct BatchContentClassifier : public WriteBatch::Handler {
@ -97,6 +98,11 @@ struct BatchContentClassifier : public WriteBatch::Handler {
return Status::OK();
}
Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_BLOB_INDEX;
return Status::OK();
}
Status MarkBeginPrepare() override {
content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
return Status::OK();
@ -328,6 +334,17 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
return Status::Corruption("bad WriteBatch Merge");
}
break;
case kTypeColumnFamilyBlobIndex:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch BlobIndex");
}
// intentional fallthrough
case kTypeBlobIndex:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch BlobIndex");
}
break;
case kTypeLogData:
assert(blob != nullptr);
if (!GetLengthPrefixedSlice(input, blob)) {
@ -424,6 +441,13 @@ Status WriteBatch::Iterate(Handler* handler) const {
empty_batch = false;
found++;
break;
case kTypeColumnFamilyBlobIndex:
case kTypeBlobIndex:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutBlobIndexCF(column_family, key, value);
found++;
break;
case kTypeLogData:
handler->LogData(blob);
empty_batch = true;
@ -776,6 +800,25 @@ Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
value);
}
Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
uint32_t column_family_id,
const Slice& key, const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeBlobIndex));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyBlobIndex));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_BLOB_INDEX,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::PutLogData(const Slice& blob) {
LocalSavePoint save(this);
rep_.push_back(static_cast<char>(kTypeLogData));
@ -967,8 +1010,8 @@ class MemTableInserter : public WriteBatch::Handler {
return true;
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
Status PutCFImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType value_type) {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
if (write_after_commit_) {
@ -986,7 +1029,7 @@ class MemTableInserter : public WriteBatch::Handler {
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_,
mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
@ -1024,11 +1067,11 @@ class MemTableInserter : public WriteBatch::Handler {
value, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value.
mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
mem->Add(sequence_, value_type, key, Slice(prev_buffer, prev_size));
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
} else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
mem->Add(sequence_, value_type, key, Slice(merged_value));
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
@ -1041,6 +1084,11 @@ class MemTableInserter : public WriteBatch::Handler {
return Status::OK();
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
return PutCFImpl(column_family_id, key, value, kTypeValue);
}
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType delete_type) {
MemTable* mem = cf_mems_->GetMemTable();
@ -1209,6 +1257,12 @@ class MemTableInserter : public WriteBatch::Handler {
return Status::OK();
}
virtual Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
// Same as PutCF except for value type.
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex);
}
void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();

@ -99,6 +99,9 @@ class WriteBatchInternal {
static Status Merge(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value);
static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid);
static Status MarkRollback(WriteBatch* batch, const Slice& xid);

@ -206,6 +206,7 @@ class CompactionEventListener {
kDelete,
kSingleDelete,
kRangeDelete,
kBlobIndex,
kInvalid,
};

@ -233,6 +233,12 @@ class WriteBatch : public WriteBatchBase {
}
virtual void Merge(const Slice& /*key*/, const Slice& /*value*/) {}
virtual Status PutBlobIndexCF(uint32_t /*column_family_id*/,
const Slice& /*key*/,
const Slice& /*value*/) {
return Status::InvalidArgument("PutBlobIndexCF not implemented");
}
// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);

@ -34,12 +34,15 @@ void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
} // namespace
GetContext::GetContext(
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
RangeDelAggregator* _range_del_agg, Env* env, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback)
GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* pinnable_val,
bool* value_found, MergeContext* merge_context,
RangeDelAggregator* _range_del_agg, Env* env,
SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr,
ReadCallback* callback, bool* is_blob_index)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
@ -54,7 +57,8 @@ GetContext::GetContext(
seq_(seq),
replay_log_(nullptr),
pinned_iters_mgr_(_pinned_iters_mgr),
callback_(callback) {
callback_(callback),
is_blob_index_(is_blob_index) {
if (seq_) {
*seq_ = kMaxSequenceNumber;
}
@ -104,13 +108,19 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
auto type = parsed_key.type;
// Key matches. Process it
if ((type == kTypeValue || type == kTypeMerge) &&
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
range_del_agg_ != nullptr && range_del_agg_->ShouldDelete(parsed_key)) {
type = kTypeRangeDeletion;
}
switch (type) {
case kTypeValue:
case kTypeBlobIndex:
assert(state_ == kNotFound || state_ == kMerge);
if (type == kTypeBlobIndex && is_blob_index_ == nullptr) {
// Blob value not supported. Stop.
state_ = kBlobIndex;
return false;
}
if (kNotFound == state_) {
state_ = kFound;
if (LIKELY(pinnable_val_ != nullptr)) {
@ -136,6 +146,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
}
}
if (is_blob_index_ != nullptr) {
*is_blob_index_ = (type == kTypeBlobIndex);
}
return false;
case kTypeDeletion:

@ -23,7 +23,8 @@ class GetContext {
kFound,
kDeleted,
kCorrupt,
kMerge // saver contains the current merge result (the operands)
kMerge, // saver contains the current merge result (the operands)
kBlobIndex,
};
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
@ -32,7 +33,7 @@ class GetContext {
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
Env* env, SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr);
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr);
void MarkKeyMayExist();
@ -93,6 +94,7 @@ class GetContext {
PinnedIteratorsManager* pinned_iters_mgr_;
ReadCallback* callback_;
bool sample_;
bool* is_blob_index_;
};
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,

Loading…
Cancel
Save