diff --git a/CMakeLists.txt b/CMakeLists.txt index 7face2c3d..b17b8fb9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -183,6 +183,7 @@ set(SOURCES table/plain_table_index.cc table/plain_table_key_coding.cc table/plain_table_reader.cc + persistent_cache_helper.cc table/table_properties.cc table/two_level_iterator.cc tools/sst_dump_tool.cc diff --git a/db/db_test2.cc b/db/db_test2.cc index 8657c3036..4c80bb446 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -7,8 +7,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include + #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "rocksdb/persistent_cache.h" #include "rocksdb/wal_filter.h" namespace rocksdb { @@ -1024,7 +1026,131 @@ TEST_P(PinL0IndexAndFilterBlocksTest, INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest, PinL0IndexAndFilterBlocksTest, ::testing::Bool()); +#ifndef ROCKSDB_LITE +static void UniqueIdCallback(void* arg) { + int* result = reinterpret_cast(arg); + if (*result == -1) { + *result = 0; + } + + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); +} + +class MockPersistentCache : public PersistentCache { + public: + explicit MockPersistentCache(const bool is_compressed, const size_t max_size) + : is_compressed_(is_compressed), max_size_(max_size) { + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); + } + + virtual ~MockPersistentCache() {} + + Status Insert(const Slice& page_key, const char* data, + const size_t size) override { + MutexLock _(&lock_); + + if (size_ > max_size_) { + size_ -= data_.begin()->second.size(); + data_.erase(data_.begin()); + } + + data_.insert(std::make_pair(page_key.ToString(), std::string(data, size))); + size_ += size; + return Status::OK(); + } + + Status Lookup(const Slice& page_key, std::unique_ptr* data, + size_t* size) override { + MutexLock _(&lock_); + auto it = data_.find(page_key.ToString()); + if (it == data_.end()) { + return Status::NotFound(); + } + + assert(page_key.ToString() == it->first); + data->reset(new char[it->second.size()]); + memcpy(data->get(), it->second.c_str(), it->second.size()); + *size = it->second.size(); + return Status::OK(); + } + + bool IsCompressed() override { return is_compressed_; } + + port::Mutex lock_; + std::map data_; + const bool is_compressed_ = true; + size_t size_ = 0; + const size_t max_size_ = 10 * 1024; // 10KiB +}; + +TEST_F(DBTest2, PersistentCache) { + int num_iter = 80; + Options options; + options.write_buffer_size = 64 * 1024; // small write buffer + options.statistics = rocksdb::CreateDBStatistics(); + options = CurrentOptions(options); + + auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024}; + auto types = {/*compressed*/ 1, /*uncompressed*/ 0}; + for (auto bsize : bsizes) { + for (auto type : types) { + BlockBasedTableOptions table_options; + table_options.persistent_cache.reset( + new MockPersistentCache(type, 10 * 1024)); + table_options.no_block_cache = true; + table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr; + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + // default column family doesn't have block cache + Options no_block_cache_opts; + no_block_cache_opts.statistics = options.statistics; + no_block_cache_opts = CurrentOptions(no_block_cache_opts); + BlockBasedTableOptions table_options_no_bc; + table_options_no_bc.no_block_cache = true; + no_block_cache_opts.table_factory.reset( + NewBlockBasedTableFactory(table_options_no_bc)); + ReopenWithColumnFamilies( + {"default", "pikachu"}, + std::vector({no_block_cache_opts, options})); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); + std::vector values; + std::string str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = RandomString(&rnd, 1000); + } + values.push_back(str); + ASSERT_OK(Put(1, Key(i), values[i])); + } + + // flush all data from memtable so that reads are from block cache + ASSERT_OK(Flush(1)); + + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(1, Key(i)), values[i]); + } + + auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT); + auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS); + + ASSERT_GT(hit, 0); + ASSERT_GT(miss, 0); + } + } +} +#endif } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 538671824..c7afd1a84 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1613,6 +1613,7 @@ struct CompactRangeOptions { BottommostLevelCompaction bottommost_level_compaction = BottommostLevelCompaction::kIfHaveCompactionFilter; }; + } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/persistent_cache.h b/include/rocksdb/persistent_cache.h new file mode 100644 index 000000000..ef49da5ab --- /dev/null +++ b/include/rocksdb/persistent_cache.h @@ -0,0 +1,49 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include + +#include "rocksdb/slice.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +// PersistentCache +// +// Persistent cache interface for caching IO pages on a persistent medium. The +// cache interface is specifically designed for persistent read cache. +class PersistentCache { + public: + virtual ~PersistentCache() {} + + // Insert to page cache + // + // page_key Identifier to identify a page uniquely across restarts + // data Page data + // size Size of the page + virtual Status Insert(const Slice& key, const char* data, + const size_t size) = 0; + + // Lookup page cache by page identifier + // + // page_key Page identifier + // buf Buffer where the data should be copied + // size Size of the page + virtual Status Lookup(const Slice& key, std::unique_ptr* data, + size_t* size) = 0; + + // Is cache storing uncompressed data ? + // + // True if the cache is configured to store uncompressed data else false + virtual bool IsCompressed() = 0; +}; + +} // namespace rocksdb diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c832516da..2f14c444b 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -54,6 +54,11 @@ enum Tickers : uint32_t { // # of times bloom filter has avoided file reads. BLOOM_FILTER_USEFUL, + // # persistent cache hit + PERSISTENT_CACHE_HIT, + // # persistent cache miss + PERSISTENT_CACHE_MISS, + // # of memtable hits. MEMTABLE_HIT, // # of memtable misses. diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index b066b67d9..ed9060bd5 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -21,15 +21,16 @@ #include #include "rocksdb/env.h" +#include "rocksdb/immutable_options.h" #include "rocksdb/iterator.h" #include "rocksdb/options.h" -#include "rocksdb/immutable_options.h" #include "rocksdb/status.h" namespace rocksdb { // -- Block-based Table class FlushBlockPolicyFactory; +class PersistentCache; class RandomAccessFile; struct TableReaderOptions; struct TableBuilderOptions; @@ -103,6 +104,10 @@ struct BlockBasedTableOptions { // If NULL, rocksdb will automatically create and use an 8MB internal cache. std::shared_ptr block_cache = nullptr; + // If non-NULL use the specified cache for pages read from device + // IF NULL, no page cache is used + std::shared_ptr persistent_cache = nullptr; + // If non-NULL use the specified cache for compressed blocks. // If NULL, rocksdb will not use a compressed block cache. std::shared_ptr block_cache_compressed = nullptr; diff --git a/src.mk b/src.mk index db6551241..d18a0f67a 100644 --- a/src.mk +++ b/src.mk @@ -82,6 +82,7 @@ LIB_SOURCES = \ table/plain_table_index.cc \ table/plain_table_key_coding.cc \ table/plain_table_reader.cc \ + table/persistent_cache_helper.cc \ table/table_properties.cc \ table/two_level_iterator.cc \ tools/dump/db_dump_tool.cc \ @@ -103,7 +104,7 @@ LIB_SOURCES = \ util/io_posix.cc \ util/threadpool.cc \ util/transaction_test_util.cc \ - util/sst_file_manager_impl.cc \ + util/sst_file_manager_impl.cc \ util/file_util.cc \ util/file_reader_writer.cc \ util/filter_policy.cc \ diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 82c9903fa..fa5c83966 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -25,17 +25,18 @@ #include "rocksdb/table_properties.h" #include "table/block.h" -#include "table/filter_block.h" #include "table/block_based_filter_block.h" #include "table/block_based_table_factory.h" -#include "table/full_filter_block.h" #include "table/block_hash_index.h" #include "table/block_prefix_index.h" +#include "table/filter_block.h" #include "table/format.h" +#include "table/full_filter_block.h" +#include "table/get_context.h" #include "table/internal_iterator.h" #include "table/meta_blocks.h" +#include "table/persistent_cache_helper.h" #include "table/two_level_iterator.h" -#include "table/get_context.h" #include "util/coding.h" #include "util/file_reader_writer.h" @@ -53,13 +54,6 @@ using std::unique_ptr; typedef BlockBasedTable::IndexReader IndexReader; namespace { -// The longest the prefix of the cache key used to identify blocks can be. -// We are using the fact that we know for Posix files the unique ID is three -// varints. -// For some reason, compiling for iOS complains that this variable is unused -const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) = - kMaxVarint64Length * 3 + 1; - // Read the block identified by "handle" from "file". // The only relevant option is options.verify_checksums for now. // On failure return non-OK. @@ -69,11 +63,13 @@ const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) = Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, std::unique_ptr* result, Env* env, - bool do_uncompress = true, - const Slice& compression_dict = Slice()) { + bool do_uncompress, const Slice& compression_dict, + const PersistentCacheOptions& cache_options, + Logger* info_log) { BlockContents contents; Status s = ReadBlockContents(file, footer, options, handle, &contents, env, - do_uncompress, compression_dict); + do_uncompress, compression_dict, cache_options, + info_log); if (s.ok()) { result->reset(new Block(std::move(contents))); } @@ -106,18 +102,12 @@ Slice GetCacheKeyFromOffset(const char* cache_key_prefix, char* cache_key) { assert(cache_key != nullptr); assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + assert(cache_key_prefix_size <= BlockBasedTable::kMaxCacheKeyPrefixSize); memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); char* end = EncodeVarint64(cache_key + cache_key_prefix_size, offset); return Slice(cache_key, static_cast(end - cache_key)); } -Slice GetCacheKey(const char* cache_key_prefix, size_t cache_key_prefix_size, - const BlockHandle& handle, char* cache_key) { - return GetCacheKeyFromOffset(cache_key_prefix, cache_key_prefix_size, - handle.offset(), cache_key); -} - Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, Tickers block_cache_miss_ticker, Tickers block_cache_hit_ticker, @@ -183,11 +173,13 @@ class BinarySearchIndexReader : public IndexReader { // unmodified. static Status Create(RandomAccessFileReader* file, const Footer& footer, const BlockHandle& index_handle, Env* env, - const Comparator* comparator, - IndexReader** index_reader) { + const Comparator* comparator, IndexReader** index_reader, + const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, - &index_block, env); + &index_block, env, true /* decompress */, + Slice() /*compression dict*/, cache_options, + /*info_log*/ nullptr); if (s.ok()) { *index_reader = @@ -231,10 +223,13 @@ class HashIndexReader : public IndexReader { const BlockHandle& index_handle, InternalIterator* meta_index_iter, IndexReader** index_reader, - bool hash_index_allow_collision) { + bool hash_index_allow_collision, + const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, - &index_block, env); + &index_block, env, true /* decompress */, + Slice() /*compression dict*/, cache_options, + /*info_log*/ nullptr); if (!s.ok()) { return s; @@ -269,14 +264,15 @@ class HashIndexReader : public IndexReader { // Read contents for the blocks BlockContents prefixes_contents; s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle, - &prefixes_contents, env, true /* do decompression */); + &prefixes_contents, env, true /* decompress */, + Slice() /*compression dict*/, cache_options); if (!s.ok()) { return s; } BlockContents prefixes_meta_contents; s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle, - &prefixes_meta_contents, env, - true /* do decompression */); + &prefixes_meta_contents, env, true /* decompress */, + Slice() /*compression dict*/, cache_options); if (!s.ok()) { // TODO: log error return Status::OK(); @@ -388,10 +384,13 @@ struct BlockBasedTable::Rep { unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t cache_key_prefix_size = 0; + char persistent_cache_key_prefix[kMaxCacheKeyPrefixSize]; + size_t persistent_cache_key_prefix_size = 0; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size = 0; uint64_t dummy_index_reader_offset = 0; // ID that is unique for the block cache. + PersistentCacheOptions persistent_cache_options; // Footer contains the fixed table information Footer footer; @@ -451,6 +450,11 @@ void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { rep->dummy_index_reader_offset = file_size + rep->table_options.block_cache->NewId(); } + if (rep->table_options.persistent_cache != nullptr) { + GenerateCachePrefix(/*cache=*/nullptr, rep->file->file(), + &rep->persistent_cache_key_prefix[0], + &rep->persistent_cache_key_prefix_size); + } if (rep->table_options.block_cache_compressed != nullptr) { GenerateCachePrefix(rep->table_options.block_cache_compressed.get(), rep->file->file(), &rep->compressed_cache_key_prefix[0], @@ -466,7 +470,7 @@ void BlockBasedTable::GenerateCachePrefix(Cache* cc, // If the prefix wasn't generated or was too long, // create one from the cache. - if (*size == 0) { + if (cc && *size == 0) { char* end = EncodeVarint64(buffer, cc->NewId()); *size = static_cast(end - buffer); } @@ -507,6 +511,18 @@ bool IsFeatureSupported(const TableProperties& table_properties, } } // namespace +Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix, + size_t cache_key_prefix_size, + const BlockHandle& handle, char* cache_key) { + assert(cache_key != nullptr); + assert(cache_key_prefix_size != 0); + assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); + char* end = + EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset()); + return Slice(cache_key, static_cast(end - cache_key)); +} + Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, const EnvOptions& env_options, const BlockBasedTableOptions& table_options, @@ -541,6 +557,13 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, SetupCacheKeyPrefix(rep, file_size); unique_ptr new_table(new BlockBasedTable(rep)); + // page cache options + rep->persistent_cache_options = + PersistentCacheOptions(rep->table_options.persistent_cache, + std::string(rep->persistent_cache_key_prefix, + rep->persistent_cache_key_prefix_size), + rep->ioptions.statistics); + // Read meta index std::unique_ptr meta; std::unique_ptr meta_iter; @@ -736,12 +759,10 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, // TODO: we never really verify check sum for meta index block std::unique_ptr meta; Status s = ReadBlockFromFile( - rep->file.get(), - rep->footer, - ReadOptions(), - rep->footer.metaindex_handle(), - &meta, - rep->ioptions.env); + rep->file.get(), rep->footer, ReadOptions(), + rep->footer.metaindex_handle(), &meta, rep->ioptions.env, + true /* decompress */, Slice() /*compression dict*/, + rep->persistent_cache_options, rep->ioptions.info_log); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log, @@ -908,7 +929,9 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep, size_t* filter_size) { BlockContents block; if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(), rep->filter_handle, &block, rep->ioptions.env, - false).ok()) { + false /* decompress */, Slice() /*compression dict*/, + rep->persistent_cache_options) + .ok()) { // Error reading the block return nullptr; } @@ -1148,7 +1171,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions.env, block_cache_compressed == nullptr, - compression_dict); + compression_dict, rep->persistent_cache_options, + rep->ioptions.info_log); } if (s.ok()) { @@ -1173,8 +1197,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } std::unique_ptr block_value; s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, - &block_value, rep->ioptions.env, - true /* do_uncompress */, compression_dict); + &block_value, rep->ioptions.env, true /* compress */, + compression_dict, rep->persistent_cache_options, + rep->ioptions.info_log); if (s.ok()) { block.value = block_value.release(); } @@ -1546,7 +1571,8 @@ Status BlockBasedTable::CreateIndexReader( switch (index_type_on_file) { case BlockBasedTableOptions::kBinarySearch: { return BinarySearchIndexReader::Create( - file, footer, footer.index_handle(), env, comparator, index_reader); + file, footer, footer.index_handle(), env, comparator, index_reader, + rep_->persistent_cache_options); } case BlockBasedTableOptions::kHashSearch: { std::unique_ptr meta_guard; @@ -1561,7 +1587,8 @@ Status BlockBasedTable::CreateIndexReader( "Unable to read the metaindex block." " Fall back to binary search index."); return BinarySearchIndexReader::Create( - file, footer, footer.index_handle(), env, comparator, index_reader); + file, footer, footer.index_handle(), env, comparator, + index_reader, rep_->persistent_cache_options); } meta_index_iter = meta_iter_guard.get(); } @@ -1573,7 +1600,7 @@ Status BlockBasedTable::CreateIndexReader( return HashIndexReader::Create( rep_->internal_prefix_transform.get(), footer, file, env, comparator, footer.index_handle(), meta_index_iter, index_reader, - rep_->hash_index_allow_collision); + rep_->hash_index_allow_collision, rep_->persistent_cache_options); } default: { std::string error_message = @@ -1691,8 +1718,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { BlockHandle handle; if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) { BlockContents block; - if (ReadBlockContents(rep_->file.get(), rep_->footer, ReadOptions(), - handle, &block, rep_->ioptions.env, false).ok()) { + if (ReadBlockContents( + rep_->file.get(), rep_->footer, ReadOptions(), handle, &block, + rep_->ioptions.env, false /*decompress*/, + Slice() /*compression dict*/, rep_->persistent_cache_options) + .ok()) { rep_->filter.reset(new BlockBasedFilterBlockReader( rep_->ioptions.prefix_extractor, table_options, table_options.whole_key_filtering, std::move(block))); diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 45b303e0e..37d760e01 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -15,11 +15,12 @@ #include #include "rocksdb/options.h" +#include "rocksdb/persistent_cache.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" -#include "table/table_reader.h" #include "table/table_properties_internal.h" +#include "table/table_reader.h" #include "util/coding.h" #include "util/file_reader_writer.h" @@ -54,6 +55,9 @@ class BlockBasedTable : public TableReader { public: static const std::string kFilterBlockPrefix; static const std::string kFullFilterBlockPrefix; + // The longest prefix of the cache key used to identify blocks. + // For Posix files the unique ID is three varints. + static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length * 3 + 1; // Attempt to open the table that is stored in bytes [0..file_size) // of "file", and read the metadata entries necessary to allow @@ -128,6 +132,10 @@ class BlockBasedTable : public TableReader { // Implementation of IndexReader will be exposed to internal cc file only. class IndexReader; + static Slice GetCacheKey(const char* cache_key_prefix, + size_t cache_key_prefix_size, + const BlockHandle& handle, char* cache_key); + private: template struct CachableEntry; @@ -229,10 +237,6 @@ class BlockBasedTable : public TableReader { static void GenerateCachePrefix(Cache* cc, WritableFile* file, char* buffer, size_t* size); - // The longest prefix of the cache key used to identify blocks. - // For Posix files the unique ID is three varints. - static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; - // Helper functions for DumpTable() Status DumpIndexBlock(WritableFile* out_file); Status DumpDataBlocks(WritableFile* out_file); diff --git a/table/format.cc b/table/format.cc index 628e08af1..7a62f66bb 100644 --- a/table/format.cc +++ b/table/format.cc @@ -14,6 +14,8 @@ #include "rocksdb/env.h" #include "table/block.h" +#include "table/block_based_table_reader.h" +#include "table/persistent_cache_helper.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" @@ -294,10 +296,12 @@ Status ReadBlock(RandomAccessFileReader* file, const Footer& footer, } // namespace Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, - const ReadOptions& options, const BlockHandle& handle, - BlockContents* contents, Env* env, - bool decompression_requested, - const Slice& compression_dict) { + const ReadOptions& read_options, + const BlockHandle& handle, BlockContents* contents, + Env* env, bool decompression_requested, + const Slice& compression_dict, + const PersistentCacheOptions& cache_options, + Logger* info_log) { Status status; Slice slice; size_t n = static_cast(handle.size()); @@ -306,17 +310,63 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, char* used_buf = nullptr; rocksdb::CompressionType compression_type; - if (decompression_requested && - n + kBlockTrailerSize < DefaultStackBufferSize) { - // If we've got a small enough hunk of data, read it in to the - // trivially allocated stack buffer instead of needing a full malloc() - used_buf = &stack_buf[0]; + if (cache_options.persistent_cache && + !cache_options.persistent_cache->IsCompressed()) { + status = PersistentCacheHelper::LookupUncompressedPage(cache_options, + handle, contents); + if (status.ok()) { + // uncompressed page is found for the block handle + return status; + } else { + // uncompressed page is not found + if (info_log && !status.IsNotFound()) { + assert(!status.ok()); + Log(InfoLogLevel::INFO_LEVEL, info_log, + "Error reading from persistent cache. %s", + status.ToString().c_str()); + } + } + } + + if (cache_options.persistent_cache && + cache_options.persistent_cache->IsCompressed()) { + // lookup uncompressed cache mode p-cache + status = PersistentCacheHelper::LookupRawPage( + cache_options, handle, &heap_buf, n + kBlockTrailerSize); } else { - heap_buf = std::unique_ptr(new char[n + kBlockTrailerSize]); - used_buf = heap_buf.get(); + status = Status::NotFound(); } - status = ReadBlock(file, footer, options, handle, &slice, used_buf); + if (status.ok()) { + // cache hit + used_buf = heap_buf.get(); + slice = Slice(heap_buf.get(), n); + } else { + if (info_log && !status.IsNotFound()) { + assert(!status.ok()); + Log(InfoLogLevel::INFO_LEVEL, info_log, + "Error reading from persistent cache. %s", status.ToString().c_str()); + } + // cache miss read from device + if (decompression_requested && + n + kBlockTrailerSize < DefaultStackBufferSize) { + // If we've got a small enough hunk of data, read it in to the + // trivially allocated stack buffer instead of needing a full malloc() + used_buf = &stack_buf[0]; + } else { + heap_buf = std::unique_ptr(new char[n + kBlockTrailerSize]); + used_buf = heap_buf.get(); + } + + status = ReadBlock(file, footer, read_options, handle, &slice, used_buf); + if (status.ok() && read_options.fill_cache && + cache_options.persistent_cache && + cache_options.persistent_cache->IsCompressed()) { + // insert to raw cache + PersistentCacheHelper::InsertRawPage(cache_options, handle, used_buf, + n + kBlockTrailerSize); + } + } if (!status.ok()) { return status; @@ -327,21 +377,29 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, compression_type = static_cast(slice.data()[n]); if (decompression_requested && compression_type != kNoCompression) { - return UncompressBlockContents(slice.data(), n, contents, footer.version(), - compression_dict); - } - - if (slice.data() != used_buf) { + // compressed page, uncompress, update cache + status = UncompressBlockContents(slice.data(), n, contents, + footer.version(), compression_dict); + } else if (slice.data() != used_buf) { + // the slice content is not the buffer provided *contents = BlockContents(Slice(slice.data(), n), false, compression_type); - return status; + } else { + // page is uncompressed, the buffer either stack or heap provided + if (used_buf == &stack_buf[0]) { + heap_buf = std::unique_ptr(new char[n]); + memcpy(heap_buf.get(), stack_buf, n); + } + *contents = BlockContents(std::move(heap_buf), n, true, compression_type); } - if (used_buf == &stack_buf[0]) { - heap_buf = std::unique_ptr(new char[n]); - memcpy(heap_buf.get(), stack_buf, n); + if (status.ok() && read_options.fill_cache && + cache_options.persistent_cache && + !cache_options.persistent_cache->IsCompressed()) { + // insert to uncompressed cache + PersistentCacheHelper::InsertUncompressedPage(cache_options, handle, + *contents); } - *contents = BlockContents(std::move(heap_buf), n, true, compression_type); return status; } @@ -447,6 +505,7 @@ Status UncompressBlockContents(const char* data, size_t n, default: return Status::Corruption("bad block type"); } + return Status::OK(); } diff --git a/table/format.h b/table/format.h index d9846d529..a488f8dd8 100644 --- a/table/format.h +++ b/table/format.h @@ -16,6 +16,7 @@ #include "rocksdb/table.h" #include "port/port.h" // noexcept +#include "table/persistent_cache_helper.h" namespace rocksdb { @@ -208,13 +209,13 @@ struct BlockContents { // Read the block identified by "handle" from "file". On failure // return non-OK. On success fill *result and return OK. -extern Status ReadBlockContents(RandomAccessFileReader* file, - const Footer& footer, - const ReadOptions& options, - const BlockHandle& handle, - BlockContents* contents, Env* env, - bool do_uncompress, - const Slice& compression_dict = Slice()); +extern Status ReadBlockContents( + RandomAccessFileReader* file, const Footer& footer, + const ReadOptions& options, const BlockHandle& handle, + BlockContents* contents, Env* env, bool do_uncompress = true, + const Slice& compression_dict = Slice(), + const PersistentCacheOptions& cache_options = PersistentCacheOptions(), + Logger* info_log = nullptr); // The 'data' points to the raw block contents read in from file. // This method allocates a new heap buffer and the raw block diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 7028d106c..2cbe9eca7 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -13,6 +13,7 @@ #include "table/block.h" #include "table/format.h" #include "table/internal_iterator.h" +#include "table/persistent_cache_helper.h" #include "table/table_properties_internal.h" #include "util/coding.h" @@ -164,7 +165,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, read_options.verify_checksums = false; Status s; s = ReadBlockContents(file, footer, read_options, handle, &block_contents, - env, false); + env, false /* decompress */); if (!s.ok()) { return s; @@ -264,7 +265,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, ReadOptions read_options; read_options.verify_checksums = false; s = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, env, false); + &metaindex_contents, env, false /* decompress */); if (!s.ok()) { return s; } @@ -318,7 +319,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, ReadOptions read_options; read_options.verify_checksums = false; s = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, env, false); + &metaindex_contents, env, false /* do decompression */); if (!s.ok()) { return s; } @@ -347,7 +348,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, ReadOptions read_options; read_options.verify_checksums = false; status = ReadBlockContents(file, footer, read_options, metaindex_handle, - &metaindex_contents, env, false); + &metaindex_contents, env, false /* decompress */); if (!status.ok()) { return status; } @@ -367,7 +368,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, // Reading metablock return ReadBlockContents(file, footer, read_options, block_handle, contents, - env, false); + env, false /* decompress */); } } // namespace rocksdb diff --git a/table/persistent_cache_helper.cc b/table/persistent_cache_helper.cc new file mode 100644 index 000000000..d68c80735 --- /dev/null +++ b/table/persistent_cache_helper.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#include "table/persistent_cache_helper.h" +#include "table/format.h" + +namespace rocksdb { + +void PersistentCacheHelper::InsertRawPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + const char* data, const size_t size) { + assert(cache_options.persistent_cache); + assert(cache_options.persistent_cache->IsCompressed()); + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // insert content to cache + cache_options.persistent_cache->Insert(key, data, size); +} + +void PersistentCacheHelper::InsertUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + const BlockContents& contents) { + assert(cache_options.persistent_cache); + assert(!cache_options.persistent_cache->IsCompressed()); + if (!contents.cachable || contents.compression_type != kNoCompression) { + // We shouldn't cache this. Either + // (1) content is not cacheable + // (2) content is compressed + return; + } + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // insert block contents to page cache + cache_options.persistent_cache->Insert(key, contents.data.data(), + contents.data.size()); +} + +Status PersistentCacheHelper::LookupRawPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + std::unique_ptr* raw_data, const size_t raw_data_size) { + assert(cache_options.persistent_cache); + assert(cache_options.persistent_cache->IsCompressed()); + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // Lookup page + size_t size; + Status s = cache_options.persistent_cache->Lookup(key, raw_data, &size); + if (!s.ok()) { + // cache miss + RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS); + return s; + } + + // cache hit + assert(raw_data_size == handle.size() + kBlockTrailerSize); + assert(size == raw_data_size); + RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT); + return Status::OK(); +} + +Status PersistentCacheHelper::LookupUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + BlockContents* contents) { + assert(cache_options.persistent_cache); + assert(!cache_options.persistent_cache->IsCompressed()); + if (!contents) { + // We shouldn't lookup in the cache. Either + // (1) Nowhere to store + return Status::NotFound(); + } + + // construct the page key + char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(), + cache_options.key_prefix.size(), + handle, cache_key); + // Lookup page + std::unique_ptr data; + size_t size; + Status s = cache_options.persistent_cache->Lookup(key, &data, &size); + if (!s.ok()) { + // cache miss + RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS); + return s; + } + + // please note we are potentially comparing compressed data size with + // uncompressed data size + assert(handle.size() <= size); + + // update stats + RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT); + // construct result and return + *contents = + BlockContents(std::move(data), size, false /*cacheable*/, kNoCompression); + return Status::OK(); +} + +} // namespace rocksdb diff --git a/table/persistent_cache_helper.h b/table/persistent_cache_helper.h new file mode 100644 index 000000000..45a1f87d2 --- /dev/null +++ b/table/persistent_cache_helper.h @@ -0,0 +1,63 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include + +#include "table/block_based_table_reader.h" +#include "util/statistics.h" + +namespace rocksdb { + +struct BlockContents; + +// PersistentCacheOptions +// +// This describe the caching behavior for page cache +// This is used to pass the context for caching and the cache handle +struct PersistentCacheOptions { + PersistentCacheOptions() {} + explicit PersistentCacheOptions( + const std::shared_ptr& _persistent_cache, + const std::string _key_prefix, Statistics* const _statistics) + : persistent_cache(_persistent_cache), + key_prefix(_key_prefix), + statistics(_statistics) {} + + virtual ~PersistentCacheOptions() {} + + std::shared_ptr persistent_cache; + std::string key_prefix; + Statistics* statistics = nullptr; +}; + +// PersistentCacheHelper +// +// Encapsulates some of the helper logic for read and writing from the cache +class PersistentCacheHelper { + public: + // insert block into raw page cache + static void InsertRawPage(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, const char* data, + const size_t size); + + // insert block into uncompressed cache + static void InsertUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + const BlockContents& contents); + + // lookup block from raw page cacge + static Status LookupRawPage(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, + std::unique_ptr* raw_data, + const size_t raw_data_size); + + // lookup block from uncompressed cache + static Status LookupUncompressedPage( + const PersistentCacheOptions& cache_options, const BlockHandle& handle, + BlockContents* contents); +}; + +} // namespace rocksdb diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 242d5f4c6..df288fdee 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2099,7 +2099,7 @@ class Benchmark { } } if (FLAGS_statistics) { - fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); + fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); } } diff --git a/util/io_posix.cc b/util/io_posix.cc index 05a7f2788..18238f900 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -119,6 +119,7 @@ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) { long version = 0; result = ioctl(fd, FS_IOC_GETVERSION, &version); + TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); if (result == -1) { return 0; } diff --git a/util/options_settable_test.cc b/util/options_settable_test.cc index c05e3b803..f83ed9886 100644 --- a/util/options_settable_test.cc +++ b/util/options_settable_test.cc @@ -102,6 +102,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offsetof(struct BlockBasedTableOptions, block_cache), sizeof(std::shared_ptr)}, + {offsetof(struct BlockBasedTableOptions, persistent_cache), + sizeof(std::shared_ptr)}, {offsetof(struct BlockBasedTableOptions, block_cache_compressed), sizeof(std::shared_ptr)}, {offsetof(struct BlockBasedTableOptions, filter_policy),