Use DbSessionId as cache key prefix when secondary cache is enabled (#8360)

Summary:
Currently, we either use the file system inode or a monotonically incrementing runtime ID as the block cache key prefix. However, if we use a monotonically incrementing runtime ID (in the case that the file system does not support inode id generation), in some cases, it cannot ensure uniqueness (e.g., we have secondary cache migrated from host to host). We use DbSessionID (20 bytes) + current file number (at most 10 bytes) as the new cache block key prefix when the secondary cache is enabled. So can accommodate scenarios such as transfer of cache state across hosts.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8360

Test Plan: add the test to lru_cache_test

Reviewed By: pdillinger

Differential Revision: D29006215

Pulled By: zhichao-cao

fbshipit-source-id: 6cff686b38d83904667a2bd39923cd030df16814
main
Zhichao Cao 4 years ago committed by Facebook GitHub Bot
parent db325a5904
commit f44e69c64a
  1. 55
      cache/lru_cache_test.cc
  2. 17
      db/column_family.cc
  3. 7
      db/column_family.h
  4. 3
      db/compaction/compaction_job.cc
  5. 12
      db/compaction/compaction_job_test.cc
  6. 5
      db/db_impl/db_impl.cc
  7. 2
      db/db_impl/db_impl_open.cc
  8. 9
      db/db_wal_test.cc
  9. 3
      db/flush_job.cc
  10. 3
      db/flush_job_test.cc
  11. 4
      db/memtable_list_test.cc
  12. 17
      db/repair.cc
  13. 16
      db/table_cache.cc
  14. 4
      db/table_cache.h
  15. 20
      db/version_set.cc
  16. 5
      db/version_set.h
  17. 30
      db/version_set_test.cc
  18. 3
      db/wal_manager_test.cc
  19. 3
      table/block_based/block_based_table_builder.cc
  20. 3
      table/block_based/block_based_table_factory.cc
  21. 16
      table/block_based/block_based_table_reader.cc
  22. 27
      table/block_based/block_based_table_reader.h
  23. 2
      table/sst_file_writer.cc
  24. 35
      table/table_builder.h
  25. 2
      table/table_test.cc
  26. 12
      tools/ldb_cmd.cc
  27. 2
      tools/ldb_cmd_test.cc
  28. 8
      utilities/fault_injection_fs.cc
  29. 16
      utilities/fault_injection_fs.h

@ -18,6 +18,7 @@
#include "test_util/testharness.h"
#include "util/coding.h"
#include "util/random.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
@ -216,11 +217,16 @@ class TestSecondaryCache : public SecondaryCache {
void ResetInjectFailure() { inject_failure_ = false; }
void SetDbSessionId(const std::string& db_session_id) {
db_session_id_ = db_session_id;
}
Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) override {
if (inject_failure_) {
return Status::Corruption("Insertion Data Corrupted");
}
assert(IsDbSessionIdAsKeyPrefix(key) == true);
size_t size;
char* buf;
Status s;
@ -273,6 +279,20 @@ class TestSecondaryCache : public SecondaryCache {
uint32_t num_lookups() { return num_lookups_; }
bool IsDbSessionIdAsKeyPrefix(const Slice& key) {
if (db_session_id_.size() == 0) {
return true;
}
if (key.size() < 20) {
return false;
}
std::string s_key = key.ToString();
if (s_key.substr(0, 20) != db_session_id_) {
return false;
}
return true;
}
private:
class TestSecondaryCacheHandle : public SecondaryCacheHandle {
public:
@ -300,12 +320,19 @@ class TestSecondaryCache : public SecondaryCache {
uint32_t num_inserts_;
uint32_t num_lookups_;
bool inject_failure_;
std::string db_session_id_;
};
class DBSecondaryCacheTest : public DBTestBase {
public:
DBSecondaryCacheTest()
: DBTestBase("/db_secondary_cache_test", /*env_do_fsync=*/true) {}
: DBTestBase("/db_secondary_cache_test", /*env_do_fsync=*/true) {
fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
}
std::shared_ptr<FaultInjectionTestFS> fault_fs_;
std::unique_ptr<Env> fault_env_;
};
class LRUSecondaryCacheTest : public LRUCacheTest {
@ -586,11 +613,16 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness1) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
fault_fs_->SetFailGetUniqueId(true);
// Set the file paranoid check, so after flush, the file will be read
// all the blocks will be accessed.
options.paranoid_file_checks = true;
DestroyAndReopen(options);
std::string session_id;
ASSERT_OK(db_->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
@ -679,7 +711,12 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness2) {
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.paranoid_file_checks = true;
options.env = fault_env_.get();
fault_fs_->SetFailGetUniqueId(true);
DestroyAndReopen(options);
std::string session_id;
ASSERT_OK(db_->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
@ -767,7 +804,13 @@ TEST_F(DBSecondaryCacheTest, NoSecondaryCacheInsertion) {
options.create_if_missing = true;
options.paranoid_file_checks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
fault_fs_->SetFailGetUniqueId(true);
DestroyAndReopen(options);
std::string session_id;
ASSERT_OK(db_->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
@ -814,7 +857,12 @@ TEST_F(DBSecondaryCacheTest, SecondaryCacheIntensiveTesting) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
fault_fs_->SetFailGetUniqueId(true);
DestroyAndReopen(options);
std::string session_id;
ASSERT_OK(db_->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
Random rnd(301);
const int N = 256;
for (int i = 0; i < N; i++) {
@ -859,7 +907,12 @@ TEST_F(DBSecondaryCacheTest, SecondaryCacheFailureTest) {
options.create_if_missing = true;
options.paranoid_file_checks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
fault_fs_->SetFailGetUniqueId(true);
DestroyAndReopen(options);
std::string session_id;
ASSERT_OK(db_->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {

@ -504,7 +504,8 @@ ColumnFamilyData::ColumnFamilyData(
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
const FileOptions& file_options, ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id)
: id_(id),
name_(name),
dummy_versions_(_dummy_versions),
@ -562,7 +563,8 @@ ColumnFamilyData::ColumnFamilyData(
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
block_cache_tracer, io_tracer));
block_cache_tracer, io_tracer,
db_session_id));
blob_file_cache_.reset(
new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
internal_stats_->GetBlobFileReadHist(), io_tracer));
@ -1451,12 +1453,13 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
WriteBufferManager* _write_buffer_manager,
WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(
ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
block_cache_tracer, io_tracer)),
block_cache_tracer, io_tracer, db_session_id)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
@ -1465,7 +1468,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
write_buffer_manager_(_write_buffer_manager),
write_controller_(_write_controller),
block_cache_tracer_(block_cache_tracer),
io_tracer_(io_tracer) {
io_tracer_(io_tracer),
db_session_id_(db_session_id) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
@ -1531,7 +1535,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
*db_options_, file_options_, this, block_cache_tracer_, io_tracer_);
*db_options_, file_options_, this, block_cache_tracer_, io_tracer_,
db_session_id_);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);

@ -532,7 +532,8 @@ class ColumnFamilyData {
const FileOptions& file_options,
ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id);
std::vector<std::string> GetDbPaths() const;
@ -668,7 +669,8 @@ class ColumnFamilySet {
WriteBufferManager* _write_buffer_manager,
WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id);
~ColumnFamilySet();
ColumnFamilyData* GetDefault() const;
@ -733,6 +735,7 @@ class ColumnFamilySet {
WriteController* write_controller_;
BlockCacheTracer* const block_cache_tracer_;
std::shared_ptr<IOTracer> io_tracer_;
std::string db_session_id_;
};
// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access

@ -1953,7 +1953,8 @@ Status CompactionJob::OpenCompactionOutputFile(
cfd->GetName(), sub_compact->compaction->output_level(),
bottommost_level_, TableFileCreationReason::kCompaction,
oldest_ancester_time, 0 /* oldest_key_time */, current_time, db_id_,
db_session_id_, sub_compact->compaction->max_output_file_size());
db_session_id_, sub_compact->compaction->max_output_file_size(),
file_number);
sub_compact->builder.reset(
NewTableBuilder(tboptions, sub_compact->outfile.get()));
LogFlush(db_options_.info_log);

@ -82,10 +82,11 @@ class CompactionJobTestBase : public testing::Test {
mutable_db_options_(),
table_cache_(NewLRUCache(50000, 16)),
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_,
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_session_id*/ "")),
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()),
@ -269,7 +270,8 @@ class CompactionJobTestBase : public testing::Test {
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
compaction_job_stats_.Reset();
ASSERT_OK(SetIdentityFile(env_, dbname_));

@ -251,16 +251,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
co.metadata_charge_policy = kDontChargeCacheMetadata;
table_cache_ = NewLRUCache(co);
SetDbSessionId();
assert(!db_session_id_.empty());
versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
table_cache_.get(), write_buffer_manager_,
&write_controller_, &block_cache_tracer_,
io_tracer_));
io_tracer_, db_session_id_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
SetDbSessionId();
DumpDBFileSummary(immutable_db_options_, dbname_, db_session_id_);
immutable_db_options_.Dump(immutable_db_options_.info_log.get());
mutable_db_options_.Dump(immutable_db_options_.info_log.get());

@ -1402,7 +1402,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
0 /* level */, false /* is_bottommost */,
TableFileCreationReason::kRecovery, current_time,
0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_,
db_session_id_, 0 /* target_file_size */);
db_session_id_, 0 /* target_file_size */, meta.fd.GetNumber());
s = BuildTable(
dbname_, versions_.get(), immutable_db_options_, tboptions,
file_options_for_compaction_, cfd->table_cache(), iter.get(),

@ -1262,10 +1262,11 @@ class RecoveryTestHelper {
std::unique_ptr<WalManager> wal_manager;
WriteController write_controller;
versions.reset(new VersionSet(
test->dbname_, &db_options, file_options, table_cache.get(),
&write_buffer_manager, &write_controller,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
versions.reset(new VersionSet(test->dbname_, &db_options, file_options,
table_cache.get(), &write_buffer_manager,
&write_controller,
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_session_id*/ ""));
wal_manager.reset(
new WalManager(db_options, file_options, /*io_tracer=*/nullptr));

@ -412,7 +412,8 @@ Status FlushJob::WriteLevel0Table() {
mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
0 /* level */, false /* is_bottommost */,
TableFileCreationReason::kFlush, creation_time, oldest_key_time,
current_time, db_id_, db_session_id_, 0 /* target_file_size */);
current_time, db_id_, db_session_id_, 0 /* target_file_size */,
meta_.fd.GetNumber());
s = BuildTable(
dbname_, versions_, db_options_, tboptions, file_options_,
cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,

@ -127,7 +127,8 @@ class FlushJobTestBase : public testing::Test {
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
EXPECT_OK(versions_->Recover(column_families, false));
}

@ -103,7 +103,7 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr);
/*io_tracer=*/nullptr, /*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());
@ -153,7 +153,7 @@ class MemTableListTest : public testing::Test {
VersionSet versions(dbname, &immutable_db_options, env_options,
table_cache.get(), &write_buffer_manager,
&write_controller, /*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr);
/*io_tracer=*/nullptr, /*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
cf_descs.emplace_back("one", ColumnFamilyOptions());

@ -109,14 +109,20 @@ class Repairer {
// TableCache can be small since we expect each table to be opened
// once.
NewLRUCache(10, db_options_.table_cache_numshardbits)),
table_cache_(new TableCache(
default_iopts_, env_options_, raw_table_cache_.get(),
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)),
table_cache_(
// TODO: db_session_id for TableCache should be initialized after
// db_session_id_ is set.
new TableCache(default_iopts_, env_options_, raw_table_cache_.get(),
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_session_id*/ "")),
wb_(db_options_.db_write_buffer_size),
wc_(db_options_.delayed_write_rate),
// TODO: db_session_id for VersionSet should be initialized after
// db_session_id_ is set and use it for initialization.
vset_(dbname_, &immutable_db_options_, env_options_,
raw_table_cache_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr),
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""),
next_file_number_(1),
db_lock_(nullptr),
closed_(false) {
@ -448,7 +454,8 @@ class Repairer {
-1 /* level */, false /* is_bottommost */,
TableFileCreationReason::kRecovery, current_time,
0 /* oldest_key_time */, 0 /* file_creation_time */,
"DB Repairer" /* db_id */, db_session_id_, 0 /*target_file_size*/);
"DB Repairer" /* db_id */, db_session_id_, 0 /*target_file_size*/,
meta.fd.GetNumber());
status = BuildTable(
dbname_, /* versions */ nullptr, immutable_db_options_, tboptions,
env_options_, table_cache_.get(), iter.get(),

@ -68,14 +68,16 @@ const int kLoadConcurency = 128;
TableCache::TableCache(const ImmutableOptions& ioptions,
const FileOptions& file_options, Cache* const cache,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id)
: ioptions_(ioptions),
file_options_(file_options),
cache_(cache),
immortal_tables_(false),
block_cache_tracer_(block_cache_tracer),
loader_mutex_(kLoadConcurency, kGetSliceNPHash64UnseededFnPtr),
io_tracer_(io_tracer) {
io_tracer_(io_tracer),
db_session_id_(db_session_id) {
if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
@ -133,11 +135,11 @@ Status TableCache::GetTableReader(
file_read_hist, ioptions_.rate_limiter.get(), ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
ro,
TableReaderOptions(ioptions_, prefix_extractor, file_options,
internal_comparator, skip_filters, immortal_tables_,
false /* force_direct_prefetch */, level,
fd.largest_seqno, block_cache_tracer_,
max_file_size_for_l0_meta_pin),
TableReaderOptions(
ioptions_, prefix_extractor, file_options, internal_comparator,
skip_filters, immortal_tables_, false /* force_direct_prefetch */,
level, fd.largest_seqno, block_cache_tracer_,
max_file_size_for_l0_meta_pin, db_session_id_, fd.GetNumber()),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");

@ -51,7 +51,8 @@ class TableCache {
TableCache(const ImmutableOptions& ioptions,
const FileOptions& storage_options, Cache* cache,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id);
~TableCache();
// Return an iterator for the specified file number (the corresponding
@ -228,6 +229,7 @@ class TableCache {
BlockCacheTracer* const block_cache_tracer_;
Striped<port::Mutex, Slice> loader_mutex_;
std::shared_ptr<IOTracer> io_tracer_;
std::string db_session_id_;
};
} // namespace ROCKSDB_NAMESPACE

@ -3779,11 +3779,12 @@ VersionSet::VersionSet(const std::string& dbname,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer)
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id)
: column_family_set_(
new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller,
block_cache_tracer, io_tracer)),
block_cache_tracer, io_tracer, db_session_id)),
table_cache_(table_cache),
env_(_db_options->env),
fs_(_db_options->fs, io_tracer),
@ -3802,7 +3803,8 @@ VersionSet::VersionSet(const std::string& dbname,
manifest_file_size_(0),
file_options_(storage_options),
block_cache_tracer_(block_cache_tracer),
io_tracer_(io_tracer) {}
io_tracer_(io_tracer),
db_session_id_(db_session_id) {}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
@ -3823,9 +3825,9 @@ void VersionSet::Reset() {
if (column_family_set_) {
WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
WriteController* wc = column_family_set_->write_controller();
column_family_set_.reset(
new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache_,
wbm, wc, block_cache_tracer_, io_tracer_));
column_family_set_.reset(new ColumnFamilySet(
dbname_, db_options_, file_options_, table_cache_, wbm, wc,
block_cache_tracer_, io_tracer_, db_session_id_));
}
db_id_.clear();
next_file_number_.store(2);
@ -4837,7 +4839,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
WriteController wc(options->delayed_write_rate);
WriteBufferManager wb(options->db_write_buffer_size);
VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/);
nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/,
/*db_session_id*/ "");
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
@ -5698,7 +5701,8 @@ ReactiveVersionSet::ReactiveVersionSet(
const std::shared_ptr<IOTracer>& io_tracer)
: VersionSet(dbname, _db_options, _file_options, table_cache,
write_buffer_manager, write_controller,
/*block_cache_tracer=*/nullptr, io_tracer) {}
/*block_cache_tracer=*/nullptr, io_tracer,
/*db_session_id*/ "") {}
ReactiveVersionSet::~ReactiveVersionSet() {}

@ -935,7 +935,8 @@ class VersionSet {
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer);
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id);
// No copying allowed
VersionSet(const VersionSet&) = delete;
void operator=(const VersionSet&) = delete;
@ -1396,6 +1397,8 @@ class VersionSet {
std::shared_ptr<IOTracer> io_tracer_;
std::string db_session_id_;
private:
// REQUIRES db mutex at beginning. may release and re-acquire db mutex
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,

@ -729,7 +729,8 @@ class VersionSetTestBase {
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
reactive_versions_ = std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_, nullptr);
@ -832,7 +833,8 @@ class VersionSetTestBase {
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
EXPECT_OK(versions_->Recover(column_families_, false));
}
@ -1336,7 +1338,8 @@ TEST_F(VersionSetTest, WalAddition) {
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -1402,7 +1405,8 @@ TEST_F(VersionSetTest, WalCloseWithoutSync) {
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 2);
@ -1454,7 +1458,8 @@ TEST_F(VersionSetTest, WalDeletion) {
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -1491,7 +1496,8 @@ TEST_F(VersionSetTest, WalDeletion) {
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -1608,7 +1614,8 @@ TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) {
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 1);
@ -1643,7 +1650,8 @@ TEST_F(VersionSetTest, DeleteAllWals) {
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
ASSERT_OK(new_versions->Recover(column_families_, false));
const auto& wals = new_versions->GetWalSet().GetWals();
ASSERT_EQ(wals.size(), 0);
@ -1684,7 +1692,8 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
std::unique_ptr<VersionSet> new_versions(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
std::string db_id;
ASSERT_OK(
new_versions->Recover(column_families_, /*read_only=*/false, &db_id));
@ -1737,7 +1746,8 @@ class VersionSetWithTimestampTest : public VersionSetTest {
std::unique_ptr<VersionSet> vset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
/*db_id=*/nullptr));
for (auto* cfd : *(vset->GetColumnFamilySet())) {

@ -54,7 +54,8 @@ class WalManagerTest : public testing::Test {
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ ""));
wal_manager_.reset(
new WalManager(db_options_, env_options_, nullptr /*IOTracer*/));

@ -873,7 +873,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
table_options.block_cache_compressed.get(), file->writable_file(),
&rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size);
&rep_->compressed_cache_key_prefix_size, tbo.db_session_id,
tbo.cur_file_num);
}
if (rep_->IsParallelCompressionEnabled()) {

@ -485,7 +485,8 @@ Status BlockBasedTableFactory::NewTableReader(
table_reader_options.largest_seqno,
table_reader_options.force_direct_prefetch, &tail_prefetch_stats_,
table_reader_options.block_cache_tracer,
table_reader_options.max_file_size_for_l0_meta_pin);
table_reader_options.max_file_size_for_l0_meta_pin,
table_reader_options.cur_db_session_id);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(

@ -371,26 +371,29 @@ Cache::Handle* BlockBasedTable::GetEntryFromCache(
}
// Helper function to setup the cache key's prefix for the Table.
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) {
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep,
const std::string& db_session_id,
uint64_t cur_file_num) {
assert(kMaxCacheKeyPrefixSize >= 10);
rep->cache_key_prefix_size = 0;
rep->compressed_cache_key_prefix_size = 0;
if (rep->table_options.block_cache != nullptr) {
GenerateCachePrefix<Cache, FSRandomAccessFile>(
rep->table_options.block_cache.get(), rep->file->file(),
&rep->cache_key_prefix[0], &rep->cache_key_prefix_size);
&rep->cache_key_prefix[0], &rep->cache_key_prefix_size, db_session_id,
cur_file_num);
}
if (rep->table_options.persistent_cache != nullptr) {
GenerateCachePrefix<PersistentCache, FSRandomAccessFile>(
rep->table_options.persistent_cache.get(), rep->file->file(),
&rep->persistent_cache_key_prefix[0],
&rep->persistent_cache_key_prefix_size);
&rep->persistent_cache_key_prefix_size, "", cur_file_num);
}
if (rep->table_options.block_cache_compressed != nullptr) {
GenerateCachePrefix<Cache, FSRandomAccessFile>(
rep->table_options.block_cache_compressed.get(), rep->file->file(),
&rep->compressed_cache_key_prefix[0],
&rep->compressed_cache_key_prefix_size);
&rep->compressed_cache_key_prefix_size, "", cur_file_num);
}
}
@ -512,7 +515,8 @@ Status BlockBasedTable::Open(
const SequenceNumber largest_seqno, const bool force_direct_prefetch,
TailPrefetchStats* tail_prefetch_stats,
BlockCacheTracer* const block_cache_tracer,
size_t max_file_size_for_l0_meta_pin) {
size_t max_file_size_for_l0_meta_pin, const std::string& db_session_id,
uint64_t cur_file_num) {
table_reader->reset();
Status s;
@ -586,7 +590,7 @@ Status BlockBasedTable::Open(
rep->internal_prefix_transform.reset(
new InternalKeySliceTransform(prefix_extractor));
}
SetupCacheKeyPrefix(rep);
SetupCacheKeyPrefix(rep, db_session_id, cur_file_num);
std::unique_ptr<BlockBasedTable> new_table(
new BlockBasedTable(rep, block_cache_tracer));

@ -99,7 +99,9 @@ class BlockBasedTable : public TableReader {
bool force_direct_prefetch = false,
TailPrefetchStats* tail_prefetch_stats = nullptr,
BlockCacheTracer* const block_cache_tracer = nullptr,
size_t max_file_size_for_l0_meta_pin = 0);
size_t max_file_size_for_l0_meta_pin = 0,
const std::string& db_session_id = "",
uint64_t cur_file_num = 0);
bool PrefixMayMatch(const Slice& internal_key,
const ReadOptions& read_options,
@ -446,22 +448,39 @@ class BlockBasedTable : public TableReader {
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context);
static void SetupCacheKeyPrefix(Rep* rep);
static void SetupCacheKeyPrefix(Rep* rep, const std::string& db_session_id,
uint64_t cur_file_num);
// Generate a cache key prefix from the file
template <typename TCache, typename TFile>
static void GenerateCachePrefix(TCache* cc, TFile* file, char* buffer,
size_t* size) {
size_t* size,
const std::string& db_session_id,
uint64_t cur_file_num) {
// generate an id from the file
*size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
// If the prefix wasn't generated or was too long,
// create one from the cache.
// create one based on the DbSessionId and curent file number if they
// are set. Otherwise, created from NewId()
if (cc != nullptr && *size == 0) {
if (db_session_id.size() == 20) {
// db_session_id is 20 bytes as defined.
memcpy(buffer, db_session_id.c_str(), 20);
char* end;
if (cur_file_num != 0) {
end = EncodeVarint64(buffer + 20, cur_file_num);
} else {
end = EncodeVarint64(buffer + 20, cc->NewId());
}
// kMaxVarint64Length is 10 therefore, the prefix is at most 30 bytes.
*size = static_cast<size_t>(end - buffer);
} else {
char* end = EncodeVarint64(buffer, cc->NewId());
*size = static_cast<size_t>(end - buffer);
}
}
}
// Size of all data blocks, maybe approximate
uint64_t GetApproximateDataSize();

@ -255,7 +255,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
cf_id, r->column_family_name, unknown_level, false /* is_bottommost */,
TableFileCreationReason::kMisc, 0 /* creation_time */,
0 /* oldest_key_time */, 0 /* file_creation_time */,
"SST Writer" /* db_id */, db_session_id, 0 /* target_file_size */);
"SST Writer" /* db_id */, db_session_id, 0 /* target_file_size */, 0);
// XXX: when we can remove skip_filters from the SstFileWriter public API
// we can remove it from TableBuilderOptions.
table_builder_options.skip_filters = r->skip_filters;

@ -37,12 +37,15 @@ struct TableReaderOptions {
bool _skip_filters = false, bool _immortal = false,
bool _force_direct_prefetch = false, int _level = -1,
BlockCacheTracer* const _block_cache_tracer = nullptr,
size_t _max_file_size_for_l0_meta_pin = 0)
: TableReaderOptions(_ioptions, _prefix_extractor, _env_options,
_internal_comparator, _skip_filters, _immortal,
_force_direct_prefetch, _level,
size_t _max_file_size_for_l0_meta_pin = 0,
const std::string& _cur_db_session_id = "",
uint64_t _cur_file_num = 0)
: TableReaderOptions(
_ioptions, _prefix_extractor, _env_options, _internal_comparator,
_skip_filters, _immortal, _force_direct_prefetch, _level,
0 /* _largest_seqno */, _block_cache_tracer,
_max_file_size_for_l0_meta_pin) {}
_max_file_size_for_l0_meta_pin, _cur_db_session_id, _cur_file_num) {
}
// @param skip_filters Disables loading/accessing the filter block
TableReaderOptions(const ImmutableOptions& _ioptions,
@ -53,7 +56,9 @@ struct TableReaderOptions {
bool _force_direct_prefetch, int _level,
SequenceNumber _largest_seqno,
BlockCacheTracer* const _block_cache_tracer,
size_t _max_file_size_for_l0_meta_pin)
size_t _max_file_size_for_l0_meta_pin,
const std::string& _cur_db_session_id,
uint64_t _cur_file_num)
: ioptions(_ioptions),
prefix_extractor(_prefix_extractor),
env_options(_env_options),
@ -64,7 +69,9 @@ struct TableReaderOptions {
level(_level),
largest_seqno(_largest_seqno),
block_cache_tracer(_block_cache_tracer),
max_file_size_for_l0_meta_pin(_max_file_size_for_l0_meta_pin) {}
max_file_size_for_l0_meta_pin(_max_file_size_for_l0_meta_pin),
cur_db_session_id(_cur_db_session_id),
cur_file_num(_cur_file_num) {}
const ImmutableOptions& ioptions;
const SliceTransform* prefix_extractor;
@ -87,6 +94,10 @@ struct TableReaderOptions {
// Largest L0 file size whose meta-blocks may be pinned (can be zero when
// unknown).
const size_t max_file_size_for_l0_meta_pin;
std::string cur_db_session_id;
uint64_t cur_file_num;
};
struct TableBuilderOptions {
@ -102,7 +113,7 @@ struct TableBuilderOptions {
const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0,
const uint64_t _file_creation_time = 0, const std::string& _db_id = "",
const std::string& _db_session_id = "",
const uint64_t _target_file_size = 0)
const uint64_t _target_file_size = 0, const uint64_t _cur_file_num = 0)
: ioptions(_ioptions),
moptions(_moptions),
internal_comparator(_internal_comparator),
@ -119,7 +130,8 @@ struct TableBuilderOptions {
db_session_id(_db_session_id),
level_at_creation(_level),
is_bottommost(_is_bottommost),
reason(_reason) {}
reason(_reason),
cur_file_num(_cur_file_num) {}
TableBuilderOptions(
const ImmutableOptions& _ioptions, const MutableCFOptions& _moptions,
@ -133,7 +145,7 @@ struct TableBuilderOptions {
const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0,
const uint64_t _file_creation_time = 0, const std::string& _db_id = "",
const std::string& _db_session_id = "",
const uint64_t _target_file_size = 0)
const uint64_t _target_file_size = 0, const uint64_t _cur_file_num = 0)
: TableBuilderOptions(_ioptions, _moptions, _internal_comparator,
IntTblPropCollectorFactoryRange(
_int_tbl_prop_collector_factories->begin(),
@ -142,7 +154,7 @@ struct TableBuilderOptions {
_column_family_id, _column_family_name, _level,
_is_bottommost, _reason, _creation_time,
_oldest_key_time, _file_creation_time, _db_id,
_db_session_id, _target_file_size) {}
_db_session_id, _target_file_size, _cur_file_num) {}
const ImmutableOptions& ioptions;
const MutableCFOptions& moptions;
@ -168,6 +180,7 @@ struct TableBuilderOptions {
// want to skip filters, that should be (for example) null filter_policy
// in the table options of the ioptions.table_factory
bool skip_filters = false;
const uint64_t cur_file_num;
};
// TableBuilder provides the interface used to build a Table

@ -398,7 +398,7 @@ class TableConstructor : public Constructor {
TableReaderOptions(ioptions, moptions.prefix_extractor.get(), soptions,
internal_comparator, !kSkipFilters, !kImmortal,
false, level_, largest_seqno_, &block_cache_tracer_,
moptions.write_buffer_size),
moptions.write_buffer_size, "", uniq_id_),
std::move(file_reader_), TEST_GetSink()->contents().size(),
&table_reader_);
}

@ -1104,7 +1104,8 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex,
WriteBufferManager wb(options.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr);
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "");
Status s = versions.DumpManifest(options, file, verbose, hex, json);
if (!s.ok()) {
fprintf(stderr, "Error in processing file %s %s\n", file.c_str(),
@ -1246,7 +1247,8 @@ void GetLiveFilesChecksumInfoFromVersionSet(Options options,
WriteBufferManager wb(options.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr);
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "");
std::vector<std::string> cf_name_list;
s = versions.ListColumnFamilies(&cf_name_list, db_path,
immutable_db_options.fs.get());
@ -2027,7 +2029,8 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
WriteController wc(opt.delayed_write_rate);
WriteBufferManager wb(opt.db_write_buffer_size);
VersionSet versions(db_path_, &db_options, soptions, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr);
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "");
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
ColumnFamilyOptions(opt));
@ -3713,7 +3716,8 @@ void UnsafeRemoveSstFileCommand::DoCommand() {
NewLRUCache(1 << 20 /* capacity */, options_.table_cache_numshardbits));
EnvOptions sopt;
VersionSet versions(db_path_, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr);
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "");
Status s = versions.Recover(column_families_);
ColumnFamilyData* cfd = nullptr;

@ -206,7 +206,7 @@ class FileChecksumTestHelper {
WriteBufferManager wb(options_.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options_);
VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb,
&wc, nullptr, nullptr);
&wc, nullptr, nullptr, "");
std::vector<std::string> cf_name_list;
Status s;
s = versions.ListColumnFamilies(&cf_name_list, dbname_,

@ -299,6 +299,14 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
return s;
}
size_t TestFSRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
if (fs_->ShouldFailGetUniqueId()) {
return 0;
} else {
return target_->GetUniqueId(id, max_size);
}
}
IOStatus FaultInjectionTestFS::NewDirectory(
const std::string& name, const IOOptions& options,
std::unique_ptr<FSDirectory>* result, IODebugContext* dbg) {

@ -145,6 +145,8 @@ class TestFSRandomAccessFile : public FSRandomAccessFile {
}
bool use_direct_io() const override { return target_->use_direct_io(); }
size_t GetUniqueId(char* id, size_t max_size) const override;
private:
std::unique_ptr<FSRandomAccessFile> target_;
FaultInjectionTestFS* fs_;
@ -178,7 +180,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
write_error_rand_(0),
write_error_one_in_(0),
metadata_write_error_one_in_(0),
ingest_data_corruption_before_write_(false) {}
ingest_data_corruption_before_write_(false),
fail_get_file_unique_id_(false) {}
virtual ~FaultInjectionTestFS() { error_.PermitUncheckedError(); }
const char* Name() const override { return "FaultInjectionTestFS"; }
@ -321,6 +324,16 @@ class FaultInjectionTestFS : public FileSystemWrapper {
return checksum_handoff_func_tpye_;
}
void SetFailGetUniqueId(bool flag) {
MutexLock l(&mutex_);
fail_get_file_unique_id_ = flag;
}
bool ShouldFailGetUniqueId() {
MutexLock l(&mutex_);
return fail_get_file_unique_id_;
}
// Specify what the operation, so we can inject the right type of error
enum ErrorOperation : char {
kRead = 0,
@ -484,6 +497,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
std::vector<FileType> write_error_allowed_types_;
bool ingest_data_corruption_before_write_;
ChecksumType checksum_handoff_func_tpye_;
bool fail_get_file_unique_id_;
};
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save