diff --git a/db/db_bench.cc b/db/db_bench.cc index 2c5c56d53..e322fdef5 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -198,6 +198,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed" DEFINE_int32(block_size, rocksdb::Options().block_size, "Number of bytes in a block."); +DEFINE_int64(compressed_cache_size, -1, + "Number of bytes to use as a cache of compressed data."); + DEFINE_int32(open_files, rocksdb::Options().max_open_files, "Maximum number of files to keep open at the same time" " (use default if == 0)"); @@ -752,6 +755,7 @@ class Duration { class Benchmark { private: shared_ptr cache_; + shared_ptr compressed_cache_; const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; DB* db_; @@ -907,6 +911,10 @@ class Benchmark { NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits, FLAGS_cache_remove_scan_count_limit) : NewLRUCache(FLAGS_cache_size)) : nullptr), + compressed_cache_(FLAGS_compressed_cache_size >= 0 ? + (FLAGS_cache_numshardbits >= 1 ? + NewLRUCache(FLAGS_compressed_cache_size, FLAGS_cache_numshardbits) : + NewLRUCache(FLAGS_compressed_cache_size)) : nullptr), filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), @@ -1275,6 +1283,7 @@ class Benchmark { Options options; options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; + options.block_cache_compressed = compressed_cache_; if (cache_ == nullptr) { options.no_block_cache = true; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 88a95f718..d214a55c8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1679,7 +1679,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); status = DoCompactionWork(compact); - CleanupCompaction(compact); + CleanupCompaction(compact, status); versions_->ReleaseCompactionFiles(c.get(), status); c->ReleaseInputs(); FindObsoleteFiles(deletion_state); @@ -1728,7 +1728,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, return status; } -void DBImpl::CleanupCompaction(CompactionState* compact) { +void DBImpl::CleanupCompaction(CompactionState* compact, Status status) { mutex_.AssertHeld(); if (compact->builder != nullptr) { // May happen if we get a shutdown call in the middle of compaction @@ -1740,6 +1740,12 @@ void DBImpl::CleanupCompaction(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; pending_outputs_.erase(out.number); + + // If this file was inserted into the table cache then remove + // them here because this compaction was not committed. + if (!status.ok()) { + table_cache_->Evict(out.number); + } } delete compact; } diff --git a/db/db_impl.h b/db/db_impl.h index 556be2c77..5654e1773 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -194,7 +194,7 @@ class DBImpl : public DB { void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); Status BackgroundFlush(bool* madeProgress); - void CleanupCompaction(CompactionState* compact); + void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact); diff --git a/db/db_test.cc b/db/db_test.cc index aaf50cbd5..d17467e65 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -262,6 +262,7 @@ class DBTest { kDeletesFilterFirst, kPrefixHashRep, kUniversalCompaction, + kCompressedBlockCache, kEnd }; int option_config_; @@ -388,6 +389,9 @@ class DBTest { case kUniversalCompaction: options.compaction_style = kCompactionStyleUniversal; break; + case kCompressedBlockCache: + options.block_cache_compressed = NewLRUCache(8*1024*1024); + break; default: break; } @@ -452,6 +456,7 @@ class DBTest { std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { ReadOptions options; + options.verify_checksums = true; options.snapshot = snapshot; std::string result; Status s = db_->Get(options, k, &result); @@ -1753,6 +1758,90 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } } +TEST(DBTest, CompressedCache) { + int num_iter = 80; + + // Run this test three iterations. + // Iteration 1: only a uncompressed block cache + // Iteration 2: only a compressed block cache + // Iteration 3: both block cache and compressed cache + for (int iter = 0; iter < 3; iter++) { + Options options = CurrentOptions(); + options.write_buffer_size = 64*1024; // small write buffer + options.statistics = rocksdb::CreateDBStatistics(); + + switch (iter) { + case 0: + // only uncompressed block cache + options.block_cache = NewLRUCache(8*1024); + options.block_cache_compressed = nullptr; + break; + case 1: + // no block cache, only compressed cache + options.no_block_cache = true; + options.block_cache = nullptr; + options.block_cache_compressed = NewLRUCache(8*1024); + break; + case 2: + // both compressed and uncompressed block cache + options.block_cache = NewLRUCache(1024); + options.block_cache_compressed = NewLRUCache(8*1024); + break; + default: + ASSERT_TRUE(false); + } + Reopen(&options); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + std::vector values; + Slice str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = RandomString(&rnd, 100000); + } + values.push_back(str.ToString(true)); + ASSERT_OK(Put(Key(i), values[i])); + } + + // flush all data from memtable so that reads are from block cache + dbfull()->Flush(FlushOptions()); + + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } + + // check that we triggered the appropriate code paths in the cache + switch (iter) { + case 0: + // only uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_EQ(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 1: + // no block cache, only compressed cache + ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 2: + // both compressed and uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + default: + ASSERT_TRUE(false); + } + } +} + TEST(DBTest, CompactionTrigger) { Options options = CurrentOptions(); options.write_buffer_size = 100<<10; //100KB diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 78ac83427..c9b2befa5 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -373,6 +373,11 @@ class WritableFile { *block_size = preallocation_block_size_; } + // For documentation, refer to RandomAccessFile::GetUniqueId() + virtual size_t GetUniqueId(char* id, size_t max_size) const { + return 0; // Default implementation to prevent issues with backwards + } + // Remove any kind of caching of data from the offset to offset+length // of this file. If the length is 0, then it refers to the end of file. // If the system is not caching the file contents, then this is a noop. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 907d4c7ca..055019e76 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -39,7 +39,7 @@ using std::shared_ptr; // sequence of key,value pairs. Each block may be compressed before // being stored in a file. The following enum describes which // compression method (if any) is used to compress a block. -enum CompressionType { +enum CompressionType : char { // NOTE: do not change the values of existing entries, as these are // part of the persistent format on disk. kNoCompression = 0x0, @@ -48,7 +48,7 @@ enum CompressionType { kBZip2Compression = 0x3 }; -enum CompactionStyle { +enum CompactionStyle : char { kCompactionStyleLevel = 0x0, // level based compaction style kCompactionStyleUniversal = 0x1 // Universal compaction style }; @@ -177,6 +177,11 @@ struct Options { // Default: nullptr shared_ptr block_cache; + // If non-NULL use the specified cache for compressed blocks. + // If NULL, rocksdb will not use a compressed block cache. + // Default: nullptr + shared_ptr block_cache_compressed; + // Approximate size of user data packed per block. Note that the // block size specified here corresponds to uncompressed data. The // actual size of the unit read from disk may be smaller if diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index e9302b973..44cf1d3ab 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -82,6 +82,9 @@ enum Tickers { // transaction log iterator refreshes GET_UPDATES_SINCE_CALLS, + BLOCK_CACHE_COMPRESSED_MISS, // miss in the compressed block cache + BLOCK_CACHE_COMPRESSED_HIT, // hit in the compressed block cache + TICKER_ENUM_MAX }; @@ -116,7 +119,9 @@ const std::vector> TickersNameMap = { { BLOOM_FILTER_PREFIX_CHECKED, "rocksdb.bloom.filter.prefix.checked" }, { BLOOM_FILTER_PREFIX_USEFUL, "rocksdb.bloom.filter.prefix.useful" }, { NUMBER_OF_RESEEKS_IN_ITERATION, "rocksdb.number.reseeks.iteration" }, - { GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" } + { GET_UPDATES_SINCE_CALLS, "rocksdb.getupdatessince.calls" }, + { BLOCK_CACHE_COMPRESSED_MISS, "rocksdb.block.cachecompressed.miss" }, + { BLOCK_CACHE_COMPRESSED_HIT, "rocksdb.block.cachecompressed.hit" } }; /** diff --git a/table/block.cc b/table/block.cc index fbcf722c4..3f969fe2a 100644 --- a/table/block.cc +++ b/table/block.cc @@ -29,7 +29,8 @@ Block::Block(const BlockContents& contents) : data_(contents.data.data()), size_(contents.data.size()), owned_(contents.heap_allocated), - cachable_(contents.cachable) { + cachable_(contents.cachable), + compression_type_(contents.compression_type) { if (size_ < sizeof(uint32_t)) { size_ = 0; // Error marker } else { diff --git a/table/block.h b/table/block.h index ed936f15f..7fac00657 100644 --- a/table/block.h +++ b/table/block.h @@ -11,6 +11,7 @@ #include #include #include "rocksdb/iterator.h" +#include "rocksdb/options.h" namespace rocksdb { @@ -26,7 +27,9 @@ class Block { size_t size() const { return size_; } bool isCachable() const { return cachable_; } + CompressionType compressionType() const { return compression_type_; } Iterator* NewIterator(const Comparator* comparator); + const char* data() { return data_; } private: uint32_t NumRestarts() const; @@ -36,6 +39,7 @@ class Block { uint32_t restart_offset_; // Offset in data_ of restart array bool owned_; // Block owns data_[] bool cachable_; + CompressionType compression_type_; // No copying allowed Block(const Block&); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index fde6c81e8..7b00d6708 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -12,12 +12,14 @@ #include #include +#include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/table.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" #include "table/block_based_table_reader.h" +#include "table/block.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" @@ -91,6 +93,8 @@ struct BlockBasedTableBuilder::Rep { bool closed = false; // Either Finish() or Abandon() has been called. FilterBlockBuilder* filter_block; + char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; + size_t compressed_cache_key_prefix_size; // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter @@ -126,6 +130,11 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(const Options& options, if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } + if (options.block_cache_compressed.get() != nullptr) { + BlockBasedTable::GenerateCachePrefix(options.block_cache_compressed, file, + &rep_->compressed_cache_key_prefix[0], + &rep_->compressed_cache_key_prefix_size); + } } BlockBasedTableBuilder::~BlockBasedTableBuilder() { @@ -284,6 +293,9 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type EncodeFixed32(trailer+1, crc32c::Mask(crc)); r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); + if (r->status.ok()) { + r->status = InsertBlockInCache(block_contents, type, handle); + } if (r->status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; } @@ -294,6 +306,56 @@ Status BlockBasedTableBuilder::status() const { return rep_->status; } +static void DeleteCachedBlock(const Slice& key, void* value) { + Block* block = reinterpret_cast(value); + delete block; +} + +// +// Make a copy of the block contents and insert into compressed block cache +// +Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, + const CompressionType type, + const BlockHandle* handle) { + Rep* r = rep_; + Cache* block_cache_compressed = r->options.block_cache_compressed.get(); + + if (type != kNoCompression && block_cache_compressed != nullptr) { + + Cache::Handle* cache_handle = nullptr; + size_t size = block_contents.size(); + + char* ubuf = new char[size]; // make a new copy + memcpy(ubuf, block_contents.data(), size); + + BlockContents results; + Slice sl(ubuf, size); + results.data = sl; + results.cachable = true; // XXX + results.heap_allocated = true; + results.compression_type = type; + + Block* block = new Block(results); + + // make cache key by appending the file offset to the cache prefix id + char* end = EncodeVarint64( + r->compressed_cache_key_prefix + + r->compressed_cache_key_prefix_size, + handle->offset()); + Slice key(r->compressed_cache_key_prefix, static_cast + (end - r->compressed_cache_key_prefix)); + + // Insert into compressed block cache. + cache_handle = block_cache_compressed->Insert(key, block, block->size(), + &DeleteCachedBlock); + block_cache_compressed->Release(cache_handle); + + // Invalidate OS cache. + r->file->InvalidateCache(r->offset, size); + } + return Status::OK(); +} + Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; Flush(); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index b7c82b68f..87307a0cb 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -62,7 +62,8 @@ class BlockBasedTableBuilder : public TableBuilder { bool ok() const { return status().ok(); } void WriteBlock(BlockBuilder* block, BlockHandle* handle); void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); - + Status InsertBlockInCache(const Slice& block_contents, + const CompressionType type, const BlockHandle* handle); struct Rep; Rep* rep_; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index bec087033..a1c11e9d2 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -11,7 +11,6 @@ #include "db/dbformat.h" -#include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" @@ -51,6 +50,8 @@ struct BlockBasedTable::Rep { unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t cache_key_prefix_size; + char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; + size_t compressed_cache_key_prefix_size; FilterBlockReader* filter; const char* filter_data; @@ -67,18 +68,44 @@ BlockBasedTable::~BlockBasedTable() { void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { assert(kMaxCacheKeyPrefixSize >= 10); rep->cache_key_prefix_size = 0; - if (rep->options.block_cache) { - rep->cache_key_prefix_size = rep->file->GetUniqueId(rep->cache_key_prefix, - kMaxCacheKeyPrefixSize); - - if (rep->cache_key_prefix_size == 0) { - // If the prefix wasn't generated or was too long, we create one from the - // cache. - char* end = EncodeVarint64(rep->cache_key_prefix, - rep->options.block_cache->NewId()); - rep->cache_key_prefix_size = - static_cast(end - rep->cache_key_prefix); - } + rep->compressed_cache_key_prefix_size = 0; + if (rep->options.block_cache != nullptr) { + GenerateCachePrefix(rep->options.block_cache, rep->file.get(), + &rep->cache_key_prefix[0], + &rep->cache_key_prefix_size); + } + if (rep->options.block_cache_compressed != nullptr) { + GenerateCachePrefix(rep->options.block_cache_compressed, rep->file.get(), + &rep->compressed_cache_key_prefix[0], + &rep->compressed_cache_key_prefix_size); + } +} + +void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, + RandomAccessFile* file, char* buffer, size_t* size) { + + // generate an id from the file + *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); + + // If the prefix wasn't generated or was too long, + // create one from the cache. + if (*size == 0) { + char* end = EncodeVarint64(buffer, cc->NewId()); + *size = static_cast(end - buffer); + } +} + +void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, + WritableFile* file, char* buffer, size_t* size) { + + // generate an id from the file + *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); + + // If the prefix wasn't generated or was too long, + // create one from the cache. + if (*size == 0) { + char* end = EncodeVarint64(buffer, cc->NewId()); + *size = static_cast(end - buffer); } } @@ -94,9 +121,11 @@ Status ReadBlock(RandomAccessFile* file, const BlockHandle& handle, Block** result, Env* env, - bool* didIO = nullptr) { + bool* didIO = nullptr, + bool do_uncompress = true) { BlockContents contents; - Status s = ReadBlockContents(file, options, handle, &contents, env); + Status s = ReadBlockContents(file, options, handle, &contents, + env, do_uncompress); if (s.ok()) { *result = new Block(contents); } @@ -143,6 +172,7 @@ Status BlockBasedTable::Open(const Options& options, if (s.ok()) { // We've successfully read the footer and the index block: we're // ready to serve requests. + assert(index_block->compressionType() == kNoCompression); BlockBasedTable::Rep* rep = new BlockBasedTable::Rep(soptions); rep->options = options; rep->file = std::move(file); @@ -193,6 +223,7 @@ void BlockBasedTable::ReadMeta(const Footer& footer) { // Do not propagate errors since meta info is not needed for operation return; } + assert(meta->compressionType() == kNoCompression); Iterator* iter = meta->NewIterator(BytewiseComparator()); // read filter @@ -238,7 +269,7 @@ void BlockBasedTable::ReadFilter(const Slice& filter_handle_value) { ReadOptions opt; BlockContents block; if (!ReadBlockContents(rep_->file.get(), opt, filter_handle, &block, - rep_->options.env).ok()) { + rep_->options.env, false).ok()) { return; } if (block.heap_allocated) { @@ -260,7 +291,8 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { ReadOptions(), handle, &block_contents, - rep->options.env + rep->options.env, + false ); if (!s.ok()) { @@ -351,9 +383,13 @@ Iterator* BlockBasedTable::BlockReader(void* arg, const bool no_io = (options.read_tier == kBlockCacheTier); BlockBasedTable* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); + Cache* block_cache_compressed = table->rep_->options. + block_cache_compressed.get(); std::shared_ptr statistics = table->rep_->options.statistics; Block* block = nullptr; + Block* cblock = nullptr; Cache::Handle* cache_handle = nullptr; + Cache::Handle* compressed_cache_handle = nullptr; BlockHandle handle; Slice input = index_value; @@ -362,26 +398,88 @@ Iterator* BlockBasedTable::BlockReader(void* arg, // can add more features in the future. if (s.ok()) { - if (block_cache != nullptr) { + if (block_cache != nullptr || block_cache_compressed != nullptr) { char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - const size_t cache_key_prefix_size = table->rep_->cache_key_prefix_size; - assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); - memcpy(cache_key, table->rep_->cache_key_prefix, - cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + cache_key_prefix_size, - handle.offset()); - Slice key(cache_key, static_cast(end-cache_key)); - cache_handle = block_cache->Lookup(key); - if (cache_handle != nullptr) { - block = reinterpret_cast(block_cache->Value(cache_handle)); + char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + char* end = cache_key; + + // create key for block cache + if (block_cache != nullptr) { + assert(table->rep_->cache_key_prefix_size != 0); + assert(table->rep_->cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + memcpy(cache_key, table->rep_->cache_key_prefix, + table->rep_->cache_key_prefix_size); + end = EncodeVarint64(cache_key + table->rep_->cache_key_prefix_size, + handle.offset()); + } + Slice key(cache_key, static_cast(end - cache_key)); + + // create key for compressed block cache + end = compressed_cache_key; + if (block_cache_compressed != nullptr) { + assert(table->rep_->compressed_cache_key_prefix_size != 0); + assert(table->rep_->compressed_cache_key_prefix_size <= + kMaxCacheKeyPrefixSize); + memcpy(compressed_cache_key, table->rep_->compressed_cache_key_prefix, + table->rep_->compressed_cache_key_prefix_size); + end = EncodeVarint64(compressed_cache_key + + table->rep_->compressed_cache_key_prefix_size, + handle.offset()); + } + Slice ckey(compressed_cache_key, static_cast + (end - compressed_cache_key)); + + // Lookup uncompressed cache first + if (block_cache != nullptr) { + cache_handle = block_cache->Lookup(key); + if (cache_handle != nullptr) { + block = reinterpret_cast(block_cache->Value(cache_handle)); + RecordTick(statistics, BLOCK_CACHE_HIT); + } + } + // If not found in uncompressed cache, lookup compressed cache + if (block == nullptr && block_cache_compressed != nullptr) { + compressed_cache_handle = block_cache_compressed->Lookup(ckey); + + // if we found in the compressed cache, then uncompress and + // insert into uncompressed cache + if (compressed_cache_handle != nullptr) { + // found compressed block + cblock = reinterpret_cast(block_cache_compressed-> + Value(compressed_cache_handle)); + assert(cblock->compressionType() != kNoCompression); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + s = UncompressBlockContents(cblock->data(), cblock->size(), + &contents); + + // Insert uncompressed block into block cache + if (s.ok()) { + block = new Block(contents); // uncompressed block + assert(block->compressionType() == kNoCompression); + if (block_cache != nullptr && block->isCachable() && + options.fill_cache) { + cache_handle = block_cache->Insert(key, block, block->size(), + &DeleteCachedBlock); + assert(reinterpret_cast(block_cache->Value(cache_handle)) + == block); + } + } + // Release hold on compressed cache entry + block_cache_compressed->Release(compressed_cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); + } + } + + if (block != nullptr) { BumpPerfCount(&perf_context.block_cache_hit_count); - RecordTick(statistics, BLOCK_CACHE_HIT); } else if (no_io) { // Did not find in block_cache and can't do IO return NewErrorIterator(Status::Incomplete("no blocking io")); } else { + Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; { // block for stop watch @@ -390,19 +488,54 @@ Iterator* BlockBasedTable::BlockReader(void* arg, table->rep_->file.get(), options, handle, - &block, + &cblock, table->rep_->options.env, - didIO + didIO, + block_cache_compressed == nullptr ); } if (s.ok()) { - if (block->isCachable() && options.fill_cache) { - cache_handle = block_cache->Insert( - key, block, block->size(), &DeleteCachedBlock); + assert(cblock->compressionType() == kNoCompression || + block_cache_compressed != nullptr); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + if (cblock->compressionType() != kNoCompression) { + s = UncompressBlockContents(cblock->data(), cblock->size(), + &contents); + } + if (s.ok()) { + if (cblock->compressionType() != kNoCompression) { + block = new Block(contents); // uncompressed block + } else { + block = cblock; + cblock = nullptr; + } + if (block->isCachable() && options.fill_cache) { + // Insert compressed block into compressed block cache. + // Release the hold on the compressed cache entry immediately. + if (block_cache_compressed != nullptr && cblock != nullptr) { + compressed_cache_handle = block_cache_compressed->Insert( + ckey, cblock, cblock->size(), &DeleteCachedBlock); + block_cache_compressed->Release(compressed_cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); + cblock = nullptr; + } + // insert into uncompressed block cache + assert((block->compressionType() == kNoCompression)); + if (block_cache != nullptr) { + cache_handle = block_cache->Insert( + key, block, block->size(), &DeleteCachedBlock); + RecordTick(statistics, BLOCK_CACHE_MISS); + assert(reinterpret_cast(block_cache->Value( + cache_handle))== block); + } + } } } - - RecordTick(statistics, BLOCK_CACHE_MISS); + if (cblock != nullptr) { + delete cblock; + } } } else if (no_io) { // Could not read from block_cache and can't do IO @@ -416,10 +549,10 @@ Iterator* BlockBasedTable::BlockReader(void* arg, Iterator* iter; if (block != nullptr) { iter = block->NewIterator(table->rep_->options.comparator); - if (cache_handle == nullptr) { - iter->RegisterCleanup(&DeleteBlock, block, nullptr); - } else { + if (cache_handle != nullptr) { iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); + } else { + iter->RegisterCleanup(&DeleteBlock, block, nullptr); } } else { iter = NewErrorIterator(s); diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 848c55655..b6e87b2f8 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -10,10 +10,12 @@ #pragma once #include #include +#include "rocksdb/cache.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/table_stats.h" #include "rocksdb/table.h" +#include "util/coding.h" namespace rocksdb { @@ -103,6 +105,7 @@ class BlockBasedTable : public TableReader { // after a call to Seek(key), until handle_result returns false. // May not make such a call if filter policy says that key is not present. friend class TableCache; + friend class BlockBasedTableBuilder; void ReadMeta(const Footer& footer); void ReadFilter(const Slice& filter_handle_value); @@ -114,6 +117,15 @@ class BlockBasedTable : public TableReader { compaction_optimized_(false) { rep_ = rep; } + // Generate a cache key prefix from the file + static void GenerateCachePrefix(shared_ptr cc, + RandomAccessFile* file, char* buffer, size_t* size); + static void GenerateCachePrefix(shared_ptr cc, + WritableFile* file, char* buffer, size_t* size); + + // The longest prefix of the cache key used to identify blocks. + // For Posix files the unique ID is three varints. + static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; // No copying allowed explicit BlockBasedTable(const TableReader&) = delete; diff --git a/table/format.cc b/table/format.cc index fdda0c9ca..1eb50ef33 100644 --- a/table/format.cc +++ b/table/format.cc @@ -76,7 +76,8 @@ Status ReadBlockContents(RandomAccessFile* file, const ReadOptions& options, const BlockHandle& handle, BlockContents* result, - Env* env) { + Env* env, + bool do_uncompress) { result->data = Slice(); result->cachable = false; result->heap_allocated = false; @@ -116,41 +117,55 @@ Status ReadBlockContents(RandomAccessFile* file, BumpPerfTime(&perf_context.block_checksum_time, &timer); } + // If the caller has requested that the block not be uncompressed + if (!do_uncompress || data[n] == kNoCompression) { + if (data != buf) { + // File implementation gave us pointer to some other data. + // Use it directly under the assumption that it will be live + // while the file is open. + delete[] buf; + result->data = Slice(data, n); + result->heap_allocated = false; + result->cachable = false; // Do not double-cache + } else { + result->data = Slice(buf, n); + result->heap_allocated = true; + result->cachable = true; + } + result->compression_type = (rocksdb::CompressionType)data[n]; + s = Status::OK(); + } else { + s = UncompressBlockContents(buf, n, result); + delete[] buf; + } + BumpPerfTime(&perf_context.block_decompress_time, &timer); + return s; +} + +// +// The 'data' points to the raw block contents that was read in from file. +// This method allocates a new heap buffer and the raw block +// contents are uncompresed into this buffer. This +// buffer is returned via 'result' and it is upto the caller to +// free this buffer. +Status UncompressBlockContents(const char* data, size_t n, + BlockContents* result) { char* ubuf = nullptr; int decompress_size = 0; + assert(data[n] != kNoCompression); switch (data[n]) { - case kNoCompression: - if (data != buf) { - // File implementation gave us pointer to some other data. - // Use it directly under the assumption that it will be live - // while the file is open. - delete[] buf; - result->data = Slice(data, n); - result->heap_allocated = false; - result->cachable = false; // Do not double-cache - } else { - result->data = Slice(buf, n); - result->heap_allocated = true; - result->cachable = true; - } - - // Ok - break; case kSnappyCompression: { size_t ulength = 0; static char snappy_corrupt_msg[] = "Snappy not supported or corrupted Snappy compressed block contents"; if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { - delete[] buf; return Status::Corruption(snappy_corrupt_msg); } ubuf = new char[ulength]; if (!port::Snappy_Uncompress(data, n, ubuf)) { - delete[] buf; delete[] ubuf; return Status::Corruption(snappy_corrupt_msg); } - delete[] buf; result->data = Slice(ubuf, ulength); result->heap_allocated = true; result->cachable = true; @@ -161,10 +176,8 @@ Status ReadBlockContents(RandomAccessFile* file, static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; if (!ubuf) { - delete[] buf; return Status::Corruption(zlib_corrupt_msg); } - delete[] buf; result->data = Slice(ubuf, decompress_size); result->heap_allocated = true; result->cachable = true; @@ -174,21 +187,16 @@ Status ReadBlockContents(RandomAccessFile* file, static char bzip2_corrupt_msg[] = "Bzip2 not supported or corrupted Bzip2 compressed block contents"; if (!ubuf) { - delete[] buf; return Status::Corruption(bzip2_corrupt_msg); } - delete[] buf; result->data = Slice(ubuf, decompress_size); result->heap_allocated = true; result->cachable = true; break; default: - delete[] buf; return Status::Corruption("bad block type"); } - - BumpPerfTime(&perf_context.block_decompress_time, &timer); - + result->compression_type = kNoCompression; // not compressed any more return Status::OK(); } diff --git a/table/format.h b/table/format.h index 82f86884e..2f1c1e8dc 100644 --- a/table/format.h +++ b/table/format.h @@ -12,6 +12,7 @@ #include #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/options.h" #include "rocksdb/table.h" namespace rocksdb { @@ -90,6 +91,7 @@ struct BlockContents { Slice data; // Actual contents of data bool cachable; // True iff data can be cached bool heap_allocated; // True iff caller should delete[] data.data() + CompressionType compression_type; }; // Read the block identified by "handle" from "file". On failure @@ -98,7 +100,17 @@ extern Status ReadBlockContents(RandomAccessFile* file, const ReadOptions& options, const BlockHandle& handle, BlockContents* result, - Env* env); + Env* env, + bool do_uncompress); + +// The 'data' points to the raw block contents read in from file. +// This method allocates a new heap buffer and the raw block +// contents are uncompresed into this buffer. This buffer is +// returned via 'result' and it is upto the caller to +// free this buffer. +extern Status UncompressBlockContents(const char* data, + size_t n, + BlockContents* result); // Implementation details follow. Clients should ignore, diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 352997d65..71eabef8f 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -115,6 +115,10 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files, "Maximum number of files to keep open at the same time " "(use default if == 0)"); +DEFINE_int64(compressed_cache_size, -1, + "Number of bytes to use as a cache of compressed data." + " Negative means use default settings."); + DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, ""); DEFINE_int32(level0_file_num_compaction_trigger, @@ -672,6 +676,9 @@ class StressTest { public: StressTest() : cache_(NewLRUCache(FLAGS_cache_size)), + compressed_cache_(FLAGS_compressed_cache_size >= 0 ? + NewLRUCache(FLAGS_compressed_cache_size) : + nullptr), filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), @@ -1341,6 +1348,7 @@ class StressTest { assert(db_ == nullptr); Options options; options.block_cache = cache_; + options.block_cache_compressed = compressed_cache_; options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.min_write_buffer_number_to_merge = @@ -1469,6 +1477,7 @@ class StressTest { private: shared_ptr cache_; + shared_ptr compressed_cache_; const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; DB* db_; diff --git a/util/env_posix.cc b/util/env_posix.cc index 994c45a2a..b85e0aff6 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -101,6 +101,36 @@ static void TestKillRandom(int odds, const std::string& srcfile, #endif +#if defined(OS_LINUX) +namespace { + static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { + if (max_size < kMaxVarint64Length*3) { + return 0; + } + + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return 0; + } + + long version = 0; + result = ioctl(fd, FS_IOC_GETVERSION, &version); + if (result == -1) { + return 0; + } + uint64_t uversion = (uint64_t)version; + + char* rid = id; + rid = EncodeVarint64(rid, buf.st_dev); + rid = EncodeVarint64(rid, buf.st_ino); + rid = EncodeVarint64(rid, uversion); + assert(rid >= id); + return static_cast(rid-id); + } +} +#endif + class PosixSequentialFile: public SequentialFile { private: std::string filename_; @@ -187,30 +217,7 @@ class PosixRandomAccessFile: public RandomAccessFile { #if defined(OS_LINUX) virtual size_t GetUniqueId(char* id, size_t max_size) const { - // TODO: possibly allow this function to handle tighter bounds. - if (max_size < kMaxVarint64Length*3) { - return 0; - } - - struct stat buf; - int result = fstat(fd_, &buf); - if (result == -1) { - return 0; - } - - long version = 0; - result = ioctl(fd_, FS_IOC_GETVERSION, &version); - if (result == -1) { - return 0; - } - uint64_t uversion = (uint64_t)version; - - char* rid = id; - rid = EncodeVarint64(rid, buf.st_dev); - rid = EncodeVarint64(rid, buf.st_ino); - rid = EncodeVarint64(rid, uversion); - assert(rid >= id); - return static_cast(rid-id); + return GetUniqueIdFromFile(fd_, id, max_size); } #endif @@ -711,6 +718,9 @@ class PosixWritableFile : public WritableFile { return IOError(filename_, errno); } } + virtual size_t GetUniqueId(char* id, size_t max_size) const { + return GetUniqueIdFromFile(fd_, id, max_size); + } #endif }; diff --git a/util/options.cc b/util/options.cc index f828b026a..170a8ae55 100644 --- a/util/options.cc +++ b/util/options.cc @@ -34,6 +34,8 @@ Options::Options() max_write_buffer_number(2), min_write_buffer_number_to_merge(1), max_open_files(1000), + block_cache(nullptr), + block_cache_compressed(nullptr), block_size(4096), block_restart_interval(16), compression(kSnappyCompression), @@ -129,10 +131,16 @@ Options::Dump(Logger* log) const Log(log," Options.max_write_buffer_number: %d", max_write_buffer_number); Log(log," Options.max_open_files: %d", max_open_files); Log(log," Options.block_cache: %p", block_cache.get()); + Log(log," Options.block_cache_compressed: %p", + block_cache_compressed.get()); if (block_cache) { - Log(log," Options.block_cache_size: %zd", + Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); } + if (block_cache_compressed) { + Log(log,"Options.block_cache_compressed_size: %zd", + block_cache_compressed->GetCapacity()); + } Log(log," Options.block_size: %zd", block_size); Log(log," Options.block_restart_interval: %d", block_restart_interval); if (!compression_per_level.empty()) {