Added PersistentCache abstraction

Summary:
Added a new abstraction to cache page to RocksDB designed for the read
cache use.

RocksDB current block cache is more of an object cache. For the persistent read cache
project, what we need is a page cache equivalent. This changes adds a cache
abstraction to RocksDB to cache pages called PersistentCache. PersistentCache can cache
uncompressed pages or raw pages (content as in filesystem). The user can
choose to operate PersistentCache either in  COMPRESSED or UNCOMPRESSED mode.

Blame Rev:

Test Plan: Run unit tests

Reviewers: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D55707
main
krad 9 years ago
parent 5c06e0814c
commit a08c8c851a
  1. 1
      CMakeLists.txt
  2. 126
      db/db_test2.cc
  3. 1
      include/rocksdb/options.h
  4. 49
      include/rocksdb/persistent_cache.h
  5. 5
      include/rocksdb/statistics.h
  6. 7
      include/rocksdb/table.h
  7. 3
      src.mk
  8. 118
      table/block_based_table_reader.cc
  9. 14
      table/block_based_table_reader.h
  10. 103
      table/format.cc
  11. 15
      table/format.h
  12. 11
      table/meta_blocks.cc
  13. 112
      table/persistent_cache_helper.cc
  14. 63
      table/persistent_cache_helper.h
  15. 2
      tools/db_bench_tool.cc
  16. 1
      util/io_posix.cc
  17. 2
      util/options_settable_test.cc

@ -183,6 +183,7 @@ set(SOURCES
table/plain_table_index.cc
table/plain_table_key_coding.cc
table/plain_table_reader.cc
persistent_cache_helper.cc
table/table_properties.cc
table/two_level_iterator.cc
tools/sst_dump_tool.cc

@ -7,8 +7,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cstdlib>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/wal_filter.h"
namespace rocksdb {
@ -1024,7 +1026,131 @@ TEST_P(PinL0IndexAndFilterBlocksTest,
INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
PinL0IndexAndFilterBlocksTest, ::testing::Bool());
#ifndef ROCKSDB_LITE
static void UniqueIdCallback(void* arg) {
int* result = reinterpret_cast<int*>(arg);
if (*result == -1) {
*result = 0;
}
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
class MockPersistentCache : public PersistentCache {
public:
explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
: is_compressed_(is_compressed), max_size_(max_size) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
}
virtual ~MockPersistentCache() {}
Status Insert(const Slice& page_key, const char* data,
const size_t size) override {
MutexLock _(&lock_);
if (size_ > max_size_) {
size_ -= data_.begin()->second.size();
data_.erase(data_.begin());
}
data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
size_ += size;
return Status::OK();
}
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
size_t* size) override {
MutexLock _(&lock_);
auto it = data_.find(page_key.ToString());
if (it == data_.end()) {
return Status::NotFound();
}
assert(page_key.ToString() == it->first);
data->reset(new char[it->second.size()]);
memcpy(data->get(), it->second.c_str(), it->second.size());
*size = it->second.size();
return Status::OK();
}
bool IsCompressed() override { return is_compressed_; }
port::Mutex lock_;
std::map<std::string, std::string> data_;
const bool is_compressed_ = true;
size_t size_ = 0;
const size_t max_size_ = 10 * 1024; // 10KiB
};
TEST_F(DBTest2, PersistentCache) {
int num_iter = 80;
Options options;
options.write_buffer_size = 64 * 1024; // small write buffer
options.statistics = rocksdb::CreateDBStatistics();
options = CurrentOptions(options);
auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024};
auto types = {/*compressed*/ 1, /*uncompressed*/ 0};
for (auto bsize : bsizes) {
for (auto type : types) {
BlockBasedTableOptions table_options;
table_options.persistent_cache.reset(
new MockPersistentCache(type, 10 * 1024));
table_options.no_block_cache = true;
table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
table_options.block_cache_compressed = nullptr;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// default column family doesn't have block cache
Options no_block_cache_opts;
no_block_cache_opts.statistics = options.statistics;
no_block_cache_opts = CurrentOptions(no_block_cache_opts);
BlockBasedTableOptions table_options_no_bc;
table_options_no_bc.no_block_cache = true;
no_block_cache_opts.table_factory.reset(
NewBlockBasedTableFactory(table_options_no_bc));
ReopenWithColumnFamilies(
{"default", "pikachu"},
std::vector<Options>({no_block_cache_opts, options}));
Random rnd(301);
// Write 8MB (80 values, each 100K)
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
std::vector<std::string> values;
std::string str;
for (int i = 0; i < num_iter; i++) {
if (i % 4 == 0) { // high compression ratio
str = RandomString(&rnd, 1000);
}
values.push_back(str);
ASSERT_OK(Put(1, Key(i), values[i]));
}
// flush all data from memtable so that reads are from block cache
ASSERT_OK(Flush(1));
for (int i = 0; i < num_iter; i++) {
ASSERT_EQ(Get(1, Key(i)), values[i]);
}
auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
ASSERT_GT(hit, 0);
ASSERT_GT(miss, 0);
}
}
}
#endif
} // namespace rocksdb
int main(int argc, char** argv) {

@ -1613,6 +1613,7 @@ struct CompactRangeOptions {
BottommostLevelCompaction bottommost_level_compaction =
BottommostLevelCompaction::kIfHaveCompactionFilter;
};
} // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_

@ -0,0 +1,49 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <stdint.h>
#include <memory>
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
namespace rocksdb {
// PersistentCache
//
// Persistent cache interface for caching IO pages on a persistent medium. The
// cache interface is specifically designed for persistent read cache.
class PersistentCache {
public:
virtual ~PersistentCache() {}
// Insert to page cache
//
// page_key Identifier to identify a page uniquely across restarts
// data Page data
// size Size of the page
virtual Status Insert(const Slice& key, const char* data,
const size_t size) = 0;
// Lookup page cache by page identifier
//
// page_key Page identifier
// buf Buffer where the data should be copied
// size Size of the page
virtual Status Lookup(const Slice& key, std::unique_ptr<char[]>* data,
size_t* size) = 0;
// Is cache storing uncompressed data ?
//
// True if the cache is configured to store uncompressed data else false
virtual bool IsCompressed() = 0;
};
} // namespace rocksdb

@ -54,6 +54,11 @@ enum Tickers : uint32_t {
// # of times bloom filter has avoided file reads.
BLOOM_FILTER_USEFUL,
// # persistent cache hit
PERSISTENT_CACHE_HIT,
// # persistent cache miss
PERSISTENT_CACHE_MISS,
// # of memtable hits.
MEMTABLE_HIT,
// # of memtable misses.

@ -21,15 +21,16 @@
#include <unordered_map>
#include "rocksdb/env.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/status.h"
namespace rocksdb {
// -- Block-based Table
class FlushBlockPolicyFactory;
class PersistentCache;
class RandomAccessFile;
struct TableReaderOptions;
struct TableBuilderOptions;
@ -103,6 +104,10 @@ struct BlockBasedTableOptions {
// If NULL, rocksdb will automatically create and use an 8MB internal cache.
std::shared_ptr<Cache> block_cache = nullptr;
// If non-NULL use the specified cache for pages read from device
// IF NULL, no page cache is used
std::shared_ptr<PersistentCache> persistent_cache = nullptr;
// If non-NULL use the specified cache for compressed blocks.
// If NULL, rocksdb will not use a compressed block cache.
std::shared_ptr<Cache> block_cache_compressed = nullptr;

@ -82,6 +82,7 @@ LIB_SOURCES = \
table/plain_table_index.cc \
table/plain_table_key_coding.cc \
table/plain_table_reader.cc \
table/persistent_cache_helper.cc \
table/table_properties.cc \
table/two_level_iterator.cc \
tools/dump/db_dump_tool.cc \
@ -103,7 +104,7 @@ LIB_SOURCES = \
util/io_posix.cc \
util/threadpool.cc \
util/transaction_test_util.cc \
util/sst_file_manager_impl.cc \
util/sst_file_manager_impl.cc \
util/file_util.cc \
util/file_reader_writer.cc \
util/filter_policy.cc \

@ -25,17 +25,18 @@
#include "rocksdb/table_properties.h"
#include "table/block.h"
#include "table/filter_block.h"
#include "table/block_based_filter_block.h"
#include "table/block_based_table_factory.h"
#include "table/full_filter_block.h"
#include "table/block_hash_index.h"
#include "table/block_prefix_index.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/full_filter_block.h"
#include "table/get_context.h"
#include "table/internal_iterator.h"
#include "table/meta_blocks.h"
#include "table/persistent_cache_helper.h"
#include "table/two_level_iterator.h"
#include "table/get_context.h"
#include "util/coding.h"
#include "util/file_reader_writer.h"
@ -53,13 +54,6 @@ using std::unique_ptr;
typedef BlockBasedTable::IndexReader IndexReader;
namespace {
// The longest the prefix of the cache key used to identify blocks can be.
// We are using the fact that we know for Posix files the unique ID is three
// varints.
// For some reason, compiling for iOS complains that this variable is unused
const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) =
kMaxVarint64Length * 3 + 1;
// Read the block identified by "handle" from "file".
// The only relevant option is options.verify_checksums for now.
// On failure return non-OK.
@ -69,11 +63,13 @@ const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) =
Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result, Env* env,
bool do_uncompress = true,
const Slice& compression_dict = Slice()) {
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
Logger* info_log) {
BlockContents contents;
Status s = ReadBlockContents(file, footer, options, handle, &contents, env,
do_uncompress, compression_dict);
do_uncompress, compression_dict, cache_options,
info_log);
if (s.ok()) {
result->reset(new Block(std::move(contents)));
}
@ -106,18 +102,12 @@ Slice GetCacheKeyFromOffset(const char* cache_key_prefix,
char* cache_key) {
assert(cache_key != nullptr);
assert(cache_key_prefix_size != 0);
assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
assert(cache_key_prefix_size <= BlockBasedTable::kMaxCacheKeyPrefixSize);
memcpy(cache_key, cache_key_prefix, cache_key_prefix_size);
char* end = EncodeVarint64(cache_key + cache_key_prefix_size, offset);
return Slice(cache_key, static_cast<size_t>(end - cache_key));
}
Slice GetCacheKey(const char* cache_key_prefix, size_t cache_key_prefix_size,
const BlockHandle& handle, char* cache_key) {
return GetCacheKeyFromOffset(cache_key_prefix, cache_key_prefix_size,
handle.offset(), cache_key);
}
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker,
@ -183,11 +173,13 @@ class BinarySearchIndexReader : public IndexReader {
// unmodified.
static Status Create(RandomAccessFileReader* file, const Footer& footer,
const BlockHandle& index_handle, Env* env,
const Comparator* comparator,
IndexReader** index_reader) {
const Comparator* comparator, IndexReader** index_reader,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, env);
&index_block, env, true /* decompress */,
Slice() /*compression dict*/, cache_options,
/*info_log*/ nullptr);
if (s.ok()) {
*index_reader =
@ -231,10 +223,13 @@ class HashIndexReader : public IndexReader {
const BlockHandle& index_handle,
InternalIterator* meta_index_iter,
IndexReader** index_reader,
bool hash_index_allow_collision) {
bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
&index_block, env);
&index_block, env, true /* decompress */,
Slice() /*compression dict*/, cache_options,
/*info_log*/ nullptr);
if (!s.ok()) {
return s;
@ -269,14 +264,15 @@ class HashIndexReader : public IndexReader {
// Read contents for the blocks
BlockContents prefixes_contents;
s = ReadBlockContents(file, footer, ReadOptions(), prefixes_handle,
&prefixes_contents, env, true /* do decompression */);
&prefixes_contents, env, true /* decompress */,
Slice() /*compression dict*/, cache_options);
if (!s.ok()) {
return s;
}
BlockContents prefixes_meta_contents;
s = ReadBlockContents(file, footer, ReadOptions(), prefixes_meta_handle,
&prefixes_meta_contents, env,
true /* do decompression */);
&prefixes_meta_contents, env, true /* decompress */,
Slice() /*compression dict*/, cache_options);
if (!s.ok()) {
// TODO: log error
return Status::OK();
@ -388,10 +384,13 @@ struct BlockBasedTable::Rep {
unique_ptr<RandomAccessFileReader> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t cache_key_prefix_size = 0;
char persistent_cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t persistent_cache_key_prefix_size = 0;
char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size = 0;
uint64_t dummy_index_reader_offset =
0; // ID that is unique for the block cache.
PersistentCacheOptions persistent_cache_options;
// Footer contains the fixed table information
Footer footer;
@ -451,6 +450,11 @@ void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) {
rep->dummy_index_reader_offset =
file_size + rep->table_options.block_cache->NewId();
}
if (rep->table_options.persistent_cache != nullptr) {
GenerateCachePrefix(/*cache=*/nullptr, rep->file->file(),
&rep->persistent_cache_key_prefix[0],
&rep->persistent_cache_key_prefix_size);
}
if (rep->table_options.block_cache_compressed != nullptr) {
GenerateCachePrefix(rep->table_options.block_cache_compressed.get(),
rep->file->file(), &rep->compressed_cache_key_prefix[0],
@ -466,7 +470,7 @@ void BlockBasedTable::GenerateCachePrefix(Cache* cc,
// If the prefix wasn't generated or was too long,
// create one from the cache.
if (*size == 0) {
if (cc && *size == 0) {
char* end = EncodeVarint64(buffer, cc->NewId());
*size = static_cast<size_t>(end - buffer);
}
@ -507,6 +511,18 @@ bool IsFeatureSupported(const TableProperties& table_properties,
}
} // namespace
Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix,
size_t cache_key_prefix_size,
const BlockHandle& handle, char* cache_key) {
assert(cache_key != nullptr);
assert(cache_key_prefix_size != 0);
assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize);
memcpy(cache_key, cache_key_prefix, cache_key_prefix_size);
char* end =
EncodeVarint64(cache_key + cache_key_prefix_size, handle.offset());
return Slice(cache_key, static_cast<size_t>(end - cache_key));
}
Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const BlockBasedTableOptions& table_options,
@ -541,6 +557,13 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
SetupCacheKeyPrefix(rep, file_size);
unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep));
// page cache options
rep->persistent_cache_options =
PersistentCacheOptions(rep->table_options.persistent_cache,
std::string(rep->persistent_cache_key_prefix,
rep->persistent_cache_key_prefix_size),
rep->ioptions.statistics);
// Read meta index
std::unique_ptr<Block> meta;
std::unique_ptr<InternalIterator> meta_iter;
@ -736,12 +759,10 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
// TODO: we never really verify check sum for meta index block
std::unique_ptr<Block> meta;
Status s = ReadBlockFromFile(
rep->file.get(),
rep->footer,
ReadOptions(),
rep->footer.metaindex_handle(),
&meta,
rep->ioptions.env);
rep->file.get(), rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions.env,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, rep->ioptions.info_log);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log,
@ -908,7 +929,9 @@ FilterBlockReader* BlockBasedTable::ReadFilter(Rep* rep, size_t* filter_size) {
BlockContents block;
if (!ReadBlockContents(rep->file.get(), rep->footer, ReadOptions(),
rep->filter_handle, &block, rep->ioptions.env,
false).ok()) {
false /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options)
.ok()) {
// Error reading the block
return nullptr;
}
@ -1148,7 +1171,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&raw_block, rep->ioptions.env,
block_cache_compressed == nullptr,
compression_dict);
compression_dict, rep->persistent_cache_options,
rep->ioptions.info_log);
}
if (s.ok()) {
@ -1173,8 +1197,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
}
std::unique_ptr<Block> block_value;
s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
&block_value, rep->ioptions.env,
true /* do_uncompress */, compression_dict);
&block_value, rep->ioptions.env, true /* compress */,
compression_dict, rep->persistent_cache_options,
rep->ioptions.info_log);
if (s.ok()) {
block.value = block_value.release();
}
@ -1546,7 +1571,8 @@ Status BlockBasedTable::CreateIndexReader(
switch (index_type_on_file) {
case BlockBasedTableOptions::kBinarySearch: {
return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), env, comparator, index_reader);
file, footer, footer.index_handle(), env, comparator, index_reader,
rep_->persistent_cache_options);
}
case BlockBasedTableOptions::kHashSearch: {
std::unique_ptr<Block> meta_guard;
@ -1561,7 +1587,8 @@ Status BlockBasedTable::CreateIndexReader(
"Unable to read the metaindex block."
" Fall back to binary search index.");
return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), env, comparator, index_reader);
file, footer, footer.index_handle(), env, comparator,
index_reader, rep_->persistent_cache_options);
}
meta_index_iter = meta_iter_guard.get();
}
@ -1573,7 +1600,7 @@ Status BlockBasedTable::CreateIndexReader(
return HashIndexReader::Create(
rep_->internal_prefix_transform.get(), footer, file, env, comparator,
footer.index_handle(), meta_index_iter, index_reader,
rep_->hash_index_allow_collision);
rep_->hash_index_allow_collision, rep_->persistent_cache_options);
}
default: {
std::string error_message =
@ -1691,8 +1718,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
BlockHandle handle;
if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) {
BlockContents block;
if (ReadBlockContents(rep_->file.get(), rep_->footer, ReadOptions(),
handle, &block, rep_->ioptions.env, false).ok()) {
if (ReadBlockContents(
rep_->file.get(), rep_->footer, ReadOptions(), handle, &block,
rep_->ioptions.env, false /*decompress*/,
Slice() /*compression dict*/, rep_->persistent_cache_options)
.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader(
rep_->ioptions.prefix_extractor, table_options,
table_options.whole_key_filtering, std::move(block)));

@ -15,11 +15,12 @@
#include <string>
#include "rocksdb/options.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/table_reader.h"
#include "table/table_properties_internal.h"
#include "table/table_reader.h"
#include "util/coding.h"
#include "util/file_reader_writer.h"
@ -54,6 +55,9 @@ class BlockBasedTable : public TableReader {
public:
static const std::string kFilterBlockPrefix;
static const std::string kFullFilterBlockPrefix;
// The longest prefix of the cache key used to identify blocks.
// For Posix files the unique ID is three varints.
static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length * 3 + 1;
// Attempt to open the table that is stored in bytes [0..file_size)
// of "file", and read the metadata entries necessary to allow
@ -128,6 +132,10 @@ class BlockBasedTable : public TableReader {
// Implementation of IndexReader will be exposed to internal cc file only.
class IndexReader;
static Slice GetCacheKey(const char* cache_key_prefix,
size_t cache_key_prefix_size,
const BlockHandle& handle, char* cache_key);
private:
template <class TValue>
struct CachableEntry;
@ -229,10 +237,6 @@ class BlockBasedTable : public TableReader {
static void GenerateCachePrefix(Cache* cc,
WritableFile* file, char* buffer, size_t* size);
// The longest prefix of the cache key used to identify blocks.
// For Posix files the unique ID is three varints.
static const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1;
// Helper functions for DumpTable()
Status DumpIndexBlock(WritableFile* out_file);
Status DumpDataBlocks(WritableFile* out_file);

@ -14,6 +14,8 @@
#include "rocksdb/env.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "table/persistent_cache_helper.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -294,10 +296,12 @@ Status ReadBlock(RandomAccessFileReader* file, const Footer& footer,
} // namespace
Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle,
BlockContents* contents, Env* env,
bool decompression_requested,
const Slice& compression_dict) {
const ReadOptions& read_options,
const BlockHandle& handle, BlockContents* contents,
Env* env, bool decompression_requested,
const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
Logger* info_log) {
Status status;
Slice slice;
size_t n = static_cast<size_t>(handle.size());
@ -306,17 +310,63 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
char* used_buf = nullptr;
rocksdb::CompressionType compression_type;
if (decompression_requested &&
n + kBlockTrailerSize < DefaultStackBufferSize) {
// If we've got a small enough hunk of data, read it in to the
// trivially allocated stack buffer instead of needing a full malloc()
used_buf = &stack_buf[0];
if (cache_options.persistent_cache &&
!cache_options.persistent_cache->IsCompressed()) {
status = PersistentCacheHelper::LookupUncompressedPage(cache_options,
handle, contents);
if (status.ok()) {
// uncompressed page is found for the block handle
return status;
} else {
// uncompressed page is not found
if (info_log && !status.IsNotFound()) {
assert(!status.ok());
Log(InfoLogLevel::INFO_LEVEL, info_log,
"Error reading from persistent cache. %s",
status.ToString().c_str());
}
}
}
if (cache_options.persistent_cache &&
cache_options.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache
status = PersistentCacheHelper::LookupRawPage(
cache_options, handle, &heap_buf, n + kBlockTrailerSize);
} else {
heap_buf = std::unique_ptr<char[]>(new char[n + kBlockTrailerSize]);
used_buf = heap_buf.get();
status = Status::NotFound();
}
status = ReadBlock(file, footer, options, handle, &slice, used_buf);
if (status.ok()) {
// cache hit
used_buf = heap_buf.get();
slice = Slice(heap_buf.get(), n);
} else {
if (info_log && !status.IsNotFound()) {
assert(!status.ok());
Log(InfoLogLevel::INFO_LEVEL, info_log,
"Error reading from persistent cache. %s", status.ToString().c_str());
}
// cache miss read from device
if (decompression_requested &&
n + kBlockTrailerSize < DefaultStackBufferSize) {
// If we've got a small enough hunk of data, read it in to the
// trivially allocated stack buffer instead of needing a full malloc()
used_buf = &stack_buf[0];
} else {
heap_buf = std::unique_ptr<char[]>(new char[n + kBlockTrailerSize]);
used_buf = heap_buf.get();
}
status = ReadBlock(file, footer, read_options, handle, &slice, used_buf);
if (status.ok() && read_options.fill_cache &&
cache_options.persistent_cache &&
cache_options.persistent_cache->IsCompressed()) {
// insert to raw cache
PersistentCacheHelper::InsertRawPage(cache_options, handle, used_buf,
n + kBlockTrailerSize);
}
}
if (!status.ok()) {
return status;
@ -327,21 +377,29 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
compression_type = static_cast<rocksdb::CompressionType>(slice.data()[n]);
if (decompression_requested && compression_type != kNoCompression) {
return UncompressBlockContents(slice.data(), n, contents, footer.version(),
compression_dict);
}
if (slice.data() != used_buf) {
// compressed page, uncompress, update cache
status = UncompressBlockContents(slice.data(), n, contents,
footer.version(), compression_dict);
} else if (slice.data() != used_buf) {
// the slice content is not the buffer provided
*contents = BlockContents(Slice(slice.data(), n), false, compression_type);
return status;
} else {
// page is uncompressed, the buffer either stack or heap provided
if (used_buf == &stack_buf[0]) {
heap_buf = std::unique_ptr<char[]>(new char[n]);
memcpy(heap_buf.get(), stack_buf, n);
}
*contents = BlockContents(std::move(heap_buf), n, true, compression_type);
}
if (used_buf == &stack_buf[0]) {
heap_buf = std::unique_ptr<char[]>(new char[n]);
memcpy(heap_buf.get(), stack_buf, n);
if (status.ok() && read_options.fill_cache &&
cache_options.persistent_cache &&
!cache_options.persistent_cache->IsCompressed()) {
// insert to uncompressed cache
PersistentCacheHelper::InsertUncompressedPage(cache_options, handle,
*contents);
}
*contents = BlockContents(std::move(heap_buf), n, true, compression_type);
return status;
}
@ -447,6 +505,7 @@ Status UncompressBlockContents(const char* data, size_t n,
default:
return Status::Corruption("bad block type");
}
return Status::OK();
}

@ -16,6 +16,7 @@
#include "rocksdb/table.h"
#include "port/port.h" // noexcept
#include "table/persistent_cache_helper.h"
namespace rocksdb {
@ -208,13 +209,13 @@ struct BlockContents {
// Read the block identified by "handle" from "file". On failure
// return non-OK. On success fill *result and return OK.
extern Status ReadBlockContents(RandomAccessFileReader* file,
const Footer& footer,
const ReadOptions& options,
const BlockHandle& handle,
BlockContents* contents, Env* env,
bool do_uncompress,
const Slice& compression_dict = Slice());
extern Status ReadBlockContents(
RandomAccessFileReader* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle,
BlockContents* contents, Env* env, bool do_uncompress = true,
const Slice& compression_dict = Slice(),
const PersistentCacheOptions& cache_options = PersistentCacheOptions(),
Logger* info_log = nullptr);
// The 'data' points to the raw block contents read in from file.
// This method allocates a new heap buffer and the raw block

@ -13,6 +13,7 @@
#include "table/block.h"
#include "table/format.h"
#include "table/internal_iterator.h"
#include "table/persistent_cache_helper.h"
#include "table/table_properties_internal.h"
#include "util/coding.h"
@ -164,7 +165,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
read_options.verify_checksums = false;
Status s;
s = ReadBlockContents(file, footer, read_options, handle, &block_contents,
env, false);
env, false /* decompress */);
if (!s.ok()) {
return s;
@ -264,7 +265,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options;
read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false);
&metaindex_contents, env, false /* decompress */);
if (!s.ok()) {
return s;
}
@ -318,7 +319,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options;
read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false);
&metaindex_contents, env, false /* do decompression */);
if (!s.ok()) {
return s;
}
@ -347,7 +348,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
ReadOptions read_options;
read_options.verify_checksums = false;
status = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false);
&metaindex_contents, env, false /* decompress */);
if (!status.ok()) {
return status;
}
@ -367,7 +368,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
// Reading metablock
return ReadBlockContents(file, footer, read_options, block_handle, contents,
env, false);
env, false /* decompress */);
}
} // namespace rocksdb

@ -0,0 +1,112 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "table/persistent_cache_helper.h"
#include "table/format.h"
namespace rocksdb {
void PersistentCacheHelper::InsertRawPage(
const PersistentCacheOptions& cache_options, const BlockHandle& handle,
const char* data, const size_t size) {
assert(cache_options.persistent_cache);
assert(cache_options.persistent_cache->IsCompressed());
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// insert content to cache
cache_options.persistent_cache->Insert(key, data, size);
}
void PersistentCacheHelper::InsertUncompressedPage(
const PersistentCacheOptions& cache_options, const BlockHandle& handle,
const BlockContents& contents) {
assert(cache_options.persistent_cache);
assert(!cache_options.persistent_cache->IsCompressed());
if (!contents.cachable || contents.compression_type != kNoCompression) {
// We shouldn't cache this. Either
// (1) content is not cacheable
// (2) content is compressed
return;
}
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// insert block contents to page cache
cache_options.persistent_cache->Insert(key, contents.data.data(),
contents.data.size());
}
Status PersistentCacheHelper::LookupRawPage(
const PersistentCacheOptions& cache_options, const BlockHandle& handle,
std::unique_ptr<char[]>* raw_data, const size_t raw_data_size) {
assert(cache_options.persistent_cache);
assert(cache_options.persistent_cache->IsCompressed());
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// Lookup page
size_t size;
Status s = cache_options.persistent_cache->Lookup(key, raw_data, &size);
if (!s.ok()) {
// cache miss
RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS);
return s;
}
// cache hit
assert(raw_data_size == handle.size() + kBlockTrailerSize);
assert(size == raw_data_size);
RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT);
return Status::OK();
}
Status PersistentCacheHelper::LookupUncompressedPage(
const PersistentCacheOptions& cache_options, const BlockHandle& handle,
BlockContents* contents) {
assert(cache_options.persistent_cache);
assert(!cache_options.persistent_cache->IsCompressed());
if (!contents) {
// We shouldn't lookup in the cache. Either
// (1) Nowhere to store
return Status::NotFound();
}
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key = BlockBasedTable::GetCacheKey(cache_options.key_prefix.c_str(),
cache_options.key_prefix.size(),
handle, cache_key);
// Lookup page
std::unique_ptr<char[]> data;
size_t size;
Status s = cache_options.persistent_cache->Lookup(key, &data, &size);
if (!s.ok()) {
// cache miss
RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS);
return s;
}
// please note we are potentially comparing compressed data size with
// uncompressed data size
assert(handle.size() <= size);
// update stats
RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT);
// construct result and return
*contents =
BlockContents(std::move(data), size, false /*cacheable*/, kNoCompression);
return Status::OK();
}
} // namespace rocksdb

@ -0,0 +1,63 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "table/block_based_table_reader.h"
#include "util/statistics.h"
namespace rocksdb {
struct BlockContents;
// PersistentCacheOptions
//
// This describe the caching behavior for page cache
// This is used to pass the context for caching and the cache handle
struct PersistentCacheOptions {
PersistentCacheOptions() {}
explicit PersistentCacheOptions(
const std::shared_ptr<PersistentCache>& _persistent_cache,
const std::string _key_prefix, Statistics* const _statistics)
: persistent_cache(_persistent_cache),
key_prefix(_key_prefix),
statistics(_statistics) {}
virtual ~PersistentCacheOptions() {}
std::shared_ptr<PersistentCache> persistent_cache;
std::string key_prefix;
Statistics* statistics = nullptr;
};
// PersistentCacheHelper
//
// Encapsulates some of the helper logic for read and writing from the cache
class PersistentCacheHelper {
public:
// insert block into raw page cache
static void InsertRawPage(const PersistentCacheOptions& cache_options,
const BlockHandle& handle, const char* data,
const size_t size);
// insert block into uncompressed cache
static void InsertUncompressedPage(
const PersistentCacheOptions& cache_options, const BlockHandle& handle,
const BlockContents& contents);
// lookup block from raw page cacge
static Status LookupRawPage(const PersistentCacheOptions& cache_options,
const BlockHandle& handle,
std::unique_ptr<char[]>* raw_data,
const size_t raw_data_size);
// lookup block from uncompressed cache
static Status LookupUncompressedPage(
const PersistentCacheOptions& cache_options, const BlockHandle& handle,
BlockContents* contents);
};
} // namespace rocksdb

@ -2099,7 +2099,7 @@ class Benchmark {
}
}
if (FLAGS_statistics) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
}
}

@ -119,6 +119,7 @@ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
long version = 0;
result = ioctl(fd, FS_IOC_GETVERSION, &version);
TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
if (result == -1) {
return 0;
}

@ -102,6 +102,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
sizeof(std::shared_ptr<FlushBlockPolicyFactory>)},
{offsetof(struct BlockBasedTableOptions, block_cache),
sizeof(std::shared_ptr<Cache>)},
{offsetof(struct BlockBasedTableOptions, persistent_cache),
sizeof(std::shared_ptr<PersistentCache>)},
{offsetof(struct BlockBasedTableOptions, block_cache_compressed),
sizeof(std::shared_ptr<Cache>)},
{offsetof(struct BlockBasedTableOptions, filter_policy),

Loading…
Cancel
Save