From b4ad5e89ae1f5017b7cdd1ef261b146b645d9f3c Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Sun, 1 Sep 2013 23:23:40 -0700 Subject: [PATCH] Implement a compressed block cache. Summary: Rocksdb can now support a uncompressed block cache, or a compressed block cache or both. Lookups first look for a block in the uncompressed cache, if it is not found only then it is looked up in the compressed cache. If it is found in the compressed cache, then it is uncompressed and inserted into the uncompressed cache. It is possible that the same block resides in the compressed cache as well as the uncompressed cache at the same time. Both caches have their own individual LRU policy. Test Plan: Unit test case attached. Reviewers: kailiu, sdong, haobo, leveldb Reviewed By: haobo CC: xjin, haobo Differential Revision: https://reviews.facebook.net/D12675 --- db/db_bench.cc | 9 ++ db/db_impl.cc | 10 +- db/db_impl.h | 2 +- db/db_test.cc | 89 ++++++++++++ include/rocksdb/env.h | 5 + include/rocksdb/options.h | 9 +- include/rocksdb/statistics.h | 7 +- table/block.cc | 3 +- table/block.h | 4 + table/block_based_table_builder.cc | 62 +++++++++ table/block_based_table_builder.h | 3 +- table/block_based_table_reader.cc | 213 +++++++++++++++++++++++------ table/block_based_table_reader.h | 12 ++ table/format.cc | 66 +++++---- table/format.h | 14 +- tools/db_stress.cc | 9 ++ util/env_posix.cc | 58 ++++---- util/options.cc | 10 +- 18 files changed, 482 insertions(+), 103 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 2c5c56d53..e322fdef5 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -198,6 +198,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed" DEFINE_int32(block_size, rocksdb::Options().block_size, "Number of bytes in a block."); +DEFINE_int64(compressed_cache_size, -1, + "Number of bytes to use as a cache of compressed data."); + DEFINE_int32(open_files, rocksdb::Options().max_open_files, "Maximum number of files to keep open at the same time" " (use default if == 0)"); @@ -752,6 +755,7 @@ class Duration { class Benchmark { private: shared_ptr cache_; + shared_ptr compressed_cache_; const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; DB* db_; @@ -907,6 +911,10 @@ class Benchmark { NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits, FLAGS_cache_remove_scan_count_limit) : NewLRUCache(FLAGS_cache_size)) : nullptr), + compressed_cache_(FLAGS_compressed_cache_size >= 0 ? + (FLAGS_cache_numshardbits >= 1 ? + NewLRUCache(FLAGS_compressed_cache_size, FLAGS_cache_numshardbits) : + NewLRUCache(FLAGS_compressed_cache_size)) : nullptr), filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), @@ -1275,6 +1283,7 @@ class Benchmark { Options options; options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; + options.block_cache_compressed = compressed_cache_; if (cache_ == nullptr) { options.no_block_cache = true; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 88a95f718..d214a55c8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1679,7 +1679,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); status = DoCompactionWork(compact); - CleanupCompaction(compact); + CleanupCompaction(compact, status); versions_->ReleaseCompactionFiles(c.get(), status); c->ReleaseInputs(); FindObsoleteFiles(deletion_state); @@ -1728,7 +1728,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, return status; } -void DBImpl::CleanupCompaction(CompactionState* compact) { +void DBImpl::CleanupCompaction(CompactionState* compact, Status status) { mutex_.AssertHeld(); if (compact->builder != nullptr) { // May happen if we get a shutdown call in the middle of compaction @@ -1740,6 +1740,12 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; pending_outputs_.erase(out.number); + + // If this file was inserted into the table cache then remove + // them here because this compaction was not committed. + if (!status.ok()) { + table_cache_->Evict(out.number); + } } delete compact; } diff --git a/db/db_impl.h b/db/db_impl.h index 556be2c77..5654e1773 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -194,7 +194,7 @@ class DBImpl : public DB { void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); Status BackgroundFlush(bool* madeProgress); - void CleanupCompaction(CompactionState* compact); + void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact); diff --git a/db/db_test.cc b/db/db_test.cc index aaf50cbd5..d17467e65 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -262,6 +262,7 @@ class DBTest { kDeletesFilterFirst, kPrefixHashRep, kUniversalCompaction, + kCompressedBlockCache, kEnd }; int option_config_; @@ -388,6 +389,9 @@ class DBTest { case kUniversalCompaction: options.compaction_style = kCompactionStyleUniversal; break; + case kCompressedBlockCache: + options.block_cache_compressed = NewLRUCache(8*1024*1024); + break; default: break; } @@ -452,6 +456,7 @@ class DBTest { std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { ReadOptions options; + options.verify_checksums = true; options.snapshot = snapshot; std::string result; Status s = db_->Get(options, k, &result); @@ -1753,6 +1758,90 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } } +TEST(DBTest, CompressedCache) { + int num_iter = 80; + + // Run this test three iterations. + // Iteration 1: only a uncompressed block cache + // Iteration 2: only a compressed block cache + // Iteration 3: both block cache and compressed cache + for (int iter = 0; iter < 3; iter++) { + Options options = CurrentOptions(); + options.write_buffer_size = 64*1024; // small write buffer + options.statistics = rocksdb::CreateDBStatistics(); + + switch (iter) { + case 0: + // only uncompressed block cache + options.block_cache = NewLRUCache(8*1024); + options.block_cache_compressed = nullptr; + break; + case 1: + // no block cache, only compressed cache + options.no_block_cache = true; + options.block_cache = nullptr; + options.block_cache_compressed = NewLRUCache(8*1024); + break; + case 2: + // both compressed and uncompressed block cache + options.block_cache = NewLRUCache(1024); + options.block_cache_compressed = NewLRUCache(8*1024); + break; + default: + ASSERT_TRUE(false); + } + Reopen(&options); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + std::vector values; + Slice str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = RandomString(&rnd, 100000); + } + values.push_back(str.ToString(true)); + ASSERT_OK(Put(Key(i), values[i])); + } + + // flush all data from memtable so that reads are from block cache + dbfull()->Flush(FlushOptions()); + + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } + + // check that we triggered the appropriate code paths in the cache + switch (iter) { + case 0: + // only uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_EQ(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 1: + // no block cache, only compressed cache + ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 2: + // both compressed and uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + default: + ASSERT_TRUE(false); + } + } +} + TEST(DBTest, CompactionTrigger) { Options options = CurrentOptions(); options.write_buffer_size = 100<<10; //100KB diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 78ac83427..c9b2befa5 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -373,6 +373,11 @@ class WritableFile { *block_size = preallocation_block_size_; } + // For documentation, refer to RandomAccessFile::GetUniqueId() + virtual size_t GetUniqueId(char* id, size_t max_size) const { + return 0; // Default implementation to prevent issues with backwards + } + // Remove any kind of caching of data from the offset to offset+length // of this file. If the length is 0, then it refers to the end of file. // If the system is not caching the file contents, then this is a noop. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 907d4c7ca..055019e76 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -39,7 +39,7 @@ using std::shared_ptr; // sequence of key,value pairs. Each block may be compressed before // being stored in a file. The following enum describes which // compression method (if any) is used to compress a block. -enum CompressionType { +enum CompressionType : char { // NOTE: do not change the values of existing entries, as these are // part of the persistent format on disk. kNoCompression = 0x0, @@ -48,7 +48,7 @@ enum CompressionType { kBZip2Compression = 0x3 }; -enum CompactionStyle { +enum CompactionStyle : char { kCompactionStyleLevel = 0x0, // level based compaction style kCompactionStyleUniversal = 0x1 // Universal compaction style }; @@ -177,6 +177,11 @@ struct Options { // Default: nullptr shared_ptr block_cache; + // If non-NULL use the specified cache for compressed blocks. + // If NULL, rocksdb will not use a compressed block cache. + // Default: nullptr + shared_ptr block_cache_compressed; + // Approximate size of user data packed per block. Note that the // block size specified here corresponds to uncompressed data. The // actual size of the unit read from disk may be smaller if diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index e9302b973..44cf1d3ab 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -82,6 +82,9 @@ enum Tickers { // transaction log iterator refreshes GET_UPDATES_SINCE_CALLS, + BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache + BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache + TICKER_ENUM_MAX }; @@ -116,7 +119,9 @@ const std::vector> TickersNameMap = { { BLOOM_FILTER_PREFIX_CHECKED, "rocksdb.bloom.filter.prefix.checked" }, { BLOOM_FILTER_PREFIX_USEFUL, "rocksdb.bloom.filter.prefix.useful" }, { NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration" }, - { GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" } + { GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" }, + { BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss" }, + { BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit" } }; /** diff --git a/table/block.cc b/table/block.cc index fbcf722c4..3f969fe2a 100644 --- a/table/block.cc +++ b/table/block.cc @@ -29,7 +29,8 @@ Block::Block(const BlockContents& contents) : data_(contents.data.data()), size_(contents.data.size()), owned_(contents.heap_allocated), - cachable_(contents.cachable) { + cachable_(contents.cachable), + compression_type_(contents.compression_type) { if (size_ < sizeof(uint32_t)) { size_ = 0; // Error marker } else { diff --git a/table/block.h b/table/block.h index ed936f15f..7fac00657 100644 --- a/table/block.h +++ b/table/block.h @@ -11,6 +11,7 @@ #include #include #include "rocksdb/iterator.h" +#include "rocksdb/options.h" namespace rocksdb { @@ -26,7 +27,9 @@ class Block { size_t size() const { return size_; } bool isCachable() const { return cachable_; } + CompressionType compressionType() const { return compression_type_; } Iterator* NewIterator(const Comparator* comparator); + const char* data() { return data_; } private: uint32_t NumRestarts() const; @@ -36,6 +39,7 @@ class Block { uint32_t restart_offset_; // Offset in data_ of restart array bool owned_; // Block owns data_[] bool cachable_; + CompressionType compression_type_; // No copying allowed Block(const Block&); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index fde6c81e8..7b00d6708 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -12,12 +12,14 @@ #include #include +#include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/table.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" #include "table/block_based_table_reader.h" +#include "table/block.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" @@ -91,6 +93,8 @@ struct BlockBasedTableBuilder::Rep { bool closed = false; // Either Finish() or Abandon() has been called. FilterBlockBuilder* filter_block; + char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; + size_t compressed_cache_key_prefix_size; // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter @@ -126,6 +130,11 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(const Options& options, if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } + if (options.block_cache_compressed.get() != nullptr) { + BlockBasedTable::GenerateCachePrefix(options.block_cache_compressed, file, + &rep_->compressed_cache_key_prefix[0], + &rep_->compressed_cache_key_prefix_size); + } } BlockBasedTableBuilder::~BlockBasedTableBuilder() { @@ -284,6 +293,9 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type EncodeFixed32(trailer+1, crc32c::Mask(crc)); r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); + if (r->status.ok()) { + r->status = InsertBlockInCache(block_contents, type, handle); + } if (r->status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; } @@ -294,6 +306,56 @@ Status BlockBasedTableBuilder::status() const { return rep_->status; } +static void DeleteCachedBlock(const Slice& key, void* value) { + Block* block = reinterpret_cast(value); + delete block; +} + +// +// Make a copy of the block contents and insert into compressed block cache +// +Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, + const CompressionType type, + const BlockHandle* handle) { + Rep* r = rep_; + Cache* block_cache_compressed = r->options.block_cache_compressed.get(); + + if (type != kNoCompression && block_cache_compressed != nullptr) { + + Cache::Handle* cache_handle = nullptr; + size_t size = block_contents.size(); + + char* ubuf = new char[size]; // make a new copy + memcpy(ubuf, block_contents.data(), size); + + BlockContents results; + Slice sl(ubuf, size); + results.data = sl; + results.cachable = true; // XXX + results.heap_allocated = true; + results.compression_type = type; + + Block* block = new Block(results); + + // make cache key by appending the file offset to the cache prefix id + char* end = EncodeVarint64( + r->compressed_cache_key_prefix + + r->compressed_cache_key_prefix_size, + handle->offset()); + Slice key(r->compressed_cache_key_prefix, static_cast + (end - r->compressed_cache_key_prefix)); + + // Insert into compressed block cache. + cache_handle = block_cache_compressed->Insert(key, block, block->size(), + &DeleteCachedBlock); + block_cache_compressed->Release(cache_handle); + + // Invalidate OS cache. + r->file->InvalidateCache(r->offset, size); + } + return Status::OK(); +} + Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; Flush(); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index b7c82b68f..87307a0cb 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -62,7 +62,8 @@ class BlockBasedTableBuilder : public TableBuilder { bool ok() const { return status().ok(); } void WriteBlock(BlockBuilder* block, BlockHandle* handle); void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); - + Status InsertBlockInCache(const Slice& block_contents, + const CompressionType type, const BlockHandle* handle); struct Rep; Rep* rep_; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index bec087033..a1c11e9d2 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -11,7 +11,6 @@ #include "db/dbformat.h" -#include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" @@ -51,6 +50,8 @@ struct BlockBasedTable::Rep { unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t cache_key_prefix_size; + char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; + size_t compressed_cache_key_prefix_size; FilterBlockReader* filter; const char* filter_data; @@ -67,18 +68,44 @@ BlockBasedTable::~BlockBasedTable() { void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { assert(kMaxCacheKeyPrefixSize >= 10); rep->cache_key_prefix_size = 0; - if (rep->options.block_cache) { - rep->cache_key_prefix_size = rep->file->GetUniqueId(rep->cache_key_prefix, - kMaxCacheKeyPrefixSize); - - if (rep->cache_key_prefix_size == 0) { - // If the prefix wasn't generated or was too long, we create one from the - // cache. - char* end = EncodeVarint64(rep->cache_key_prefix, - rep->options.block_cache->NewId()); - rep->cache_key_prefix_size = - static_cast(end - rep->cache_key_prefix); - } + rep->compressed_cache_key_prefix_size = 0; + if (rep->options.block_cache != nullptr) { + GenerateCachePrefix(rep->options.block_cache, rep->file.get(), + &rep->cache_key_prefix[0], + &rep->cache_key_prefix_size); + } + if (rep->options.block_cache_compressed != nullptr) { + GenerateCachePrefix(rep->options.block_cache_compressed, rep->file.get(), + &rep->compressed_cache_key_prefix[0], + &rep->compressed_cache_key_prefix_size); + } +} + +void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, + RandomAccessFile* file, char* buffer, size_t* size) { + + // generate an id from the file + *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); + + // If the prefix wasn't generated or was too long, + // create one from the cache. + if (*size == 0) { + char* end = EncodeVarint64(buffer, cc->NewId()); + *size = static_cast(end - buffer); + } +} + +void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, + WritableFile* file, char* buffer, size_t* size) { + + // generate an id from the file + *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); + + // If the prefix wasn't generated or was too long, + // create one from the cache. + if (*size == 0) { + char* end = EncodeVarint64(buffer, cc->NewId()); + *size = static_cast(end - buffer); } } @@ -94,9 +121,11 @@ Status ReadBlock(RandomAccessFile* file, const BlockHandle& handle, Block** result, Env* env, - bool* didIO = nullptr) { + bool* didIO = nullptr, + bool do_uncompress = true) { BlockContents contents; - Status s = ReadBlockContents(file, options, handle, &contents, env); + Status s = ReadBlockContents(file, options, handle, &contents, + env, do_uncompress); if (s.ok()) { *result = new Block(contents); } @@ -143,6 +172,7 @@ Status BlockBasedTable::Open(const Options& options, if (s.ok()) { // We've successfully read the footer and the index block: we're // ready to serve requests. + assert(index_block->compressionType() == kNoCompression); BlockBasedTable::Rep* rep = new BlockBasedTable::Rep(soptions); rep->options = options; rep->file = std::move(file); @@ -193,6 +223,7 @@ void BlockBasedTable::ReadMeta(const Footer& footer) { // Do not propagate errors since meta info is not needed for operation return; } + assert(meta->compressionType() == kNoCompression); Iterator* iter = meta->NewIterator(BytewiseComparator()); // read filter @@ -238,7 +269,7 @@ void BlockBasedTable::ReadFilter(const Slice& filter_handle_value) { ReadOptions opt; BlockContents block; if (!ReadBlockContents(rep_->file.get(), opt, filter_handle, &block, - rep_->options.env).ok()) { + rep_->options.env, false).ok()) { return; } if (block.heap_allocated) { @@ -260,7 +291,8 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { ReadOptions(), handle, &block_contents, - rep->options.env + rep->options.env, + false ); if (!s.ok()) { @@ -351,9 +383,13 @@ Iterator* BlockBasedTable::BlockReader(void* arg, const bool no_io = (options.read_tier == kBlockCacheTier); BlockBasedTable* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); + Cache* block_cache_compressed = table->rep_->options. + block_cache_compressed.get(); std::shared_ptr statistics = table->rep_->options.statistics; Block* block = nullptr; + Block* cblock = nullptr; Cache::Handle* cache_handle = nullptr; + Cache::Handle* compressed_cache_handle = nullptr; BlockHandle handle; Slice input = index_value; @@ -362,26 +398,88 @@ Iterator* BlockBasedTable::BlockReader(void* arg, // can add more features in the future. if (s.ok()) { - if (block_cache != nullptr) { + if (block_cache != nullptr || block_cache_compressed != nullptr) { char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - const size_t cache_key_prefix_size = table->rep_->cache_key_prefix_size; - assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); - memcpy(cache_key, table->rep_->cache_key_prefix, - cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + cache_key_prefix_size, - handle.offset()); - Slice key(cache_key, static_cast(end-cache_key)); - cache_handle = block_cache->Lookup(key); - if (cache_handle != nullptr) { - block = reinterpret_cast(block_cache->Value(cache_handle)); + char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + char* end = cache_key; + + // create key for block cache + if (block_cache != nullptr) { + assert(table->rep_->cache_key_prefix_size != 0); + assert(table->rep_->cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + memcpy(cache_key, table->rep_->cache_key_prefix, + table->rep_->cache_key_prefix_size); + end = EncodeVarint64(cache_key + table->rep_->cache_key_prefix_size, + handle.offset()); + } + Slice key(cache_key, static_cast(end - cache_key)); + + // create key for compressed block cache + end = compressed_cache_key; + if (block_cache_compressed != nullptr) { + assert(table->rep_->compressed_cache_key_prefix_size != 0); + assert(table->rep_->compressed_cache_key_prefix_size <= + kMaxCacheKeyPrefixSize); + memcpy(compressed_cache_key, table->rep_->compressed_cache_key_prefix, + table->rep_->compressed_cache_key_prefix_size); + end = EncodeVarint64(compressed_cache_key + + table->rep_->compressed_cache_key_prefix_size, + handle.offset()); + } + Slice ckey(compressed_cache_key, static_cast + (end - compressed_cache_key)); + + // Lookup uncompressed cache first + if (block_cache != nullptr) { + cache_handle = block_cache->Lookup(key); + if (cache_handle != nullptr) { + block = reinterpret_cast(block_cache->Value(cache_handle)); + RecordTick(statistics, BLOCK_CACHE_HIT); + } + } + // If not found in uncompressed cache, lookup compressed cache + if (block == nullptr && block_cache_compressed != nullptr) { + compressed_cache_handle = block_cache_compressed->Lookup(ckey); + + // if we found in the compressed cache, then uncompress and + // insert into uncompressed cache + if (compressed_cache_handle != nullptr) { + // found compressed block + cblock = reinterpret_cast(block_cache_compressed-> + Value(compressed_cache_handle)); + assert(cblock->compressionType() != kNoCompression); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + s = UncompressBlockContents(cblock->data(), cblock->size(), + &contents); + + // Insert uncompressed block into block cache + if (s.ok()) { + block = new Block(contents); // uncompressed block + assert(block->compressionType() == kNoCompression); + if (block_cache != nullptr && block->isCachable() && + options.fill_cache) { + cache_handle = block_cache->Insert(key, block, block->size(), + &DeleteCachedBlock); + assert(reinterpret_cast(block_cache->Value(cache_handle)) + == block); + } + } + // Release hold on compressed cache entry + block_cache_compressed->Release(compressed_cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); + } + } + + if (block != nullptr) { BumpPerfCount(&perf_context.block_cache_hit_count); - RecordTick(statistics, BLOCK_CACHE_HIT); } else if (no_io) { // Did not find in block_cache and can't do IO return NewErrorIterator(Status::Incomplete("no blocking io")); } else { + Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; { // block for stop watch @@ -390,19 +488,54 @@ Iterator* BlockBasedTable::BlockReader(void* arg, table->rep_->file.get(), options, handle, - &block, + &cblock, table->rep_->options.env, - didIO + didIO, + block_cache_compressed == nullptr ); } if (s.ok()) { - if (block->isCachable() && options.fill_cache) { - cache_handle = block_cache->Insert( - key, block, block->size(), &DeleteCachedBlock); + assert(cblock->compressionType() == kNoCompression || + block_cache_compressed != nullptr); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + if (cblock->compressionType() != kNoCompression) { + s = UncompressBlockContents(cblock->data(), cblock->size(), + &contents); + } + if (s.ok()) { + if (cblock->compressionType() != kNoCompression) { + block = new Block(contents); // uncompressed block + } else { + block = cblock; + cblock = nullptr; + } + if (block->isCachable() && options.fill_cache) { + // Insert compressed block into compressed block cache. + // Release the hold on the compressed cache entry immediately. + if (block_cache_compressed != nullptr && cblock != nullptr) { + compressed_cache_handle = block_cache_compressed->Insert( + ckey, cblock, cblock->size(), &DeleteCachedBlock); + block_cache_compressed->Release(compressed_cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); + cblock = nullptr; + } + // insert into uncompressed block cache + assert((block->compressionType() == kNoCompression)); + if (block_cache != nullptr) { + cache_handle = block_cache->Insert( + key, block, block->size(), &DeleteCachedBlock); + RecordTick(statistics, BLOCK_CACHE_MISS); + assert(reinterpret_cast(block_cache->Value( + cache_handle))== block); + } + } } } - - RecordTick(statistics, BLOCK_CACHE_MISS); + if (cblock != nullptr) { + delete cblock; + } } } else if (no_io) { // Could not read from block_cache and can't do IO @@ -416,10 +549,10 @@ Iterator* BlockBasedTable::BlockReader(void* arg, Iterator* iter; if (block != nullptr) { iter = block->NewIterator(table->rep_->options.comparator); - if (cache_handle == nullptr) { - iter->RegisterCleanup(&DeleteBlock, block, nullptr); - } else { + if (cache_handle != nullptr) { iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); + } else { + iter->RegisterCleanup(&DeleteBlock, block, nullptr); } } else { iter = NewErrorIterator(s); diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 848c55655..b6e87b2f8 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -10,10 +10,12 @@ #pragma once #include #include +#include "rocksdb/cache.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/table_stats.h" #include "rocksdb/table.h" +#include "util/coding.h" namespace rocksdb { @@ -103,6 +105,7 @@ class BlockBasedTable : public TableReader { // after a call to Seek(key), until handle_result returns false. // May not make such a call if filter policy says that key is not present. friend class TableCache; + friend class BlockBasedTableBuilder; void ReadMeta(const Footer& footer); void ReadFilter(const Slice& filter_handle_value); @@ -114,6 +117,15 @@ class BlockBasedTable : public TableReader { compaction_optimized_(false) { rep_ = rep; } + // Generate a cache key prefix from the file + static void GenerateCachePrefix(shared_ptr cc, + RandomAccessFile* file, char* buffer, size_t* size); + static void GenerateCachePrefix(shared_ptr cc, + WritableFile* file, char* buffer, size_t* size); + + // The longest prefix of the cache key used to identify blocks. + // For Posix files the unique ID is three varints. + static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; // No copying allowed explicit BlockBasedTable(const TableReader&) = delete; diff --git a/table/format.cc b/table/format.cc index fdda0c9ca..1eb50ef33 100644 --- a/table/format.cc +++ b/table/format.cc @@ -76,7 +76,8 @@ Status ReadBlockContents(RandomAccessFile* file, const ReadOptions& options, const BlockHandle& handle, BlockContents* result, - Env* env) { + Env* env, + bool do_uncompress) { result->data = Slice(); result->cachable = false; result->heap_allocated = false; @@ -116,41 +117,55 @@ Status ReadBlockContents(RandomAccessFile* file, BumpPerfTime(&perf_context.block_checksum_time, &timer); } + // If the caller has requested that the block not be uncompressed + if (!do_uncompress || data[n] == kNoCompression) { + if (data != buf) { + // File implementation gave us pointer to some other data. + // Use it directly under the assumption that it will be live + // while the file is open. + delete[] buf; + result->data = Slice(data, n); + result->heap_allocated = false; + result->cachable = false; // Do not double-cache + } else { + result->data = Slice(buf, n); + result->heap_allocated = true; + result->cachable = true; + } + result->compression_type = (rocksdb::CompressionType)data[n]; + s = Status::OK(); + } else { + s = UncompressBlockContents(buf, n, result); + delete[] buf; + } + BumpPerfTime(&perf_context.block_decompress_time, &timer); + return s; +} + +// +// The 'data' points to the raw block contents that was read in from file. +// This method allocates a new heap buffer and the raw block +// contents are uncompresed into this buffer. This +// buffer is returned via 'result' and it is upto the caller to +// free this buffer. +Status UncompressBlockContents(const char* data, size_t n, + BlockContents* result) { char* ubuf = nullptr; int decompress_size = 0; + assert(data[n] != kNoCompression); switch (data[n]) { - case kNoCompression: - if (data != buf) { - // File implementation gave us pointer to some other data. - // Use it directly under the assumption that it will be live - // while the file is open. - delete[] buf; - result->data = Slice(data, n); - result->heap_allocated = false; - result->cachable = false; // Do not double-cache - } else { - result->data = Slice(buf, n); - result->heap_allocated = true; - result->cachable = true; - } - - // Ok - break; case kSnappyCompression: { size_t ulength = 0; static char snappy_corrupt_msg[] = "Snappy not supported or corrupted Snappy compressed block contents"; if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { - delete[] buf; return Status::Corruption(snappy_corrupt_msg); } ubuf = new char[ulength]; if (!port::Snappy_Uncompress(data, n, ubuf)) { - delete[] buf; delete[] ubuf; return Status::Corruption(snappy_corrupt_msg); } - delete[] buf; result->data = Slice(ubuf, ulength); result->heap_allocated = true; result->cachable = true; @@ -161,10 +176,8 @@ Status ReadBlockContents(RandomAccessFile* file, static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; if (!ubuf) { - delete[] buf; return Status::Corruption(zlib_corrupt_msg); } - delete[] buf; result->data = Slice(ubuf, decompress_size); result->heap_allocated = true; result->cachable = true; @@ -174,21 +187,16 @@ Status ReadBlockContents(RandomAccessFile* file, static char bzip2_corrupt_msg[] = "Bzip2 not supported or corrupted Bzip2 compressed block contents"; if (!ubuf) { - delete[] buf; return Status::Corruption(bzip2_corrupt_msg); } - delete[] buf; result->data = Slice(ubuf, decompress_size); result->heap_allocated = true; result->cachable = true; break; default: - delete[] buf; return Status::Corruption("bad block type"); } - - BumpPerfTime(&perf_context.block_decompress_time, &timer); - + result->compression_type = kNoCompression; // not compressed any more return Status::OK(); } diff --git a/table/format.h b/table/format.h index 82f86884e..2f1c1e8dc 100644 --- a/table/format.h +++ b/table/format.h @@ -12,6 +12,7 @@ #include #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/options.h" #include "rocksdb/table.h" namespace rocksdb { @@ -90,6 +91,7 @@ struct BlockContents { Slice data; // Actual contents of data bool cachable; // True iff data can be cached bool heap_allocated; // True iff caller should delete[] data.data() + CompressionType compression_type; }; // Read the block identified by "handle" from "file". On failure @@ -98,7 +100,17 @@ extern Status ReadBlockContents(RandomAccessFile* file, const ReadOptions& options, const BlockHandle& handle, BlockContents* result, - Env* env); + Env* env, + bool do_uncompress); + +// The 'data' points to the raw block contents read in from file. +// This method allocates a new heap buffer and the raw block +// contents are uncompresed into this buffer. This buffer is +// returned via 'result' and it is upto the caller to +// free this buffer. +extern Status UncompressBlockContents(const char* data, + size_t n, + BlockContents* result); // Implementation details follow. Clients should ignore, diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 352997d65..71eabef8f 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -115,6 +115,10 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files, "Maximum number of files to keep open at the same time " "(use default if == 0)"); +DEFINE_int64(compressed_cache_size, -1, + "Number of bytes to use as a cache of compressed data." + " Negative means use default settings."); + DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, ""); DEFINE_int32(level0_file_num_compaction_trigger, @@ -672,6 +676,9 @@ class StressTest { public: StressTest() : cache_(NewLRUCache(FLAGS_cache_size)), + compressed_cache_(FLAGS_compressed_cache_size >= 0 ? + NewLRUCache(FLAGS_compressed_cache_size) : + nullptr), filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), @@ -1341,6 +1348,7 @@ class StressTest { assert(db_ == nullptr); Options options; options.block_cache = cache_; + options.block_cache_compressed = compressed_cache_; options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.min_write_buffer_number_to_merge = @@ -1469,6 +1477,7 @@ class StressTest { private: shared_ptr cache_; + shared_ptr compressed_cache_; const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; DB* db_; diff --git a/util/env_posix.cc b/util/env_posix.cc index 994c45a2a..b85e0aff6 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -101,6 +101,36 @@ static void TestKillRandom(int odds, const std::string& srcfile, #endif +#if defined(OS_LINUX) +namespace { + static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { + if (max_size < kMaxVarint64Length*3) { + return 0; + } + + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return 0; + } + + long version = 0; + result = ioctl(fd, FS_IOC_GETVERSION, &version); + if (result == -1) { + return 0; + } + uint64_t uversion = (uint64_t)version; + + char* rid = id; + rid = EncodeVarint64(rid, buf.st_dev); + rid = EncodeVarint64(rid, buf.st_ino); + rid = EncodeVarint64(rid, uversion); + assert(rid >= id); + return static_cast(rid-id); + } +} +#endif + class PosixSequentialFile: public SequentialFile { private: std::string filename_; @@ -187,30 +217,7 @@ class PosixRandomAccessFile: public RandomAccessFile { #if defined(OS_LINUX) virtual size_t GetUniqueId(char* id, size_t max_size) const { - // TODO: possibly allow this function to handle tighter bounds. - if (max_size < kMaxVarint64Length*3) { - return 0; - } - - struct stat buf; - int result = fstat(fd_, &buf); - if (result == -1) { - return 0; - } - - long version = 0; - result = ioctl(fd_, FS_IOC_GETVERSION, &version); - if (result == -1) { - return 0; - } - uint64_t uversion = (uint64_t)version; - - char* rid = id; - rid = EncodeVarint64(rid, buf.st_dev); - rid = EncodeVarint64(rid, buf.st_ino); - rid = EncodeVarint64(rid, uversion); - assert(rid >= id); - return static_cast(rid-id); + return GetUniqueIdFromFile(fd_, id, max_size); } #endif @@ -711,6 +718,9 @@ class PosixWritableFile : public WritableFile { return IOError(filename_, errno); } } + virtual size_t GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(fd_, id, max_size); + } #endif }; diff --git a/util/options.cc b/util/options.cc index f828b026a..170a8ae55 100644 --- a/util/options.cc +++ b/util/options.cc @@ -34,6 +34,8 @@ Options::Options() max_write_buffer_number(2), min_write_buffer_number_to_merge(1), max_open_files(1000), + block_cache(nullptr), + block_cache_compressed(nullptr), block_size(4096), block_restart_interval(16), compression(kSnappyCompression), @@ -129,10 +131,16 @@ Options::Dump(Logger* log) const Log(log," Options.max_write_buffer_number: %d", max_write_buffer_number); Log(log," Options.max_open_files: %d", max_open_files); Log(log," Options.block_cache: %p", block_cache.get()); + Log(log," Options.block_cache_compressed: %p", + block_cache_compressed.get()); if (block_cache) { - Log(log," Options.block_cache_size: %zd", + Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); } + if (block_cache_compressed) { + Log(log,"Options.block_cache_compressed_size: %zd", + block_cache_compressed->GetCapacity()); + } Log(log," Options.block_size: %zd", block_size); Log(log," Options.block_restart_interval: %d", block_restart_interval); if (!compression_per_level.empty()) {