Add blob source to retrieve blobs in RocksDB (#10198)

Summary:
There is currently no caching mechanism for blobs, which is not ideal especially when the database resides on remote storage (where we cannot rely on the OS page cache). As part of this task, we would like to make it possible for the application to configure a blob cache.
In this task, we formally introduced the blob source to RocksDB.  BlobSource is a new abstraction layer that provides universal access to blobs, regardless of whether they are in the blob cache, secondary cache, or (remote) storage. Depending on user settings, it always fetch blobs from multi-tier cache and storage with minimal cost.

Note: The new `MultiGetBlob()` implementation is not included in the current PR. To go faster, we aim to create a separate PR for it in parallel!

This PR is a part of https://github.com/facebook/rocksdb/issues/10156

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10198

Reviewed By: ltamasi

Differential Revision: D37294735

Pulled By: gangliao

fbshipit-source-id: 9cb50422d9dd1bc03798501c2778b6c7520c7a1e
main
Gang Liao 3 years ago committed by Facebook GitHub Bot
parent 4207872fc3
commit deff48bcef
  1. 3
      db/blob/blob_source.cc
  2. 11
      db/blob/blob_source.h
  3. 172
      db/blob/db_blob_basic_test.cc
  4. 11
      db/column_family.cc
  5. 9
      db/column_family.h
  6. 12
      db/compaction/compaction_job_test.cc
  7. 2
      db/db_impl/db_impl.cc
  8. 10
      db/db_wal_test.cc
  9. 2
      db/flush_job_test.cc
  10. 6
      db/memtable_list_test.cc
  11. 2
      db/repair.cc
  12. 9
      db/version_edit_handler.cc
  13. 53
      db/version_set.cc
  14. 4
      db/version_set.h
  15. 20
      db/version_set_test.cc
  16. 2
      db/version_util.h
  17. 2
      db/wal_manager_test.cc
  18. 6
      tools/ldb_cmd.cc
  19. 2
      tools/ldb_cmd_test.cc

@ -111,7 +111,8 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
const bool no_io = read_options.read_tier == kBlockCacheTier; const bool no_io = read_options.read_tier == kBlockCacheTier;
if (no_io) { if (no_io) {
return Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
return s;
} }
// Can't find the blob from the cache. Since I/O is allowed, read from the // Can't find the blob from the cache. Since I/O is allowed, read from the

@ -42,6 +42,13 @@ class BlobSource {
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value, FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read); uint64_t* bytes_read);
inline Status GetBlobFileReader(
uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader) {
return blob_file_cache_->GetBlobFileReader(blob_file_number,
blob_file_reader);
}
bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size, bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset) const; uint64_t offset) const;
@ -73,8 +80,8 @@ class BlobSource {
priority); priority);
} }
const std::string db_id_; const std::string& db_id_;
const std::string db_session_id_; const std::string& db_session_id_;
Statistics* statistics_; Statistics* statistics_;

@ -5,6 +5,7 @@
#include <array> #include <array>
#include <sstream> #include <sstream>
#include <string>
#include "db/blob/blob_index.h" #include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
@ -48,6 +49,177 @@ TEST_F(DBBlobBasicTest, GetBlob) {
.IsIncomplete()); .IsIncomplete());
} }
TEST_F(DBBlobBasicTest, GetBlobFromCache) {
Options options = GetDefaultOptions();
LRUCacheOptions co;
co.capacity = 2048;
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
auto backing_cache = NewLRUCache(co);
options.enable_blob_files = true;
options.blob_cache = backing_cache;
BlockBasedTableOptions block_based_options;
block_based_options.no_block_cache = false;
block_based_options.block_cache = backing_cache;
block_based_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
Reopen(options);
constexpr char key[] = "key";
constexpr char blob_value[] = "blob_value";
ASSERT_OK(Put(key, blob_value));
ASSERT_OK(Flush());
ReadOptions read_options;
read_options.fill_cache = false;
{
PinnableSlice result;
read_options.read_tier = kReadAllTier;
ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result));
ASSERT_EQ(result, blob_value);
result.Reset();
read_options.read_tier = kBlockCacheTier;
// Try again with no I/O allowed. Since we didn't re-fill the cache, the
// blob itself can only be read from the blob file, so the read should
// return Incomplete.
ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)
.IsIncomplete());
ASSERT_TRUE(result.empty());
}
read_options.fill_cache = true;
{
PinnableSlice result;
read_options.read_tier = kReadAllTier;
ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result));
ASSERT_EQ(result, blob_value);
result.Reset();
read_options.read_tier = kBlockCacheTier;
// Try again with no I/O allowed. The table and the necessary blocks/blobs
// should already be in their respective caches.
ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result));
ASSERT_EQ(result, blob_value);
}
}
TEST_F(DBBlobBasicTest, IterateBlobsFromCache) {
Options options = GetDefaultOptions();
LRUCacheOptions co;
co.capacity = 2048;
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
auto backing_cache = NewLRUCache(co);
options.enable_blob_files = true;
options.blob_cache = backing_cache;
BlockBasedTableOptions block_based_options;
block_based_options.no_block_cache = false;
block_based_options.block_cache = backing_cache;
block_based_options.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
Reopen(options);
int num_blobs = 5;
std::vector<std::string> keys;
std::vector<std::string> blobs;
for (int i = 0; i < num_blobs; ++i) {
keys.push_back("key" + std::to_string(i));
blobs.push_back("blob" + std::to_string(i));
ASSERT_OK(Put(keys[i], blobs[i]));
}
ASSERT_OK(Flush());
ReadOptions read_options;
{
read_options.fill_cache = false;
read_options.read_tier = kReadAllTier;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
ASSERT_OK(iter->status());
int i = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key().ToString(), keys[i]);
ASSERT_EQ(iter->value().ToString(), blobs[i]);
++i;
}
ASSERT_EQ(i, num_blobs);
}
{
read_options.fill_cache = false;
read_options.read_tier = kBlockCacheTier;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
ASSERT_OK(iter->status());
// Try again with no I/O allowed. Since we didn't re-fill the cache,
// the blob itself can only be read from the blob file, so iter->Valid()
// should be false.
iter->SeekToFirst();
ASSERT_NOK(iter->status());
ASSERT_FALSE(iter->Valid());
}
{
read_options.fill_cache = true;
read_options.read_tier = kReadAllTier;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
ASSERT_OK(iter->status());
// Read blobs from the file and refill the cache.
int i = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key().ToString(), keys[i]);
ASSERT_EQ(iter->value().ToString(), blobs[i]);
++i;
}
ASSERT_EQ(i, num_blobs);
}
{
read_options.fill_cache = false;
read_options.read_tier = kBlockCacheTier;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
ASSERT_OK(iter->status());
// Try again with no I/O allowed. The table and the necessary blocks/blobs
// should already be in their respective caches.
int i = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key().ToString(), keys[i]);
ASSERT_EQ(iter->value().ToString(), blobs[i]);
++i;
}
ASSERT_EQ(i, num_blobs);
}
}
TEST_F(DBBlobBasicTest, MultiGetBlobs) { TEST_F(DBBlobBasicTest, MultiGetBlobs) {
constexpr size_t min_blob_size = 6; constexpr size_t min_blob_size = 6;

@ -17,6 +17,7 @@
#include <vector> #include <vector>
#include "db/blob/blob_file_cache.h" #include "db/blob/blob_file_cache.h"
#include "db/blob/blob_source.h"
#include "db/compaction/compaction_picker.h" #include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h" #include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h" #include "db/compaction/compaction_picker_level.h"
@ -516,7 +517,7 @@ ColumnFamilyData::ColumnFamilyData(
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options, const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
const FileOptions* file_options, ColumnFamilySet* column_family_set, const FileOptions* file_options, ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer, BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id,
const std::string& db_session_id) const std::string& db_session_id)
: id_(id), : id_(id),
name_(name), name_(name),
@ -580,6 +581,8 @@ ColumnFamilyData::ColumnFamilyData(
blob_file_cache_.reset( blob_file_cache_.reset(
new BlobFileCache(_table_cache, ioptions(), soptions(), id_, new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
internal_stats_->GetBlobFileReadHist(), io_tracer)); internal_stats_->GetBlobFileReadHist(), io_tracer));
blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id,
blob_file_cache_.get()));
if (ioptions_.compaction_style == kCompactionStyleLevel) { if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset( compaction_picker_.reset(
@ -1504,13 +1507,14 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
WriteController* _write_controller, WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer, BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id,
const std::string& db_session_id) const std::string& db_session_id)
: max_column_family_(0), : max_column_family_(0),
file_options_(file_options), file_options_(file_options),
dummy_cfd_(new ColumnFamilyData( dummy_cfd_(new ColumnFamilyData(
ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr, ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr, nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
block_cache_tracer, io_tracer, db_session_id)), block_cache_tracer, io_tracer, db_id, db_session_id)),
default_cfd_cache_(nullptr), default_cfd_cache_(nullptr),
db_name_(dbname), db_name_(dbname),
db_options_(db_options), db_options_(db_options),
@ -1519,6 +1523,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
write_controller_(_write_controller), write_controller_(_write_controller),
block_cache_tracer_(block_cache_tracer), block_cache_tracer_(block_cache_tracer),
io_tracer_(io_tracer), io_tracer_(io_tracer),
db_id_(db_id),
db_session_id_(db_session_id) { db_session_id_(db_session_id) {
// initialize linked list // initialize linked list
dummy_cfd_->prev_ = dummy_cfd_; dummy_cfd_->prev_ = dummy_cfd_;
@ -1586,7 +1591,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
ColumnFamilyData* new_cfd = new ColumnFamilyData( ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options, id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, &file_options_, this, block_cache_tracer_, io_tracer_, *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
db_session_id_); db_id_, db_session_id_);
column_families_.insert({name, id}); column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd}); column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id); max_column_family_ = std::max(max_column_family_, id);

@ -47,6 +47,7 @@ class InstrumentedMutex;
class InstrumentedMutexLock; class InstrumentedMutexLock;
struct SuperVersionContext; struct SuperVersionContext;
class BlobFileCache; class BlobFileCache;
class BlobSource;
extern const double kIncSlowdownRatio; extern const double kIncSlowdownRatio;
// This file contains a list of data structures for managing column family // This file contains a list of data structures for managing column family
@ -376,7 +377,7 @@ class ColumnFamilyData {
SequenceNumber earliest_seq); SequenceNumber earliest_seq);
TableCache* table_cache() const { return table_cache_.get(); } TableCache* table_cache() const { return table_cache_.get(); }
BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); } BlobSource* blob_source() const { return blob_source_.get(); }
// See documentation in compaction_picker.h // See documentation in compaction_picker.h
// REQUIRES: DB mutex held // REQUIRES: DB mutex held
@ -539,7 +540,7 @@ class ColumnFamilyData {
ColumnFamilySet* column_family_set, ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer, BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id); const std::string& db_id, const std::string& db_session_id);
std::vector<std::string> GetDbPaths() const; std::vector<std::string> GetDbPaths() const;
@ -563,6 +564,7 @@ class ColumnFamilyData {
std::unique_ptr<TableCache> table_cache_; std::unique_ptr<TableCache> table_cache_;
std::unique_ptr<BlobFileCache> blob_file_cache_; std::unique_ptr<BlobFileCache> blob_file_cache_;
std::unique_ptr<BlobSource> blob_source_;
std::unique_ptr<InternalStats> internal_stats_; std::unique_ptr<InternalStats> internal_stats_;
@ -673,7 +675,7 @@ class ColumnFamilySet {
WriteController* _write_controller, WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer, BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id); const std::string& db_id, const std::string& db_session_id);
~ColumnFamilySet(); ~ColumnFamilySet();
ColumnFamilyData* GetDefault() const; ColumnFamilyData* GetDefault() const;
@ -735,6 +737,7 @@ class ColumnFamilySet {
WriteController* write_controller_; WriteController* write_controller_;
BlockCacheTracer* const block_cache_tracer_; BlockCacheTracer* const block_cache_tracer_;
std::shared_ptr<IOTracer> io_tracer_; std::shared_ptr<IOTracer> io_tracer_;
const std::string& db_id_;
std::string db_session_id_; std::string db_session_id_;
}; };

@ -208,11 +208,11 @@ class CompactionJobTestBase : public testing::Test {
mutable_db_options_(), mutable_db_options_(),
table_cache_(NewLRUCache(50000, 16)), table_cache_(NewLRUCache(50000, 16)),
write_buffer_manager_(db_options_.db_write_buffer_size), write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_, versions_(new VersionSet(
table_cache_.get(), &write_buffer_manager_, dbname_, &db_options_, env_options_, table_cache_.get(),
&write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_session_id*/ "")), /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")),
shutting_down_(false), shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()), mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_), error_handler_(nullptr, db_options_, &mutex_),
@ -444,7 +444,7 @@ class CompactionJobTestBase : public testing::Test {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
compaction_job_stats_.Reset(); compaction_job_stats_.Reset();
ASSERT_OK(SetIdentityFile(env_, dbname_)); ASSERT_OK(SetIdentityFile(env_, dbname_));

@ -263,7 +263,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_, versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
table_cache_.get(), write_buffer_manager_, table_cache_.get(), write_buffer_manager_,
&write_controller_, &block_cache_tracer_, &write_controller_, &block_cache_tracer_,
io_tracer_, db_session_id_)); io_tracer_, db_id_, db_session_id_));
column_family_memtables_.reset( column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));

@ -1261,11 +1261,11 @@ class RecoveryTestHelper {
std::unique_ptr<WalManager> wal_manager; std::unique_ptr<WalManager> wal_manager;
WriteController write_controller; WriteController write_controller;
versions.reset(new VersionSet(test->dbname_, &db_options, file_options, versions.reset(new VersionSet(
table_cache.get(), &write_buffer_manager, test->dbname_, &db_options, file_options, table_cache.get(),
&write_controller, &write_buffer_manager, &write_controller,
/*block_cache_tracer=*/nullptr, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_session_id*/ "")); /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ ""));
wal_manager.reset( wal_manager.reset(
new WalManager(db_options, file_options, /*io_tracer=*/nullptr)); new WalManager(db_options, file_options, /*io_tracer=*/nullptr));

@ -128,7 +128,7 @@ class FlushJobTestBase : public testing::Test {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
EXPECT_OK(versions_->Recover(column_families, false)); EXPECT_OK(versions_->Recover(column_families, false));
} }

@ -103,7 +103,8 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options, VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager, table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr, &write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_session_id*/ ""); /*io_tracer=*/nullptr, /*db_id*/ "",
/*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs; std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions());
@ -153,7 +154,8 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options, VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager, table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr, &write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_session_id*/ ""); /*io_tracer=*/nullptr, /*db_id*/ "",
/*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs; std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions());

@ -122,7 +122,7 @@ class Repairer {
vset_(dbname_, &immutable_db_options_, file_options_, vset_(dbname_, &immutable_db_options_, file_options_,
raw_table_cache_.get(), &wb_, &wc_, raw_table_cache_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
db_session_id_), /*db_id=*/"", db_session_id_),
next_file_number_(1), next_file_number_(1),
db_lock_(nullptr), db_lock_(nullptr),
closed_(false) { closed_(false) {

@ -12,8 +12,8 @@
#include <cinttypes> #include <cinttypes>
#include <sstream> #include <sstream>
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h" #include "db/blob/blob_file_reader.h"
#include "db/blob/blob_source.h"
#include "logging/logging.h" #include "logging/logging.h"
#include "monitoring/persistent_stats_history.h" #include "monitoring/persistent_stats_history.h"
@ -831,11 +831,10 @@ Status VersionEditHandlerPointInTime::VerifyFile(const std::string& fpath,
Status VersionEditHandlerPointInTime::VerifyBlobFile( Status VersionEditHandlerPointInTime::VerifyBlobFile(
ColumnFamilyData* cfd, uint64_t blob_file_num, ColumnFamilyData* cfd, uint64_t blob_file_num,
const BlobFileAddition& blob_addition) { const BlobFileAddition& blob_addition) {
BlobFileCache* blob_file_cache = cfd->blob_file_cache(); BlobSource* blob_source = cfd->blob_source();
assert(blob_file_cache); assert(blob_source);
CacheHandleGuard<BlobFileReader> blob_file_reader; CacheHandleGuard<BlobFileReader> blob_file_reader;
Status s = Status s = blob_source->GetBlobFileReader(blob_file_num, &blob_file_reader);
blob_file_cache->GetBlobFileReader(blob_file_num, &blob_file_reader);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

@ -25,6 +25,7 @@
#include "db/blob/blob_file_reader.h" #include "db/blob/blob_file_reader.h"
#include "db/blob/blob_index.h" #include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h" #include "db/blob/blob_log_format.h"
#include "db/blob/blob_source.h"
#include "db/compaction/compaction.h" #include "db/compaction/compaction.h"
#include "db/compaction/file_pri.h" #include "db/compaction/file_pri.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -1833,7 +1834,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->logger), info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->logger),
db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->stats), db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->stats),
table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()), table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr), blob_source_(cfd_ ? cfd_->blob_source() : nullptr),
merge_operator_( merge_operator_(
(cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator.get()), (cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator.get()),
storage_info_( storage_info_(
@ -1880,34 +1881,22 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
PinnableSlice* value, uint64_t* bytes_read) const { PinnableSlice* value, uint64_t* bytes_read) const {
assert(value); assert(value);
if (read_options.read_tier == kBlockCacheTier) {
return Status::Incomplete("Cannot read blob: no disk I/O allowed");
}
if (blob_index.HasTTL() || blob_index.IsInlined()) { if (blob_index.HasTTL() || blob_index.IsInlined()) {
return Status::Corruption("Unexpected TTL/inlined blob index"); return Status::Corruption("Unexpected TTL/inlined blob index");
} }
const uint64_t blob_file_number = blob_index.file_number(); const uint64_t blob_file_number = blob_index.file_number();
if (!storage_info_.GetBlobFileMetaData(blob_file_number)) { auto blob_file_meta = storage_info_.GetBlobFileMetaData(blob_file_number);
if (!blob_file_meta) {
return Status::Corruption("Invalid blob file number"); return Status::Corruption("Invalid blob file number");
} }
CacheHandleGuard<BlobFileReader> blob_file_reader; assert(blob_source_);
value->Reset();
{ const Status s = blob_source_->GetBlob(
assert(blob_file_cache_); read_options, user_key, blob_file_number, blob_index.offset(),
const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number, blob_file_meta->GetBlobFileSize(), blob_index.size(),
&blob_file_reader);
if (!s.ok()) {
return s;
}
}
assert(blob_file_reader.GetValue());
const Status s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, blob_index.offset(), blob_index.size(),
blob_index.compression(), prefetch_buffer, value, bytes_read); blob_index.compression(), prefetch_buffer, value, bytes_read);
return s; return s;
@ -1948,9 +1937,9 @@ void Version::MultiGetBlob(
} }
CacheHandleGuard<BlobFileReader> blob_file_reader; CacheHandleGuard<BlobFileReader> blob_file_reader;
assert(blob_file_cache_); assert(blob_source_);
status = blob_file_cache_->GetBlobFileReader(blob_file_number, status =
&blob_file_reader); blob_source_->GetBlobFileReader(blob_file_number, &blob_file_reader);
assert(!status.ok() || blob_file_reader.GetValue()); assert(!status.ok() || blob_file_reader.GetValue());
auto& blobs_in_file = elem.second; auto& blobs_in_file = elem.second;
@ -4118,11 +4107,12 @@ VersionSet::VersionSet(const std::string& dbname,
WriteController* write_controller, WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer, BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id,
const std::string& db_session_id) const std::string& db_session_id)
: column_family_set_( : column_family_set_(new ColumnFamilySet(
new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller, write_buffer_manager, write_controller, block_cache_tracer, io_tracer,
block_cache_tracer, io_tracer, db_session_id)), db_id, db_session_id)),
table_cache_(table_cache), table_cache_(table_cache),
env_(_db_options->env), env_(_db_options->env),
fs_(_db_options->fs, io_tracer), fs_(_db_options->fs, io_tracer),
@ -4164,9 +4154,13 @@ void VersionSet::Reset() {
if (column_family_set_) { if (column_family_set_) {
WriteBufferManager* wbm = column_family_set_->write_buffer_manager(); WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
WriteController* wc = column_family_set_->write_controller(); WriteController* wc = column_family_set_->write_controller();
// db_id becomes the source of truth after DBImpl::Recover():
// https://github.com/facebook/rocksdb/blob/v7.3.1/db/db_impl/db_impl_open.cc#L527
// Note: we may not be able to recover db_id from MANIFEST if
// options.write_dbid_to_manifest is false (default).
column_family_set_.reset(new ColumnFamilySet( column_family_set_.reset(new ColumnFamilySet(
dbname_, db_options_, file_options_, table_cache_, wbm, wc, dbname_, db_options_, file_options_, table_cache_, wbm, wc,
block_cache_tracer_, io_tracer_, db_session_id_)); block_cache_tracer_, io_tracer_, db_id_, db_session_id_));
} }
db_id_.clear(); db_id_.clear();
next_file_number_.store(2); next_file_number_.store(2);
@ -5219,6 +5213,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
WriteBufferManager wb(options->db_write_buffer_size); WriteBufferManager wb(options->db_write_buffer_size);
VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc, VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/, nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/,
/*db_id*/ "",
/*db_session_id*/ ""); /*db_session_id*/ "");
Status status; Status status;
@ -6152,7 +6147,7 @@ ReactiveVersionSet::ReactiveVersionSet(
const std::shared_ptr<IOTracer>& io_tracer) const std::shared_ptr<IOTracer>& io_tracer)
: VersionSet(dbname, _db_options, _file_options, table_cache, : VersionSet(dbname, _db_options, _file_options, table_cache,
write_buffer_manager, write_controller, write_buffer_manager, write_controller,
/*block_cache_tracer=*/nullptr, io_tracer, /*block_cache_tracer=*/nullptr, io_tracer, /*db_id*/ "",
/*db_session_id*/ "") {} /*db_session_id*/ "") {}
ReactiveVersionSet::~ReactiveVersionSet() {} ReactiveVersionSet::~ReactiveVersionSet() {}

@ -964,7 +964,7 @@ class Version {
Logger* info_log_; Logger* info_log_;
Statistics* db_statistics_; Statistics* db_statistics_;
TableCache* table_cache_; TableCache* table_cache_;
BlobFileCache* blob_file_cache_; BlobSource* blob_source_;
const MergeOperator* merge_operator_; const MergeOperator* merge_operator_;
VersionStorageInfo storage_info_; VersionStorageInfo storage_info_;
@ -1025,7 +1025,7 @@ class VersionSet {
WriteController* write_controller, WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer, BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id); const std::string& db_id, const std::string& db_session_id);
// No copying allowed // No copying allowed
VersionSet(const VersionSet&) = delete; VersionSet(const VersionSet&) = delete;
void operator=(const VersionSet&) = delete; void operator=(const VersionSet&) = delete;

@ -1146,7 +1146,7 @@ class VersionSetTestBase {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
reactive_versions_ = std::make_shared<ReactiveVersionSet>( reactive_versions_ = std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(), dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, nullptr); &write_buffer_manager_, &write_controller_, nullptr);
@ -1250,7 +1250,7 @@ class VersionSetTestBase {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
EXPECT_OK(versions_->Recover(column_families_, false)); EXPECT_OK(versions_->Recover(column_families_, false));
} }
@ -1756,7 +1756,7 @@ TEST_F(VersionSetTest, WalAddition) {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false)); ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
const auto& wals = new_versions->GetWalSet().GetWals(); const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1); ASSERT_EQ(wals.size(), 1);
@ -1823,7 +1823,7 @@ TEST_F(VersionSetTest, WalCloseWithoutSync) {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false)); ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals(); const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 2); ASSERT_EQ(wals.size(), 2);
@ -1876,7 +1876,7 @@ TEST_F(VersionSetTest, WalDeletion) {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false)); ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals(); const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1); ASSERT_EQ(wals.size(), 1);
@ -1914,7 +1914,7 @@ TEST_F(VersionSetTest, WalDeletion) {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false)); ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals(); const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1); ASSERT_EQ(wals.size(), 1);
@ -2032,7 +2032,7 @@ TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false)); ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals(); const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1); ASSERT_EQ(wals.size(), 1);
@ -2068,7 +2068,7 @@ TEST_F(VersionSetTest, DeleteAllWals) {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false)); ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals(); const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 0); ASSERT_EQ(wals.size(), 0);
@ -2110,7 +2110,7 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
std::string db_id; std::string db_id;
ASSERT_OK( ASSERT_OK(
new_versions->Recover(column_families_, /*read_only=*/false, &db_id)); new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
@ -2164,7 +2164,7 @@ class VersionSetWithTimestampTest : public VersionSetTest {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false, ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
/*db_id=*/nullptr)); /*db_id=*/nullptr));
for (auto* cfd : *(vset->GetColumnFamilySet())) { for (auto* cfd : *(vset->GetColumnFamilySet())) {

@ -25,7 +25,7 @@ class OfflineManifestWriter {
options.table_cache_numshardbits)), options.table_cache_numshardbits)),
versions_(db_path, &immutable_db_options_, sopt_, tc_.get(), &wb_, &wc_, versions_(db_path, &immutable_db_options_, sopt_, tc_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "") {} /*db_id*/ "", /*db_session_id*/ "") {}
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families) { Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families) {
return versions_.Recover(column_families); return versions_.Recover(column_families);

@ -54,7 +54,7 @@ class WalManagerTest : public testing::Test {
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, &write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "")); /*db_id*/ "", /*db_session_id*/ ""));
wal_manager_.reset( wal_manager_.reset(
new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));

@ -1306,7 +1306,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex,
ImmutableDBOptions immutable_db_options(options); ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""); /*db_id*/ "", /*db_session_id*/ "");
Status s = versions.DumpManifest(options, file, verbose, hex, json); Status s = versions.DumpManifest(options, file, verbose, hex, json);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Error in processing file %s %s\n", file.c_str(), fprintf(stderr, "Error in processing file %s %s\n", file.c_str(),
@ -1448,7 +1448,7 @@ Status GetLiveFilesChecksumInfoFromVersionSet(Options options,
ImmutableDBOptions immutable_db_options(options); ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc, VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""); /*db_id*/ "", /*db_session_id*/ "");
std::vector<std::string> cf_name_list; std::vector<std::string> cf_name_list;
s = versions.ListColumnFamilies(&cf_name_list, db_path, s = versions.ListColumnFamilies(&cf_name_list, db_path,
immutable_db_options.fs.get()); immutable_db_options.fs.get());
@ -2255,7 +2255,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
WriteBufferManager wb(opt.db_write_buffer_size); WriteBufferManager wb(opt.db_write_buffer_size);
VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc, VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""); /*db_id*/ "", /*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> dummy; std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(opt)); ColumnFamilyOptions(opt));

@ -208,7 +208,7 @@ class FileChecksumTestHelper {
WriteBufferManager wb(options_.db_write_buffer_size); WriteBufferManager wb(options_.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options_); ImmutableDBOptions immutable_db_options(options_);
VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb, VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb,
&wc, nullptr, nullptr, ""); &wc, nullptr, nullptr, "", "");
std::vector<std::string> cf_name_list; std::vector<std::string> cf_name_list;
Status s; Status s;
s = versions.ListColumnFamilies(&cf_name_list, dbname_, s = versions.ListColumnFamilies(&cf_name_list, dbname_,

Loading…
Cancel
Save