Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes.

Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.

Test Plan:
'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK.
I didn't run the Java tests, I don't have Java set up on my devserver.

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D56133
main
Marton Trencseni 9 years ago
parent 2feafa3db9
commit 9b51987521
  1. 5
      db/builder.cc
  2. 2
      db/builder.h
  3. 5
      db/c.cc
  4. 2
      db/column_family.h
  5. 15
      db/db_impl.cc
  6. 118
      db/db_test2.cc
  7. 10
      db/flush_job.cc
  8. 28
      db/table_cache.cc
  9. 15
      db/table_cache.h
  10. 10
      db/version_builder.cc
  11. 35
      db/version_set.cc
  12. 1
      examples/rocksdb_option_file_example.ini
  13. 3
      include/rocksdb/c.h
  14. 4
      include/rocksdb/cache.h
  15. 6
      include/rocksdb/table.h
  16. 5
      java/rocksjni/table.cc
  17. 33
      java/src/main/java/org/rocksdb/BlockBasedTableConfig.java
  18. 12
      table/block_based_table_factory.cc
  19. 118
      table/block_based_table_reader.cc
  20. 9
      table/block_based_table_reader.h
  21. 7
      table/table_builder.h
  22. 2
      table/table_reader.h
  23. 21
      table/table_test.cc
  24. 1
      tools/benchmark.sh
  25. 5
      tools/db_bench_tool.cc
  26. 8
      tools/sst_dump_tool_imp.h
  27. 32
      util/cache.cc
  28. 21
      util/options_helper.h
  29. 4
      util/options_test.cc
  30. 1
      util/testutil.cc

@ -63,7 +63,7 @@ Status BuildTable(
const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, const Env::IOPriority io_priority,
TableProperties* table_properties) {
TableProperties* table_properties, int level) {
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
@ -149,7 +149,8 @@ Status BuildTable(
ReadOptions(), env_options, internal_comparator, meta->fd, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
false));
false /* for_compaction */, nullptr /* arena */,
false /* skip_filter */, level));
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {

@ -61,6 +61,6 @@ extern Status BuildTable(
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr);
TableProperties* table_properties = nullptr, int level = -1);
} // namespace rocksdb

@ -1288,6 +1288,11 @@ void rocksdb_block_based_options_set_cache_index_and_filter_blocks(
options->rep.cache_index_and_filter_blocks = v;
}
void rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.pin_l0_filter_and_index_blocks_in_cache = v;
}
void rocksdb_block_based_options_set_skip_table_builder_flush(
rocksdb_block_based_table_options_t* options, unsigned char v) {
options->rep.skip_table_builder_flush = v;

@ -465,6 +465,8 @@ class ColumnFamilySet {
// Don't call while iterating over ColumnFamilySet
void FreeDeadColumnFamilies();
Cache* get_table_cache() { return table_cache_; }
private:
friend class ColumnFamilyData;
// helper function that gets called from cfd destructor

@ -405,6 +405,21 @@ DBImpl::~DBImpl() {
}
logs_.clear();
// Table cache may have table handles holding blocks from the block cache.
// We need to release them before the block cache is destroyed. The block
// cache may be destroyed inside versions_.reset(), when column family data
// list is destroyed, so leaving handles in table cache after
// versions_.reset() may cause issues.
// Here we clean all unreferenced handles in table cache.
// Now we assume all user queries have finished, so only version set itself
// can possibly hold the blocks from block cache. After releasing unreferenced
// handles here, only handles held by version set left and inside
// versions_.reset(), we will release them. There, we need to make sure every
// time a handle is released, we erase it from the cache too. By doing that,
// we can guarantee that after versions_.reset(), table cache is empty
// so the cache can be safely destroyed.
table_cache_->EraseUnRefEntries();
// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
versions_.reset();

@ -13,6 +13,11 @@
namespace rocksdb {
static uint64_t TestGetTickerCount(const Options& options,
Tickers ticker_type) {
return options.statistics->getTickerCount(ticker_type);
}
class DBTest2 : public DBTestBase {
public:
DBTest2() : DBTestBase("/db_test2") {}
@ -675,6 +680,119 @@ TEST_F(DBTest2, DISABLED_FirstSnapshotTest) {
db_->ReleaseSnapshot(s1);
}
class PinL0IndexAndFilterBlocksTest : public DBTestBase,
public testing::WithParamInterface<bool> {
public:
PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
virtual void SetUp() override { infinite_max_files_ = GetParam(); }
bool infinite_max_files_;
};
TEST_P(PinL0IndexAndFilterBlocksTest,
IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
Options options = CurrentOptions();
if (infinite_max_files_) {
options.max_open_files = -1;
}
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "key", "val"));
// Create a new table.
ASSERT_OK(Flush(1));
// index/filter blocks added to block cache right after table creation.
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// only index/filter were added
ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
std::string value;
// Miss and hit count should remain the same, they're all pinned.
db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// Miss and hit count should remain the same, they're all pinned.
value = Get(1, "key");
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
}
TEST_P(PinL0IndexAndFilterBlocksTest,
MultiLevelIndexAndFilterBlocksCachedWithPinning) {
Options options = CurrentOptions();
if (infinite_max_files_) {
options.max_open_files = -1;
}
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "a", "begin");
Put(1, "z", "end");
ASSERT_OK(Flush(1));
// move this table to L1
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
// reset block cache
table_options.block_cache = NewLRUCache(64 * 1024);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
// create new table at L0
Put(1, "a2", "begin2");
Put(1, "z2", "end2");
ASSERT_OK(Flush(1));
table_options.block_cache->EraseUnRefEntries();
// get base cache values
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
std::string value;
// this should be read from L0
// so cache values don't change
value = Get(1, "a2");
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
// this should be read from L1
// the file is opened, prefetching results in a cache filter miss
// the block is loaded and added to the cache,
// then the get results in a cache hit for L1
value = Get(1, "a");
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
}
INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
PinL0IndexAndFilterBlocksTest, ::testing::Bool());
} // namespace rocksdb
int main(int argc, char** argv) {

@ -234,14 +234,14 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), meta,
cfd_->internal_comparator(),
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
existing_snapshots_, earliest_write_conflict_snapshot_,
output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), Env::IO_HIGH, &table_properties_);
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
Env::IO_HIGH, &table_properties_, 0 /* level */);
info.table_properties = table_properties_;
LogFlush(db_options_.info_log);
}

@ -88,7 +88,7 @@ Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader, bool skip_filters) {
unique_ptr<TableReader>* table_reader, bool skip_filters, int level) {
std::string fname =
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
unique_ptr<RandomAccessFile> file;
@ -109,18 +109,26 @@ Status TableCache::GetTableReader(
file_read_hist));
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, env_options, internal_comparator,
skip_filters),
skip_filters, level),
std::move(file_reader), fd.GetFileSize(), table_reader);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
}
return s;
}
void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) {
ReleaseHandle(handle);
uint64_t number = fd.GetNumber();
Slice key = GetSliceForFileNumber(&number);
cache_->Erase(key);
}
Status TableCache::FindTable(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters) {
HistogramImpl* file_read_hist, bool skip_filters,
int level) {
PERF_TIMER_GUARD(find_table_nanos);
Status s;
uint64_t number = fd.GetNumber();
@ -136,7 +144,7 @@ Status TableCache::FindTable(const EnvOptions& env_options,
unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, skip_filters);
file_read_hist, &table_reader, skip_filters, level);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
@ -158,7 +166,7 @@ InternalIterator* TableCache::NewIterator(
const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileDescriptor& fd,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
bool for_compaction, Arena* arena, bool skip_filters) {
bool for_compaction, Arena* arena, bool skip_filters, int level) {
PERF_TIMER_GUARD(new_table_iterator_nanos);
if (table_reader_ptr != nullptr) {
@ -173,7 +181,8 @@ InternalIterator* TableCache::NewIterator(
unique_ptr<TableReader> table_reader_unique_ptr;
Status s = GetTableReader(
env_options, icomparator, fd, /* sequential mode */ true,
/* record stats */ false, nullptr, &table_reader_unique_ptr);
/* record stats */ false, nullptr, &table_reader_unique_ptr,
false /* skip_filters */, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
}
@ -184,7 +193,7 @@ InternalIterator* TableCache::NewIterator(
Status s = FindTable(env_options, icomparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
!for_compaction /* record read_stats */,
file_read_hist, skip_filters);
file_read_hist, skip_filters, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
}
@ -216,7 +225,7 @@ Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, const Slice& k,
GetContext* get_context, HistogramImpl* file_read_hist,
bool skip_filters) {
bool skip_filters, int level) {
TableReader* t = fd.table_reader;
Status s;
Cache::Handle* handle = nullptr;
@ -265,7 +274,8 @@ Status TableCache::Get(const ReadOptions& options,
if (!t) {
s = FindTable(env_options_, internal_comparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters);
true /* record_read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}

@ -45,34 +45,41 @@ class TableCache {
// the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
InternalIterator* NewIterator(
const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr,
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
Arena* arena = nullptr, bool skip_filters = false);
Arena* arena = nullptr, bool skip_filters = false, int level = -1);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value) repeatedly until
// it returns false.
// @param skip_filters Disables loading/accessing the filter block
// @param level The level this table is at, -1 for "not set / don't know"
Status Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, const Slice& k,
GetContext* get_context, HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);
// Clean table handle and erase it from the table cache
// Used in DB close, or the file is not live anymore.
void EraseHandle(const FileDescriptor& fd, Cache::Handle* handle);
// Find table reader
// @param skip_filters Disables loading/accessing the filter block
// @param level == -1 means not specified
Status FindTable(const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, Cache::Handle**,
const bool no_io = false, bool record_read_stats = true,
HistogramImpl* file_read_hist = nullptr,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
// Get TableReader from a cache handle.
TableReader* GetTableReaderFromHandle(Cache::Handle* handle);
@ -106,7 +113,7 @@ class TableCache {
const FileDescriptor& fd, bool sequential_mode,
bool record_read_stats, HistogramImpl* file_read_hist,
unique_ptr<TableReader>* table_reader,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
const ImmutableCFOptions& ioptions_;
const EnvOptions& env_options_;

@ -309,11 +309,11 @@ class VersionBuilder::Rep {
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
table_cache_->FindTable(env_options_,
*(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle,
false /*no_io */, true /* record_read_stats */,
internal_stats->GetFileReadHist(level));
table_cache_->FindTable(
env_options_, *(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle, false /*no_io */,
true /* record_read_stats */,
internal_stats->GetFileReadHist(level), false, level);
if (file_meta->table_reader_handle != nullptr) {
// Load table_reader
file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(

@ -92,6 +92,7 @@ class FilePicker {
const InternalKeyComparator* internal_comparator)
: num_levels_(num_levels),
curr_level_(static_cast<unsigned int>(-1)),
returned_file_level_(static_cast<unsigned int>(-1)),
hit_file_level_(static_cast<unsigned int>(-1)),
search_left_bound_(0),
search_right_bound_(FileIndexer::kLevelMaxIndex),
@ -118,6 +119,8 @@ class FilePicker {
}
}
int GetCurrentLevel() { return returned_file_level_; }
FdWithKeyRange* GetNextFile() {
while (!search_ended_) { // Loops over different levels.
while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
@ -190,6 +193,7 @@ class FilePicker {
}
prev_file_ = f;
#endif
returned_file_level_ = curr_level_;
if (curr_level_ > 0 && cmp_largest < 0) {
// No more files to search in this level.
search_ended_ = !PrepareNextLevel();
@ -216,6 +220,7 @@ class FilePicker {
private:
unsigned int num_levels_;
unsigned int curr_level_;
unsigned int returned_file_level_;
unsigned int hit_file_level_;
int32_t search_left_bound_;
int32_t search_right_bound_;
@ -322,7 +327,7 @@ Version::~Version() {
f->refs--;
if (f->refs <= 0) {
if (f->table_reader_handle) {
cfd_->table_cache()->ReleaseHandle(f->table_reader_handle);
cfd_->table_cache()->EraseHandle(f->fd, f->table_reader_handle);
f->table_reader_handle = nullptr;
}
vset_->obsolete_files_.push_back(f);
@ -486,7 +491,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
const EnvOptions& env_options,
const InternalKeyComparator& icomparator,
HistogramImpl* file_read_hist, bool for_compaction,
bool prefix_enabled, bool skip_filters)
bool prefix_enabled, bool skip_filters, int level)
: TwoLevelIteratorState(prefix_enabled),
table_cache_(table_cache),
read_options_(read_options),
@ -494,7 +499,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
icomparator_(icomparator),
file_read_hist_(file_read_hist),
for_compaction_(for_compaction),
skip_filters_(skip_filters) {}
skip_filters_(skip_filters),
level_(level) {}
InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override {
if (meta_handle.size() != sizeof(FileDescriptor)) {
@ -506,7 +512,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
return table_cache_->NewIterator(
read_options_, env_options_, icomparator_, *fd,
nullptr /* don't need reference to table*/, file_read_hist_,
for_compaction_, nullptr /* arena */, skip_filters_);
for_compaction_, nullptr /* arena */, skip_filters_, level_);
}
}
@ -522,6 +528,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
HistogramImpl* file_read_hist_;
bool for_compaction_;
bool skip_filters_;
int level_;
};
// A wrapper of version builder which references the current version in
@ -789,7 +796,8 @@ void Version::AddIterators(const ReadOptions& read_options,
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr,
cfd_->internal_stats()->GetFileReadHist(0), false, arena));
cfd_->internal_stats()->GetFileReadHist(0), false, arena,
false /* skip_filters */, 0 /* level */));
}
// For levels > 0, we can use a concatenating iterator that sequentially
@ -804,7 +812,7 @@ void Version::AddIterators(const ReadOptions& read_options,
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr,
IsFilterSkipped(level));
IsFilterSkipped(level), level);
mem = arena->AllocateAligned(sizeof(LevelFileNumIterator));
auto* first_level_iter = new (mem) LevelFileNumIterator(
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level));
@ -909,7 +917,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
read_options, *internal_comparator(), f->fd, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()));
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
@ -2062,9 +2071,16 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options,
env_options_(storage_options),
env_options_compactions_(env_options_) {}
void CloseTables(void* ptr, size_t) {
TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
table_reader->Close();
}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
column_family_set_->get_table_cache()->ApplyToAllCacheEntries(&CloseTables,
false);
column_family_set_.reset();
for (auto file : obsolete_files_) {
delete file;
@ -3275,7 +3291,8 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) {
read_options, env_options_compactions_,
cfd->internal_comparator(), flevel->files[i].fd, nullptr,
nullptr, /* no per level latency histogram*/
true /* for compaction */);
true /* for_compaction */, nullptr /* arena */,
false /* skip_filters */, (int)which /* level */);
}
} else {
// Create concatenating iterator for the files from this level
@ -3285,7 +3302,7 @@ InternalIterator* VersionSet::MakeInputIterator(const Compaction* c) {
cfd->internal_comparator(),
nullptr /* no per level latency histogram */,
true /* for_compaction */, false /* prefix enabled */,
false /* skip_filters */),
false /* skip_filters */, (int)which /* level */),
new LevelFileNumIterator(cfd->internal_comparator(),
c->input_levels(which)));
}

@ -138,6 +138,7 @@
block_size=8192
block_restart_interval=16
cache_index_and_filter_blocks=false
pin_l0_filter_and_index_blocks_in_cache=false
index_type=kBinarySearch
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory

@ -451,6 +451,9 @@ extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_cache_index_and_filter_blocks(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(
rocksdb_block_based_table_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_block_based_options_set_skip_table_builder_flush(
rocksdb_block_based_table_options_t* options, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory(

@ -146,6 +146,10 @@ class Cache {
virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) = 0;
// Remove all entries.
// Prerequisit: no entry is referenced.
virtual void EraseUnRefEntries() = 0;
private:
void LRU_Remove(Handle* e);
void LRU_Append(Handle* e);

@ -64,6 +64,12 @@ struct BlockBasedTableOptions {
// block during table initialization.
bool cache_index_and_filter_blocks = false;
// if cache_index_and_filter_blocks is true and the below is true, then
// filter and index blocks are stored in the cache, but a reference is
// held in the "table reader" object so the blocks are pinned and only
// evicted from cache when the table reader is freed.
bool pin_l0_filter_and_index_blocks_in_cache = false;
// The index type that will be used for this table.
enum IndexType : char {
// A space efficient index block that is optimized for

@ -38,13 +38,14 @@ jlong Java_org_rocksdb_PlainTableConfig_newTableFactoryHandle(
/*
* Class: org_rocksdb_BlockBasedTableConfig
* Method: newTableFactoryHandle
* Signature: (ZJIJIIZIZZJIBBI)J
* Signature: (ZJIJIIZIZZZJIBBI)J
*/
jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size,
jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation,
jint block_restart_interval, jboolean whole_key_filtering,
jlong jfilterPolicy, jboolean cache_index_and_filter_blocks,
jboolean pin_l0_filter_and_index_blocks_in_cache,
jboolean hash_index_allow_collision, jlong block_cache_compressed_size,
jint block_cache_compressd_num_shard_bits, jbyte jchecksum_type,
jbyte jindex_type, jint jformat_version) {
@ -70,6 +71,8 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
options.filter_policy = *pFilterPolicy;
}
options.cache_index_and_filter_blocks = cache_index_and_filter_blocks;
options.pin_l0_filter_and_index_blocks_in_cache =
pin_l0_filter_and_index_blocks_in_cache;
options.hash_index_allow_collision = hash_index_allow_collision;
if (block_cache_compressed_size > 0) {
if (block_cache_compressd_num_shard_bits > 0) {

@ -21,6 +21,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
wholeKeyFiltering_ = true;
filter_ = null;
cacheIndexAndFilterBlocks_ = false;
pinL0FilterAndIndexBlocksInCache_ = false;
hashIndexAllowCollision_ = true;
blockCacheCompressedSize_ = 0;
blockCacheCompressedNumShardBits_ = 0;
@ -226,6 +227,29 @@ public class BlockBasedTableConfig extends TableFormatConfig {
return this;
}
/**
* Indicating if we'd like to pin L0 index/filter blocks to the block cache.
If not specified, defaults to false.
*
* @return if L0 index and filter blocks should be pinned to the block cache.
*/
public boolean pinL0FilterAndIndexBlocksInCache() {
return pinL0FilterAndIndexBlocksInCache_;
}
/**
* Indicating if we'd like to pin L0 index/filter blocks to the block cache.
If not specified, defaults to false.
*
* @param pinL0FilterAndIndexBlocksInCache pin blocks in block cache
* @return the reference to the current config.
*/
public BlockBasedTableConfig setPinL0FilterAndIndexBlocksInCache(
final boolean pinL0FilterAndIndexBlocksInCache) {
pinL0FilterAndIndexBlocksInCache_ = pinL0FilterAndIndexBlocksInCache;
return this;
}
/**
* Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
@ -393,6 +417,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
blockCacheNumShardBits_, blockSize_, blockSizeDeviation_,
blockRestartInterval_, wholeKeyFiltering_,
filterHandle, cacheIndexAndFilterBlocks_,
pinL0FilterAndIndexBlocksInCache_,
hashIndexAllowCollision_, blockCacheCompressedSize_,
blockCacheCompressedNumShardBits_,
checksumType_.getValue(), indexType_.getValue(),
@ -403,11 +428,13 @@ public class BlockBasedTableConfig extends TableFormatConfig {
boolean noBlockCache, long blockCacheSize, int blockCacheNumShardBits,
long blockSize, int blockSizeDeviation, int blockRestartInterval,
boolean wholeKeyFiltering, long filterPolicyHandle,
boolean cacheIndexAndFilterBlocks, boolean hashIndexAllowCollision,
long blockCacheCompressedSize, int blockCacheCompressedNumShardBits,
byte checkSumType, byte indexType, int formatVersion);
boolean cacheIndexAndFilterBlocks, boolean pinL0FilterAndIndexBlocksInCache,
boolean hashIndexAllowCollision, long blockCacheCompressedSize,
int blockCacheCompressedNumShardBits, byte checkSumType,
byte indexType, int formatVersion);
private boolean cacheIndexAndFilterBlocks_;
private boolean pinL0FilterAndIndexBlocksInCache_;
private IndexType indexType_;
private boolean hashIndexAllowCollision_;
private ChecksumType checksumType_;

@ -64,7 +64,7 @@ Status BlockBasedTableFactory::NewTableReader(
table_reader_options.ioptions, table_reader_options.env_options,
table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, prefetch_enabled,
table_reader_options.skip_filters);
table_reader_options.skip_filters, table_reader_options.level);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
@ -94,6 +94,12 @@ Status BlockBasedTableFactory::SanitizeOptions(
return Status::InvalidArgument("Enable cache_index_and_filter_blocks, "
", but block cache is disabled");
}
if (table_options_.pin_l0_filter_and_index_blocks_in_cache &&
table_options_.no_block_cache) {
return Status::InvalidArgument(
"Enable pin_l0_filter_and_index_blocks_in_cache, "
", but block cache is disabled");
}
if (!BlockBasedTableSupportedVersion(table_options_.format_version)) {
return Status::InvalidArgument(
"Unsupported BlockBasedTable format_version. Please check "
@ -115,6 +121,10 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
snprintf(buffer, kBufferSize, " cache_index_and_filter_blocks: %d\n",
table_options_.cache_index_and_filter_blocks);
ret.append(buffer);
snprintf(buffer, kBufferSize,
" pin_l0_filter_and_index_blocks_in_cache: %d\n",
table_options_.pin_l0_filter_and_index_blocks_in_cache);
ret.append(buffer);
snprintf(buffer, kBufferSize, " index_type: %d\n",
table_options_.index_type);
ret.append(buffer);

@ -340,6 +340,28 @@ class HashIndexReader : public IndexReader {
BlockContents prefixes_contents_;
};
// CachableEntry represents the entries that *may* be fetched from block cache.
// field `value` is the item we want to get.
// field `cache_handle` is the cache handle to the block cache. If the value
// was not read from cache, `cache_handle` will be nullptr.
template <class TValue>
struct BlockBasedTable::CachableEntry {
CachableEntry(TValue* _value, Cache::Handle* _cache_handle)
: value(_value), cache_handle(_cache_handle) {}
CachableEntry() : CachableEntry(nullptr, nullptr) {}
void Release(Cache* cache) {
if (cache_handle) {
cache->Release(cache_handle);
value = nullptr;
cache_handle = nullptr;
}
}
bool IsSet() const { return cache_handle != nullptr; }
TValue* value = nullptr;
// if the entry is from the cache, cache_handle will be populated.
Cache::Handle* cache_handle = nullptr;
};
struct BlockBasedTable::Rep {
Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options,
@ -394,34 +416,21 @@ struct BlockBasedTable::Rep {
// and compatible with existing code, we introduce a wrapper that allows
// block to extract prefix without knowing if a key is internal or not.
unique_ptr<SliceTransform> internal_prefix_transform;
// only used in level 0 files:
// when pin_l0_filter_and_index_blocks_in_cache is true, we do use the
// LRU cache, but we always keep the filter & idndex block's handle checked
// out here (=we don't call Release()), plus the parsed out objects
// the LRU cache will never push flush them out, hence they're pinned
CachableEntry<FilterBlockReader> filter_entry;
CachableEntry<IndexReader> index_entry;
};
BlockBasedTable::~BlockBasedTable() {
Close();
delete rep_;
}
// CachableEntry represents the entries that *may* be fetched from block cache.
// field `value` is the item we want to get.
// field `cache_handle` is the cache handle to the block cache. If the value
// was not read from cache, `cache_handle` will be nullptr.
template <class TValue>
struct BlockBasedTable::CachableEntry {
CachableEntry(TValue* _value, Cache::Handle* _cache_handle)
: value(_value), cache_handle(_cache_handle) {}
CachableEntry() : CachableEntry(nullptr, nullptr) {}
void Release(Cache* cache) {
if (cache_handle) {
cache->Release(cache_handle);
value = nullptr;
cache_handle = nullptr;
}
}
TValue* value = nullptr;
// if the entry is from the cache, cache_handle will be populated.
Cache::Handle* cache_handle = nullptr;
};
// Helper function to setup the cache key's prefix for the Table.
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) {
assert(kMaxCacheKeyPrefixSize >= 10);
@ -498,7 +507,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
uint64_t file_size,
unique_ptr<TableReader>* table_reader,
const bool prefetch_index_and_filter,
const bool skip_filters) {
const bool skip_filters, const int level) {
table_reader->reset();
Footer footer;
@ -594,15 +603,34 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
assert(table_options.block_cache != nullptr);
// Hack: Call NewIndexIterator() to implicitly add index to the
// block_cache
// if pin_l0_filter_and_index_blocks_in_cache is true and this is
// a level0 file, then we will pass in this pointer to rep->index
// to NewIndexIterator(), which will save the index block in there
// else it's a nullptr and nothing special happens
CachableEntry<IndexReader>* index_entry = nullptr;
if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0) {
index_entry = &rep->index_entry;
}
unique_ptr<InternalIterator> iter(
new_table->NewIndexIterator(ReadOptions()));
new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry));
s = iter->status();
if (s.ok()) {
// Hack: Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter();
// if pin_l0_filter_and_index_blocks_in_cache is true, and this is
// a level0 file, then save it in rep_->filter_entry; it will be
// released in the destructor only, hence it will be pinned in the
// cache until this reader is alive
if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache &&
level == 0) {
rep->filter_entry = filter_entry;
} else {
filter_entry.Release(table_options.block_cache.get());
}
}
} else {
// If we don't use block cache for index/filter blocks access, we'll
// pre-load these blocks, which will kept in member variables in Rep
@ -886,14 +914,19 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
return {rep_->filter.get(), nullptr /* cache handle */};
}
PERF_TIMER_GUARD(read_filter_block_nanos);
Cache* block_cache = rep_->table_options.block_cache.get();
if (rep_->filter_policy == nullptr /* do not use filter */ ||
block_cache == nullptr /* no block cache at all */) {
return {nullptr /* filter */, nullptr /* cache handle */};
}
// we have a pinned filter block
if (rep_->filter_entry.IsSet()) {
return rep_->filter_entry;
}
PERF_TIMER_GUARD(read_filter_block_nanos);
// Fetching from the cache
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
@ -935,12 +968,19 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
}
InternalIterator* BlockBasedTable::NewIndexIterator(
const ReadOptions& read_options, BlockIter* input_iter) {
const ReadOptions& read_options, BlockIter* input_iter,
CachableEntry<IndexReader>* index_entry) {
// index reader has already been pre-populated.
if (rep_->index_reader) {
return rep_->index_reader->NewIterator(
input_iter, read_options.total_order_seek);
}
// we have a pinned index block
if (rep_->index_entry.IsSet()) {
return rep_->index_entry.value->NewIterator(input_iter,
read_options.total_order_seek);
}
PERF_TIMER_GUARD(read_index_block_nanos);
bool no_io = read_options.read_tier == kBlockCacheTier;
@ -996,7 +1036,15 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
assert(cache_handle);
auto* iter = index_reader->NewIterator(
input_iter, read_options.total_order_seek);
// the caller would like to take ownership of the index block
// don't call RegisterCleanup() in this case, the caller will take care of it
if (index_entry != nullptr) {
*index_entry = {index_reader, cache_handle};
} else {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle);
}
return iter;
}
@ -1224,7 +1272,13 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL);
}
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
return may_match;
}
@ -1324,7 +1378,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
}
}
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
return s;
}
@ -1612,6 +1671,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
return s;
}
void BlockBasedTable::Close() {
rep_->filter_entry.Release(rep_->table_options.block_cache.get());
rep_->index_entry.Release(rep_->table_options.block_cache.get());
}
Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
out_file->Append(
"Index Details:\n"

@ -76,7 +76,7 @@ class BlockBasedTable : public TableReader {
unique_ptr<RandomAccessFileReader>&& file,
uint64_t file_size, unique_ptr<TableReader>* table_reader,
bool prefetch_index_and_filter = true,
bool skip_filters = false);
bool skip_filters = false, int level = -1);
bool PrefixMayMatch(const Slice& internal_key);
@ -119,6 +119,8 @@ class BlockBasedTable : public TableReader {
// convert SST file to a human readable form
Status DumpTable(WritableFile* out_file) override;
void Close() override;
~BlockBasedTable();
bool TEST_filter_block_preloaded() const;
@ -155,8 +157,9 @@ class BlockBasedTable : public TableReader {
// 2. index is not present in block cache.
// 3. We disallowed any io to be performed, that is, read_options ==
// kBlockCacheTier
InternalIterator* NewIndexIterator(const ReadOptions& read_options,
BlockIter* input_iter = nullptr);
InternalIterator* NewIndexIterator(
const ReadOptions& read_options, BlockIter* input_iter = nullptr,
CachableEntry<IndexReader>* index_entry = nullptr);
// Read block cache from block caches (if set): block_cache and
// block_cache_compressed.

@ -29,17 +29,20 @@ struct TableReaderOptions {
TableReaderOptions(const ImmutableCFOptions& _ioptions,
const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator,
bool _skip_filters = false)
bool _skip_filters = false, int _level = -1)
: ioptions(_ioptions),
env_options(_env_options),
internal_comparator(_internal_comparator),
skip_filters(_skip_filters) {}
skip_filters(_skip_filters),
level(_level) {}
const ImmutableCFOptions& ioptions;
const EnvOptions& env_options;
const InternalKeyComparator& internal_comparator;
// This is only used for BlockBasedTable (reader)
bool skip_filters;
// what level this table/file is on, -1 for "not set, don't know"
int level;
};
struct TableBuilderOptions {

@ -91,6 +91,8 @@ class TableReader {
virtual Status DumpTable(WritableFile* out_file) {
return Status::NotSupported("DumpTable() not supported");
}
virtual void Close() {}
};
} // namespace rocksdb

@ -333,6 +333,8 @@ class TableConstructor: public Constructor {
return convert_to_internal_key_;
}
void ResetTableReader() { table_reader_.reset(); }
private:
void Reset() {
uniq_id_ = 0;
@ -1017,6 +1019,7 @@ TEST_F(BlockBasedTableTest, BasicBlockBasedTableProperties) {
}
Slice content = block_builder.Finish();
ASSERT_EQ(content.size() + kBlockTrailerSize, props.data_size);
c.ResetTableReader();
}
TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) {
@ -1034,6 +1037,7 @@ TEST_F(BlockBasedTableTest, FilterPolicyNameProperties) {
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
auto& props = *c.GetTableReader()->GetTableProperties();
ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name);
c.ResetTableReader();
}
//
@ -1075,6 +1079,7 @@ void PrefetchRange(TableConstructor* c, Options* opt,
// assert our expectation in cache warmup
AssertKeysInCache(table_reader, keys_in_cache, keys_not_in_cache);
c->ResetTableReader();
}
TEST_F(BlockBasedTableTest, PrefetchTest) {
@ -1102,6 +1107,7 @@ TEST_F(BlockBasedTableTest, PrefetchTest) {
stl_wrappers::KVMap kvmap;
const ImmutableCFOptions ioptions(opt);
c.Finish(opt, ioptions, table_options, *ikc, &keys, &kvmap);
c.ResetTableReader();
// We get the following data spread :
//
@ -1157,6 +1163,7 @@ TEST_F(BlockBasedTableTest, PrefetchTest) {
PrefetchRange(&c, &opt, &table_options, keys,
"k06", "k00", {}, {},
Status::InvalidArgument(Slice("k06 "), Slice("k07")));
c.ResetTableReader();
}
TEST_F(BlockBasedTableTest, TotalOrderSeekOnHashIndex) {
@ -1400,6 +1407,7 @@ TEST_F(TableTest, HashIndexTest) {
ASSERT_TRUE(BytewiseComparator()->Compare(prefix, ukey_prefix) < 0);
}
}
c.ResetTableReader();
}
// It's very hard to figure out the index block size of a block accurately.
@ -1440,6 +1448,7 @@ TEST_F(BlockBasedTableTest, IndexSizeStat) {
auto index_size = c.GetTableReader()->GetTableProperties()->index_size;
ASSERT_GT(index_size, last_index_size);
last_index_size = index_size;
c.ResetTableReader();
}
}
@ -1466,6 +1475,7 @@ TEST_F(BlockBasedTableTest, NumBlockStat) {
GetPlainInternalComparator(options.comparator), &ks, &kvmap);
ASSERT_EQ(kvmap.size(),
c.GetTableReader()->GetTableProperties()->num_data_blocks);
c.ResetTableReader();
}
// A simple tool that takes the snapshot of block cache statistics.
@ -1662,6 +1672,8 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
// release the iterator so that the block cache can reset correctly.
iter.reset();
c.ResetTableReader();
// -- PART 2: Open with very small block cache
// In this test, no block will ever get hit since the block cache is
// too small to fit even one entry.
@ -1702,6 +1714,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
ASSERT_EQ(props.GetCacheBytesRead(), 0);
}
iter.reset();
c.ResetTableReader();
// -- PART 3: Open table with bloom filter enabled but not in SST file
table_options.block_cache = NewLRUCache(4096);
@ -1716,6 +1729,8 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
// Generate table without filter policy
c3.Finish(options, ioptions3, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
c3.ResetTableReader();
// Open table with filter policy
table_options.filter_policy.reset(NewBloomFilterPolicy(1));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
@ -1732,6 +1747,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) {
ASSERT_EQ(value, "hello");
BlockCachePropertiesSnapshot props(options.statistics.get());
props.AssertFilterBlockStat(0, 0);
c3.ResetTableReader();
}
void ValidateBlockSizeDeviation(int value, int expected) {
@ -1894,6 +1910,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) {
for (const std::string& key : keys) {
ASSERT_TRUE(table_reader->TEST_KeyInCache(ReadOptions(), key));
}
c.ResetTableReader();
// rerun with different block cache
table_options.block_cache = NewLRUCache(16 * 1024 * 1024);
@ -1904,6 +1921,7 @@ TEST_F(BlockBasedTableTest, BlockCacheLeak) {
for (const std::string& key : keys) {
ASSERT_TRUE(!table_reader->TEST_KeyInCache(ReadOptions(), key));
}
c.ResetTableReader();
}
// Plain table is not supported in ROCKSDB_LITE
@ -1991,6 +2009,7 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfPlain) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"), 510000, 511000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"), 510000, 511000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000));
c.ResetTableReader();
}
static void DoCompressionTest(CompressionType comp) {
@ -2017,6 +2036,7 @@ static void DoCompressionTest(CompressionType comp) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k03"), 2000, 3000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k04"), 2000, 3000));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6100));
c.ResetTableReader();
}
TEST_F(GeneralTableTest, ApproximateOffsetOfCompressed) {
@ -2342,6 +2362,7 @@ TEST_P(IndexBlockRestartIntervalTest, IndexBlockRestartInterval) {
kv_iter++;
}
ASSERT_EQ(kv_iter, kvmap.end());
c.ResetTableReader();
}
class PrefixTest : public testing::Test {

@ -74,6 +74,7 @@ const_params="
--level_compaction_dynamic_level_bytes=true \
--bytes_per_sync=$((8 * M)) \
--cache_index_and_filter_blocks=0 \
--pin_l0_filter_and_index_blocks_in_cache=1 \
--benchmark_write_rate_limit=$(( 1024 * 1024 * $mb_written_per_sec )) \
\
--hard_rate_limit=3 \

@ -340,6 +340,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed"
DEFINE_bool(cache_index_and_filter_blocks, false,
"Cache index/filter blocks in block cache.");
DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
"Pin index/filter blocks of L0 files in block cache.");
DEFINE_int32(block_size,
static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
"Number of bytes in a block.");
@ -2511,6 +2514,8 @@ class Benchmark {
}
block_based_options.cache_index_and_filter_blocks =
FLAGS_cache_index_and_filter_blocks;
block_based_options.pin_l0_filter_and_index_blocks_in_cache =
FLAGS_pin_l0_filter_and_index_blocks_in_cache;
block_based_options.block_cache = cache_;
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.block_size = FLAGS_block_size;

@ -59,12 +59,14 @@ class SstFileReader {
bool output_hex_;
EnvOptions soptions_;
Status init_result_;
unique_ptr<TableReader> table_reader_;
unique_ptr<RandomAccessFileReader> file_;
// options_ and internal_comparator_ will also be used in
// ReadSequential internally (specifically, seek-related operations)
Options options_;
Status init_result_;
unique_ptr<TableReader> table_reader_;
unique_ptr<RandomAccessFileReader> file_;
const ImmutableCFOptions ioptions_;
InternalKeyComparator internal_comparator_;
unique_ptr<TableProperties> table_properties_;

@ -225,6 +225,8 @@ class LRUCache {
void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe);
void EraseUnRefEntries();
private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* e);
@ -280,6 +282,29 @@ bool LRUCache::Unref(LRUHandle* e) {
// Call deleter and free
void LRUCache::EraseUnRefEntries() {
autovector<LRUHandle*> last_reference_list;
{
MutexLock l(&mutex_);
while (lru_.next != &lru_) {
LRUHandle* old = lru_.next;
assert(old->in_cache);
assert(old->refs ==
1); // LRU list contains elements which may be evicted
LRU_Remove(old);
table_.Remove(old->key(), old->hash);
old->in_cache = false;
Unref(old);
usage_ -= old->charge;
last_reference_list.push_back(old);
}
}
for (auto entry : last_reference_list) {
entry->Free();
}
}
void LRUCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) {
if (thread_safe) {
@ -615,6 +640,13 @@ class ShardedLRUCache : public Cache {
shards_[s].ApplyToAllCacheEntries(callback, thread_safe);
}
}
virtual void EraseUnRefEntries() override {
int num_shards = 1 << num_shard_bits_;
for (int s = 0; s < num_shards; s++) {
shards_[s].EraseUnRefEntries();
}
}
};
} // end anonymous namespace

@ -479,21 +479,28 @@ static std::unordered_map<std::string, OptionTypeInfo> cf_options_type_info = {
{offsetof(struct ColumnFamilyOptions, compaction_style),
OptionType::kCompactionStyle, OptionVerificationType::kNormal}}};
static std::unordered_map<std::string,
OptionTypeInfo> block_based_table_type_info = {
static std::unordered_map<std::string, OptionTypeInfo>
block_based_table_type_info = {
/* currently not supported
std::shared_ptr<Cache> block_cache = nullptr;
std::shared_ptr<Cache> block_cache_compressed = nullptr;
*/
{"flush_block_policy_factory",
{offsetof(struct BlockBasedTableOptions, flush_block_policy_factory),
OptionType::kFlushBlockPolicyFactory, OptionVerificationType::kByName}},
OptionType::kFlushBlockPolicyFactory,
OptionVerificationType::kByName}},
{"cache_index_and_filter_blocks",
{offsetof(struct BlockBasedTableOptions, cache_index_and_filter_blocks),
{offsetof(struct BlockBasedTableOptions,
cache_index_and_filter_blocks),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"pin_l0_filter_and_index_blocks_in_cache",
{offsetof(struct BlockBasedTableOptions,
pin_l0_filter_and_index_blocks_in_cache),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"index_type",
{offsetof(struct BlockBasedTableOptions, index_type),
OptionType::kBlockBasedTableIndexType, OptionVerificationType::kNormal}},
OptionType::kBlockBasedTableIndexType,
OptionVerificationType::kNormal}},
{"hash_index_allow_collision",
{offsetof(struct BlockBasedTableOptions, hash_index_allow_collision),
OptionType::kBoolean, OptionVerificationType::kNormal}},
@ -504,8 +511,8 @@ static std::unordered_map<std::string,
{offsetof(struct BlockBasedTableOptions, no_block_cache),
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"block_size",
{offsetof(struct BlockBasedTableOptions, block_size), OptionType::kSizeT,
OptionVerificationType::kNormal}},
{offsetof(struct BlockBasedTableOptions, block_size),
OptionType::kSizeT, OptionVerificationType::kNormal}},
{"block_size_deviation",
{offsetof(struct BlockBasedTableOptions, block_size_deviation),
OptionType::kInt, OptionVerificationType::kNormal}},

@ -1602,7 +1602,9 @@ TEST_F(OptionsParserTest, BlockBasedTableOptionsAllFieldsSettable) {
// Need to update the option string if a new option is added.
ASSERT_OK(GetBlockBasedTableOptionsFromString(
*bbto,
"cache_index_and_filter_blocks=1;index_type=kHashSearch;"
"cache_index_and_filter_blocks=1;"
"pin_l0_filter_and_index_blocks_in_cache=1;"
"index_type=kHashSearch;"
"checksum=kxxHash;hash_index_allow_collision=1;no_block_cache=1;"
"block_cache=1M;block_cache_compressed=1k;block_size=1024;"
"block_size_deviation=8;block_restart_interval=4; "

@ -193,6 +193,7 @@ const SliceTransform* RandomSliceTransform(Random* rnd, int pre_defined) {
BlockBasedTableOptions RandomBlockBasedTableOptions(Random* rnd) {
BlockBasedTableOptions opt;
opt.cache_index_and_filter_blocks = rnd->Uniform(2);
opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2);
opt.index_type = rnd->Uniform(2) ? BlockBasedTableOptions::kBinarySearch
: BlockBasedTableOptions::kHashSearch;
opt.hash_index_allow_collision = rnd->Uniform(2);

Loading…
Cancel
Save