Fixing blob db sequence number handling

Summary:
Blob db rely on base db returning sequence number through write batch after DB::Write(). However after recent changes to the write path, DB::Writ()e no longer return sequence number in some cases. Fixing it by have WriteBatchInternal::InsertInto() always encode sequence number into write batch.

Stacking on #2375.
Closes https://github.com/facebook/rocksdb/pull/2385

Differential Revision: D5148358

Pulled By: yiwu-arbug

fbshipit-source-id: 8bda0aa07b9334ed03ed381548b39d167dc20c33
main
Yi Wu 8 years ago committed by Facebook Github Bot
parent 51ac91f586
commit ad19eb8686
  1. 1
      CMakeLists.txt
  2. 4
      Makefile
  3. 3
      TARGETS
  4. 9
      db/db_impl_write.cc
  5. 35
      db/db_test_util.cc
  6. 36
      db/db_test_util.h
  7. 82
      db/db_write_test.cc
  8. 55
      db/write_batch.cc
  9. 2
      db/write_batch_internal.h
  10. 1
      src.mk
  11. 147
      utilities/blob_db/blob_db_impl.cc
  12. 7
      utilities/blob_db/blob_db_impl.h
  13. 39
      utilities/blob_db/blob_db_test.cc

@ -632,6 +632,7 @@ set(TESTS
db/db_test2.cc db/db_test2.cc
db/db_universal_compaction_test.cc db/db_universal_compaction_test.cc
db/db_wal_test.cc db/db_wal_test.cc
db/db_write_test.cc
db/dbformat_test.cc db/dbformat_test.cc
db/deletefile_test.cc db/deletefile_test.cc
db/external_sst_file_basic_test.cc db/external_sst_file_basic_test.cc

@ -362,6 +362,7 @@ TESTS = \
db_properties_test \ db_properties_test \
db_table_properties_test \ db_table_properties_test \
db_statistics_test \ db_statistics_test \
db_write_test \
autovector_test \ autovector_test \
cleanable_test \ cleanable_test \
column_family_test \ column_family_test \
@ -1063,6 +1064,9 @@ db_sst_test: db/db_sst_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
db_statistics_test: db/db_statistics_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_statistics_test: db/db_statistics_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
db_write_test: db/db_write_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -458,7 +458,8 @@ ROCKS_TESTS = [['merger_test', 'table/merger_test.cc', 'serial'],
['skiplist_test', 'memtable/skiplist_test.cc', 'serial'], ['skiplist_test', 'memtable/skiplist_test.cc', 'serial'],
['lru_cache_test', 'cache/lru_cache_test.cc', 'serial'], ['lru_cache_test', 'cache/lru_cache_test.cc', 'serial'],
['plain_table_db_test', 'db/plain_table_db_test.cc', 'serial'], ['plain_table_db_test', 'db/plain_table_db_test.cc', 'serial'],
['blob_db_test', 'utilities/blob_db/blob_db_test.cc', 'serial']] ['blob_db_test', 'utilities/blob_db/blob_db_test.cc', 'serial'],
['db_write_test', 'db/db_write_test.cc', 'serial']]
# Generate a test rule for each entry in ROCKS_TESTS # Generate a test rule for each entry in ROCKS_TESTS
for test_cfg in ROCKS_TESTS: for test_cfg in ROCKS_TESTS:

@ -91,9 +91,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (w.ShouldWriteToMemtable()) { if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
WriteBatchInternal::SetSequence(w.batch, w.sequence);
w.status = WriteBatchInternal::InsertInto( w.status = WriteBatchInternal::InsertInto(
&w, &column_family_memtables, &flush_scheduler_, &w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this, write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/); true /*concurrent_memtable_writes*/);
} }
@ -239,9 +238,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence); assert(w.sequence == current_sequence);
WriteBatchInternal::SetSequence(w.batch, w.sequence);
w.status = WriteBatchInternal::InsertInto( w.status = WriteBatchInternal::InsertInto(
&w, &column_family_memtables, &flush_scheduler_, &w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/); this, true /*concurrent_memtable_writes*/);
} }
@ -384,11 +382,10 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(w.ShouldWriteToMemtable()); assert(w.ShouldWriteToMemtable());
WriteBatchInternal::SetSequence(w.batch, w.sequence);
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto( w.status = WriteBatchInternal::InsertInto(
&w, &column_family_memtables, &flush_scheduler_, &w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this, write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/); true /*concurrent_memtable_writes*/);
if (write_thread_.CompleteParallelMemTableWriter(&w)) { if (write_thread_.CompleteParallelMemTableWriter(&w)) {

@ -43,9 +43,9 @@ SpecialEnv::SpecialEnv(Env* base)
} }
DBTestBase::DBTestBase(const std::string path) DBTestBase::DBTestBase(const std::string path)
: option_config_(kDefault), : mem_env_(!getenv("MEM_ENV") ? nullptr : new MockEnv(Env::Default())),
mem_env_(!getenv("MEM_ENV") ? nullptr : new MockEnv(Env::Default())), env_(new SpecialEnv(mem_env_ ? mem_env_ : Env::Default())),
env_(new SpecialEnv(mem_env_ ? mem_env_ : Env::Default())) { option_config_(kDefault) {
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::HIGH);
dbname_ = test::TmpDir(env_) + path; dbname_ = test::TmpDir(env_) + path;
@ -245,7 +245,17 @@ bool DBTestBase::ChangeFilterOptions() {
// Return the current option configuration. // Return the current option configuration.
Options DBTestBase::CurrentOptions( Options DBTestBase::CurrentOptions(
const anon::OptionsOverride& options_override) { const anon::OptionsOverride& options_override) const {
return GetOptions(option_config_, GetDefaultOptions(), options_override);
}
Options DBTestBase::CurrentOptions(
const Options& default_options,
const anon::OptionsOverride& options_override) const {
return GetOptions(option_config_, default_options, options_override);
}
Options DBTestBase::GetDefaultOptions() {
Options options; Options options;
options.write_buffer_size = 4090 * 4096; options.write_buffer_size = 4090 * 4096;
options.target_file_size_base = 2 * 1024 * 1024; options.target_file_size_base = 2 * 1024 * 1024;
@ -253,15 +263,14 @@ Options DBTestBase::CurrentOptions(
options.max_open_files = 5000; options.max_open_files = 5000;
options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options.compaction_pri = CompactionPri::kByCompensatedSize; options.compaction_pri = CompactionPri::kByCompensatedSize;
return options;
return CurrentOptions(options, options_override);
} }
Options DBTestBase::CurrentOptions( Options DBTestBase::GetOptions(
const Options& defaultOptions, int option_config, const Options& default_options,
const anon::OptionsOverride& options_override) { const anon::OptionsOverride& options_override) const {
// this redundant copy is to minimize code change w/o having lint error. // this redundant copy is to minimize code change w/o having lint error.
Options options = defaultOptions; Options options = default_options;
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
bool set_block_based_table_factory = true; bool set_block_based_table_factory = true;
#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \ #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
@ -272,7 +281,7 @@ Options DBTestBase::CurrentOptions(
"NewWritableFile:O_DIRECT"); "NewWritableFile:O_DIRECT");
#endif #endif
switch (option_config_) { switch (option_config) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
case kHashSkipList: case kHashSkipList:
options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.prefix_extractor.reset(NewFixedPrefixTransform(1));
@ -456,6 +465,10 @@ Options DBTestBase::CurrentOptions(
#endif #endif
break; break;
} }
case kPipelinedWrite: {
options.enable_pipelined_write = true;
break;
}
default: default:
break; break;

@ -606,9 +606,9 @@ class TestPutOperator : public MergeOperator {
}; };
class DBTestBase : public testing::Test { class DBTestBase : public testing::Test {
protected: public:
// Sequence of option configurations to try // Sequence of option configurations to try
enum OptionConfig { enum OptionConfig : int {
kDefault = 0, kDefault = 0,
kBlockBasedTableWithPrefixHashIndex = 1, kBlockBasedTableWithPrefixHashIndex = 1,
kBlockBasedTableWithWholeKeyHashIndex = 2, kBlockBasedTableWithWholeKeyHashIndex = 2,
@ -639,15 +639,15 @@ class DBTestBase : public testing::Test {
kRowCache = 27, kRowCache = 27,
kRecycleLogFiles = 28, kRecycleLogFiles = 28,
kConcurrentSkipList = 29, kConcurrentSkipList = 29,
kDirectIO = 30, kPipelinedWrite = 30,
kEnd = 31, kEnd = 31,
kLevelSubcompactions = 32, kDirectIO = 32,
kUniversalSubcompactions = 33, kLevelSubcompactions = 33,
kBlockBasedTableWithIndexRestartInterval = 34, kUniversalSubcompactions = 34,
kBlockBasedTableWithPartitionedIndex = 35, kBlockBasedTableWithIndexRestartInterval = 35,
kPartitionedFilterWithNewTableReaderForCompactions = 36, kBlockBasedTableWithPartitionedIndex = 36,
kPartitionedFilterWithNewTableReaderForCompactions = 37,
}; };
int option_config_;
public: public:
std::string dbname_; std::string dbname_;
@ -658,6 +658,7 @@ class DBTestBase : public testing::Test {
DB* db_; DB* db_;
std::vector<ColumnFamilyHandle*> handles_; std::vector<ColumnFamilyHandle*> handles_;
int option_config_;
Options last_options_; Options last_options_;
// Skip some options, as they may not be applicable to a specific test. // Skip some options, as they may not be applicable to a specific test.
@ -708,12 +709,19 @@ class DBTestBase : public testing::Test {
bool ChangeFilterOptions(); bool ChangeFilterOptions();
// Return the current option configuration. // Return the current option configuration.
Options CurrentOptions( Options CurrentOptions(const anon::OptionsOverride& options_override =
const anon::OptionsOverride& options_override = anon::OptionsOverride()); anon::OptionsOverride()) const;
Options CurrentOptions(const Options& default_options,
const anon::OptionsOverride& options_override =
anon::OptionsOverride()) const;
static Options GetDefaultOptions();
Options CurrentOptions( Options GetOptions(int option_config,
const Options& defaultOptions, const Options& default_options = GetDefaultOptions(),
const anon::OptionsOverride& options_override = anon::OptionsOverride()); const anon::OptionsOverride& options_override =
anon::OptionsOverride()) const;
DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); } DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }

@ -0,0 +1,82 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#include <memory>
#include <thread>
#include <vector>
#include "db/db_test_util.h"
#include "db/write_batch_internal.h"
#include "port/stack_trace.h"
#include "util/sync_point.h"
namespace rocksdb {
// Test variations of WriteImpl.
class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
public:
DBWriteTest() : DBTestBase("/db_write_test") {}
void Open() { DBTestBase::Reopen(GetOptions(GetParam())); }
};
// Sequence number should be return through input write batch.
TEST_P(DBWriteTest, ReturnSeuqneceNumber) {
Random rnd(4422);
Open();
for (int i = 0; i < 100; i++) {
WriteBatch batch;
batch.Put("key" + ToString(i), test::RandomHumanReadableString(&rnd, 10));
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(),
WriteBatchInternal::Sequence(&batch));
}
}
TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
constexpr size_t kThreads = 16;
constexpr size_t kNumKeys = 1000;
Random rnd(4422);
Open();
ASSERT_EQ(0, dbfull()->GetLatestSequenceNumber());
// Check each sequence is used once and only once.
std::vector<std::atomic_flag> flags(kNumKeys * kThreads + 1);
for (size_t i = 0; i < flags.size(); i++) {
flags[i].clear();
}
auto writer = [&](size_t id) {
for (size_t k = 0; k < kNumKeys; k++) {
WriteBatch batch;
batch.Put("key" + ToString(id) + "-" + ToString(k),
test::RandomHumanReadableString(&rnd, 10));
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
ASSERT_GT(sequence, 0);
ASSERT_LE(sequence, kNumKeys * kThreads);
// The sequence isn't consumed by someone else.
ASSERT_FALSE(flags[sequence].test_and_set());
}
};
std::vector<std::thread> threads;
for (size_t i = 0; i < kThreads; i++) {
threads.emplace_back(writer, i);
}
for (size_t i = 0; i < kThreads; i++) {
threads[i].join();
}
}
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kPipelinedWrite));
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -852,24 +852,24 @@ class MemTableInserter : public WriteBatch::Handler {
public: public:
// cf_mems should not be shared with concurrent inserters // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db, uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes, bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr) bool* has_valid_writes = nullptr)
: sequence_(sequence), : sequence_(_sequence),
cf_mems_(cf_mems), cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler), flush_scheduler_(flush_scheduler),
ignore_missing_column_families_(ignore_missing_column_families), ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number), recovering_log_number_(recovering_log_number),
log_number_ref_(0), log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)), db_(reinterpret_cast<DBImpl*>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes), concurrent_memtable_writes_(concurrent_memtable_writes),
post_info_created_(false), post_info_created_(false),
has_valid_writes_(has_valid_writes), has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr) { rebuilding_trx_(nullptr) {
assert(cf_mems_); assert(cf_mems_);
} }
~MemTableInserter() { ~MemTableInserter() {
@ -884,7 +884,7 @@ public:
void set_log_number_ref(uint64_t log) { log_number_ref_ = log; } void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
SequenceNumber get_final_sequence() const { return sequence_; } SequenceNumber sequence() const { return sequence_; }
void PostProcess() { void PostProcess() {
assert(concurrent_memtable_writes_); assert(concurrent_memtable_writes_);
@ -1304,6 +1304,7 @@ Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
if (!w->ShouldWriteToMemtable()) { if (!w->ShouldWriteToMemtable()) {
continue; continue;
} }
SetSequence(w->batch, inserter.sequence());
inserter.set_log_number_ref(w->log_ref); inserter.set_log_number_ref(w->log_ref);
w->status = w->batch->Iterate(&inserter); w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) { if (!w->status.ok()) {
@ -1314,16 +1315,17 @@ Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
} }
Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer, Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, bool ignore_missing_column_families,
uint64_t log_number, DB* db, uint64_t log_number, DB* db,
bool concurrent_memtable_writes) { bool concurrent_memtable_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(writer->batch), assert(writer->ShouldWriteToMemtable());
memtables, flush_scheduler, MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db, ignore_missing_column_families, log_number, db,
concurrent_memtable_writes); concurrent_memtable_writes);
assert(writer->ShouldWriteToMemtable()); SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref); inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter); Status s = writer->batch->Iterate(&inserter);
if (concurrent_memtable_writes) { if (concurrent_memtable_writes) {
@ -1337,13 +1339,12 @@ Status WriteBatchInternal::InsertInto(
FlushScheduler* flush_scheduler, bool ignore_missing_column_families, FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes, uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* last_seq_used, bool* has_valid_writes) { SequenceNumber* last_seq_used, bool* has_valid_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
flush_scheduler, ignore_missing_column_families, ignore_missing_column_families, log_number, db,
log_number, db, concurrent_memtable_writes, concurrent_memtable_writes, has_valid_writes);
has_valid_writes);
Status s = batch->Iterate(&inserter); Status s = batch->Iterate(&inserter);
if (last_seq_used != nullptr) { if (last_seq_used != nullptr) {
*last_seq_used = inserter.get_final_sequence(); *last_seq_used = inserter.sequence();
} }
if (concurrent_memtable_writes) { if (concurrent_memtable_writes) {
inserter.PostProcess(); inserter.PostProcess();

@ -172,7 +172,7 @@ class WriteBatchInternal {
SequenceNumber* last_seq_used = nullptr, SequenceNumber* last_seq_used = nullptr,
bool* has_valid_writes = nullptr); bool* has_valid_writes = nullptr);
static Status InsertInto(WriteThread::Writer* writer, static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families = false, bool ignore_missing_column_families = false,

@ -255,6 +255,7 @@ MAIN_SOURCES = \
db/db_test.cc \ db/db_test.cc \
db/db_universal_compaction_test.cc \ db/db_universal_compaction_test.cc \
db/db_wal_test.cc \ db/db_wal_test.cc \
db/db_write_test.cc \
db/dbformat_test.cc \ db/dbformat_test.cc \
db/deletefile_test.cc \ db/deletefile_test.cc \
db/external_sst_file_basic_test.cc \ db/external_sst_file_basic_test.cc \

@ -98,6 +98,7 @@ class BlobHandle {
void set_compression(CompressionType t) { compression_ = t; } void set_compression(CompressionType t) { compression_ = t; }
void EncodeTo(std::string* dst) const; void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* input); Status DecodeFrom(Slice* input);
void clear(); void clear();
@ -919,7 +920,7 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
if (previous_put) { if (previous_put) {
impl->AppendSN(last_file, -1); impl->AppendSN(last_file, 0 /*sequence number*/);
previous_put = false; previous_put = false;
} }
@ -977,7 +978,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
if (handler1.previous_put) { if (handler1.previous_put) {
// this is the sequence number of the write. // this is the sequence number of the write.
SequenceNumber sn = WriteBatchInternal::Sequence(&handler1.updates_blob); SequenceNumber sn = WriteBatchInternal::Sequence(&handler1.updates_blob) +
WriteBatchInternal::Count(&handler1.updates_blob) - 1;
AppendSN(handler1.last_file, sn); AppendSN(handler1.last_file, sn);
CloseIf(handler1.last_file); CloseIf(handler1.last_file);
@ -1196,8 +1198,8 @@ std::vector<Status> BlobDBImpl::MultiGet(
} }
Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
const std::string& index_entry, const std::string& index_entry, std::string* value,
std::string* value) { SequenceNumber* sequence) {
Slice index_entry_slice(index_entry); Slice index_entry_slice(index_entry);
BlobHandle handle; BlobHandle handle;
Status s = handle.DecodeFrom(&index_entry_slice); Status s = handle.DecodeFrom(&index_entry_slice);
@ -1245,68 +1247,86 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
std::shared_ptr<RandomAccessFileReader> reader = std::shared_ptr<RandomAccessFileReader> reader =
GetOrOpenRandomAccessReader(bfile, myenv_, env_options_); GetOrOpenRandomAccessReader(bfile, myenv_, env_options_);
std::string* valueptr = value; if (value != nullptr) {
std::string value_c; std::string* valueptr = value;
if (bdb_options_.compression != kNoCompression) { std::string value_c;
valueptr = &value_c; if (bdb_options_.compression != kNoCompression) {
} valueptr = &value_c;
}
// allocate the buffer. This is safe in C++11 // allocate the buffer. This is safe in C++11
valueptr->resize(handle.size()); valueptr->resize(handle.size());
char* buffer = &(*valueptr)[0]; char* buffer = &(*valueptr)[0];
Slice blob_value; Slice blob_value;
s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer); s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer);
if (!s.ok() || blob_value.size() != handle.size()) { if (!s.ok() || blob_value.size() != handle.size()) {
if (debug_level_ >= 2) { if (debug_level_ >= 2) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Failed to read blob from file: %s blob_offset: %" PRIu64 "Failed to read blob from file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " read: %d key: %s status: '%s'", " blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(), bfile->PathName().c_str(), handle.offset(), handle.size(),
static_cast<int>(blob_value.size()), key.data(), static_cast<int>(blob_value.size()), key.data(),
s.ToString().c_str()); s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
} }
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
}
Slice crc_slice; Slice crc_slice;
uint32_t crc_exp; uint32_t crc_exp;
std::string crc_str; std::string crc_str;
crc_str.resize(sizeof(uint32_t)); crc_str.resize(sizeof(uint32_t));
char* crc_buffer = &(crc_str[0]); char* crc_buffer = &(crc_str[0]);
s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)), s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)),
sizeof(uint32_t), &crc_slice, crc_buffer); sizeof(uint32_t), &crc_slice, crc_buffer);
if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) { if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) {
if (debug_level_ >= 2) { if (debug_level_ >= 2) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Failed to fetch blob crc file: %s blob_offset: %" PRIu64 "Failed to fetch blob crc file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'", " blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(), key.data(), bfile->PathName().c_str(), handle.offset(), handle.size(),
s.ToString().c_str()); key.data(), s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
} }
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
}
uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size()); uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size());
crc = crc32c::Mask(crc); // Adjust for storage crc = crc32c::Mask(crc); // Adjust for storage
if (crc != crc_exp) { if (crc != crc_exp) {
if (debug_level_ >= 2) { if (debug_level_ >= 2) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Blob crc mismatch file: %s blob_offset: %" PRIu64 "Blob crc mismatch file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'", " blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(), key.data(), bfile->PathName().c_str(), handle.offset(), handle.size(),
s.ToString().c_str()); key.data(), s.ToString().c_str());
}
return Status::Corruption("Corruption. Blob CRC mismatch");
}
if (bdb_options_.compression != kNoCompression) {
BlockContents contents;
s = UncompressBlockContentsForCompressionType(
blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
*(cfd->ioptions()));
*value = contents.data.ToString();
} }
return Status::Corruption("Corruption. Blob CRC mismatch");
} }
if (bdb_options_.compression != kNoCompression) { if (sequence != nullptr) {
BlockContents contents; char buffer[BlobLogRecord::kFooterSize];
s = UncompressBlockContentsForCompressionType( Slice footer_slice;
blob_value.data(), blob_value.size(), &contents, s = reader->Read(handle.offset() + handle.size(),
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, BlobLogRecord::kFooterSize, &footer_slice, buffer);
*(cfd->ioptions())); if (!s.ok()) {
*value = contents.data.ToString(); return s;
}
BlobLogRecord record;
s = record.DecodeFooterFrom(footer_slice);
if (!s.ok()) {
return s;
}
*sequence = record.GetSN();
} }
return s; return s;
@ -2205,6 +2225,19 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& opts,
column_family, this); column_family, this);
} }
#ifndef NDEBUG
Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
SequenceNumber* sequence) {
std::string index_entry;
Status s = db_->Get(ReadOptions(), key, &index_entry);
if (!s.ok()) {
return s;
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence);
}
#endif // !NDEBUG
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -196,6 +196,10 @@ class BlobDBImpl : public BlobDB {
~BlobDBImpl(); ~BlobDBImpl();
#ifndef NDEBUG
Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence);
#endif // !NDEBUG
private: private:
static bool ExtractTTLFromBlob(const Slice& value, Slice* newval, static bool ExtractTTLFromBlob(const Slice& value, Slice* newval,
int32_t* ttl_val); int32_t* ttl_val);
@ -203,7 +207,8 @@ class BlobDBImpl : public BlobDB {
Status OpenPhase1(); Status OpenPhase1();
Status CommonGet(const ColumnFamilyData* cfd, const Slice& key, Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
const std::string& index_entry, std::string* value); const std::string& index_entry, std::string* value,
SequenceNumber* sequence = nullptr);
// Just before flush starts acting on memtable files, // Just before flush starts acting on memtable files,
// this handler is called. // this handler is called.

@ -490,6 +490,45 @@ TEST_F(BlobDBTest, Large) {
ASSERT_EQ(value3, value); ASSERT_EQ(value3, value);
} }
// Test sequence number store in blob file is correct.
TEST_F(BlobDBTest, SequenceNumber) {
Random rnd(223);
Reopen(BlobDBOptionsImpl(), Options());
SequenceNumber sequence = blobdb_->GetLatestSequenceNumber();
BlobDBImpl *blobdb_impl = reinterpret_cast<BlobDBImpl *>(blobdb_);
for (int i = 0; i < 100; i++) {
std::string key = "key" + ToString(i);
PutRandom(key, &rnd);
sequence += 1;
ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber());
SequenceNumber actual_sequence = 0;
ASSERT_OK(blobdb_impl->TEST_GetSequenceNumber(key, &actual_sequence));
ASSERT_EQ(sequence, actual_sequence);
}
for (int i = 0; i < 100; i++) {
WriteBatch batch;
size_t batch_size = rnd.Next() % 10 + 1;
for (size_t k = 0; k < batch_size; k++) {
std::string value = test::RandomHumanReadableString(&rnd, 1000);
ASSERT_OK(batch.Put("key" + ToString(i) + "-" + ToString(k), value));
}
ASSERT_OK(blobdb_->Write(WriteOptions(), &batch));
sequence += batch_size;
ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber());
for (size_t k = 0; k < batch_size; k++) {
std::string key = "key" + ToString(i) + "-" + ToString(k);
SequenceNumber actual_sequence;
ASSERT_OK(blobdb_impl->TEST_GetSequenceNumber(key, &actual_sequence));
// We only write sequence for the last key in a batch.
if (k + 1 < batch_size) {
ASSERT_EQ(0, actual_sequence);
} else {
ASSERT_EQ(sequence, actual_sequence);
}
}
}
}
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save