Implement a compressed block cache.

Summary:
Rocksdb can now support a uncompressed block cache, or a compressed
block cache or both. Lookups first look for a block in the
uncompressed cache, if it is not found only then it is looked up
in the compressed cache. If it is found in the compressed cache,
then it is uncompressed and inserted into the uncompressed cache.

It is possible that the same block resides in the compressed cache
as well as the uncompressed cache at the same time. Both caches
have their own individual LRU policy.

Test Plan: Unit test case attached.

Reviewers: kailiu, sdong, haobo, leveldb

Reviewed By: haobo

CC: xjin, haobo

Differential Revision: https://reviews.facebook.net/D12675
main
Dhruba Borthakur 12 years ago
parent 1e4375d2ef
commit b4ad5e89ae
  1. 9
      db/db_bench.cc
  2. 10
      db/db_impl.cc
  3. 2
      db/db_impl.h
  4. 89
      db/db_test.cc
  5. 5
      include/rocksdb/env.h
  6. 9
      include/rocksdb/options.h
  7. 7
      include/rocksdb/statistics.h
  8. 3
      table/block.cc
  9. 4
      table/block.h
  10. 62
      table/block_based_table_builder.cc
  11. 3
      table/block_based_table_builder.h
  12. 213
      table/block_based_table_reader.cc
  13. 12
      table/block_based_table_reader.h
  14. 66
      table/format.cc
  15. 14
      table/format.h
  16. 9
      tools/db_stress.cc
  17. 58
      util/env_posix.cc
  18. 10
      util/options.cc

@ -198,6 +198,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed"
DEFINE_int32(block_size, rocksdb::Options().block_size,
"Number of bytes in a block.");
DEFINE_int64(compressed_cache_size, -1,
"Number of bytes to use as a cache of compressed data.");
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
"Maximum number of files to keep open at the same time"
" (use default if == 0)");
@ -752,6 +755,7 @@ class Duration {
class Benchmark {
private:
shared_ptr<Cache> cache_;
shared_ptr<Cache> compressed_cache_;
const FilterPolicy* filter_policy_;
const SliceTransform* prefix_extractor_;
DB* db_;
@ -907,6 +911,10 @@ class Benchmark {
NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits,
FLAGS_cache_remove_scan_count_limit) :
NewLRUCache(FLAGS_cache_size)) : nullptr),
compressed_cache_(FLAGS_compressed_cache_size >= 0 ?
(FLAGS_cache_numshardbits >= 1 ?
NewLRUCache(FLAGS_compressed_cache_size, FLAGS_cache_numshardbits) :
NewLRUCache(FLAGS_compressed_cache_size)) : nullptr),
filter_policy_(FLAGS_bloom_bits >= 0
? NewBloomFilterPolicy(FLAGS_bloom_bits)
: nullptr),
@ -1275,6 +1283,7 @@ class Benchmark {
Options options;
options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_;
options.block_cache_compressed = compressed_cache_;
if (cache_ == nullptr) {
options.no_block_cache = true;
}

@ -1679,7 +1679,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact);
CleanupCompaction(compact);
CleanupCompaction(compact, status);
versions_->ReleaseCompactionFiles(c.get(), status);
c->ReleaseInputs();
FindObsoleteFiles(deletion_state);
@ -1728,7 +1728,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
return status;
}
void DBImpl::CleanupCompaction(CompactionState* compact) {
void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
mutex_.AssertHeld();
if (compact->builder != nullptr) {
// May happen if we get a shutdown call in the middle of compaction
@ -1740,6 +1740,12 @@ void DBImpl::CleanupCompaction(CompactionState* compact) {
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_.erase(out.number);
// If this file was inserted into the table cache then remove
// them here because this compaction was not committed.
if (!status.ok()) {
table_cache_->Evict(out.number);
}
}
delete compact;
}

@ -194,7 +194,7 @@ class DBImpl : public DB {
void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
Status BackgroundFlush(bool* madeProgress);
void CleanupCompaction(CompactionState* compact);
void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact);
Status OpenCompactionOutputFile(CompactionState* compact);

@ -262,6 +262,7 @@ class DBTest {
kDeletesFilterFirst,
kPrefixHashRep,
kUniversalCompaction,
kCompressedBlockCache,
kEnd
};
int option_config_;
@ -388,6 +389,9 @@ class DBTest {
case kUniversalCompaction:
options.compaction_style = kCompactionStyleUniversal;
break;
case kCompressedBlockCache:
options.block_cache_compressed = NewLRUCache(8*1024*1024);
break;
default:
break;
}
@ -452,6 +456,7 @@ class DBTest {
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
ReadOptions options;
options.verify_checksums = true;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, k, &result);
@ -1753,6 +1758,90 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) {
}
}
TEST(DBTest, CompressedCache) {
int num_iter = 80;
// Run this test three iterations.
// Iteration 1: only a uncompressed block cache
// Iteration 2: only a compressed block cache
// Iteration 3: both block cache and compressed cache
for (int iter = 0; iter < 3; iter++) {
Options options = CurrentOptions();
options.write_buffer_size = 64*1024; // small write buffer
options.statistics = rocksdb::CreateDBStatistics();
switch (iter) {
case 0:
// only uncompressed block cache
options.block_cache = NewLRUCache(8*1024);
options.block_cache_compressed = nullptr;
break;
case 1:
// no block cache, only compressed cache
options.no_block_cache = true;
options.block_cache = nullptr;
options.block_cache_compressed = NewLRUCache(8*1024);
break;
case 2:
// both compressed and uncompressed block cache
options.block_cache = NewLRUCache(1024);
options.block_cache_compressed = NewLRUCache(8*1024);
break;
default:
ASSERT_TRUE(false);
}
Reopen(&options);
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
std::vector<std::string> values;
Slice str;
for (int i = 0; i < num_iter; i++) {
if (i % 4 == 0) { // high compression ratio
str = RandomString(&rnd, 100000);
}
values.push_back(str.ToString(true));
ASSERT_OK(Put(Key(i), values[i]));
}
// flush all data from memtable so that reads are from block cache
dbfull()->Flush(FlushOptions());
for (int i = 0; i < num_iter; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
// check that we triggered the appropriate code paths in the cache
switch (iter) {
case 0:
// only uncompressed block cache
ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_EQ(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
case 1:
// no block cache, only compressed cache
ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_GT(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
case 2:
// both compressed and uncompressed block cache
ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS),
0);
ASSERT_GT(options.statistics.get()->getTickerCount
(BLOCK_CACHE_COMPRESSED_MISS), 0);
break;
default:
ASSERT_TRUE(false);
}
}
}
TEST(DBTest, CompactionTrigger) {
Options options = CurrentOptions();
options.write_buffer_size = 100<<10; //100KB

@ -373,6 +373,11 @@ class WritableFile {
*block_size = preallocation_block_size_;
}
// For documentation, refer to RandomAccessFile::GetUniqueId()
virtual size_t GetUniqueId(char* id, size_t max_size) const {
return 0; // Default implementation to prevent issues with backwards
}
// Remove any kind of caching of data from the offset to offset+length
// of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop.

@ -39,7 +39,7 @@ using std::shared_ptr;
// sequence of key,value pairs. Each block may be compressed before
// being stored in a file. The following enum describes which
// compression method (if any) is used to compress a block.
enum CompressionType {
enum CompressionType : char {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
@ -48,7 +48,7 @@ enum CompressionType {
kBZip2Compression = 0x3
};
enum CompactionStyle {
enum CompactionStyle : char {
kCompactionStyleLevel = 0x0, // level based compaction style
kCompactionStyleUniversal = 0x1 // Universal compaction style
};
@ -177,6 +177,11 @@ struct Options {
// Default: nullptr
shared_ptr<Cache> block_cache;
// If non-NULL use the specified cache for compressed blocks.
// If NULL, rocksdb will not use a compressed block cache.
// Default: nullptr
shared_ptr<Cache> block_cache_compressed;
// Approximate size of user data packed per block. Note that the
// block size specified here corresponds to uncompressed data. The
// actual size of the unit read from disk may be smaller if

@ -82,6 +82,9 @@ enum Tickers {
// transaction log iterator refreshes
GET_UPDATES_SINCE_CALLS,
BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache
BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache
TICKER_ENUM_MAX
};
@ -116,7 +119,9 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{ BLOOM_FILTER_PREFIX_CHECKED, "rocksdb.bloom.filter.prefix.checked" },
{ BLOOM_FILTER_PREFIX_USEFUL, "rocksdb.bloom.filter.prefix.useful" },
{ NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration" },
{ GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" }
{ GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" },
{ BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss" },
{ BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit" }
};
/**

@ -29,7 +29,8 @@ Block::Block(const BlockContents& contents)
: data_(contents.data.data()),
size_(contents.data.size()),
owned_(contents.heap_allocated),
cachable_(contents.cachable) {
cachable_(contents.cachable),
compression_type_(contents.compression_type) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {

@ -11,6 +11,7 @@
#include <stddef.h>
#include <stdint.h>
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
namespace rocksdb {
@ -26,7 +27,9 @@ class Block {
size_t size() const { return size_; }
bool isCachable() const { return cachable_; }
CompressionType compressionType() const { return compression_type_; }
Iterator* NewIterator(const Comparator* comparator);
const char* data() { return data_; }
private:
uint32_t NumRestarts() const;
@ -36,6 +39,7 @@ class Block {
uint32_t restart_offset_; // Offset in data_ of restart array
bool owned_; // Block owns data_[]
bool cachable_;
CompressionType compression_type_;
// No copying allowed
Block(const Block&);

@ -12,12 +12,14 @@
#include <assert.h>
#include <map>
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/table.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "table/block_based_table_reader.h"
#include "table/block.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
@ -91,6 +93,8 @@ struct BlockBasedTableBuilder::Rep {
bool closed = false; // Either Finish() or Abandon() has been called.
FilterBlockBuilder* filter_block;
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
@ -126,6 +130,11 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(const Options& options,
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
if (options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix(options.block_cache_compressed, file,
&rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size);
}
}
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
@ -284,6 +293,9 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer+1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->status = InsertBlockInCache(block_contents, type, handle);
}
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
@ -294,6 +306,56 @@ Status BlockBasedTableBuilder::status() const {
return rep_->status;
}
static void DeleteCachedBlock(const Slice& key, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
}
//
// Make a copy of the block contents and insert into compressed block cache
//
Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
const CompressionType type,
const BlockHandle* handle) {
Rep* r = rep_;
Cache* block_cache_compressed = r->options.block_cache_compressed.get();
if (type != kNoCompression && block_cache_compressed != nullptr) {
Cache::Handle* cache_handle = nullptr;
size_t size = block_contents.size();
char* ubuf = new char[size]; // make a new copy
memcpy(ubuf, block_contents.data(), size);
BlockContents results;
Slice sl(ubuf, size);
results.data = sl;
results.cachable = true; // XXX
results.heap_allocated = true;
results.compression_type = type;
Block* block = new Block(results);
// make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64(
r->compressed_cache_key_prefix +
r->compressed_cache_key_prefix_size,
handle->offset());
Slice key(r->compressed_cache_key_prefix, static_cast<size_t>
(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
cache_handle = block_cache_compressed->Insert(key, block, block->size(),
&DeleteCachedBlock);
block_cache_compressed->Release(cache_handle);
// Invalidate OS cache.
r->file->InvalidateCache(r->offset, size);
}
return Status::OK();
}
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
Flush();

@ -62,7 +62,8 @@ class BlockBasedTableBuilder : public TableBuilder {
bool ok() const { return status().ok(); }
void WriteBlock(BlockBuilder* block, BlockHandle* handle);
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);
Status InsertBlockInCache(const Slice& block_contents,
const CompressionType type, const BlockHandle* handle);
struct Rep;
Rep* rep_;

@ -11,7 +11,6 @@
#include "db/dbformat.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
@ -51,6 +50,8 @@ struct BlockBasedTable::Rep {
unique_ptr<RandomAccessFile> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t cache_key_prefix_size;
char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
FilterBlockReader* filter;
const char* filter_data;
@ -67,18 +68,44 @@ BlockBasedTable::~BlockBasedTable() {
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) {
assert(kMaxCacheKeyPrefixSize >= 10);
rep->cache_key_prefix_size = 0;
if (rep->options.block_cache) {
rep->cache_key_prefix_size = rep->file->GetUniqueId(rep->cache_key_prefix,
kMaxCacheKeyPrefixSize);
if (rep->cache_key_prefix_size == 0) {
// If the prefix wasn't generated or was too long, we create one from the
// cache.
char* end = EncodeVarint64(rep->cache_key_prefix,
rep->options.block_cache->NewId());
rep->cache_key_prefix_size =
static_cast<size_t>(end - rep->cache_key_prefix);
}
rep->compressed_cache_key_prefix_size = 0;
if (rep->options.block_cache != nullptr) {
GenerateCachePrefix(rep->options.block_cache, rep->file.get(),
&rep->cache_key_prefix[0],
&rep->cache_key_prefix_size);
}
if (rep->options.block_cache_compressed != nullptr) {
GenerateCachePrefix(rep->options.block_cache_compressed, rep->file.get(),
&rep->compressed_cache_key_prefix[0],
&rep->compressed_cache_key_prefix_size);
}
}
void BlockBasedTable::GenerateCachePrefix(shared_ptr<Cache> cc,
RandomAccessFile* file, char* buffer, size_t* size) {
// generate an id from the file
*size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
// If the prefix wasn't generated or was too long,
// create one from the cache.
if (*size == 0) {
char* end = EncodeVarint64(buffer, cc->NewId());
*size = static_cast<size_t>(end - buffer);
}
}
void BlockBasedTable::GenerateCachePrefix(shared_ptr<Cache> cc,
WritableFile* file, char* buffer, size_t* size) {
// generate an id from the file
*size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
// If the prefix wasn't generated or was too long,
// create one from the cache.
if (*size == 0) {
char* end = EncodeVarint64(buffer, cc->NewId());
*size = static_cast<size_t>(end - buffer);
}
}
@ -94,9 +121,11 @@ Status ReadBlock(RandomAccessFile* file,
const BlockHandle& handle,
Block** result,
Env* env,
bool* didIO = nullptr) {
bool* didIO = nullptr,
bool do_uncompress = true) {
BlockContents contents;
Status s = ReadBlockContents(file, options, handle, &contents, env);
Status s = ReadBlockContents(file, options, handle, &contents,
env, do_uncompress);
if (s.ok()) {
*result = new Block(contents);
}
@ -143,6 +172,7 @@ Status BlockBasedTable::Open(const Options& options,
if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
assert(index_block->compressionType() == kNoCompression);
BlockBasedTable::Rep* rep = new BlockBasedTable::Rep(soptions);
rep->options = options;
rep->file = std::move(file);
@ -193,6 +223,7 @@ void BlockBasedTable::ReadMeta(const Footer& footer) {
// Do not propagate errors since meta info is not needed for operation
return;
}
assert(meta->compressionType() == kNoCompression);
Iterator* iter = meta->NewIterator(BytewiseComparator());
// read filter
@ -238,7 +269,7 @@ void BlockBasedTable::ReadFilter(const Slice& filter_handle_value) {
ReadOptions opt;
BlockContents block;
if (!ReadBlockContents(rep_->file.get(), opt, filter_handle, &block,
rep_->options.env).ok()) {
rep_->options.env, false).ok()) {
return;
}
if (block.heap_allocated) {
@ -260,7 +291,8 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) {
ReadOptions(),
handle,
&block_contents,
rep->options.env
rep->options.env,
false
);
if (!s.ok()) {
@ -351,9 +383,13 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
const bool no_io = (options.read_tier == kBlockCacheTier);
BlockBasedTable* table = reinterpret_cast<BlockBasedTable*>(arg);
Cache* block_cache = table->rep_->options.block_cache.get();
Cache* block_cache_compressed = table->rep_->options.
block_cache_compressed.get();
std::shared_ptr<Statistics> statistics = table->rep_->options.statistics;
Block* block = nullptr;
Block* cblock = nullptr;
Cache::Handle* cache_handle = nullptr;
Cache::Handle* compressed_cache_handle = nullptr;
BlockHandle handle;
Slice input = index_value;
@ -362,26 +398,88 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
// can add more features in the future.
if (s.ok()) {
if (block_cache != nullptr) {
if (block_cache != nullptr || block_cache_compressed != nullptr) {
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
const size_t cache_key_prefix_size = table->rep_->cache_key_prefix_size;
assert(cache_key_prefix_size != 0);
assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
memcpy(cache_key, table->rep_->cache_key_prefix,
cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + cache_key_prefix_size,
handle.offset());
Slice key(cache_key, static_cast<size_t>(end-cache_key));
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
char* end = cache_key;
// create key for block cache
if (block_cache != nullptr) {
assert(table->rep_->cache_key_prefix_size != 0);
assert(table->rep_->cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
memcpy(cache_key, table->rep_->cache_key_prefix,
table->rep_->cache_key_prefix_size);
end = EncodeVarint64(cache_key + table->rep_->cache_key_prefix_size,
handle.offset());
}
Slice key(cache_key, static_cast<size_t>(end - cache_key));
// create key for compressed block cache
end = compressed_cache_key;
if (block_cache_compressed != nullptr) {
assert(table->rep_->compressed_cache_key_prefix_size != 0);
assert(table->rep_->compressed_cache_key_prefix_size <=
kMaxCacheKeyPrefixSize);
memcpy(compressed_cache_key, table->rep_->compressed_cache_key_prefix,
table->rep_->compressed_cache_key_prefix_size);
end = EncodeVarint64(compressed_cache_key +
table->rep_->compressed_cache_key_prefix_size,
handle.offset());
}
Slice ckey(compressed_cache_key, static_cast<size_t>
(end - compressed_cache_key));
// Lookup uncompressed cache first
if (block_cache != nullptr) {
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
RecordTick(statistics, BLOCK_CACHE_HIT);
}
}
// If not found in uncompressed cache, lookup compressed cache
if (block == nullptr && block_cache_compressed != nullptr) {
compressed_cache_handle = block_cache_compressed->Lookup(ckey);
// if we found in the compressed cache, then uncompress and
// insert into uncompressed cache
if (compressed_cache_handle != nullptr) {
// found compressed block
cblock = reinterpret_cast<Block*>(block_cache_compressed->
Value(compressed_cache_handle));
assert(cblock->compressionType() != kNoCompression);
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
s = UncompressBlockContents(cblock->data(), cblock->size(),
&contents);
// Insert uncompressed block into block cache
if (s.ok()) {
block = new Block(contents); // uncompressed block
assert(block->compressionType() == kNoCompression);
if (block_cache != nullptr && block->isCachable() &&
options.fill_cache) {
cache_handle = block_cache->Insert(key, block, block->size(),
&DeleteCachedBlock);
assert(reinterpret_cast<Block*>(block_cache->Value(cache_handle))
== block);
}
}
// Release hold on compressed cache entry
block_cache_compressed->Release(compressed_cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT);
}
}
if (block != nullptr) {
BumpPerfCount(&perf_context.block_cache_hit_count);
RecordTick(statistics, BLOCK_CACHE_HIT);
} else if (no_io) {
// Did not find in block_cache and can't do IO
return NewErrorIterator(Status::Incomplete("no blocking io"));
} else {
Histograms histogram = for_compaction ?
READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
{ // block for stop watch
@ -390,19 +488,54 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
table->rep_->file.get(),
options,
handle,
&block,
&cblock,
table->rep_->options.env,
didIO
didIO,
block_cache_compressed == nullptr
);
}
if (s.ok()) {
if (block->isCachable() && options.fill_cache) {
cache_handle = block_cache->Insert(
key, block, block->size(), &DeleteCachedBlock);
assert(cblock->compressionType() == kNoCompression ||
block_cache_compressed != nullptr);
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
if (cblock->compressionType() != kNoCompression) {
s = UncompressBlockContents(cblock->data(), cblock->size(),
&contents);
}
if (s.ok()) {
if (cblock->compressionType() != kNoCompression) {
block = new Block(contents); // uncompressed block
} else {
block = cblock;
cblock = nullptr;
}
if (block->isCachable() && options.fill_cache) {
// Insert compressed block into compressed block cache.
// Release the hold on the compressed cache entry immediately.
if (block_cache_compressed != nullptr && cblock != nullptr) {
compressed_cache_handle = block_cache_compressed->Insert(
ckey, cblock, cblock->size(), &DeleteCachedBlock);
block_cache_compressed->Release(compressed_cache_handle);
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS);
cblock = nullptr;
}
// insert into uncompressed block cache
assert((block->compressionType() == kNoCompression));
if (block_cache != nullptr) {
cache_handle = block_cache->Insert(
key, block, block->size(), &DeleteCachedBlock);
RecordTick(statistics, BLOCK_CACHE_MISS);
assert(reinterpret_cast<Block*>(block_cache->Value(
cache_handle))== block);
}
}
}
}
RecordTick(statistics, BLOCK_CACHE_MISS);
if (cblock != nullptr) {
delete cblock;
}
}
} else if (no_io) {
// Could not read from block_cache and can't do IO
@ -416,10 +549,10 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
Iterator* iter;
if (block != nullptr) {
iter = block->NewIterator(table->rep_->options.comparator);
if (cache_handle == nullptr) {
iter->RegisterCleanup(&DeleteBlock, block, nullptr);
} else {
if (cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
} else {
iter->RegisterCleanup(&DeleteBlock, block, nullptr);
}
} else {
iter = NewErrorIterator(s);

@ -10,10 +10,12 @@
#pragma once
#include <memory>
#include <stdint.h>
#include "rocksdb/cache.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/table_stats.h"
#include "rocksdb/table.h"
#include "util/coding.h"
namespace rocksdb {
@ -103,6 +105,7 @@ class BlockBasedTable : public TableReader {
// after a call to Seek(key), until handle_result returns false.
// May not make such a call if filter policy says that key is not present.
friend class TableCache;
friend class BlockBasedTableBuilder;
void ReadMeta(const Footer& footer);
void ReadFilter(const Slice& filter_handle_value);
@ -114,6 +117,15 @@ class BlockBasedTable : public TableReader {
compaction_optimized_(false) {
rep_ = rep;
}
// Generate a cache key prefix from the file
static void GenerateCachePrefix(shared_ptr<Cache> cc,
RandomAccessFile* file, char* buffer, size_t* size);
static void GenerateCachePrefix(shared_ptr<Cache> cc,
WritableFile* file, char* buffer, size_t* size);
// The longest prefix of the cache key used to identify blocks.
// For Posix files the unique ID is three varints.
static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1;
// No copying allowed
explicit BlockBasedTable(const TableReader&) = delete;

@ -76,7 +76,8 @@ Status ReadBlockContents(RandomAccessFile* file,
const ReadOptions& options,
const BlockHandle& handle,
BlockContents* result,
Env* env) {
Env* env,
bool do_uncompress) {
result->data = Slice();
result->cachable = false;
result->heap_allocated = false;
@ -116,41 +117,55 @@ Status ReadBlockContents(RandomAccessFile* file,
BumpPerfTime(&perf_context.block_checksum_time, &timer);
}
// If the caller has requested that the block not be uncompressed
if (!do_uncompress || data[n] == kNoCompression) {
if (data != buf) {
// File implementation gave us pointer to some other data.
// Use it directly under the assumption that it will be live
// while the file is open.
delete[] buf;
result->data = Slice(data, n);
result->heap_allocated = false;
result->cachable = false; // Do not double-cache
} else {
result->data = Slice(buf, n);
result->heap_allocated = true;
result->cachable = true;
}
result->compression_type = (rocksdb::CompressionType)data[n];
s = Status::OK();
} else {
s = UncompressBlockContents(buf, n, result);
delete[] buf;
}
BumpPerfTime(&perf_context.block_decompress_time, &timer);
return s;
}
//
// The 'data' points to the raw block contents that was read in from file.
// This method allocates a new heap buffer and the raw block
// contents are uncompresed into this buffer. This
// buffer is returned via 'result' and it is upto the caller to
// free this buffer.
Status UncompressBlockContents(const char* data, size_t n,
BlockContents* result) {
char* ubuf = nullptr;
int decompress_size = 0;
assert(data[n] != kNoCompression);
switch (data[n]) {
case kNoCompression:
if (data != buf) {
// File implementation gave us pointer to some other data.
// Use it directly under the assumption that it will be live
// while the file is open.
delete[] buf;
result->data = Slice(data, n);
result->heap_allocated = false;
result->cachable = false; // Do not double-cache
} else {
result->data = Slice(buf, n);
result->heap_allocated = true;
result->cachable = true;
}
// Ok
break;
case kSnappyCompression: {
size_t ulength = 0;
static char snappy_corrupt_msg[] =
"Snappy not supported or corrupted Snappy compressed block contents";
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption(snappy_corrupt_msg);
}
ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption(snappy_corrupt_msg);
}
delete[] buf;
result->data = Slice(ubuf, ulength);
result->heap_allocated = true;
result->cachable = true;
@ -161,10 +176,8 @@ Status ReadBlockContents(RandomAccessFile* file,
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
if (!ubuf) {
delete[] buf;
return Status::Corruption(zlib_corrupt_msg);
}
delete[] buf;
result->data = Slice(ubuf, decompress_size);
result->heap_allocated = true;
result->cachable = true;
@ -174,21 +187,16 @@ Status ReadBlockContents(RandomAccessFile* file,
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
if (!ubuf) {
delete[] buf;
return Status::Corruption(bzip2_corrupt_msg);
}
delete[] buf;
result->data = Slice(ubuf, decompress_size);
result->heap_allocated = true;
result->cachable = true;
break;
default:
delete[] buf;
return Status::Corruption("bad block type");
}
BumpPerfTime(&perf_context.block_decompress_time, &timer);
result->compression_type = kNoCompression; // not compressed any more
return Status::OK();
}

@ -12,6 +12,7 @@
#include <stdint.h>
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
namespace rocksdb {
@ -90,6 +91,7 @@ struct BlockContents {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
bool heap_allocated; // True iff caller should delete[] data.data()
CompressionType compression_type;
};
// Read the block identified by "handle" from "file". On failure
@ -98,7 +100,17 @@ extern Status ReadBlockContents(RandomAccessFile* file,
const ReadOptions& options,
const BlockHandle& handle,
BlockContents* result,
Env* env);
Env* env,
bool do_uncompress);
// The 'data' points to the raw block contents read in from file.
// This method allocates a new heap buffer and the raw block
// contents are uncompresed into this buffer. This buffer is
// returned via 'result' and it is upto the caller to
// free this buffer.
extern Status UncompressBlockContents(const char* data,
size_t n,
BlockContents* result);
// Implementation details follow. Clients should ignore,

@ -115,6 +115,10 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files,
"Maximum number of files to keep open at the same time "
"(use default if == 0)");
DEFINE_int64(compressed_cache_size, -1,
"Number of bytes to use as a cache of compressed data."
" Negative means use default settings.");
DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, "");
DEFINE_int32(level0_file_num_compaction_trigger,
@ -672,6 +676,9 @@ class StressTest {
public:
StressTest()
: cache_(NewLRUCache(FLAGS_cache_size)),
compressed_cache_(FLAGS_compressed_cache_size >= 0 ?
NewLRUCache(FLAGS_compressed_cache_size) :
nullptr),
filter_policy_(FLAGS_bloom_bits >= 0
? NewBloomFilterPolicy(FLAGS_bloom_bits)
: nullptr),
@ -1341,6 +1348,7 @@ class StressTest {
assert(db_ == nullptr);
Options options;
options.block_cache = cache_;
options.block_cache_compressed = compressed_cache_;
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.min_write_buffer_number_to_merge =
@ -1469,6 +1477,7 @@ class StressTest {
private:
shared_ptr<Cache> cache_;
shared_ptr<Cache> compressed_cache_;
const FilterPolicy* filter_policy_;
const SliceTransform* prefix_extractor_;
DB* db_;

@ -101,6 +101,36 @@ static void TestKillRandom(int odds, const std::string& srcfile,
#endif
#if defined(OS_LINUX)
namespace {
static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
if (max_size < kMaxVarint64Length*3) {
return 0;
}
struct stat buf;
int result = fstat(fd, &buf);
if (result == -1) {
return 0;
}
long version = 0;
result = ioctl(fd, FS_IOC_GETVERSION, &version);
if (result == -1) {
return 0;
}
uint64_t uversion = (uint64_t)version;
char* rid = id;
rid = EncodeVarint64(rid, buf.st_dev);
rid = EncodeVarint64(rid, buf.st_ino);
rid = EncodeVarint64(rid, uversion);
assert(rid >= id);
return static_cast<size_t>(rid-id);
}
}
#endif
class PosixSequentialFile: public SequentialFile {
private:
std::string filename_;
@ -187,30 +217,7 @@ class PosixRandomAccessFile: public RandomAccessFile {
#if defined(OS_LINUX)
virtual size_t GetUniqueId(char* id, size_t max_size) const {
// TODO: possibly allow this function to handle tighter bounds.
if (max_size < kMaxVarint64Length*3) {
return 0;
}
struct stat buf;
int result = fstat(fd_, &buf);
if (result == -1) {
return 0;
}
long version = 0;
result = ioctl(fd_, FS_IOC_GETVERSION, &version);
if (result == -1) {
return 0;
}
uint64_t uversion = (uint64_t)version;
char* rid = id;
rid = EncodeVarint64(rid, buf.st_dev);
rid = EncodeVarint64(rid, buf.st_ino);
rid = EncodeVarint64(rid, uversion);
assert(rid >= id);
return static_cast<size_t>(rid-id);
return GetUniqueIdFromFile(fd_, id, max_size);
}
#endif
@ -711,6 +718,9 @@ class PosixWritableFile : public WritableFile {
return IOError(filename_, errno);
}
}
virtual size_t GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(fd_, id, max_size);
}
#endif
};

@ -34,6 +34,8 @@ Options::Options()
max_write_buffer_number(2),
min_write_buffer_number_to_merge(1),
max_open_files(1000),
block_cache(nullptr),
block_cache_compressed(nullptr),
block_size(4096),
block_restart_interval(16),
compression(kSnappyCompression),
@ -129,10 +131,16 @@ Options::Dump(Logger* log) const
Log(log," Options.max_write_buffer_number: %d", max_write_buffer_number);
Log(log," Options.max_open_files: %d", max_open_files);
Log(log," Options.block_cache: %p", block_cache.get());
Log(log," Options.block_cache_compressed: %p",
block_cache_compressed.get());
if (block_cache) {
Log(log," Options.block_cache_size: %zd",
Log(log," Options.block_cache_size: %zd",
block_cache->GetCapacity());
}
if (block_cache_compressed) {
Log(log,"Options.block_cache_compressed_size: %zd",
block_cache_compressed->GetCapacity());
}
Log(log," Options.block_size: %zd", block_size);
Log(log," Options.block_restart_interval: %d", block_restart_interval);
if (!compression_per_level.empty()) {

Loading…
Cancel
Save