Add the index/filter block cache

Summary: This diff leverage the existing block cache and extend it to cache index/filter block.

Test Plan:
Added new tests in db_test and table_test

The correctness is checked by:

1. make check
2. make valgrind_check

Performance is test by:

1. 10 times of build_tools/regression_build_test.sh on two versions of rocksdb before/after the code change. Test results suggests no significant difference between them. For the two key operatons `overwrite` and `readrandom`, the average iops are both 20k and ~260k, with very small variance).
2. db_stress.

Reviewers: dhruba

Reviewed By: dhruba

CC: leveldb, haobo, xjin

Differential Revision: https://reviews.facebook.net/D13167
main
Kai Liu 11 years ago
parent aed9f1fa5e
commit 88ba331c1a
  1. 108
      db/db_test.cc
  2. 3
      include/rocksdb/options.h
  3. 32
      include/rocksdb/statistics.h
  4. 813
      table/block_based_table_reader.cc
  5. 49
      table/block_based_table_reader.h
  6. 6
      table/filter_block.cc
  7. 9
      table/filter_block.h
  8. 187
      table/table_test.cc
  9. 10
      tools/db_stress.cc

@ -698,6 +698,63 @@ TEST(DBTest, ReadWrite) {
} while (ChangeOptions()); } while (ChangeOptions());
} }
// Make sure that when options.block_cache is set, after a new table is
// created its index/filter blocks are added to block cache.
TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
Options options = CurrentOptions();
std::unique_ptr<const FilterPolicy> filter_policy(NewBloomFilterPolicy(20));
options.filter_policy = filter_policy.get();
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(&options);
ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
// Create a new talbe.
dbfull()->Flush(FlushOptions());
// index/filter blocks added to block cache right after table creation.
ASSERT_EQ(1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(2, /* only index/filter were added */
options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD));
ASSERT_EQ(0,
options.statistics.get()->getTickerCount(BLOCK_CACHE_DATA_MISS));
// Make sure filter block is in cache.
std::string value;
ReadOptions ropt;
db_->KeyMayExist(ReadOptions(), "key", &value);
// Miss count should remain the same.
ASSERT_EQ(1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT));
db_->KeyMayExist(ReadOptions(), "key", &value);
ASSERT_EQ(1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(2,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT));
// Make sure index block is in cache.
auto index_block_hit =
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT);
value = Get("key");
ASSERT_EQ(1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(index_block_hit + 1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT));
value = Get("key");
ASSERT_EQ(1,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(index_block_hit + 2,
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT));
}
static std::string Key(int i) { static std::string Key(int i) {
char buf[100]; char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i); snprintf(buf, sizeof(buf), "key%06d", i);
@ -768,6 +825,7 @@ TEST(DBTest, PutDeleteGet) {
} while (ChangeOptions()); } while (ChangeOptions());
} }
TEST(DBTest, GetFromImmutableLayer) { TEST(DBTest, GetFromImmutableLayer) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -917,43 +975,46 @@ TEST(DBTest, KeyMayExist) {
value.clear(); value.clear();
long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
long cache_miss = long cache_added =
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD);
ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found));
ASSERT_TRUE(!value_found); ASSERT_TRUE(!value_found);
// assert that no new files were opened and no new blocks were // assert that no new files were opened and no new blocks were
// read into block cache. // read into block cache.
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss, ASSERT_EQ(cache_added,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD));
ASSERT_OK(db_->Delete(WriteOptions(), "a")); ASSERT_OK(db_->Delete(WriteOptions(), "a"));
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); cache_added =
options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD);
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss, ASSERT_EQ(cache_added,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD));
dbfull()->Flush(FlushOptions()); dbfull()->Flush(FlushOptions());
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(nullptr, nullptr);
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); cache_added =
options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD);
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss, ASSERT_EQ(cache_added,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD));
ASSERT_OK(db_->Delete(WriteOptions(), "c")); ASSERT_OK(db_->Delete(WriteOptions(), "c"));
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); cache_added =
options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD);
ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value)); ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value));
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss, ASSERT_EQ(cache_added,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD));
delete options.filter_policy; delete options.filter_policy;
} while (ChangeOptions()); } while (ChangeOptions());
@ -987,8 +1048,8 @@ TEST(DBTest, NonBlockingIteration) {
// verify that a non-blocking iterator does not find any // verify that a non-blocking iterator does not find any
// kvs. Neither does it do any IOs to storage. // kvs. Neither does it do any IOs to storage.
long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
long cache_miss = long cache_added =
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts); iter = db_->NewIterator(non_blocking_opts);
count = 0; count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -997,8 +1058,8 @@ TEST(DBTest, NonBlockingIteration) {
ASSERT_EQ(count, 0); ASSERT_EQ(count, 0);
ASSERT_TRUE(iter->status().IsIncomplete()); ASSERT_TRUE(iter->status().IsIncomplete());
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss, ASSERT_EQ(cache_added,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD));
delete iter; delete iter;
// read in the specified block via a regular get // read in the specified block via a regular get
@ -1006,7 +1067,8 @@ TEST(DBTest, NonBlockingIteration) {
// verify that we can find it via a non-blocking scan // verify that we can find it via a non-blocking scan
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); cache_added =
options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts); iter = db_->NewIterator(non_blocking_opts);
count = 0; count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -1015,8 +1077,8 @@ TEST(DBTest, NonBlockingIteration) {
} }
ASSERT_EQ(count, 1); ASSERT_EQ(count, 1);
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss, ASSERT_EQ(cache_added,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD));
delete iter; delete iter;
} while (ChangeOptions()); } while (ChangeOptions());
@ -3534,7 +3596,7 @@ TEST(DBTest, BloomFilter) {
env_->count_random_reads_ = true; env_->count_random_reads_ = true;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
options.block_cache = NewLRUCache(0); // Prevent cache hits options.no_block_cache = true;
options.filter_policy = NewBloomFilterPolicy(10); options.filter_policy = NewBloomFilterPolicy(10);
Reopen(&options); Reopen(&options);
@ -4128,7 +4190,7 @@ TEST(DBTest, ReadCompaction) {
options.write_buffer_size = 64 * 1024; options.write_buffer_size = 64 * 1024;
options.filter_policy = nullptr; options.filter_policy = nullptr;
options.block_size = 4096; options.block_size = 4096;
options.block_cache = NewLRUCache(0); // Prevent cache hits options.no_block_cache = true;
Reopen(&options); Reopen(&options);
@ -4708,7 +4770,7 @@ TEST(DBTest, PrefixScan) {
env_->count_random_reads_ = true; env_->count_random_reads_ = true;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
options.block_cache = NewLRUCache(0); // Prevent cache hits options.no_block_cache = true;
options.filter_policy = NewBloomFilterPolicy(10); options.filter_policy = NewBloomFilterPolicy(10);
options.prefix_extractor = prefix_extractor; options.prefix_extractor = prefix_extractor;
options.whole_key_filtering = false; options.whole_key_filtering = false;

@ -653,7 +653,8 @@ struct ReadOptions {
// Default: false // Default: false
bool verify_checksums; bool verify_checksums;
// Should the data read for this iteration be cached in memory? // Should the "data block"/"index block"/"filter block" read for this
// iteration be cached in memory?
// Callers may wish to set this field to false for bulk scans. // Callers may wish to set this field to false for bulk scans.
// Default: true // Default: true
bool fill_cache; bool fill_cache;

@ -23,9 +23,32 @@ namespace rocksdb {
* And incrementing TICKER_ENUM_MAX. * And incrementing TICKER_ENUM_MAX.
*/ */
enum Tickers { enum Tickers {
// total block cache misses
// REQUIRES: BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS +
// BLOCK_CACHE_FILTER_MISS +
// BLOCK_CACHE_DATA_MISS;
BLOCK_CACHE_MISS, BLOCK_CACHE_MISS,
// total block cache hit
// REQUIRES: BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT +
// BLOCK_CACHE_FILTER_HIT +
// BLOCK_CACHE_DATA_HIT;
BLOCK_CACHE_HIT, BLOCK_CACHE_HIT,
BLOOM_FILTER_USEFUL, // no. of times bloom filter has avoided file reads. // # of blocks added to block cache.
BLOCK_CACHE_ADD,
// # of times cache miss when accessing index block from block cache.
BLOCK_CACHE_INDEX_MISS,
// # of times cache hit when accessing index block from block cache.
BLOCK_CACHE_INDEX_HIT,
// # of times cache miss when accessing filter block from block cache.
BLOCK_CACHE_FILTER_MISS,
// # of times cache hit when accessing filter block from block cache.
BLOCK_CACHE_FILTER_HIT,
// # of times cache miss when accessing data block from block cache.
BLOCK_CACHE_DATA_MISS,
// # of times cache hit when accessing data block from block cache.
BLOCK_CACHE_DATA_HIT,
// # of times bloom filter has avoided file reads.
BLOOM_FILTER_USEFUL,
/** /**
* COMPACTION_KEY_DROP_* count the reasons for key drop during compaction * COMPACTION_KEY_DROP_* count the reasons for key drop during compaction
@ -93,6 +116,13 @@ enum Tickers {
const std::vector<std::pair<Tickers, std::string>> TickersNameMap = { const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{ BLOCK_CACHE_MISS, "rocksdb.block.cache.miss" }, { BLOCK_CACHE_MISS, "rocksdb.block.cache.miss" },
{ BLOCK_CACHE_HIT, "rocksdb.block.cache.hit" }, { BLOCK_CACHE_HIT, "rocksdb.block.cache.hit" },
{ BLOCK_CACHE_ADD, "rocksdb.block.cache.add" },
{ BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss" },
{ BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit" },
{ BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss" },
{ BLOCK_CACHE_FILTER_HIT, "rocksdb.block.cache.filter.hit" },
{ BLOCK_CACHE_DATA_MISS, "rocksdb.block.cache.data.miss" },
{ BLOCK_CACHE_DATA_HIT, "rocksdb.block.cache.data.hit" },
{ BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful" }, { BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful" },
{ COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new" }, { COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new" },
{ COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete" }, { COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete" },

@ -33,13 +33,9 @@ namespace rocksdb {
// We are using the fact that we know for Posix files the unique ID is three // We are using the fact that we know for Posix files the unique ID is three
// varints. // varints.
const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1;
using std::unique_ptr;
struct BlockBasedTable::Rep { struct BlockBasedTable::Rep {
~Rep() {
delete filter;
delete [] filter_data;
delete index_block;
}
Rep(const EnvOptions& storage_options) : Rep(const EnvOptions& storage_options) :
soptions(storage_options) { soptions(storage_options) {
} }
@ -52,11 +48,16 @@ struct BlockBasedTable::Rep {
size_t cache_key_prefix_size; size_t cache_key_prefix_size;
char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size; size_t compressed_cache_key_prefix_size;
FilterBlockReader* filter;
const char* filter_data;
BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer // Handle to metaindex_block: saved from footer
Block* index_block; BlockHandle metaindex_handle;
// Handle to index: saved from footer
BlockHandle index_handle;
// index_block will be populated and used only when options.block_cache is
// NULL; otherwise we will get the index block via the block cache.
unique_ptr<Block> index_block;
unique_ptr<FilterBlockReader> filter;
TableStats table_stats; TableStats table_stats;
}; };
@ -64,6 +65,30 @@ BlockBasedTable::~BlockBasedTable() {
delete rep_; delete rep_;
} }
// CachableEntry represents the entries that *may* be fetched from block cache.
// field `value` is the item we want to get.
// field `cache_handle` is the cache handle to the block cache. If the value
// was not read from cache, `cache_handle` will be nullptr.
template <class TValue>
struct BlockBasedTable::CachableEntry {
CachableEntry(TValue* value, Cache::Handle* cache_handle)
: value(value)
, cache_handle(cache_handle) {
}
CachableEntry(): CachableEntry(nullptr, nullptr) { }
void Release(Cache* cache) {
if (cache_handle) {
cache->Release(cache_handle);
value = nullptr;
cache_handle = nullptr;
}
}
TValue* value = nullptr;
// if the entry is from the cache, cache_handle will be populated.
Cache::Handle* cache_handle = nullptr;
};
// Helper function to setup the cache key's prefix for the Table. // Helper function to setup the cache key's prefix for the Table.
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) {
assert(kMaxCacheKeyPrefixSize >= 10); assert(kMaxCacheKeyPrefixSize >= 10);
@ -116,13 +141,14 @@ namespace { // anonymous namespace, not visible externally
// Set *didIO to true if didIO is not null. // Set *didIO to true if didIO is not null.
// On failure return non-OK. // On failure return non-OK.
// On success fill *result and return OK - caller owns *result // On success fill *result and return OK - caller owns *result
Status ReadBlock(RandomAccessFile* file, Status ReadBlockFromFile(
const ReadOptions& options, RandomAccessFile* file,
const BlockHandle& handle, const ReadOptions& options,
Block** result, const BlockHandle& handle,
Env* env, Block** result,
bool* didIO = nullptr, Env* env,
bool do_uncompress = true) { bool* didIO = nullptr,
bool do_uncompress = true) {
BlockContents contents; BlockContents contents;
Status s = ReadBlockContents(file, options, handle, &contents, Status s = ReadBlockContents(file, options, handle, &contents,
env, do_uncompress); env, do_uncompress);
@ -136,6 +162,62 @@ Status ReadBlock(RandomAccessFile* file,
return s; return s;
} }
void DeleteBlock(void* arg, void* ignored) {
delete reinterpret_cast<Block*>(arg);
}
void DeleteCachedBlock(const Slice& key, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
}
void DeleteCachedFilter(const Slice& key, void* value) {
auto filter = reinterpret_cast<FilterBlockReader*>(value);
delete filter;
}
void ReleaseBlock(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}
Slice GetCacheKey(const char* cache_key_prefix,
size_t cache_key_prefix_size,
const BlockHandle& handle,
char* cache_key) {
assert(cache_key != nullptr);
assert(cache_key_prefix_size != 0);
assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
memcpy(cache_key, cache_key_prefix, cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + cache_key_prefix_size,
handle.offset());
return Slice(cache_key, static_cast<size_t>(end - cache_key));
}
Cache::Handle* GetFromBlockCache(
Cache* block_cache,
const Slice& key,
Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker,
std::shared_ptr<Statistics> statistics) {
auto cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
BumpPerfCount(&perf_context.block_cache_hit_count);
// overall cache hit
RecordTick(statistics, BLOCK_CACHE_HIT);
// block-type specific cache hit
RecordTick(statistics, block_cache_hit_ticker);
} else {
// overall cache miss
RecordTick(statistics, BLOCK_CACHE_MISS);
// block-type specific cache miss
RecordTick(statistics, block_cache_miss_ticker);
}
return cache_handle;
}
} // end of anonymous namespace } // end of anonymous namespace
Status BlockBasedTable::Open(const Options& options, Status BlockBasedTable::Open(const Options& options,
@ -164,27 +246,85 @@ Status BlockBasedTable::Open(const Options& options,
s = footer.DecodeFrom(&footer_input); s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s; if (!s.ok()) return s;
Block* index_block = nullptr; // We've successfully read the footer and the index block: we're
// TODO: we never really verify check sum for index block // ready to serve requests.
s = ReadBlock(file.get(), ReadOptions(), footer.index_handle(), &index_block, Rep* rep = new BlockBasedTable::Rep(soptions);
options.env); rep->options = options;
rep->file = std::move(file);
rep->metaindex_handle = footer.metaindex_handle();
rep->index_handle = footer.index_handle();
SetupCacheKeyPrefix(rep);
unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep));
// Read meta index
std::unique_ptr<Block> meta;
std::unique_ptr<Iterator> meta_iter;
s = ReadMetaBlock(rep, &meta, &meta_iter);
// Read the stats
meta_iter->Seek(kStatsBlock);
if (meta_iter->Valid() && meta_iter->key() == Slice(kStatsBlock)) {
s = meta_iter->status();
if (s.ok()) {
s = ReadStats(meta_iter->value(), rep, &rep->table_stats);
}
if (s.ok()) { if (!s.ok()) {
// We've successfully read the footer and the index block: we're auto err_msg =
// ready to serve requests. "[Warning] Encountered error while reading data from stats block " +
assert(index_block->compressionType() == kNoCompression); s.ToString();
BlockBasedTable::Rep* rep = new BlockBasedTable::Rep(soptions); Log(rep->options.info_log, err_msg.c_str());
rep->options = options; }
rep->file = std::move(file); }
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block; // Initialize index/filter blocks. If block cache is not specified,
SetupCacheKeyPrefix(rep); // these blocks will be kept in member variables in Rep, which will
rep->filter_data = nullptr; // reside in the memory as long as this table object is alive; otherwise
rep->filter = nullptr; // they will be added to block cache.
table_reader->reset(new BlockBasedTable(rep)); if (!options.block_cache) {
((BlockBasedTable*) (table_reader->get()))->ReadMeta(footer); Block* index_block = nullptr;
// TODO: we never really verify check sum for index block
s = ReadBlockFromFile(
rep->file.get(),
ReadOptions(),
footer.index_handle(),
&index_block,
options.env
);
if (s.ok()) {
assert(index_block->compressionType() == kNoCompression);
rep->index_block.reset(index_block);
// Set index block
if (rep->options.filter_policy) {
std::string key = kFilterBlockPrefix;
key.append(rep->options.filter_policy->Name());
meta_iter->Seek(key);
if (meta_iter->Valid() && meta_iter->key() == Slice(key)) {
rep->filter.reset(ReadFilter(meta_iter->value(), rep));
}
}
} else {
delete index_block;
}
} else { } else {
if (index_block) delete index_block; // Call IndexBlockReader() to implicitly add index to the block_cache
unique_ptr<Iterator> iter(
new_table->IndexBlockReader(ReadOptions())
);
s = iter->status();
if (s.ok()) {
// Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry = new_table->GetFilter();
filter_entry.Release(options.block_cache.get());
}
}
if (s.ok()) {
*table_reader = std::move(new_table);
} }
return s; return s;
@ -213,72 +353,71 @@ TableStats& BlockBasedTable::GetTableStats() {
return rep_->table_stats; return rep_->table_stats;
} }
void BlockBasedTable::ReadMeta(const Footer& footer) { // Load the meta-block from the file. On success, return the loaded meta block
// and its iterator.
Status BlockBasedTable::ReadMetaBlock(
Rep* rep,
std::unique_ptr<Block>* meta_block,
std::unique_ptr<Iterator>* iter) {
// TODO(sanjay): Skip this if footer.metaindex_handle() size indicates // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
// it is an empty block. // it is an empty block.
// TODO: we never really verify check sum for meta index block // TODO: we never really verify check sum for meta index block
Block* meta = nullptr; Block* meta = nullptr;
if (!ReadBlock(rep_->file.get(), ReadOptions(), footer.metaindex_handle(), Status s = ReadBlockFromFile(
&meta, rep_->options.env).ok()) { rep->file.get(),
// Do not propagate errors since meta info is not needed for operation ReadOptions(),
return; rep->metaindex_handle,
} &meta,
assert(meta->compressionType() == kNoCompression); rep->options.env);
Iterator* iter = meta->NewIterator(BytewiseComparator());
// read filter
if (rep_->options.filter_policy) {
std::string key = kFilterBlockPrefix;
key.append(rep_->options.filter_policy->Name());
iter->Seek(key);
if (iter->Valid() && iter->key() == Slice(key)) {
ReadFilter(iter->value());
}
}
// read stats
iter->Seek(kStatsBlock);
if (iter->Valid() && iter->key() == Slice(kStatsBlock)) {
auto s = iter->status();
if (s.ok()) {
s = ReadStats(iter->value(), rep_);
}
if (!s.ok()) { if (!s.ok()) {
auto err_msg = auto err_msg =
"[Warning] Encountered error while reading data from stats block " + "[Warning] Encountered error while reading data from stats block " +
s.ToString(); s.ToString();
Log(rep_->options.info_log, "%s", err_msg.c_str()); Log(rep->options.info_log, "%s", err_msg.c_str());
} }
if (!s.ok()) {
delete meta;
return s;
} }
delete iter; meta_block->reset(meta);
delete meta; // meta block uses bytewise comparator.
iter->reset(meta->NewIterator(BytewiseComparator()));
return Status::OK();
} }
void BlockBasedTable::ReadFilter(const Slice& filter_handle_value) { FilterBlockReader* BlockBasedTable::ReadFilter (
const Slice& filter_handle_value,
BlockBasedTable::Rep* rep,
size_t* filter_size) {
Slice v = filter_handle_value; Slice v = filter_handle_value;
BlockHandle filter_handle; BlockHandle filter_handle;
if (!filter_handle.DecodeFrom(&v).ok()) { if (!filter_handle.DecodeFrom(&v).ok()) {
return; return nullptr;
} }
// TODO: We might want to unify with ReadBlock() if we start // TODO: We might want to unify with ReadBlockFromFile() if we start
// requiring checksum verification in BlockBasedTable::Open. // requiring checksum verification in Table::Open.
ReadOptions opt; ReadOptions opt;
BlockContents block; BlockContents block;
if (!ReadBlockContents(rep_->file.get(), opt, filter_handle, &block, if (!ReadBlockContents(rep->file.get(), opt, filter_handle, &block,
rep_->options.env, false).ok()) { rep->options.env, false).ok()) {
return; return nullptr;
} }
if (block.heap_allocated) {
rep_->filter_data = block.data.data(); // Will need to delete later if (filter_size) {
*filter_size = block.data.size();
} }
rep_->filter = new FilterBlockReader(rep_->options, block.data);
return new FilterBlockReader(
rep->options, block.data, block.heap_allocated);
} }
Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { Status BlockBasedTable::ReadStats(
const Slice& handle_value, Rep* rep, TableStats* table_stats) {
assert(table_stats);
Slice v = handle_value; Slice v = handle_value;
BlockHandle handle; BlockHandle handle;
if (!handle.DecodeFrom(&v).ok()) { if (!handle.DecodeFrom(&v).ok()) {
@ -304,15 +443,15 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) {
stats_block.NewIterator(BytewiseComparator()) stats_block.NewIterator(BytewiseComparator())
); );
auto& table_stats = rep->table_stats;
// All pre-defined stats of type uint64_t // All pre-defined stats of type uint64_t
std::unordered_map<std::string, uint64_t*> predefined_uint64_stats = { std::unordered_map<std::string, uint64_t*> predefined_uint64_stats = {
{ BlockBasedTableStatsNames::kDataSize, &table_stats.data_size }, { BlockBasedTableStatsNames::kDataSize, &table_stats->data_size },
{ BlockBasedTableStatsNames::kIndexSize, &table_stats.index_size }, { BlockBasedTableStatsNames::kIndexSize, &table_stats->index_size },
{ BlockBasedTableStatsNames::kRawKeySize, &table_stats.raw_key_size }, { BlockBasedTableStatsNames::kRawKeySize, &table_stats->raw_key_size },
{ BlockBasedTableStatsNames::kRawValueSize, &table_stats.raw_value_size }, { BlockBasedTableStatsNames::kRawValueSize, &table_stats->raw_value_size },
{ BlockBasedTableStatsNames::kNumDataBlocks, &table_stats.num_data_blocks}, { BlockBasedTableStatsNames::kNumDataBlocks,
{ BlockBasedTableStatsNames::kNumEntries, &table_stats.num_entries }, &table_stats->num_data_blocks },
{ BlockBasedTableStatsNames::kNumEntries, &table_stats->num_entries },
}; };
std::string last_key; std::string last_key;
@ -346,11 +485,11 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) {
} }
*(pos->second) = val; *(pos->second) = val;
} else if (key == BlockBasedTableStatsNames::kFilterPolicy) { } else if (key == BlockBasedTableStatsNames::kFilterPolicy) {
table_stats.filter_policy_name = raw_val.ToString(); table_stats->filter_policy_name = raw_val.ToString();
} else { } else {
// handle user-collected // handle user-collected
table_stats.user_collected_stats.insert( table_stats->user_collected_stats.insert(
std::make_pair(iter->key().ToString(), raw_val.ToString()) std::make_pair(key, raw_val.ToString())
); );
} }
} }
@ -358,19 +497,81 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) {
return s; return s;
} }
static void DeleteBlock(void* arg, void* ignored) { Status BlockBasedTable::GetBlock(
delete reinterpret_cast<Block*>(arg); const BlockBasedTable* table,
} const BlockHandle& handle,
const ReadOptions& options,
const bool for_compaction,
const Tickers block_cache_miss_ticker,
const Tickers block_cache_hit_ticker,
bool* didIO,
CachableEntry<Block>* entry) {
bool no_io = options.read_tier == kBlockCacheTier;
Cache* block_cache = table->rep_->options.block_cache.get();
auto statistics = table->rep_->options.statistics;
Status s;
static void DeleteCachedBlock(const Slice& key, void* value) { if (block_cache != nullptr) {
Block* block = reinterpret_cast<Block*>(value); char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
delete block; auto key = GetCacheKey(
} table->rep_->cache_key_prefix,
table->rep_->cache_key_prefix_size,
handle,
cache_key
);
static void ReleaseBlock(void* arg, void* h) { entry->cache_handle = GetFromBlockCache(
Cache* cache = reinterpret_cast<Cache*>(arg); block_cache,
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h); key,
cache->Release(handle); block_cache_miss_ticker,
block_cache_hit_ticker,
table->rep_->options.statistics
);
if (entry->cache_handle != nullptr) {
entry->value =
reinterpret_cast<Block*>(block_cache->Value(entry->cache_handle));
} else if (no_io) {
// Did not find in block_cache and can't do IO
return Status::Incomplete("no blocking io");
} else {
Histograms histogram = for_compaction ?
READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
{
// block for stop watch
StopWatch sw(table->rep_->options.env, statistics, histogram);
s = ReadBlockFromFile(
table->rep_->file.get(),
options,
handle,
&entry->value,
table->rep_->options.env,
didIO
);
}
if (s.ok()) {
if (options.fill_cache && entry->value->isCachable()) {
entry->cache_handle = block_cache->Insert(
key, entry->value, entry->value->size(), &DeleteCachedBlock);
RecordTick(statistics, BLOCK_CACHE_ADD);
}
}
}
} else if (no_io) {
// Could not read from block_cache and can't do IO
return Status::Incomplete("no blocking io");
} else {
s = ReadBlockFromFile(
table->rep_->file.get(),
options,
handle,
&entry->value,
table->rep_->options.env,
didIO
);
}
return s;
} }
// Convert an index iterator value (i.e., an encoded BlockHandle) // Convert an index iterator value (i.e., an encoded BlockHandle)
@ -397,153 +598,160 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
// We intentionally allow extra stuff in index_value so that we // We intentionally allow extra stuff in index_value so that we
// can add more features in the future. // can add more features in the future.
if (s.ok()) { if (!s.ok()) {
if (block_cache != nullptr || block_cache_compressed != nullptr) { return NewErrorIterator(s);
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; }
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
char* end = cache_key;
// create key for block cache
if (block_cache != nullptr) {
assert(table->rep_->cache_key_prefix_size != 0);
assert(table->rep_->cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
memcpy(cache_key, table->rep_->cache_key_prefix,
table->rep_->cache_key_prefix_size);
end = EncodeVarint64(cache_key + table->rep_->cache_key_prefix_size,
handle.offset());
}
Slice key(cache_key, static_cast<size_t>(end - cache_key));
// create key for compressed block cache
end = compressed_cache_key;
if (block_cache_compressed != nullptr) {
assert(table->rep_->compressed_cache_key_prefix_size != 0);
assert(table->rep_->compressed_cache_key_prefix_size <=
kMaxCacheKeyPrefixSize);
memcpy(compressed_cache_key, table->rep_->compressed_cache_key_prefix,
table->rep_->compressed_cache_key_prefix_size);
end = EncodeVarint64(compressed_cache_key +
table->rep_->compressed_cache_key_prefix_size,
handle.offset());
}
Slice ckey(compressed_cache_key, static_cast<size_t>
(end - compressed_cache_key));
// Lookup uncompressed cache first
if (block_cache != nullptr) {
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
RecordTick(statistics, BLOCK_CACHE_HIT);
}
}
// If not found in uncompressed cache, lookup compressed cache if (block_cache != nullptr || block_cache_compressed != nullptr) {
if (block == nullptr && block_cache_compressed != nullptr) { char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
compressed_cache_handle = block_cache_compressed->Lookup(ckey); char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key, /* key to the block cache */
ckey /* key to the compressed block cache */ ;
// create key for block cache
if (block_cache != nullptr) {
key = GetCacheKey(
table->rep_->cache_key_prefix,
table->rep_->cache_key_prefix_size,
handle,
cache_key
);
}
// if we found in the compressed cache, then uncompress and if (block_cache_compressed != nullptr) {
// insert into uncompressed cache ckey = GetCacheKey(
if (compressed_cache_handle != nullptr) { table->rep_->compressed_cache_key_prefix,
// found compressed block table->rep_->compressed_cache_key_prefix_size,
cblock = reinterpret_cast<Block*>(block_cache_compressed-> handle,
Value(compressed_cache_handle)); compressed_cache_key
assert(cblock->compressionType() != kNoCompression); );
}
// Retrieve the uncompressed contents into a new buffer // Lookup uncompressed cache first
BlockContents contents; if (block_cache != nullptr) {
s = UncompressBlockContents(cblock->data(), cblock->size(), assert(!key.empty());
&contents); cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
RecordTick(statistics, BLOCK_CACHE_HIT);
RecordTick(statistics, BLOCK_CACHE_DATA_HIT);
} else {
RecordTick(statistics, BLOCK_CACHE_MISS);
RecordTick(statistics, BLOCK_CACHE_DATA_MISS);
}
}
// Insert uncompressed block into block cache // If not found in uncompressed cache, lookup compressed cache
if (s.ok()) { if (block == nullptr && block_cache_compressed != nullptr) {
block = new Block(contents); // uncompressed block assert(!ckey.empty());
assert(block->compressionType() == kNoCompression); compressed_cache_handle = block_cache_compressed->Lookup(ckey);
if (block_cache != nullptr && block->isCachable() &&
options.fill_cache) { // if we found in the compressed cache, then uncompress and
cache_handle = block_cache->Insert(key, block, block->size(), // insert into uncompressed cache
&DeleteCachedBlock); if (compressed_cache_handle != nullptr) {
assert(reinterpret_cast<Block*>(block_cache->Value(cache_handle)) // found compressed block
== block); cblock = reinterpret_cast<Block*>(block_cache_compressed->
} Value(compressed_cache_handle));
assert(cblock->compressionType() != kNoCompression);
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
s = UncompressBlockContents(cblock->data(), cblock->size(),
&contents);
// Insert uncompressed block into block cache
if (s.ok()) {
block = new Block(contents); // uncompressed block
assert(block->compressionType() == kNoCompression);
if (block_cache != nullptr && block->isCachable() &&
options.fill_cache) {
cache_handle = block_cache->Insert(key, block, block->size(),
&DeleteCachedBlock);
assert(reinterpret_cast<Block*>(block_cache->Value(cache_handle))
== block);
} }
// Release hold on compressed cache entry
block_cache_compressed->Release(compressed_cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT);
} }
// Release hold on compressed cache entry
block_cache_compressed->Release(compressed_cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT);
} }
}
if (block != nullptr) { if (block != nullptr) {
BumpPerfCount(&perf_context.block_cache_hit_count); BumpPerfCount(&perf_context.block_cache_hit_count);
} else if (no_io) { } else if (no_io) {
// Did not find in block_cache and can't do IO // Did not find in block_cache and can't do IO
return NewErrorIterator(Status::Incomplete("no blocking io")); return NewErrorIterator(Status::Incomplete("no blocking io"));
} else { } else {
Histograms histogram = for_compaction ?
READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
{ // block for stop watch
StopWatch sw(table->rep_->options.env, statistics, histogram);
s = ReadBlockFromFile(
table->rep_->file.get(),
options,
handle,
&cblock,
table->rep_->options.env,
didIO,
block_cache_compressed == nullptr
);
}
if (s.ok()) {
assert(cblock->compressionType() == kNoCompression ||
block_cache_compressed != nullptr);
Histograms histogram = for_compaction ? // Retrieve the uncompressed contents into a new buffer
READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; BlockContents contents;
{ // block for stop watch if (cblock->compressionType() != kNoCompression) {
StopWatch sw(table->rep_->options.env, statistics, histogram); s = UncompressBlockContents(cblock->data(), cblock->size(),
s = ReadBlock( &contents);
table->rep_->file.get(),
options,
handle,
&cblock,
table->rep_->options.env,
didIO,
block_cache_compressed == nullptr
);
} }
if (s.ok()) { if (s.ok()) {
assert(cblock->compressionType() == kNoCompression ||
block_cache_compressed != nullptr);
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
if (cblock->compressionType() != kNoCompression) { if (cblock->compressionType() != kNoCompression) {
s = UncompressBlockContents(cblock->data(), cblock->size(), block = new Block(contents); // uncompressed block
&contents); } else {
block = cblock;
cblock = nullptr;
} }
if (s.ok()) { if (block->isCachable() && options.fill_cache) {
if (cblock->compressionType() != kNoCompression) { // Insert compressed block into compressed block cache.
block = new Block(contents); // uncompressed block // Release the hold on the compressed cache entry immediately.
} else { if (block_cache_compressed != nullptr && cblock != nullptr) {
block = cblock; compressed_cache_handle = block_cache_compressed->Insert(
ckey, cblock, cblock->size(), &DeleteCachedBlock);
block_cache_compressed->Release(compressed_cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
cblock = nullptr; cblock = nullptr;
} }
if (block->isCachable() && options.fill_cache) { // insert into uncompressed block cache
// Insert compressed block into compressed block cache. assert((block->compressionType() == kNoCompression));
// Release the hold on the compressed cache entry immediately. if (block_cache != nullptr) {
if (block_cache_compressed != nullptr && cblock != nullptr) { cache_handle = block_cache->Insert(
compressed_cache_handle = block_cache_compressed->Insert( key, block, block->size(), &DeleteCachedBlock);
ckey, cblock, cblock->size(), &DeleteCachedBlock); RecordTick(statistics, BLOCK_CACHE_ADD);
block_cache_compressed->Release(compressed_cache_handle); assert(reinterpret_cast<Block*>(block_cache->Value(
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); cache_handle))== block);
cblock = nullptr;
}
// insert into uncompressed block cache
assert((block->compressionType() == kNoCompression));
if (block_cache != nullptr) {
cache_handle = block_cache->Insert(
key, block, block->size(), &DeleteCachedBlock);
RecordTick(statistics, BLOCK_CACHE_MISS);
assert(reinterpret_cast<Block*>(block_cache->Value(
cache_handle))== block);
}
} }
} }
} }
if (cblock != nullptr) {
delete cblock;
}
} }
} else if (no_io) { if (cblock != nullptr) {
// Could not read from block_cache and can't do IO delete cblock;
return NewErrorIterator(Status::Incomplete("no blocking io")); }
} else {
s = ReadBlock(table->rep_->file.get(), options, handle, &block,
table->rep_->options.env, didIO);
} }
} else if (no_io) {
// Could not read from block_cache and can't do IO
return NewErrorIterator(Status::Incomplete("no blocking io"));
} else {
s = ReadBlockFromFile(
table->rep_->file.get(),
options,
handle,
&block,
table->rep_->options.env,
didIO
);
} }
Iterator* iter; Iterator* iter;
@ -560,6 +768,102 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
return iter; return iter;
} }
BlockBasedTable::CachableEntry<FilterBlockReader>
BlockBasedTable::GetFilter(bool no_io) const {
if (!rep_->options.filter_policy || !rep_->options.block_cache) {
return {rep_->filter.get(), nullptr};
}
// Fetching from the cache
Cache* block_cache = rep_->options.block_cache.get();
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = GetCacheKey(
rep_->cache_key_prefix,
rep_->cache_key_prefix_size,
rep_->metaindex_handle,
cache_key
);
auto cache_handle = GetFromBlockCache(
block_cache,
key,
BLOCK_CACHE_FILTER_MISS,
BLOCK_CACHE_FILTER_HIT,
rep_->options.statistics
);
FilterBlockReader* filter = nullptr;
if (cache_handle != nullptr) {
filter = reinterpret_cast<FilterBlockReader*>(
block_cache->Value(cache_handle));
} else if (no_io) {
// Do not invoke any io.
return CachableEntry<FilterBlockReader>();
} else {
size_t filter_size = 0;
std::unique_ptr<Block> meta;
std::unique_ptr<Iterator> iter;
auto s = ReadMetaBlock(rep_, &meta, &iter);
if (s.ok()) {
std::string filter_block_key = kFilterBlockPrefix;
filter_block_key.append(rep_->options.filter_policy->Name());
iter->Seek(filter_block_key);
if (iter->Valid() && iter->key() == Slice(filter_block_key)) {
filter = ReadFilter(iter->value(), rep_, &filter_size);
assert(filter);
assert(filter_size > 0);
cache_handle = block_cache->Insert(
key, filter, filter_size, &DeleteCachedFilter);
RecordTick(rep_->options.statistics, BLOCK_CACHE_ADD);
}
}
}
return { filter, cache_handle };
}
// Get the iterator from the index block.
Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const {
if (rep_->index_block) {
assert (!rep_->options.block_cache);
return rep_->index_block->NewIterator(rep_->options.comparator);
}
// get index block from cache
assert (rep_->options.block_cache);
bool didIO = false;
CachableEntry<Block> entry;
auto s = GetBlock(
this,
rep_->index_handle,
options,
false, /* for compaction */
BLOCK_CACHE_INDEX_MISS,
BLOCK_CACHE_INDEX_HIT,
&didIO,
&entry
);
Iterator* iter;
if (entry.value != nullptr) {
iter = entry.value->NewIterator(rep_->options.comparator);
if (entry.cache_handle) {
iter->RegisterCleanup(
&ReleaseBlock, rep_->options.block_cache.get(), entry.cache_handle
);
} else {
iter->RegisterCleanup(&DeleteBlock, entry.value, nullptr);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}
Iterator* BlockBasedTable::BlockReader(void* arg, Iterator* BlockBasedTable::BlockReader(void* arg,
const ReadOptions& options, const ReadOptions& options,
const EnvOptions& soptions, const EnvOptions& soptions,
@ -577,27 +881,32 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
// 2) Compare(prefix(key), key) <= 0. // 2) Compare(prefix(key), key) <= 0.
// 3) If Compare(key1, key2) <= 0, then Compare(prefix(key1), prefix(key2)) <= 0 // 3) If Compare(key1, key2) <= 0, then Compare(prefix(key1), prefix(key2)) <= 0
// //
// TODO(tylerharter): right now, this won't cause I/O since blooms are // Otherwise, this method guarantees no I/O will be incurred.
// in memory. When blooms may need to be paged in, we should refactor so that //
// this is only ever called lazily. In particular, this shouldn't be called // REQUIRES: this method shouldn't be called while the DB lock is held.
// while the DB lock is held like it is now.
bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) { bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) {
FilterBlockReader* filter = rep_->filter;
bool may_match = true; bool may_match = true;
Status s; Status s;
if (filter == nullptr) { if (!rep_->options.filter_policy) {
return true; return true;
} }
std::unique_ptr<Iterator> iiter(rep_->index_block->NewIterator( // To prevent any io operation in this method, we set `read_tier` to make
rep_->options.comparator)); // sure we always read index or filter only when they have already been
// loaded to memory.
ReadOptions no_io_read_options;
no_io_read_options.read_tier = kBlockCacheTier;
unique_ptr<Iterator> iiter(
IndexBlockReader(no_io_read_options)
);
iiter->Seek(internal_prefix); iiter->Seek(internal_prefix);
if (!iiter->Valid()) { if (!iiter->Valid()) {
// we're past end of file // we're past end of file
may_match = false; may_match = false;
} else if (ExtractUserKey(iiter->key()).starts_with( } else if (ExtractUserKey(iiter->key()).starts_with(
ExtractUserKey(internal_prefix))) { ExtractUserKey(internal_prefix))) {
// we need to check for this subtle case because our only // we need to check for this subtle case because our only
// guarantee is that "the key is a string >= last key in that data // guarantee is that "the key is a string >= last key in that data
// block" according to the doc/table_format.txt spec. // block" according to the doc/table_format.txt spec.
@ -619,7 +928,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) {
BlockHandle handle; BlockHandle handle;
s = handle.DecodeFrom(&handle_value); s = handle.DecodeFrom(&handle_value);
assert(s.ok()); assert(s.ok());
may_match = filter->PrefixMayMatch(handle.offset(), internal_prefix); auto filter_entry = GetFilter(true /* no io */);
may_match =
filter_entry.value != nullptr &&
filter_entry.value->PrefixMayMatch(handle.offset(), internal_prefix);
filter_entry.Release(rep_->options.block_cache.get());
} }
RecordTick(rep_->options.statistics, BLOOM_FILTER_PREFIX_CHECKED); RecordTick(rep_->options.statistics, BLOOM_FILTER_PREFIX_CHECKED);
@ -641,9 +954,12 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) {
} }
return NewTwoLevelIterator( return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator), IndexBlockReader(options),
&BlockBasedTable::BlockReader, const_cast<BlockBasedTable*>(this), &BlockBasedTable::BlockReader,
options, rep_->soptions); const_cast<BlockBasedTable*>(this),
options,
rep_->soptions
);
} }
Status BlockBasedTable::Get( Status BlockBasedTable::Get(
@ -654,15 +970,20 @@ Status BlockBasedTable::Get(
const Slice& v, bool didIO), const Slice& v, bool didIO),
void (*mark_key_may_exist_handler)(void* handle_context)) { void (*mark_key_may_exist_handler)(void* handle_context)) {
Status s; Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); Iterator* iiter = IndexBlockReader(readOptions);
auto filter_entry = GetFilter(readOptions.read_tier == kBlockCacheTier);
FilterBlockReader* filter = filter_entry.value;
bool done = false; bool done = false;
for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
Slice handle_value = iiter->value(); Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle; BlockHandle handle;
if (filter != nullptr && bool may_not_exist_in_filter =
handle.DecodeFrom(&handle_value).ok() && filter != nullptr &&
!filter->KeyMayMatch(handle.offset(), key)) { handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), key);
if (may_not_exist_in_filter) {
// Not found // Not found
// TODO: think about interaction with Merge. If a user key cannot // TODO: think about interaction with Merge. If a user key cannot
// cross one data block, we should be fine. // cross one data block, we should be fine.
@ -670,7 +991,7 @@ Status BlockBasedTable::Get(
break; break;
} else { } else {
bool didIO = false; bool didIO = false;
std::unique_ptr<Iterator> block_iter( unique_ptr<Iterator> block_iter(
BlockReader(this, readOptions, iiter->value(), &didIO)); BlockReader(this, readOptions, iiter->value(), &didIO));
if (readOptions.read_tier && block_iter->status().IsIncomplete()) { if (readOptions.read_tier && block_iter->status().IsIncomplete()) {
@ -692,6 +1013,8 @@ Status BlockBasedTable::Get(
s = block_iter->status(); s = block_iter->status();
} }
} }
filter_entry.Release(rep_->options.block_cache.get());
if (s.ok()) { if (s.ok()) {
s = iiter->status(); s = iiter->status();
} }
@ -714,8 +1037,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
} }
uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) {
Iterator* index_iter = Iterator* index_iter = IndexBlockReader(ReadOptions());
rep_->index_block->NewIterator(rep_->options.comparator);
index_iter->Seek(key); index_iter->Seek(key);
uint64_t result; uint64_t result;
if (index_iter->Valid()) { if (index_iter->Valid()) {

@ -13,6 +13,7 @@
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table_stats.h" #include "rocksdb/table_stats.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "util/coding.h" #include "util/coding.h"
@ -27,6 +28,7 @@ class RandomAccessFile;
struct ReadOptions; struct ReadOptions;
class TableCache; class TableCache;
class TableReader; class TableReader;
class FilterBlockReader;
using std::unique_ptr; using std::unique_ptr;
@ -91,6 +93,9 @@ class BlockBasedTable : public TableReader {
~BlockBasedTable(); ~BlockBasedTable();
private: private:
template <class TValue>
struct CachableEntry;
struct Rep; struct Rep;
Rep* rep_; Rep* rep_;
bool compaction_optimized_; bool compaction_optimized_;
@ -98,9 +103,37 @@ class BlockBasedTable : public TableReader {
static Iterator* BlockReader(void*, const ReadOptions&, static Iterator* BlockReader(void*, const ReadOptions&,
const EnvOptions& soptions, const Slice&, const EnvOptions& soptions, const Slice&,
bool for_compaction); bool for_compaction);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO, bool for_compaction = false); bool* didIO, bool for_compaction = false);
// if `no_io == true`, we will not try to read filter from sst file
// if it is not cached yet.
CachableEntry<FilterBlockReader> GetFilter(bool no_io = false) const;
Iterator* IndexBlockReader(const ReadOptions& options) const;
// Read the block, either from sst file or from cache. This method will try
// to read from cache only when block_cache is set or ReadOption doesn't
// explicitly prohibit storage IO.
//
// If the block is read from cache, the statistics for cache miss/hit of the
// the given type of block will be updated. User can specify
// `block_cache_miss_ticker` and `block_cache_hit_ticker` for the statistics
// update.
//
// On success, the `result` parameter will be populated, which contains a
// pointer to the block and its cache handle, which will be nullptr if it's
// not read from the cache.
static Status GetBlock(const BlockBasedTable* table,
const BlockHandle& handle,
const ReadOptions& options,
bool for_compaction,
Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker,
bool* didIO,
CachableEntry<Block>* result);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false. // after a call to Seek(key), until handle_result returns false.
// May not make such a call if filter policy says that key is not present. // May not make such a call if filter policy says that key is not present.
@ -111,6 +144,22 @@ class BlockBasedTable : public TableReader {
void ReadFilter(const Slice& filter_handle_value); void ReadFilter(const Slice& filter_handle_value);
static Status ReadStats(const Slice& handle_value, Rep* rep); static Status ReadStats(const Slice& handle_value, Rep* rep);
// Read the meta block from sst.
static Status ReadMetaBlock(
Rep* rep,
std::unique_ptr<Block>* meta_block,
std::unique_ptr<Iterator>* iter);
// Create the filter from the filter block.
static FilterBlockReader* ReadFilter(
const Slice& filter_handle_value,
Rep* rep,
size_t* filter_size = nullptr);
// Read the table stats from stats block.
static Status ReadStats(
const Slice& handle_value, Rep* rep, TableStats* stats);
static void SetupCacheKeyPrefix(Rep* rep); static void SetupCacheKeyPrefix(Rep* rep);
explicit BlockBasedTable(Rep* rep) : explicit BlockBasedTable(Rep* rep) :

@ -127,7 +127,8 @@ void FilterBlockBuilder::GenerateFilter() {
start_.clear(); start_.clear();
} }
FilterBlockReader::FilterBlockReader(const Options& opt, const Slice& contents) FilterBlockReader::FilterBlockReader(
const Options& opt, const Slice& contents, bool delete_contents_after_use)
: policy_(opt.filter_policy), : policy_(opt.filter_policy),
prefix_extractor_(opt.prefix_extractor), prefix_extractor_(opt.prefix_extractor),
whole_key_filtering_(opt.whole_key_filtering), whole_key_filtering_(opt.whole_key_filtering),
@ -143,6 +144,9 @@ FilterBlockReader::FilterBlockReader(const Options& opt, const Slice& contents)
data_ = contents.data(); data_ = contents.data();
offset_ = data_ + last_word; offset_ = data_ + last_word;
num_ = (n - 5 - last_word) / 4; num_ = (n - 5 - last_word) / 4;
if (delete_contents_after_use) {
filter_data.reset(contents.data());
}
} }
bool FilterBlockReader::KeyMayMatch(uint64_t block_offset, bool FilterBlockReader::KeyMayMatch(uint64_t block_offset,

@ -12,6 +12,8 @@
// into a single filter block. // into a single filter block.
#pragma once #pragma once
#include <memory>
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <string> #include <string>
@ -62,7 +64,10 @@ class FilterBlockBuilder {
class FilterBlockReader { class FilterBlockReader {
public: public:
// REQUIRES: "contents" and *policy must stay live while *this is live. // REQUIRES: "contents" and *policy must stay live while *this is live.
FilterBlockReader(const Options& opt, const Slice& contents); FilterBlockReader(
const Options& opt,
const Slice& contents,
bool delete_contents_after_use = false);
bool KeyMayMatch(uint64_t block_offset, const Slice& key); bool KeyMayMatch(uint64_t block_offset, const Slice& key);
bool PrefixMayMatch(uint64_t block_offset, const Slice& prefix); bool PrefixMayMatch(uint64_t block_offset, const Slice& prefix);
@ -74,6 +79,8 @@ class FilterBlockReader {
const char* offset_; // Pointer to beginning of offset array (at block-end) const char* offset_; // Pointer to beginning of offset array (at block-end)
size_t num_; // Number of entries in offset array size_t num_; // Number of entries in offset array
size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file) size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file)
std::unique_ptr<const char[]> filter_data;
bool MayMatch(uint64_t block_offset, const Slice& entry); bool MayMatch(uint64_t block_offset, const Slice& entry);
}; };

@ -12,18 +12,19 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/db_statistics.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/table.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "table/block.h" #include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "table/block_builder.h" #include "table/block_builder.h"
#include "table/block.h"
#include "table/format.h" #include "table/format.h"
#include "table/block_based_table_reader.h"
#include "table/block_based_table_builder.h"
#include "util/random.h" #include "util/random.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
@ -486,8 +487,7 @@ struct TestArgs {
}; };
static std::vector<TestArgs> Generate_Arg_List() static std::vector<TestArgs> Generate_Arg_List() {
{
std::vector<TestArgs> ret; std::vector<TestArgs> ret;
TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST}; TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST};
int test_type_len = 4; int test_type_len = 4;
@ -928,6 +928,181 @@ TEST(TableTest, NumBlockStat) {
); );
} }
class BlockCacheStats {
public:
explicit BlockCacheStats(std::shared_ptr<Statistics> statistics) {
block_cache_miss =
statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
block_cache_hit =
statistics.get()->getTickerCount(BLOCK_CACHE_HIT);
index_block_cache_miss =
statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_MISS);
index_block_cache_hit =
statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_HIT);
data_block_cache_miss =
statistics.get()->getTickerCount(BLOCK_CACHE_DATA_MISS);
data_block_cache_hit =
statistics.get()->getTickerCount(BLOCK_CACHE_DATA_HIT);
}
// Check if the fetched stats matches the expected ones.
void AssertEqual(
long index_block_cache_miss,
long index_block_cache_hit,
long data_block_cache_miss,
long data_block_cache_hit) const {
ASSERT_EQ(index_block_cache_miss, this->index_block_cache_miss);
ASSERT_EQ(index_block_cache_hit, this->index_block_cache_hit);
ASSERT_EQ(data_block_cache_miss, this->data_block_cache_miss);
ASSERT_EQ(data_block_cache_hit, this->data_block_cache_hit);
ASSERT_EQ(
index_block_cache_miss + data_block_cache_miss,
this->block_cache_miss
);
ASSERT_EQ(
index_block_cache_hit + data_block_cache_hit,
this->block_cache_hit
);
}
private:
long block_cache_miss = 0;
long block_cache_hit = 0;
long index_block_cache_miss = 0;
long index_block_cache_hit = 0;
long data_block_cache_miss = 0;
long data_block_cache_hit = 0;
};
TEST(TableTest, BlockCacheTest) {
// -- Table construction
Options options;
options.create_if_missing = true;
options.statistics = CreateDBStatistics();
options.block_cache = NewLRUCache(1024);
std::vector<std::string> keys;
KVMap kvmap;
BlockBasedTableConstructor c(BytewiseComparator());
c.Add("key", "value");
c.Finish(options, &keys, &kvmap);
// -- PART 1: Open with regular block cache.
// Since block_cache is disabled, no cache activities will be involved.
unique_ptr<Iterator> iter;
// At first, no block will be accessed.
{
BlockCacheStats stats(options.statistics);
// index will be added to block cache.
stats.AssertEqual(
1, // index block miss
0,
0,
0
);
}
// Only index block will be accessed
{
iter.reset(c.NewIterator());
BlockCacheStats stats(options.statistics);
// NOTE: to help better highlight the "detla" of each ticker, I use
// <last_value> + <added_value> to indicate the increment of changed
// value; other numbers remain the same.
stats.AssertEqual(
1,
0 + 1, // index block hit
0,
0
);
}
// Only data block will be accessed
{
iter->SeekToFirst();
BlockCacheStats stats(options.statistics);
stats.AssertEqual(
1,
1,
0 + 1, // data block miss
0
);
}
// Data block will be in cache
{
iter.reset(c.NewIterator());
iter->SeekToFirst();
BlockCacheStats stats(options.statistics);
stats.AssertEqual(
1,
1 + 1, // index block hit
1,
0 + 1 // data block hit
);
}
// release the iterator so that the block cache can reset correctly.
iter.reset();
// -- PART 2: Open without block cache
options.block_cache.reset();
options.statistics = CreateDBStatistics(); // reset the stats
c.Reopen(options);
{
iter.reset(c.NewIterator());
iter->SeekToFirst();
ASSERT_EQ("key", iter->key().ToString());
BlockCacheStats stats(options.statistics);
// Nothing is affected at all
stats.AssertEqual(0, 0, 0, 0);
}
// -- PART 3: Open with very small block cache
// In this test, no block will ever get hit since the block cache is
// too small to fit even one entry.
options.block_cache = NewLRUCache(1);
c.Reopen(options);
{
BlockCacheStats stats(options.statistics);
stats.AssertEqual(
1, // index block miss
0,
0,
0
);
}
{
// Both index and data block get accessed.
// It first cache index block then data block. But since the cache size
// is only 1, index block will be purged after data block is inserted.
iter.reset(c.NewIterator());
BlockCacheStats stats(options.statistics);
stats.AssertEqual(
1 + 1, // index block miss
0,
0, // data block miss
0
);
}
{
// SeekToFirst() accesses data block. With similar reason, we expect data
// block's cache miss.
iter->SeekToFirst();
BlockCacheStats stats(options.statistics);
stats.AssertEqual(
2,
0,
0 + 1, // data block miss
0
);
}
}
TEST(TableTest, ApproximateOffsetOfPlain) { TEST(TableTest, ApproximateOffsetOfPlain) {
BlockBasedTableConstructor c(BytewiseComparator()); BlockBasedTableConstructor c(BytewiseComparator());
c.Add("k01", "hello"); c.Add("k01", "hello");

@ -1285,11 +1285,11 @@ class StressTest {
ttl_state = NumberToString(FLAGS_ttl); ttl_state = NumberToString(FLAGS_ttl);
} }
fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent); fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
fprintf(stdout, "Prefix percentage : %d\n", FLAGS_prefixpercent); fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
fprintf(stdout, "Write percentage : %d\n", FLAGS_writepercent); fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
fprintf(stdout, "Iterate percentage : %d\n", FLAGS_iterpercent); fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
fprintf(stdout, "Iterations : %lu\n", FLAGS_num_iterations); fprintf(stdout, "Iterations : %lu\n", FLAGS_num_iterations);
fprintf(stdout, "Max key : %ld\n", FLAGS_max_key); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key);

Loading…
Cancel
Save