diff --git a/CMakeLists.txt b/CMakeLists.txt index 482d2fb0a..561a5e807 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -632,6 +632,7 @@ set(TESTS db/db_test2.cc db/db_universal_compaction_test.cc db/db_wal_test.cc + db/db_write_test.cc db/dbformat_test.cc db/deletefile_test.cc db/external_sst_file_basic_test.cc diff --git a/Makefile b/Makefile index fc8cf50ba..0af4af9c2 100644 --- a/Makefile +++ b/Makefile @@ -362,6 +362,7 @@ TESTS = \ db_properties_test \ db_table_properties_test \ db_statistics_test \ + db_write_test \ autovector_test \ cleanable_test \ column_family_test \ @@ -1063,6 +1064,9 @@ db_sst_test: db/db_sst_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_statistics_test: db/db_statistics_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_write_test: db/db_write_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 27bcc7844..4f5ee1c01 100644 --- a/TARGETS +++ b/TARGETS @@ -458,7 +458,8 @@ ROCKS_TESTS = [['merger_test', 'table/merger_test.cc', 'serial'], ['skiplist_test', 'memtable/skiplist_test.cc', 'serial'], ['lru_cache_test', 'cache/lru_cache_test.cc', 'serial'], ['plain_table_db_test', 'db/plain_table_db_test.cc', 'serial'], - ['blob_db_test', 'utilities/blob_db/blob_db_test.cc', 'serial']] + ['blob_db_test', 'utilities/blob_db/blob_db_test.cc', 'serial'], + ['db_write_test', 'db/db_write_test.cc', 'serial']] # Generate a test rule for each entry in ROCKS_TESTS for test_cfg in ROCKS_TESTS: diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 20b6efd09..3fa103ff7 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -91,9 +91,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (w.ShouldWriteToMemtable()) { ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); - WriteBatchInternal::SetSequence(w.batch, w.sequence); w.status = WriteBatchInternal::InsertInto( - &w, &column_family_memtables, &flush_scheduler_, + &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } @@ -239,9 +238,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); assert(w.sequence == current_sequence); - WriteBatchInternal::SetSequence(w.batch, w.sequence); w.status = WriteBatchInternal::InsertInto( - &w, &column_family_memtables, &flush_scheduler_, + &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } @@ -384,11 +382,10 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { assert(w.ShouldWriteToMemtable()); - WriteBatchInternal::SetSequence(w.batch, w.sequence); ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); w.status = WriteBatchInternal::InsertInto( - &w, &column_family_memtables, &flush_scheduler_, + &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); if (write_thread_.CompleteParallelMemTableWriter(&w)) { diff --git a/db/db_test_util.cc b/db/db_test_util.cc index ad2b1ee0b..d932b5542 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -43,9 +43,9 @@ SpecialEnv::SpecialEnv(Env* base) } DBTestBase::DBTestBase(const std::string path) - : option_config_(kDefault), - mem_env_(!getenv("MEM_ENV") ? nullptr : new MockEnv(Env::Default())), - env_(new SpecialEnv(mem_env_ ? mem_env_ : Env::Default())) { + : mem_env_(!getenv("MEM_ENV") ? nullptr : new MockEnv(Env::Default())), + env_(new SpecialEnv(mem_env_ ? mem_env_ : Env::Default())), + option_config_(kDefault) { env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::HIGH); dbname_ = test::TmpDir(env_) + path; @@ -245,7 +245,17 @@ bool DBTestBase::ChangeFilterOptions() { // Return the current option configuration. Options DBTestBase::CurrentOptions( - const anon::OptionsOverride& options_override) { + const anon::OptionsOverride& options_override) const { + return GetOptions(option_config_, GetDefaultOptions(), options_override); +} + +Options DBTestBase::CurrentOptions( + const Options& default_options, + const anon::OptionsOverride& options_override) const { + return GetOptions(option_config_, default_options, options_override); +} + +Options DBTestBase::GetDefaultOptions() { Options options; options.write_buffer_size = 4090 * 4096; options.target_file_size_base = 2 * 1024 * 1024; @@ -253,15 +263,14 @@ Options DBTestBase::CurrentOptions( options.max_open_files = 5000; options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; options.compaction_pri = CompactionPri::kByCompensatedSize; - - return CurrentOptions(options, options_override); + return options; } -Options DBTestBase::CurrentOptions( - const Options& defaultOptions, - const anon::OptionsOverride& options_override) { +Options DBTestBase::GetOptions( + int option_config, const Options& default_options, + const anon::OptionsOverride& options_override) const { // this redundant copy is to minimize code change w/o having lint error. - Options options = defaultOptions; + Options options = default_options; BlockBasedTableOptions table_options; bool set_block_based_table_factory = true; #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \ @@ -272,7 +281,7 @@ Options DBTestBase::CurrentOptions( "NewWritableFile:O_DIRECT"); #endif - switch (option_config_) { + switch (option_config) { #ifndef ROCKSDB_LITE case kHashSkipList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); @@ -456,6 +465,10 @@ Options DBTestBase::CurrentOptions( #endif break; } + case kPipelinedWrite: { + options.enable_pipelined_write = true; + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 384b19243..574dc9ef6 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -606,9 +606,9 @@ class TestPutOperator : public MergeOperator { }; class DBTestBase : public testing::Test { - protected: + public: // Sequence of option configurations to try - enum OptionConfig { + enum OptionConfig : int { kDefault = 0, kBlockBasedTableWithPrefixHashIndex = 1, kBlockBasedTableWithWholeKeyHashIndex = 2, @@ -639,15 +639,15 @@ class DBTestBase : public testing::Test { kRowCache = 27, kRecycleLogFiles = 28, kConcurrentSkipList = 29, - kDirectIO = 30, + kPipelinedWrite = 30, kEnd = 31, - kLevelSubcompactions = 32, - kUniversalSubcompactions = 33, - kBlockBasedTableWithIndexRestartInterval = 34, - kBlockBasedTableWithPartitionedIndex = 35, - kPartitionedFilterWithNewTableReaderForCompactions = 36, + kDirectIO = 32, + kLevelSubcompactions = 33, + kUniversalSubcompactions = 34, + kBlockBasedTableWithIndexRestartInterval = 35, + kBlockBasedTableWithPartitionedIndex = 36, + kPartitionedFilterWithNewTableReaderForCompactions = 37, }; - int option_config_; public: std::string dbname_; @@ -658,6 +658,7 @@ class DBTestBase : public testing::Test { DB* db_; std::vector handles_; + int option_config_; Options last_options_; // Skip some options, as they may not be applicable to a specific test. @@ -708,12 +709,19 @@ class DBTestBase : public testing::Test { bool ChangeFilterOptions(); // Return the current option configuration. - Options CurrentOptions( - const anon::OptionsOverride& options_override = anon::OptionsOverride()); + Options CurrentOptions(const anon::OptionsOverride& options_override = + anon::OptionsOverride()) const; + + Options CurrentOptions(const Options& default_options, + const anon::OptionsOverride& options_override = + anon::OptionsOverride()) const; + + static Options GetDefaultOptions(); - Options CurrentOptions( - const Options& defaultOptions, - const anon::OptionsOverride& options_override = anon::OptionsOverride()); + Options GetOptions(int option_config, + const Options& default_options = GetDefaultOptions(), + const anon::OptionsOverride& options_override = + anon::OptionsOverride()) const; DBImpl* dbfull() { return reinterpret_cast(db_); } diff --git a/db/db_write_test.cc b/db/db_write_test.cc new file mode 100644 index 000000000..91281448a --- /dev/null +++ b/db/db_write_test.cc @@ -0,0 +1,82 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#include +#include +#include +#include "db/db_test_util.h" +#include "db/write_batch_internal.h" +#include "port/stack_trace.h" +#include "util/sync_point.h" + +namespace rocksdb { + +// Test variations of WriteImpl. +class DBWriteTest : public DBTestBase, public testing::WithParamInterface { + public: + DBWriteTest() : DBTestBase("/db_write_test") {} + + void Open() { DBTestBase::Reopen(GetOptions(GetParam())); } +}; + +// Sequence number should be return through input write batch. +TEST_P(DBWriteTest, ReturnSeuqneceNumber) { + Random rnd(4422); + Open(); + for (int i = 0; i < 100; i++) { + WriteBatch batch; + batch.Put("key" + ToString(i), test::RandomHumanReadableString(&rnd, 10)); + ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), + WriteBatchInternal::Sequence(&batch)); + } +} + +TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { + constexpr size_t kThreads = 16; + constexpr size_t kNumKeys = 1000; + Random rnd(4422); + Open(); + ASSERT_EQ(0, dbfull()->GetLatestSequenceNumber()); + // Check each sequence is used once and only once. + std::vector flags(kNumKeys * kThreads + 1); + for (size_t i = 0; i < flags.size(); i++) { + flags[i].clear(); + } + auto writer = [&](size_t id) { + for (size_t k = 0; k < kNumKeys; k++) { + WriteBatch batch; + batch.Put("key" + ToString(id) + "-" + ToString(k), + test::RandomHumanReadableString(&rnd, 10)); + ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); + SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); + ASSERT_GT(sequence, 0); + ASSERT_LE(sequence, kNumKeys * kThreads); + // The sequence isn't consumed by someone else. + ASSERT_FALSE(flags[sequence].test_and_set()); + } + }; + std::vector threads; + for (size_t i = 0; i < kThreads; i++) { + threads.emplace_back(writer, i); + } + for (size_t i = 0; i < kThreads; i++) { + threads[i].join(); + } +} + +INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, + testing::Values(DBTestBase::kDefault, + DBTestBase::kPipelinedWrite)); + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/write_batch.cc b/db/write_batch.cc index 0cc48b087..8ecf5794f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -852,24 +852,24 @@ class MemTableInserter : public WriteBatch::Handler { public: // cf_mems should not be shared with concurrent inserters - MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, - uint64_t recovering_log_number, DB* db, - bool concurrent_memtable_writes, - bool* has_valid_writes = nullptr) - : sequence_(sequence), - cf_mems_(cf_mems), - flush_scheduler_(flush_scheduler), - ignore_missing_column_families_(ignore_missing_column_families), - recovering_log_number_(recovering_log_number), - log_number_ref_(0), - db_(reinterpret_cast(db)), - concurrent_memtable_writes_(concurrent_memtable_writes), - post_info_created_(false), - has_valid_writes_(has_valid_writes), - rebuilding_trx_(nullptr) { - assert(cf_mems_); + MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems, + FlushScheduler* flush_scheduler, + bool ignore_missing_column_families, + uint64_t recovering_log_number, DB* db, + bool concurrent_memtable_writes, + bool* has_valid_writes = nullptr) + : sequence_(_sequence), + cf_mems_(cf_mems), + flush_scheduler_(flush_scheduler), + ignore_missing_column_families_(ignore_missing_column_families), + recovering_log_number_(recovering_log_number), + log_number_ref_(0), + db_(reinterpret_cast(db)), + concurrent_memtable_writes_(concurrent_memtable_writes), + post_info_created_(false), + has_valid_writes_(has_valid_writes), + rebuilding_trx_(nullptr) { + assert(cf_mems_); } ~MemTableInserter() { @@ -884,7 +884,7 @@ public: void set_log_number_ref(uint64_t log) { log_number_ref_ = log; } - SequenceNumber get_final_sequence() const { return sequence_; } + SequenceNumber sequence() const { return sequence_; } void PostProcess() { assert(concurrent_memtable_writes_); @@ -1304,6 +1304,7 @@ Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group, if (!w->ShouldWriteToMemtable()) { continue; } + SetSequence(w->batch, inserter.sequence()); inserter.set_log_number_ref(w->log_ref); w->status = w->batch->Iterate(&inserter); if (!w->status.ok()) { @@ -1314,16 +1315,17 @@ Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group, } Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer, + SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes) { - MemTableInserter inserter(WriteBatchInternal::Sequence(writer->batch), - memtables, flush_scheduler, + assert(writer->ShouldWriteToMemtable()); + MemTableInserter inserter(sequence, memtables, flush_scheduler, ignore_missing_column_families, log_number, db, concurrent_memtable_writes); - assert(writer->ShouldWriteToMemtable()); + SetSequence(writer->batch, sequence); inserter.set_log_number_ref(writer->log_ref); Status s = writer->batch->Iterate(&inserter); if (concurrent_memtable_writes) { @@ -1337,13 +1339,12 @@ Status WriteBatchInternal::InsertInto( FlushScheduler* flush_scheduler, bool ignore_missing_column_families, uint64_t log_number, DB* db, bool concurrent_memtable_writes, SequenceNumber* last_seq_used, bool* has_valid_writes) { - MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, - flush_scheduler, ignore_missing_column_families, - log_number, db, concurrent_memtable_writes, - has_valid_writes); + MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler, + ignore_missing_column_families, log_number, db, + concurrent_memtable_writes, has_valid_writes); Status s = batch->Iterate(&inserter); if (last_seq_used != nullptr) { - *last_seq_used = inserter.get_final_sequence(); + *last_seq_used = inserter.sequence(); } if (concurrent_memtable_writes) { inserter.PostProcess(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 730edca75..a8ca6096c 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -172,7 +172,7 @@ class WriteBatchInternal { SequenceNumber* last_seq_used = nullptr, bool* has_valid_writes = nullptr); - static Status InsertInto(WriteThread::Writer* writer, + static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, diff --git a/src.mk b/src.mk index b1634e0b9..c5645080a 100644 --- a/src.mk +++ b/src.mk @@ -255,6 +255,7 @@ MAIN_SOURCES = \ db/db_test.cc \ db/db_universal_compaction_test.cc \ db/db_wal_test.cc \ + db/db_write_test.cc \ db/dbformat_test.cc \ db/deletefile_test.cc \ db/external_sst_file_basic_test.cc \ diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 72c5d0e7e..984be3152 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -98,6 +98,7 @@ class BlobHandle { void set_compression(CompressionType t) { compression_ = t; } void EncodeTo(std::string* dst) const; + Status DecodeFrom(Slice* input); void clear(); @@ -919,7 +920,7 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); if (previous_put) { - impl->AppendSN(last_file, -1); + impl->AppendSN(last_file, 0 /*sequence number*/); previous_put = false; } @@ -977,7 +978,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { if (handler1.previous_put) { // this is the sequence number of the write. - SequenceNumber sn = WriteBatchInternal::Sequence(&handler1.updates_blob); + SequenceNumber sn = WriteBatchInternal::Sequence(&handler1.updates_blob) + + WriteBatchInternal::Count(&handler1.updates_blob) - 1; AppendSN(handler1.last_file, sn); CloseIf(handler1.last_file); @@ -1196,8 +1198,8 @@ std::vector BlobDBImpl::MultiGet( } Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, - const std::string& index_entry, - std::string* value) { + const std::string& index_entry, std::string* value, + SequenceNumber* sequence) { Slice index_entry_slice(index_entry); BlobHandle handle; Status s = handle.DecodeFrom(&index_entry_slice); @@ -1245,68 +1247,86 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, std::shared_ptr reader = GetOrOpenRandomAccessReader(bfile, myenv_, env_options_); - std::string* valueptr = value; - std::string value_c; - if (bdb_options_.compression != kNoCompression) { - valueptr = &value_c; - } + if (value != nullptr) { + std::string* valueptr = value; + std::string value_c; + if (bdb_options_.compression != kNoCompression) { + valueptr = &value_c; + } - // allocate the buffer. This is safe in C++11 - valueptr->resize(handle.size()); - char* buffer = &(*valueptr)[0]; + // allocate the buffer. This is safe in C++11 + valueptr->resize(handle.size()); + char* buffer = &(*valueptr)[0]; - Slice blob_value; - s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer); - if (!s.ok() || blob_value.size() != handle.size()) { - if (debug_level_ >= 2) { - Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "Failed to read blob from file: %s blob_offset: %" PRIu64 - " blob_size: %" PRIu64 " read: %d key: %s status: '%s'", - bfile->PathName().c_str(), handle.offset(), handle.size(), - static_cast(blob_value.size()), key.data(), - s.ToString().c_str()); + Slice blob_value; + s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer); + if (!s.ok() || blob_value.size() != handle.size()) { + if (debug_level_ >= 2) { + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "Failed to read blob from file: %s blob_offset: %" PRIu64 + " blob_size: %" PRIu64 " read: %d key: %s status: '%s'", + bfile->PathName().c_str(), handle.offset(), handle.size(), + static_cast(blob_value.size()), key.data(), + s.ToString().c_str()); + } + return Status::NotFound("Blob Not Found as couldnt retrieve Blob"); } - return Status::NotFound("Blob Not Found as couldnt retrieve Blob"); - } - Slice crc_slice; - uint32_t crc_exp; - std::string crc_str; - crc_str.resize(sizeof(uint32_t)); - char* crc_buffer = &(crc_str[0]); - s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)), - sizeof(uint32_t), &crc_slice, crc_buffer); - if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) { - if (debug_level_ >= 2) { - Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "Failed to fetch blob crc file: %s blob_offset: %" PRIu64 - " blob_size: %" PRIu64 " key: %s status: '%s'", - bfile->PathName().c_str(), handle.offset(), handle.size(), key.data(), - s.ToString().c_str()); + Slice crc_slice; + uint32_t crc_exp; + std::string crc_str; + crc_str.resize(sizeof(uint32_t)); + char* crc_buffer = &(crc_str[0]); + s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)), + sizeof(uint32_t), &crc_slice, crc_buffer); + if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) { + if (debug_level_ >= 2) { + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "Failed to fetch blob crc file: %s blob_offset: %" PRIu64 + " blob_size: %" PRIu64 " key: %s status: '%s'", + bfile->PathName().c_str(), handle.offset(), handle.size(), + key.data(), s.ToString().c_str()); + } + return Status::NotFound("Blob Not Found as couldnt retrieve CRC"); } - return Status::NotFound("Blob Not Found as couldnt retrieve CRC"); - } - uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size()); - crc = crc32c::Mask(crc); // Adjust for storage - if (crc != crc_exp) { - if (debug_level_ >= 2) { - Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "Blob crc mismatch file: %s blob_offset: %" PRIu64 - " blob_size: %" PRIu64 " key: %s status: '%s'", - bfile->PathName().c_str(), handle.offset(), handle.size(), key.data(), - s.ToString().c_str()); + uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size()); + crc = crc32c::Mask(crc); // Adjust for storage + if (crc != crc_exp) { + if (debug_level_ >= 2) { + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "Blob crc mismatch file: %s blob_offset: %" PRIu64 + " blob_size: %" PRIu64 " key: %s status: '%s'", + bfile->PathName().c_str(), handle.offset(), handle.size(), + key.data(), s.ToString().c_str()); + } + return Status::Corruption("Corruption. Blob CRC mismatch"); + } + + if (bdb_options_.compression != kNoCompression) { + BlockContents contents; + s = UncompressBlockContentsForCompressionType( + blob_value.data(), blob_value.size(), &contents, + kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, + *(cfd->ioptions())); + *value = contents.data.ToString(); } - return Status::Corruption("Corruption. Blob CRC mismatch"); } - if (bdb_options_.compression != kNoCompression) { - BlockContents contents; - s = UncompressBlockContentsForCompressionType( - blob_value.data(), blob_value.size(), &contents, - kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, - *(cfd->ioptions())); - *value = contents.data.ToString(); + if (sequence != nullptr) { + char buffer[BlobLogRecord::kFooterSize]; + Slice footer_slice; + s = reader->Read(handle.offset() + handle.size(), + BlobLogRecord::kFooterSize, &footer_slice, buffer); + if (!s.ok()) { + return s; + } + BlobLogRecord record; + s = record.DecodeFooterFrom(footer_slice); + if (!s.ok()) { + return s; + } + *sequence = record.GetSN(); } return s; @@ -2205,6 +2225,19 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& opts, column_family, this); } +#ifndef NDEBUG +Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key, + SequenceNumber* sequence) { + std::string index_entry; + Status s = db_->Get(ReadOptions(), key, &index_entry); + if (!s.ok()) { + return s; + } + auto cfh = reinterpret_cast(DefaultColumnFamily()); + return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence); +} +#endif // !NDEBUG + } // namespace blob_db } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 5b9d1fba7..d258d9564 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -196,6 +196,10 @@ class BlobDBImpl : public BlobDB { ~BlobDBImpl(); +#ifndef NDEBUG + Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence); +#endif // !NDEBUG + private: static bool ExtractTTLFromBlob(const Slice& value, Slice* newval, int32_t* ttl_val); @@ -203,7 +207,8 @@ class BlobDBImpl : public BlobDB { Status OpenPhase1(); Status CommonGet(const ColumnFamilyData* cfd, const Slice& key, - const std::string& index_entry, std::string* value); + const std::string& index_entry, std::string* value, + SequenceNumber* sequence = nullptr); // Just before flush starts acting on memtable files, // this handler is called. diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 6096a0963..3714661b0 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -490,6 +490,45 @@ TEST_F(BlobDBTest, Large) { ASSERT_EQ(value3, value); } +// Test sequence number store in blob file is correct. +TEST_F(BlobDBTest, SequenceNumber) { + Random rnd(223); + Reopen(BlobDBOptionsImpl(), Options()); + SequenceNumber sequence = blobdb_->GetLatestSequenceNumber(); + BlobDBImpl *blobdb_impl = reinterpret_cast(blobdb_); + for (int i = 0; i < 100; i++) { + std::string key = "key" + ToString(i); + PutRandom(key, &rnd); + sequence += 1; + ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber()); + SequenceNumber actual_sequence = 0; + ASSERT_OK(blobdb_impl->TEST_GetSequenceNumber(key, &actual_sequence)); + ASSERT_EQ(sequence, actual_sequence); + } + for (int i = 0; i < 100; i++) { + WriteBatch batch; + size_t batch_size = rnd.Next() % 10 + 1; + for (size_t k = 0; k < batch_size; k++) { + std::string value = test::RandomHumanReadableString(&rnd, 1000); + ASSERT_OK(batch.Put("key" + ToString(i) + "-" + ToString(k), value)); + } + ASSERT_OK(blobdb_->Write(WriteOptions(), &batch)); + sequence += batch_size; + ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber()); + for (size_t k = 0; k < batch_size; k++) { + std::string key = "key" + ToString(i) + "-" + ToString(k); + SequenceNumber actual_sequence; + ASSERT_OK(blobdb_impl->TEST_GetSequenceNumber(key, &actual_sequence)); + // We only write sequence for the last key in a batch. + if (k + 1 < batch_size) { + ASSERT_EQ(0, actual_sequence); + } else { + ASSERT_EQ(sequence, actual_sequence); + } + } + } +} + } // namespace blob_db } // namespace rocksdb