From deff48bcefa75e07019d024da443c4bba91fb244 Mon Sep 17 00:00:00 2001 From: Gang Liao Date: Mon, 20 Jun 2022 20:58:11 -0700 Subject: [PATCH] Add blob source to retrieve blobs in RocksDB (#10198) Summary: There is currently no caching mechanism for blobs, which is not ideal especially when the database resides on remote storage (where we cannot rely on the OS page cache). As part of this task, we would like to make it possible for the application to configure a blob cache. In this task, we formally introduced the blob source to RocksDB. BlobSource is a new abstraction layer that provides universal access to blobs, regardless of whether they are in the blob cache, secondary cache, or (remote) storage. Depending on user settings, it always fetch blobs from multi-tier cache and storage with minimal cost. Note: The new `MultiGetBlob()` implementation is not included in the current PR. To go faster, we aim to create a separate PR for it in parallel! This PR is a part of https://github.com/facebook/rocksdb/issues/10156 Pull Request resolved: https://github.com/facebook/rocksdb/pull/10198 Reviewed By: ltamasi Differential Revision: D37294735 Pulled By: gangliao fbshipit-source-id: 9cb50422d9dd1bc03798501c2778b6c7520c7a1e --- db/blob/blob_source.cc | 3 +- db/blob/blob_source.h | 11 +- db/blob/db_blob_basic_test.cc | 172 +++++++++++++++++++++++++++ db/column_family.cc | 11 +- db/column_family.h | 9 +- db/compaction/compaction_job_test.cc | 12 +- db/db_impl/db_impl.cc | 2 +- db/db_wal_test.cc | 10 +- db/flush_job_test.cc | 2 +- db/memtable_list_test.cc | 6 +- db/repair.cc | 2 +- db/version_edit_handler.cc | 9 +- db/version_set.cc | 53 ++++----- db/version_set.h | 4 +- db/version_set_test.cc | 20 ++-- db/version_util.h | 2 +- db/wal_manager_test.cc | 2 +- tools/ldb_cmd.cc | 6 +- tools/ldb_cmd_test.cc | 2 +- 19 files changed, 261 insertions(+), 77 deletions(-) diff --git a/db/blob/blob_source.cc b/db/blob/blob_source.cc index d8e8cfb18..83d00db10 100644 --- a/db/blob/blob_source.cc +++ b/db/blob/blob_source.cc @@ -111,7 +111,8 @@ Status BlobSource::GetBlob(const ReadOptions& read_options, const bool no_io = read_options.read_tier == kBlockCacheTier; if (no_io) { - return Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); + return s; } // Can't find the blob from the cache. Since I/O is allowed, read from the diff --git a/db/blob/blob_source.h b/db/blob/blob_source.h index e6f5ff316..2a1a13c80 100644 --- a/db/blob/blob_source.h +++ b/db/blob/blob_source.h @@ -42,6 +42,13 @@ class BlobSource { FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, uint64_t* bytes_read); + inline Status GetBlobFileReader( + uint64_t blob_file_number, + CacheHandleGuard* blob_file_reader) { + return blob_file_cache_->GetBlobFileReader(blob_file_number, + blob_file_reader); + } + bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size, uint64_t offset) const; @@ -73,8 +80,8 @@ class BlobSource { priority); } - const std::string db_id_; - const std::string db_session_id_; + const std::string& db_id_; + const std::string& db_session_id_; Statistics* statistics_; diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 40b878f75..c63e1f654 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -5,6 +5,7 @@ #include #include +#include #include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" @@ -48,6 +49,177 @@ TEST_F(DBBlobBasicTest, GetBlob) { .IsIncomplete()); } +TEST_F(DBBlobBasicTest, GetBlobFromCache) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + Reopen(options); + + constexpr char key[] = "key"; + constexpr char blob_value[] = "blob_value"; + + ASSERT_OK(Put(key, blob_value)); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + + read_options.fill_cache = false; + + { + PinnableSlice result; + + read_options.read_tier = kReadAllTier; + ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, blob_value); + + result.Reset(); + read_options.read_tier = kBlockCacheTier; + + // Try again with no I/O allowed. Since we didn't re-fill the cache, the + // blob itself can only be read from the blob file, so the read should + // return Incomplete. + ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result) + .IsIncomplete()); + ASSERT_TRUE(result.empty()); + } + + read_options.fill_cache = true; + + { + PinnableSlice result; + + read_options.read_tier = kReadAllTier; + ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, blob_value); + + result.Reset(); + read_options.read_tier = kBlockCacheTier; + + // Try again with no I/O allowed. The table and the necessary blocks/blobs + // should already be in their respective caches. + ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)); + ASSERT_EQ(result, blob_value); + } +} + +TEST_F(DBBlobBasicTest, IterateBlobsFromCache) { + Options options = GetDefaultOptions(); + + LRUCacheOptions co; + co.capacity = 2048; + co.num_shard_bits = 2; + co.metadata_charge_policy = kDontChargeCacheMetadata; + auto backing_cache = NewLRUCache(co); + + options.enable_blob_files = true; + options.blob_cache = backing_cache; + + BlockBasedTableOptions block_based_options; + block_based_options.no_block_cache = false; + block_based_options.block_cache = backing_cache; + block_based_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(NewBlockBasedTableFactory(block_based_options)); + + Reopen(options); + + int num_blobs = 5; + std::vector keys; + std::vector blobs; + + for (int i = 0; i < num_blobs; ++i) { + keys.push_back("key" + std::to_string(i)); + blobs.push_back("blob" + std::to_string(i)); + ASSERT_OK(Put(keys[i], blobs[i])); + } + ASSERT_OK(Flush()); + + ReadOptions read_options; + + { + read_options.fill_cache = false; + read_options.read_tier = kReadAllTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + int i = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key().ToString(), keys[i]); + ASSERT_EQ(iter->value().ToString(), blobs[i]); + ++i; + } + ASSERT_EQ(i, num_blobs); + } + + { + read_options.fill_cache = false; + read_options.read_tier = kBlockCacheTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + // Try again with no I/O allowed. Since we didn't re-fill the cache, + // the blob itself can only be read from the blob file, so iter->Valid() + // should be false. + iter->SeekToFirst(); + ASSERT_NOK(iter->status()); + ASSERT_FALSE(iter->Valid()); + } + + { + read_options.fill_cache = true; + read_options.read_tier = kReadAllTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + // Read blobs from the file and refill the cache. + int i = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key().ToString(), keys[i]); + ASSERT_EQ(iter->value().ToString(), blobs[i]); + ++i; + } + ASSERT_EQ(i, num_blobs); + } + + { + read_options.fill_cache = false; + read_options.read_tier = kBlockCacheTier; + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + + // Try again with no I/O allowed. The table and the necessary blocks/blobs + // should already be in their respective caches. + int i = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key().ToString(), keys[i]); + ASSERT_EQ(iter->value().ToString(), blobs[i]); + ++i; + } + ASSERT_EQ(i, num_blobs); + } +} + TEST_F(DBBlobBasicTest, MultiGetBlobs) { constexpr size_t min_blob_size = 6; diff --git a/db/column_family.cc b/db/column_family.cc index 5014e79c0..27c5dd4c6 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -17,6 +17,7 @@ #include #include "db/blob/blob_file_cache.h" +#include "db/blob/blob_source.h" #include "db/compaction/compaction_picker.h" #include "db/compaction/compaction_picker_fifo.h" #include "db/compaction/compaction_picker_level.h" @@ -516,7 +517,7 @@ ColumnFamilyData::ColumnFamilyData( const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options, const FileOptions* file_options, ColumnFamilySet* column_family_set, BlockCacheTracer* const block_cache_tracer, - const std::shared_ptr& io_tracer, + const std::shared_ptr& io_tracer, const std::string& db_id, const std::string& db_session_id) : id_(id), name_(name), @@ -580,6 +581,8 @@ ColumnFamilyData::ColumnFamilyData( blob_file_cache_.reset( new BlobFileCache(_table_cache, ioptions(), soptions(), id_, internal_stats_->GetBlobFileReadHist(), io_tracer)); + blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id, + blob_file_cache_.get())); if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( @@ -1504,13 +1507,14 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, WriteController* _write_controller, BlockCacheTracer* const block_cache_tracer, const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id) : max_column_family_(0), file_options_(file_options), dummy_cfd_(new ColumnFamilyData( ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr, - block_cache_tracer, io_tracer, db_session_id)), + block_cache_tracer, io_tracer, db_id, db_session_id)), default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), @@ -1519,6 +1523,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, write_controller_(_write_controller), block_cache_tracer_(block_cache_tracer), io_tracer_(io_tracer), + db_id_(db_id), db_session_id_(db_session_id) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; @@ -1586,7 +1591,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( ColumnFamilyData* new_cfd = new ColumnFamilyData( id, name, dummy_versions, table_cache_, write_buffer_manager_, options, *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_, - db_session_id_); + db_id_, db_session_id_); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); max_column_family_ = std::max(max_column_family_, id); diff --git a/db/column_family.h b/db/column_family.h index b615971b3..4449e2ceb 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -47,6 +47,7 @@ class InstrumentedMutex; class InstrumentedMutexLock; struct SuperVersionContext; class BlobFileCache; +class BlobSource; extern const double kIncSlowdownRatio; // This file contains a list of data structures for managing column family @@ -376,7 +377,7 @@ class ColumnFamilyData { SequenceNumber earliest_seq); TableCache* table_cache() const { return table_cache_.get(); } - BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); } + BlobSource* blob_source() const { return blob_source_.get(); } // See documentation in compaction_picker.h // REQUIRES: DB mutex held @@ -539,7 +540,7 @@ class ColumnFamilyData { ColumnFamilySet* column_family_set, BlockCacheTracer* const block_cache_tracer, const std::shared_ptr& io_tracer, - const std::string& db_session_id); + const std::string& db_id, const std::string& db_session_id); std::vector GetDbPaths() const; @@ -563,6 +564,7 @@ class ColumnFamilyData { std::unique_ptr table_cache_; std::unique_ptr blob_file_cache_; + std::unique_ptr blob_source_; std::unique_ptr internal_stats_; @@ -673,7 +675,7 @@ class ColumnFamilySet { WriteController* _write_controller, BlockCacheTracer* const block_cache_tracer, const std::shared_ptr& io_tracer, - const std::string& db_session_id); + const std::string& db_id, const std::string& db_session_id); ~ColumnFamilySet(); ColumnFamilyData* GetDefault() const; @@ -735,6 +737,7 @@ class ColumnFamilySet { WriteController* write_controller_; BlockCacheTracer* const block_cache_tracer_; std::shared_ptr io_tracer_; + const std::string& db_id_; std::string db_session_id_; }; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index daf5b67a6..2079d313c 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -208,11 +208,11 @@ class CompactionJobTestBase : public testing::Test { mutable_db_options_(), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), - versions_(new VersionSet(dbname_, &db_options_, env_options_, - table_cache_.get(), &write_buffer_manager_, - &write_controller_, - /*block_cache_tracer=*/nullptr, - /*io_tracer=*/nullptr, /*db_session_id*/ "")), + versions_(new VersionSet( + dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, + /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()), error_handler_(nullptr, db_options_, &mutex_), @@ -444,7 +444,7 @@ class CompactionJobTestBase : public testing::Test { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); compaction_job_stats_.Reset(); ASSERT_OK(SetIdentityFile(env_, dbname_)); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 58c3035c9..1aa315efc 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -263,7 +263,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_, table_cache_.get(), write_buffer_manager_, &write_controller_, &block_cache_tracer_, - io_tracer_, db_session_id_)); + io_tracer_, db_id_, db_session_id_)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 4ff1c7369..646786b21 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1261,11 +1261,11 @@ class RecoveryTestHelper { std::unique_ptr wal_manager; WriteController write_controller; - versions.reset(new VersionSet(test->dbname_, &db_options, file_options, - table_cache.get(), &write_buffer_manager, - &write_controller, - /*block_cache_tracer=*/nullptr, - /*io_tracer=*/nullptr, /*db_session_id*/ "")); + versions.reset(new VersionSet( + test->dbname_, &db_options, file_options, table_cache.get(), + &write_buffer_manager, &write_controller, + /*block_cache_tracer=*/nullptr, + /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")); wal_manager.reset( new WalManager(db_options, file_options, /*io_tracer=*/nullptr)); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 0f9c2a928..c9824f490 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -128,7 +128,7 @@ class FlushJobTestBase : public testing::Test { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); EXPECT_OK(versions_->Recover(column_families, false)); } diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 06cfdb062..6804e311c 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -103,7 +103,8 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, &write_controller, /*block_cache_tracer=*/nullptr, - /*io_tracer=*/nullptr, /*db_session_id*/ ""); + /*io_tracer=*/nullptr, /*db_id*/ "", + /*db_session_id*/ ""); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); @@ -153,7 +154,8 @@ class MemTableListTest : public testing::Test { VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, &write_controller, /*block_cache_tracer=*/nullptr, - /*io_tracer=*/nullptr, /*db_session_id*/ ""); + /*io_tracer=*/nullptr, /*db_id*/ "", + /*db_session_id*/ ""); std::vector cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); diff --git a/db/repair.cc b/db/repair.cc index f0beb3ff4..81a2c81ba 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -122,7 +122,7 @@ class Repairer { vset_(dbname_, &immutable_db_options_, file_options_, raw_table_cache_.get(), &wb_, &wc_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - db_session_id_), + /*db_id=*/"", db_session_id_), next_file_number_(1), db_lock_(nullptr), closed_(false) { diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index f3e24016b..08d04c7c0 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -12,8 +12,8 @@ #include #include -#include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_reader.h" +#include "db/blob/blob_source.h" #include "logging/logging.h" #include "monitoring/persistent_stats_history.h" @@ -831,11 +831,10 @@ Status VersionEditHandlerPointInTime::VerifyFile(const std::string& fpath, Status VersionEditHandlerPointInTime::VerifyBlobFile( ColumnFamilyData* cfd, uint64_t blob_file_num, const BlobFileAddition& blob_addition) { - BlobFileCache* blob_file_cache = cfd->blob_file_cache(); - assert(blob_file_cache); + BlobSource* blob_source = cfd->blob_source(); + assert(blob_source); CacheHandleGuard blob_file_reader; - Status s = - blob_file_cache->GetBlobFileReader(blob_file_num, &blob_file_reader); + Status s = blob_source->GetBlobFileReader(blob_file_num, &blob_file_reader); if (!s.ok()) { return s; } diff --git a/db/version_set.cc b/db/version_set.cc index 3b6157fda..6b6035102 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -25,6 +25,7 @@ #include "db/blob/blob_file_reader.h" #include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" +#include "db/blob/blob_source.h" #include "db/compaction/compaction.h" #include "db/compaction/file_pri.h" #include "db/dbformat.h" @@ -1833,7 +1834,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->logger), db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->stats), table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()), - blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr), + blob_source_(cfd_ ? cfd_->blob_source() : nullptr), merge_operator_( (cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator.get()), storage_info_( @@ -1880,34 +1881,22 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, PinnableSlice* value, uint64_t* bytes_read) const { assert(value); - if (read_options.read_tier == kBlockCacheTier) { - return Status::Incomplete("Cannot read blob: no disk I/O allowed"); - } - if (blob_index.HasTTL() || blob_index.IsInlined()) { return Status::Corruption("Unexpected TTL/inlined blob index"); } const uint64_t blob_file_number = blob_index.file_number(); - if (!storage_info_.GetBlobFileMetaData(blob_file_number)) { + auto blob_file_meta = storage_info_.GetBlobFileMetaData(blob_file_number); + if (!blob_file_meta) { return Status::Corruption("Invalid blob file number"); } - CacheHandleGuard blob_file_reader; - - { - assert(blob_file_cache_); - const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number, - &blob_file_reader); - if (!s.ok()) { - return s; - } - } - - assert(blob_file_reader.GetValue()); - const Status s = blob_file_reader.GetValue()->GetBlob( - read_options, user_key, blob_index.offset(), blob_index.size(), + assert(blob_source_); + value->Reset(); + const Status s = blob_source_->GetBlob( + read_options, user_key, blob_file_number, blob_index.offset(), + blob_file_meta->GetBlobFileSize(), blob_index.size(), blob_index.compression(), prefetch_buffer, value, bytes_read); return s; @@ -1948,9 +1937,9 @@ void Version::MultiGetBlob( } CacheHandleGuard blob_file_reader; - assert(blob_file_cache_); - status = blob_file_cache_->GetBlobFileReader(blob_file_number, - &blob_file_reader); + assert(blob_source_); + status = + blob_source_->GetBlobFileReader(blob_file_number, &blob_file_reader); assert(!status.ok() || blob_file_reader.GetValue()); auto& blobs_in_file = elem.second; @@ -4118,11 +4107,12 @@ VersionSet::VersionSet(const std::string& dbname, WriteController* write_controller, BlockCacheTracer* const block_cache_tracer, const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id) - : column_family_set_( - new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, - write_buffer_manager, write_controller, - block_cache_tracer, io_tracer, db_session_id)), + : column_family_set_(new ColumnFamilySet( + dbname, _db_options, storage_options, table_cache, + write_buffer_manager, write_controller, block_cache_tracer, io_tracer, + db_id, db_session_id)), table_cache_(table_cache), env_(_db_options->env), fs_(_db_options->fs, io_tracer), @@ -4164,9 +4154,13 @@ void VersionSet::Reset() { if (column_family_set_) { WriteBufferManager* wbm = column_family_set_->write_buffer_manager(); WriteController* wc = column_family_set_->write_controller(); + // db_id becomes the source of truth after DBImpl::Recover(): + // https://github.com/facebook/rocksdb/blob/v7.3.1/db/db_impl/db_impl_open.cc#L527 + // Note: we may not be able to recover db_id from MANIFEST if + // options.write_dbid_to_manifest is false (default). column_family_set_.reset(new ColumnFamilySet( dbname_, db_options_, file_options_, table_cache_, wbm, wc, - block_cache_tracer_, io_tracer_, db_session_id_)); + block_cache_tracer_, io_tracer_, db_id_, db_session_id_)); } db_id_.clear(); next_file_number_.store(2); @@ -5219,6 +5213,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, WriteBufferManager wb(options->db_write_buffer_size); VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/, + /*db_id*/ "", /*db_session_id*/ ""); Status status; @@ -6152,7 +6147,7 @@ ReactiveVersionSet::ReactiveVersionSet( const std::shared_ptr& io_tracer) : VersionSet(dbname, _db_options, _file_options, table_cache, write_buffer_manager, write_controller, - /*block_cache_tracer=*/nullptr, io_tracer, + /*block_cache_tracer=*/nullptr, io_tracer, /*db_id*/ "", /*db_session_id*/ "") {} ReactiveVersionSet::~ReactiveVersionSet() {} diff --git a/db/version_set.h b/db/version_set.h index 4bfb4bf74..93c887ac0 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -964,7 +964,7 @@ class Version { Logger* info_log_; Statistics* db_statistics_; TableCache* table_cache_; - BlobFileCache* blob_file_cache_; + BlobSource* blob_source_; const MergeOperator* merge_operator_; VersionStorageInfo storage_info_; @@ -1025,7 +1025,7 @@ class VersionSet { WriteController* write_controller, BlockCacheTracer* const block_cache_tracer, const std::shared_ptr& io_tracer, - const std::string& db_session_id); + const std::string& db_id, const std::string& db_session_id); // No copying allowed VersionSet(const VersionSet&) = delete; void operator=(const VersionSet&) = delete; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index acf1af9f7..c4c125bfc 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1146,7 +1146,7 @@ class VersionSetTestBase { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); reactive_versions_ = std::make_shared( dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, nullptr); @@ -1250,7 +1250,7 @@ class VersionSetTestBase { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); EXPECT_OK(versions_->Recover(column_families_, false)); } @@ -1756,7 +1756,7 @@ TEST_F(VersionSetTest, WalAddition) { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -1823,7 +1823,7 @@ TEST_F(VersionSetTest, WalCloseWithoutSync) { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 2); @@ -1876,7 +1876,7 @@ TEST_F(VersionSetTest, WalDeletion) { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -1914,7 +1914,7 @@ TEST_F(VersionSetTest, WalDeletion) { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -2032,7 +2032,7 @@ TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -2068,7 +2068,7 @@ TEST_F(VersionSetTest, DeleteAllWals) { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 0); @@ -2110,7 +2110,7 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); std::string db_id; ASSERT_OK( new_versions->Recover(column_families_, /*read_only=*/false, &db_id)); @@ -2164,7 +2164,7 @@ class VersionSetWithTimestampTest : public VersionSetTest { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false, /*db_id=*/nullptr)); for (auto* cfd : *(vset->GetColumnFamilySet())) { diff --git a/db/version_util.h b/db/version_util.h index 8390c9ab1..84d151e6c 100644 --- a/db/version_util.h +++ b/db/version_util.h @@ -25,7 +25,7 @@ class OfflineManifestWriter { options.table_cache_numshardbits)), versions_(db_path, &immutable_db_options_, sopt_, tc_.get(), &wb_, &wc_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "") {} + /*db_id*/ "", /*db_session_id*/ "") {} Status Recover(const std::vector& column_families) { return versions_.Recover(column_families); diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index d1e79ba09..72c258f54 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -54,7 +54,7 @@ class WalManagerTest : public testing::Test { new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ "")); + /*db_id*/ "", /*db_session_id*/ "")); wal_manager_.reset( new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index a119b5f75..25c4463ce 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1306,7 +1306,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ ""); + /*db_id*/ "", /*db_session_id*/ ""); Status s = versions.DumpManifest(options, file, verbose, hex, json); if (!s.ok()) { fprintf(stderr, "Error in processing file %s %s\n", file.c_str(), @@ -1448,7 +1448,7 @@ Status GetLiveFilesChecksumInfoFromVersionSet(Options options, ImmutableDBOptions immutable_db_options(options); VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ ""); + /*db_id*/ "", /*db_session_id*/ ""); std::vector cf_name_list; s = versions.ListColumnFamilies(&cf_name_list, db_path, immutable_db_options.fs.get()); @@ -2255,7 +2255,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, WriteBufferManager wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, - /*db_session_id*/ ""); + /*db_id*/ "", /*db_session_id*/ ""); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt)); diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index a46df317b..06b47829b 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -208,7 +208,7 @@ class FileChecksumTestHelper { WriteBufferManager wb(options_.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options_); VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb, - &wc, nullptr, nullptr, ""); + &wc, nullptr, nullptr, "", ""); std::vector cf_name_list; Status s; s = versions.ListColumnFamilies(&cf_name_list, dbname_,