Introduce Read amplification bitmap (read amp statistics)

Summary:
Add ReadOptions::read_amp_bytes_per_bit option which allow us to create a bitmap for every data block we read
the bitmap will contain (block_size / read_amp_bytes_per_bit) bits.

We will use this bitmap to mark which bytes have been used of the block so we can calculate the read amplification

Test Plan: added new tests

Reviewers: andrewkr, yhchiang, sdong

Reviewed By: sdong

Subscribers: yiwu, leveldb, march, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D58707
main
Islam AbdelRahman 9 years ago
parent c7004840d2
commit b49b92cf28
  1. 161
      db/db_test2.cc
  2. 10
      include/rocksdb/statistics.h
  3. 23
      include/rocksdb/table.h
  4. 24
      table/block.cc
  5. 144
      table/block.h
  6. 82
      table/block_based_table_reader.cc
  7. 8
      table/block_based_table_reader.h
  8. 257
      table/block_test.cc
  9. 5
      tools/db_bench_tool.cc
  10. 5
      util/options_helper.h
  11. 2
      util/options_settable_test.cc

@ -1859,6 +1859,167 @@ TEST_F(DBTest2, MaxSuccessiveMergesInRecovery) {
options.max_successive_merges = 3;
Reopen(options);
}
size_t GetEncodedEntrySize(size_t key_size, size_t value_size) {
std::string buffer;
PutVarint32(&buffer, static_cast<uint32_t>(0));
PutVarint32(&buffer, static_cast<uint32_t>(key_size));
PutVarint32(&buffer, static_cast<uint32_t>(value_size));
return buffer.size() + key_size + value_size;
}
TEST_F(DBTest2, ReadAmpBitmap) {
Options options = CurrentOptions();
BlockBasedTableOptions bbto;
// Disable delta encoding to make it easier to calculate read amplification
bbto.use_delta_encoding = false;
// Huge block cache to make it easier to calculate read amplification
bbto.block_cache = NewLRUCache(1024 * 1024 * 1024);
bbto.read_amp_bytes_per_bit = 16;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.statistics = rocksdb::CreateDBStatistics();
DestroyAndReopen(options);
const size_t kNumEntries = 10000;
Random rnd(301);
for (size_t i = 0; i < kNumEntries; i++) {
ASSERT_OK(Put(Key(static_cast<int>(i)), RandomString(&rnd, 100)));
}
ASSERT_OK(Flush());
Close();
Reopen(options);
// Read keys/values randomly and verify that reported read amp error
// is less than 2%
uint64_t total_useful_bytes = 0;
std::set<int> read_keys;
std::string value;
for (size_t i = 0; i < kNumEntries * 5; i++) {
int key_idx = rnd.Next() % kNumEntries;
std::string k = Key(key_idx);
ASSERT_OK(db_->Get(ReadOptions(), k, &value));
if (read_keys.find(key_idx) == read_keys.end()) {
auto ik = InternalKey(k, 0, ValueType::kTypeValue);
total_useful_bytes += GetEncodedEntrySize(ik.size(), value.size());
read_keys.insert(key_idx);
}
double expected_read_amp =
static_cast<double>(total_useful_bytes) /
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double read_amp =
static_cast<double>(options.statistics->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double error_pct = fabs(expected_read_amp - read_amp) * 100;
// Error between reported read amp and real read amp should be less than 2%
EXPECT_LE(error_pct, 2);
}
// Make sure we read every thing in the DB (which is smaller than our cache)
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(iter->value().ToString(), Get(iter->key().ToString()));
}
delete iter;
// Read amp is 100% since we read all what we loaded in memory
ASSERT_EQ(options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES),
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES));
}
TEST_F(DBTest2, ReadAmpBitmapLiveInCacheAfterDBClose) {
if (dbname_.find("dev/shm") != std::string::npos) {
// /dev/shm dont support getting a unique file id, this mean that
// running this test on /dev/shm will fail because lru_cache will load
// the blocks again regardless of them being already in the cache
return;
}
std::shared_ptr<Cache> lru_cache = NewLRUCache(1024 * 1024 * 1024);
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
Options options = CurrentOptions();
BlockBasedTableOptions bbto;
// Disable delta encoding to make it easier to calculate read amplification
bbto.use_delta_encoding = false;
// Huge block cache to make it easier to calculate read amplification
bbto.block_cache = lru_cache;
bbto.read_amp_bytes_per_bit = 16;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.statistics = stats;
DestroyAndReopen(options);
const int kNumEntries = 10000;
Random rnd(301);
for (int i = 0; i < kNumEntries; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
}
ASSERT_OK(Flush());
Close();
Reopen(options);
uint64_t total_useful_bytes = 0;
std::set<int> read_keys;
std::string value;
// Iter1: Read half the DB, Read even keys
// Key(0), Key(2), Key(4), Key(6), Key(8), ...
for (int i = 0; i < kNumEntries; i += 2) {
std::string k = Key(i);
ASSERT_OK(db_->Get(ReadOptions(), k, &value));
if (read_keys.find(i) == read_keys.end()) {
auto ik = InternalKey(k, 0, ValueType::kTypeValue);
total_useful_bytes += GetEncodedEntrySize(ik.size(), value.size());
read_keys.insert(i);
}
}
size_t total_useful_bytes_iter1 =
options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
size_t total_loaded_bytes_iter1 =
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
Close();
std::shared_ptr<Statistics> new_statistics = rocksdb::CreateDBStatistics();
// Destroy old statistics obj that the blocks in lru_cache are pointing to
options.statistics.reset();
// Use the statistics object that we just created
options.statistics = new_statistics;
Reopen(options);
// Iter2: Read half the DB, Read odd keys
// Key(1), Key(3), Key(5), Key(7), Key(9), ...
for (int i = 1; i < kNumEntries; i += 2) {
std::string k = Key(i);
ASSERT_OK(db_->Get(ReadOptions(), k, &value));
if (read_keys.find(i) == read_keys.end()) {
auto ik = InternalKey(k, 0, ValueType::kTypeValue);
total_useful_bytes += GetEncodedEntrySize(ik.size(), value.size());
read_keys.insert(i);
}
}
size_t total_useful_bytes_iter2 =
options.statistics->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
size_t total_loaded_bytes_iter2 =
options.statistics->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
// We reached read_amp of 100% because we read all the keys in the DB
ASSERT_EQ(total_useful_bytes_iter1 + total_useful_bytes_iter2,
total_loaded_bytes_iter1 + total_loaded_bytes_iter2);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -197,6 +197,14 @@ enum Tickers : uint32_t {
ROW_CACHE_HIT,
ROW_CACHE_MISS,
// Read amplification statistics.
// Read amplification can be calculated using this formula
// (READ_AMP_TOTAL_READ_BYTES / READ_AMP_ESTIMATE_USEFUL_BYTES)
//
// REQUIRES: ReadOptions::read_amp_bytes_per_bit to be enabled
READ_AMP_ESTIMATE_USEFUL_BYTES, // Estimate of total bytes actually used.
READ_AMP_TOTAL_READ_BYTES, // Total size of loaded data blocks.
TICKER_ENUM_MAX
};
@ -291,6 +299,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"},
{ROW_CACHE_HIT, "rocksdb.row.cache.hit"},
{ROW_CACHE_MISS, "rocksdb.row.cache.miss"},
{READ_AMP_ESTIMATE_USEFUL_BYTES, "rocksdb.read.amp.estimate.useful.bytes"},
{READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"},
};
/**

@ -173,6 +173,29 @@ struct BlockBasedTableOptions {
// algorithms.
bool verify_compression = false;
// If used, For every data block we load into memory, we will create a bitmap
// of size ((block_size / `read_amp_bytes_per_bit`) / 8) bytes. This bitmap
// will be used to figure out the percentage we actually read of the blocks.
//
// When this feature is used Tickers::READ_AMP_ESTIMATE_USEFUL_BYTES and
// Tickers::READ_AMP_TOTAL_READ_BYTES can be used to calculate the
// read amplification using this formula
// (READ_AMP_TOTAL_READ_BYTES / READ_AMP_ESTIMATE_USEFUL_BYTES)
//
// value => memory usage (percentage of loaded blocks memory)
// 1 => 12.50 %
// 2 => 06.25 %
// 4 => 03.12 %
// 8 => 01.56 %
// 16 => 00.78 %
//
// Note: This number must be a power of 2, if not it will be sanitized
// to be the next lowest power of 2, for example a value of 7 will be
// treated as 4, a value of 19 will be treated as 16.
//
// Default: 0 (disabled)
uint32_t read_amp_bytes_per_bit = 0;
// We currently have three versions:
// 0 -- This version is currently written out by all RocksDB's versions by
// default. Can be read by really old RocksDB's. Doesn't support changing

@ -16,9 +16,11 @@
#include <unordered_map>
#include <vector>
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/comparator.h"
#include "table/format.h"
#include "table/block_prefix_index.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/perf_context_imp.h"
@ -344,7 +346,8 @@ uint32_t Block::NumRestarts() const {
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
Block::Block(BlockContents&& contents)
Block::Block(BlockContents&& contents, size_t read_amp_bytes_per_bit,
Statistics* statistics)
: contents_(std::move(contents)),
data_(contents_.data.data()),
size_(contents_.data.size()) {
@ -359,10 +362,14 @@ Block::Block(BlockContents&& contents)
size_ = 0;
}
}
if (read_amp_bytes_per_bit != 0 && statistics && size_ != 0) {
read_amp_bitmap_.reset(new BlockReadAmpBitmap(
restart_offset_, read_amp_bytes_per_bit, statistics));
}
}
InternalIterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter,
bool total_order_seek) {
bool total_order_seek, Statistics* stats) {
if (size_ < 2*sizeof(uint32_t)) {
if (iter != nullptr) {
iter->SetStatus(Status::Corruption("bad block contents"));
@ -385,10 +392,17 @@ InternalIterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter,
if (iter != nullptr) {
iter->Initialize(cmp, data_, restart_offset_, num_restarts,
prefix_index_ptr);
prefix_index_ptr, read_amp_bitmap_.get());
} else {
iter = new BlockIter(cmp, data_, restart_offset_, num_restarts,
prefix_index_ptr);
prefix_index_ptr, read_amp_bitmap_.get());
}
if (read_amp_bitmap_) {
if (read_amp_bitmap_->GetStatistics() != stats) {
// DB changed the Statistics pointer, we need to notify read_amp_bitmap_
read_amp_bitmap_->SetStatistics(stats);
}
}
}

@ -20,6 +20,7 @@
#include "db/pinned_iterators_manager.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
#include "table/block_prefix_index.h"
#include "table/internal_iterator.h"
@ -32,10 +33,119 @@ class Comparator;
class BlockIter;
class BlockPrefixIndex;
// BlockReadAmpBitmap is a bitmap that map the rocksdb::Block data bytes to
// a bitmap with ratio bytes_per_bit. Whenever we access a range of bytes in
// the Block we update the bitmap and increment READ_AMP_ESTIMATE_USEFUL_BYTES.
class BlockReadAmpBitmap {
public:
explicit BlockReadAmpBitmap(size_t block_size, size_t bytes_per_bit,
Statistics* statistics)
: bitmap_(nullptr), bytes_per_bit_pow_(0), statistics_(statistics) {
assert(block_size > 0 && bytes_per_bit > 0);
// convert bytes_per_bit to be a power of 2
while (bytes_per_bit >>= 1) {
bytes_per_bit_pow_++;
}
// num_bits_needed = ceil(block_size / bytes_per_bit)
size_t num_bits_needed = (block_size >> bytes_per_bit_pow_) +
(block_size % (1 << bytes_per_bit_pow_) != 0);
// bitmap_size = ceil(num_bits_needed / kBitsPerEntry)
size_t bitmap_size = (num_bits_needed / kBitsPerEntry) +
(num_bits_needed % kBitsPerEntry != 0);
// Create bitmap and set all the bits to 0
bitmap_ = new std::atomic<uint32_t>[bitmap_size];
memset(bitmap_, 0, bitmap_size * kBytesPersEntry);
RecordTick(GetStatistics(), READ_AMP_TOTAL_READ_BYTES,
num_bits_needed << bytes_per_bit_pow_);
}
~BlockReadAmpBitmap() { delete[] bitmap_; }
void Mark(uint32_t start_offset, uint32_t end_offset) {
assert(end_offset >= start_offset);
// Every new bit we set will bump this counter
uint32_t new_useful_bytes = 0;
// Index of first bit in mask (start_offset / bytes_per_bit)
uint32_t start_bit = start_offset >> bytes_per_bit_pow_;
// Index of last bit in mask (end_offset / bytes_per_bit)
uint32_t end_bit = end_offset >> bytes_per_bit_pow_;
// Index of middle bit (unique to this range)
uint32_t mid_bit = start_bit + 1;
// It's guaranteed that ranges sent to Mark() wont overlap, this mean that
// we dont need to set the middle bits, we can simply set only one bit of
// the middle bits, and check this bit if we want to know if the whole
// range is set or not.
if (mid_bit < end_bit) {
if (GetAndSet(mid_bit) == 0) {
new_useful_bytes += (end_bit - mid_bit) << bytes_per_bit_pow_;
} else {
// If the middle bit is set, it's guaranteed that start and end bits
// are also set
return;
}
} else {
// This range dont have a middle bit, the whole range fall in 1 or 2 bits
}
if (GetAndSet(start_bit) == 0) {
new_useful_bytes += (1 << bytes_per_bit_pow_);
}
if (GetAndSet(end_bit) == 0) {
new_useful_bytes += (1 << bytes_per_bit_pow_);
}
if (new_useful_bytes > 0) {
RecordTick(GetStatistics(), READ_AMP_ESTIMATE_USEFUL_BYTES,
new_useful_bytes);
}
}
Statistics* GetStatistics() {
return statistics_.load(std::memory_order_relaxed);
}
void SetStatistics(Statistics* stats) { statistics_.store(stats); }
uint32_t GetBytesPerBit() { return 1 << bytes_per_bit_pow_; }
private:
// Get the current value of bit at `bit_idx` and set it to 1
inline bool GetAndSet(uint32_t bit_idx) {
const uint32_t byte_idx = bit_idx / kBitsPerEntry;
const uint32_t bit_mask = 1 << (bit_idx % kBitsPerEntry);
return bitmap_[byte_idx].fetch_or(bit_mask, std::memory_order_relaxed) &
bit_mask;
}
const uint32_t kBytesPersEntry = sizeof(uint32_t); // 4 bytes
const uint32_t kBitsPerEntry = kBytesPersEntry * 8; // 32 bits
// Bitmap used to record the bytes that we read, use atomic to protect
// against multiple threads updating the same bit
std::atomic<uint32_t>* bitmap_;
// (1 << bytes_per_bit_pow_) is bytes_per_bit. Use power of 2 to optimize
// muliplication and division
uint8_t bytes_per_bit_pow_;
// Pointer to DB Statistics object, Since this bitmap may outlive the DB
// this pointer maybe invalid, but the DB will update it to a valid pointer
// by using SetStatistics() before calling Mark()
std::atomic<Statistics*> statistics_;
};
class Block {
public:
// Initialize the block with the specified contents.
explicit Block(BlockContents&& contents);
explicit Block(BlockContents&& contents, size_t read_amp_bytes_per_bit = 0,
Statistics* statistics = nullptr);
~Block() = default;
@ -70,7 +180,8 @@ class Block {
// and prefix_index_ are null, so this option does not matter.
InternalIterator* NewIterator(const Comparator* comparator,
BlockIter* iter = nullptr,
bool total_order_seek = true);
bool total_order_seek = true,
Statistics* stats = nullptr);
void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index);
// Report an approximation of how much memory has been used.
@ -82,6 +193,7 @@ class Block {
size_t size_; // contents_.data.size()
uint32_t restart_offset_; // Offset in data_ of restart array
std::unique_ptr<BlockPrefixIndex> prefix_index_;
std::unique_ptr<BlockReadAmpBitmap> read_amp_bitmap_;
// No copying allowed
Block(const Block&);
@ -99,17 +211,22 @@ class BlockIter : public InternalIterator {
restart_index_(0),
status_(Status::OK()),
prefix_index_(nullptr),
key_pinned_(false) {}
key_pinned_(false),
read_amp_bitmap_(nullptr),
last_bitmap_offset_(0) {}
BlockIter(const Comparator* comparator, const char* data, uint32_t restarts,
uint32_t num_restarts, BlockPrefixIndex* prefix_index)
uint32_t num_restarts, BlockPrefixIndex* prefix_index,
BlockReadAmpBitmap* read_amp_bitmap)
: BlockIter() {
Initialize(comparator, data, restarts, num_restarts, prefix_index);
Initialize(comparator, data, restarts, num_restarts, prefix_index,
read_amp_bitmap);
}
void Initialize(const Comparator* comparator, const char* data,
uint32_t restarts, uint32_t num_restarts,
BlockPrefixIndex* prefix_index) {
BlockPrefixIndex* prefix_index,
BlockReadAmpBitmap* read_amp_bitmap) {
assert(data_ == nullptr); // Ensure it is called only once
assert(num_restarts > 0); // Ensure the param is valid
@ -120,6 +237,8 @@ class BlockIter : public InternalIterator {
current_ = restarts_;
restart_index_ = num_restarts_;
prefix_index_ = prefix_index;
read_amp_bitmap_ = read_amp_bitmap;
last_bitmap_offset_ = current_ + 1;
}
void SetStatus(Status s) {
@ -134,6 +253,12 @@ class BlockIter : public InternalIterator {
}
virtual Slice value() const override {
assert(Valid());
if (read_amp_bitmap_ && current_ < restarts_ &&
current_ != last_bitmap_offset_) {
read_amp_bitmap_->Mark(current_ /* current entry offset */,
NextEntryOffset() - 1);
last_bitmap_offset_ = current_;
}
return value_;
}
@ -164,6 +289,8 @@ class BlockIter : public InternalIterator {
virtual bool IsValuePinned() const override { return true; }
size_t TEST_CurrentEntrySize() { return NextEntryOffset() - current_; }
private:
const Comparator* comparator_;
const char* data_; // underlying block contents
@ -179,6 +306,11 @@ class BlockIter : public InternalIterator {
BlockPrefixIndex* prefix_index_;
bool key_pinned_;
// read-amp bitmap
BlockReadAmpBitmap* read_amp_bitmap_;
// last `current_` value we report to read-amp bitmp
mutable uint32_t last_bitmap_offset_;
struct CachedPrevEntry {
explicit CachedPrevEntry(uint32_t _offset, const char* _key_ptr,
size_t _key_offset, size_t _key_size, Slice _value)

@ -66,14 +66,16 @@ namespace {
Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result,
const ImmutableCFOptions &ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options) {
const ImmutableCFOptions& ioptions, bool do_uncompress,
const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
size_t read_amp_bytes_per_bit) {
BlockContents contents;
Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options);
if (s.ok()) {
result->reset(new Block(std::move(contents)));
result->reset(new Block(std::move(contents), read_amp_bytes_per_bit,
ioptions.statistics));
}
return s;
@ -188,7 +190,8 @@ class BinarySearchIndexReader : public IndexReader {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options);
Slice() /*compression dict*/, cache_options,
0 /* read_amp_bytes_per_bit */);
if (s.ok()) {
*index_reader = new BinarySearchIndexReader(
@ -227,17 +230,20 @@ class BinarySearchIndexReader : public IndexReader {
// key.
class HashIndexReader : public IndexReader {
public:
static Status Create(
const SliceTransform* hash_key_extractor, const Footer& footer,
RandomAccessFileReader* file, const ImmutableCFOptions &ioptions,
const Comparator* comparator, const BlockHandle& index_handle,
InternalIterator* meta_index_iter, IndexReader** index_reader,
bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options) {
static Status Create(const SliceTransform* hash_key_extractor,
const Footer& footer, RandomAccessFileReader* file,
const ImmutableCFOptions& ioptions,
const Comparator* comparator,
const BlockHandle& index_handle,
InternalIterator* meta_index_iter,
IndexReader** index_reader,
bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options);
Slice() /*compression dict*/, cache_options,
0 /* read_amp_bytes_per_bit */);
if (!s.ok()) {
return s;
@ -791,7 +797,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
rep->file.get(), rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options);
rep->persistent_cache_options, 0 /* read_amp_bytes_per_bit */);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log,
@ -809,9 +815,9 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions &ioptions, const ReadOptions& read_options,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict) {
const Slice& compression_dict, size_t read_amp_bytes_per_bit) {
Status s;
Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
@ -861,7 +867,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Insert uncompressed block into block cache
if (s.ok()) {
block->value = new Block(std::move(contents)); // uncompressed block
block->value = new Block(std::move(contents), read_amp_bytes_per_bit,
statistics); // uncompressed block
assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() &&
read_options.fill_cache) {
@ -886,9 +893,9 @@ Status BlockBasedTable::GetDataBlockFromCache(
Status BlockBasedTable::PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, const ImmutableCFOptions &ioptions,
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict) {
const Slice& compression_dict, size_t read_amp_bytes_per_bit) {
assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr);
@ -906,7 +913,8 @@ Status BlockBasedTable::PutDataBlockToCache(
}
if (raw_block->compression_type() != kNoCompression) {
block->value = new Block(std::move(contents)); // uncompressed block
block->value = new Block(std::move(contents), read_amp_bytes_per_bit,
statistics); // compressed block
} else {
block->value = raw_block;
raw_block = nullptr;
@ -1206,8 +1214,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, &block,
rep->table_options.format_version, compression_dict);
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
&block, rep->table_options.format_version, compression_dict,
rep->table_options.read_amp_bytes_per_bit);
if (block.value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block;
@ -1216,14 +1225,15 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&raw_block, rep->ioptions,
block_cache_compressed == nullptr,
compression_dict, rep->persistent_cache_options);
compression_dict, rep->persistent_cache_options,
rep->table_options.read_amp_bytes_per_bit);
}
if (s.ok()) {
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
ro, rep->ioptions, &block, raw_block.release(),
rep->table_options.format_version,
compression_dict);
s = PutDataBlockToCache(
key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions,
&block, raw_block.release(), rep->table_options.format_version,
compression_dict, rep->table_options.read_amp_bytes_per_bit);
}
}
}
@ -1242,7 +1252,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
std::unique_ptr<Block> block_value;
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&block_value, rep->ioptions, true /* compress */,
compression_dict, rep->persistent_cache_options);
compression_dict, rep->persistent_cache_options,
rep->table_options.read_amp_bytes_per_bit);
if (s.ok()) {
block.value = block_value.release();
}
@ -1251,7 +1262,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
InternalIterator* iter;
if (s.ok()) {
assert(block.value != nullptr);
iter = block.value->NewIterator(&rep->internal_comparator, input_iter);
iter = block.value->NewIterator(&rep->internal_comparator, input_iter, true,
rep->ioptions.statistics);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
block.cache_handle);
@ -1607,12 +1619,12 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
handle, cache_key_storage);
Slice ckey;
s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr,
rep_->ioptions, options, &block,
rep_->table_options.format_version,
rep_->compression_dict_block
? rep_->compression_dict_block->data
: Slice());
s = GetDataBlockFromCache(
cache_key, ckey, block_cache, nullptr, rep_->ioptions, options, &block,
rep_->table_options.format_version,
rep_->compression_dict_block ? rep_->compression_dict_block->data
: Slice(),
0 /* read_amp_bytes_per_bit */);
assert(s.ok());
bool in_cache = block.value != nullptr;
if (in_cache) {

@ -190,9 +190,9 @@ class BlockBasedTable : public TableReader {
static Status GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions &ioptions, const ReadOptions& read_options,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict);
const Slice& compression_dict, size_t read_amp_bytes_per_bit);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
@ -207,9 +207,9 @@ class BlockBasedTable : public TableReader {
static Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, const ImmutableCFOptions &ioptions,
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict);
const Slice& compression_dict, size_t read_amp_bytes_per_bit);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.

@ -4,7 +4,11 @@
// of patent rights can be found in the PATENTS file in the same directory.
//
#include <stdio.h>
#include <algorithm>
#include <set>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "db/dbformat.h"
@ -216,6 +220,259 @@ TEST_F(BlockTest, IndexHashWithSharedPrefix) {
CheckBlockContents(std::move(contents), kMaxKey, keys, values);
}
// A slow and accurate version of BlockReadAmpBitmap that simply store
// all the marked ranges in a set.
class BlockReadAmpBitmapSlowAndAccurate {
public:
void Mark(size_t start_offset, size_t end_offset) {
assert(end_offset >= start_offset);
marked_ranges_.emplace(end_offset, start_offset);
}
// Return true if any byte in this range was Marked
bool IsAnyInRangeMarked(size_t start_offset, size_t end_offset) {
auto it = marked_ranges_.lower_bound(std::make_pair(start_offset, 0));
if (it == marked_ranges_.end()) {
return false;
}
return start_offset <= it->first && end_offset >= it->second;
}
private:
std::set<std::pair<size_t, size_t>> marked_ranges_ = {};
};
TEST_F(BlockTest, BlockReadAmpBitmap) {
std::vector<size_t> block_sizes = {
1, // 1 byte
32, // 32 bytes
61, // 61 bytes
64, // 64 bytes
512, // 0.5 KB
1024, // 1 KB
1024 * 4, // 4 KB
1024 * 10, // 10 KB
1024 * 50, // 50 KB
1024 * 1024, // 1 MB
1024 * 1024 * 4, // 4 MB
1024 * 1024 * 50, // 10 MB
777,
124653,
};
const size_t kBytesPerBit = 64;
Random rnd(301);
for (size_t block_size : block_sizes) {
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
BlockReadAmpBitmap read_amp_bitmap(block_size, kBytesPerBit, stats.get());
BlockReadAmpBitmapSlowAndAccurate read_amp_slow_and_accurate;
size_t needed_bits = (block_size / kBytesPerBit);
if (block_size % kBytesPerBit != 0) {
needed_bits++;
}
size_t bitmap_size = needed_bits / 32;
if (needed_bits % 32 != 0) {
bitmap_size++;
}
size_t bits_in_bitmap = bitmap_size * 32;
ASSERT_EQ(stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES),
needed_bits * kBytesPerBit);
// Generate some random entries
std::vector<size_t> random_entry_offsets;
for (int i = 0; i < 1000; i++) {
random_entry_offsets.push_back(rnd.Next() % block_size);
}
std::sort(random_entry_offsets.begin(), random_entry_offsets.end());
auto it =
std::unique(random_entry_offsets.begin(), random_entry_offsets.end());
random_entry_offsets.resize(
std::distance(random_entry_offsets.begin(), it));
std::vector<std::pair<uint32_t, uint32_t>> random_entries;
for (size_t i = 0; i < random_entry_offsets.size(); i++) {
size_t entry_start = random_entry_offsets[i];
size_t entry_end;
if (i + 1 < random_entry_offsets.size()) {
entry_end = random_entry_offsets[i + 1] - 1;
} else {
entry_end = block_size - 1;
}
random_entries.emplace_back(entry_start, entry_end);
}
for (size_t i = 0; i < random_entries.size(); i++) {
auto &current_entry = random_entries[rnd.Next() % random_entries.size()];
read_amp_bitmap.Mark(current_entry.first, current_entry.second);
read_amp_slow_and_accurate.Mark(current_entry.first,
current_entry.second);
size_t total_bits = 0;
for (size_t bit_idx = 0; bit_idx < bits_in_bitmap; bit_idx++) {
size_t start_rng = bit_idx * kBytesPerBit;
size_t end_rng = (start_rng + kBytesPerBit) - 1;
total_bits +=
read_amp_slow_and_accurate.IsAnyInRangeMarked(start_rng, end_rng);
}
size_t expected_estimate_useful = total_bits * kBytesPerBit;
size_t got_estimate_useful =
stats->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
ASSERT_EQ(expected_estimate_useful, got_estimate_useful);
}
}
}
TEST_F(BlockTest, BlockWithReadAmpBitmap) {
Random rnd(301);
Options options = Options();
std::unique_ptr<InternalKeyComparator> ic;
ic.reset(new test::PlainInternalKeyComparator(options.comparator));
std::vector<std::string> keys;
std::vector<std::string> values;
BlockBuilder builder(16);
int num_records = 10000;
GenerateRandomKVs(&keys, &values, 0, num_records, 1);
// add a bunch of records to a block
for (int i = 0; i < num_records; i++) {
builder.Add(keys[i], values[i]);
}
Slice rawblock = builder.Finish();
const size_t kBytesPerBit = 8;
// Read the block sequentially using Next()
{
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
Block reader(std::move(contents), kBytesPerBit, stats.get());
// read contents of block sequentially
size_t read_bytes = 0;
BlockIter *iter = static_cast<BlockIter *>(
reader.NewIterator(options.comparator, nullptr, true, stats.get()));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
iter->value();
read_bytes += iter->TEST_CurrentEntrySize();
double semi_acc_read_amp =
static_cast<double>(read_bytes) / rawblock.size();
double read_amp = static_cast<double>(stats->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
// Error in read amplification will be less than 1% if we are reading
// sequentially
double error_pct = fabs(semi_acc_read_amp - read_amp) * 100;
EXPECT_LT(error_pct, 1);
}
delete iter;
}
// Read the block sequentially using Seek()
{
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
Block reader(std::move(contents), kBytesPerBit, stats.get());
size_t read_bytes = 0;
BlockIter *iter = static_cast<BlockIter *>(
reader.NewIterator(options.comparator, nullptr, true, stats.get()));
for (int i = 0; i < num_records; i++) {
Slice k(keys[i]);
// search in block for this key
iter->Seek(k);
iter->value();
read_bytes += iter->TEST_CurrentEntrySize();
double semi_acc_read_amp =
static_cast<double>(read_bytes) / rawblock.size();
double read_amp = static_cast<double>(stats->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
// Error in read amplification will be less than 1% if we are reading
// sequentially
double error_pct = fabs(semi_acc_read_amp - read_amp) * 100;
EXPECT_LT(error_pct, 1);
}
delete iter;
}
// Read the block randomly
{
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
Block reader(std::move(contents), kBytesPerBit, stats.get());
size_t read_bytes = 0;
BlockIter *iter = static_cast<BlockIter *>(
reader.NewIterator(options.comparator, nullptr, true, stats.get()));
std::unordered_set<int> read_keys;
for (int i = 0; i < num_records; i++) {
int index = rnd.Uniform(num_records);
Slice k(keys[index]);
iter->Seek(k);
iter->value();
if (read_keys.find(index) == read_keys.end()) {
read_keys.insert(index);
read_bytes += iter->TEST_CurrentEntrySize();
}
double semi_acc_read_amp =
static_cast<double>(read_bytes) / rawblock.size();
double read_amp = static_cast<double>(stats->getTickerCount(
READ_AMP_ESTIMATE_USEFUL_BYTES)) /
stats->getTickerCount(READ_AMP_TOTAL_READ_BYTES);
double error_pct = fabs(semi_acc_read_amp - read_amp) * 100;
// Error in read amplification will be less than 2% if we are reading
// randomly
EXPECT_LT(error_pct, 2);
}
delete iter;
}
}
TEST_F(BlockTest, ReadAmpBitmapPow2) {
std::shared_ptr<Statistics> stats = rocksdb::CreateDBStatistics();
ASSERT_EQ(BlockReadAmpBitmap(100, 1, stats.get()).GetBytesPerBit(), 1);
ASSERT_EQ(BlockReadAmpBitmap(100, 2, stats.get()).GetBytesPerBit(), 2);
ASSERT_EQ(BlockReadAmpBitmap(100, 4, stats.get()).GetBytesPerBit(), 4);
ASSERT_EQ(BlockReadAmpBitmap(100, 8, stats.get()).GetBytesPerBit(), 8);
ASSERT_EQ(BlockReadAmpBitmap(100, 16, stats.get()).GetBytesPerBit(), 16);
ASSERT_EQ(BlockReadAmpBitmap(100, 32, stats.get()).GetBytesPerBit(), 32);
ASSERT_EQ(BlockReadAmpBitmap(100, 3, stats.get()).GetBytesPerBit(), 2);
ASSERT_EQ(BlockReadAmpBitmap(100, 7, stats.get()).GetBytesPerBit(), 4);
ASSERT_EQ(BlockReadAmpBitmap(100, 11, stats.get()).GetBytesPerBit(), 8);
ASSERT_EQ(BlockReadAmpBitmap(100, 17, stats.get()).GetBytesPerBit(), 16);
ASSERT_EQ(BlockReadAmpBitmap(100, 33, stats.get()).GetBytesPerBit(), 32);
ASSERT_EQ(BlockReadAmpBitmap(100, 35, stats.get()).GetBytesPerBit(), 32);
}
} // namespace rocksdb
int main(int argc, char **argv) {

@ -384,6 +384,10 @@ DEFINE_int32(index_block_restart_interval,
"Number of keys between restart points "
"for delta encoding of keys in index block.");
DEFINE_int32(read_amp_bytes_per_bit,
rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit,
"Number of bytes per bit to be used in block read-amp bitmap");
DEFINE_int64(compressed_cache_size, -1,
"Number of bytes to use as a cache of compressed data.");
@ -2805,6 +2809,7 @@ class Benchmark {
block_based_options.skip_table_builder_flush =
FLAGS_skip_table_builder_flush;
block_based_options.format_version = 2;
block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
options.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));
}

@ -558,7 +558,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionType::kUInt32T, OptionVerificationType::kNormal}},
{"verify_compression",
{offsetof(struct BlockBasedTableOptions, verify_compression),
OptionType::kBoolean, OptionVerificationType::kNormal}}};
OptionType::kBoolean, OptionVerificationType::kNormal}},
{"read_amp_bytes_per_bit",
{offsetof(struct BlockBasedTableOptions, read_amp_bytes_per_bit),
OptionType::kSizeT, OptionVerificationType::kNormal}}};
static std::unordered_map<std::string, OptionTypeInfo> plain_table_type_info = {
{"user_key_len",

@ -159,7 +159,7 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;"
"skip_table_builder_flush=1;format_version=1;"
"hash_index_allow_collision=false;"
"verify_compression=true;",
"verify_compression=true;read_amp_bytes_per_bit=0",
new_bbto));
ASSERT_EQ(unset_bytes_base,

Loading…
Cancel
Save