Implement a table-level row cache

Summary:
Implementation of a table-level row cache.
It only caches point queries done through the `DB::Get` interface, queries done through the `Iterator` interface will completely skip the cache.

Supports snapshots and merge operations.

Test Plan: Ran `make valgrind_check commit-prereq`

Reviewers: igor, philipp, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D39849
main
Giuseppe Ottaviano 10 years ago
parent de85e4cadf
commit 782a1590f9
  1. 1
      HISTORY.md
  2. 12
      db/db_bench.cc
  3. 1
      db/db_impl.cc
  4. 26
      db/db_test.cc
  5. 90
      db/table_cache.cc
  6. 1
      db/table_cache.h
  7. 2
      include/rocksdb/immutable_options.h
  8. 5
      include/rocksdb/options.h
  9. 7
      include/rocksdb/statistics.h
  10. 45
      table/get_context.cc
  11. 9
      table/get_context.h
  12. 13
      util/options.cc

@ -3,6 +3,7 @@
### New Features ### New Features
* Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info.
* Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds) * Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds)
* Added a cache for individual rows. See DBOptions::row_cache for more info.
### Public API changes ### Public API changes
* EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters.

@ -322,6 +322,10 @@ DEFINE_int32(block_restart_interval,
DEFINE_int64(compressed_cache_size, -1, DEFINE_int64(compressed_cache_size, -1,
"Number of bytes to use as a cache of compressed data."); "Number of bytes to use as a cache of compressed data.");
DEFINE_int64(row_cache_size, 0,
"Number of bytes to use as a cache of individual rows"
" (0 = disabled).");
DEFINE_int32(open_files, rocksdb::Options().max_open_files, DEFINE_int32(open_files, rocksdb::Options().max_open_files,
"Maximum number of files to keep open at the same time" "Maximum number of files to keep open at the same time"
" (use default if == 0)"); " (use default if == 0)");
@ -2268,6 +2272,14 @@ class Benchmark {
options.max_bytes_for_level_multiplier = options.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier; FLAGS_max_bytes_for_level_multiplier;
options.filter_deletes = FLAGS_filter_deletes; options.filter_deletes = FLAGS_filter_deletes;
if (FLAGS_row_cache_size) {
if (FLAGS_cache_numshardbits >= 1) {
options.row_cache =
NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits);
} else {
options.row_cache = NewLRUCache(FLAGS_row_cache_size);
}
}
if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash || if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
FLAGS_rep_factory == kHashLinkedList)) { FLAGS_rep_factory == kHashLinkedList)) {
fprintf(stderr, "prefix_size should be non-zero if PrefixHash or " fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "

@ -253,7 +253,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
// Give a large number for setting of "infinite" open files. // Give a large number for setting of "infinite" open files.
const int table_cache_size = (db_options_.max_open_files == -1) ? const int table_cache_size = (db_options_.max_open_files == -1) ?
4194304 : db_options_.max_open_files - 10; 4194304 : db_options_.max_open_files - 10;
// Reserve ten files or so for other uses and give the rest to TableCache.
table_cache_ = table_cache_ =
NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits); NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits);

@ -417,7 +417,8 @@ class DBTest : public testing::Test {
kxxHashChecksum = 25, kxxHashChecksum = 25,
kFIFOCompaction = 26, kFIFOCompaction = 26,
kOptimizeFiltersForHits = 27, kOptimizeFiltersForHits = 27,
kEnd = 28 kRowCache = 28,
kEnd = 29
}; };
int option_config_; int option_config_;
@ -707,6 +708,10 @@ class DBTest : public testing::Test {
set_block_based_table_factory = true; set_block_based_table_factory = true;
break; break;
} }
case kRowCache: {
options.row_cache = NewLRUCache(1024 * 1024);
break;
}
default: default:
break; break;
@ -14017,6 +14022,25 @@ TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
} }
} }
TEST_F(DBTest, RowCache) {
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
options.row_cache = NewLRUCache(8192);
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 0);
ASSERT_EQ(Get("foo"), "bar");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 0);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
ASSERT_EQ(Get("foo"), "bar");
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -9,6 +9,7 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/dbformat.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/version_edit.h" #include "db/version_edit.h"
@ -21,9 +22,12 @@
namespace rocksdb { namespace rocksdb {
namespace {
template <class T>
static void DeleteEntry(const Slice& key, void* value) { static void DeleteEntry(const Slice& key, void* value) {
TableReader* table_reader = reinterpret_cast<TableReader*>(value); T* typed_value = reinterpret_cast<T*>(value);
delete table_reader; delete typed_value;
} }
static void UnrefEntry(void* arg1, void* arg2) { static void UnrefEntry(void* arg1, void* arg2) {
@ -37,11 +41,27 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) {
sizeof(*file_number)); sizeof(*file_number));
} }
#ifndef ROCKSDB_LITE
void AppendVarint64(IterKey* key, uint64_t v) {
char buf[10];
auto ptr = EncodeVarint64(buf, v);
key->TrimAppend(key->Size(), buf, ptr - buf);
}
#endif // ROCKSDB_LITE
} // namespace
TableCache::TableCache(const ImmutableCFOptions& ioptions, TableCache::TableCache(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, Cache* const cache) const EnvOptions& env_options, Cache* const cache)
: ioptions_(ioptions), : ioptions_(ioptions), env_options_(env_options), cache_(cache) {
env_options_(env_options), if (ioptions_.row_cache) {
cache_(cache) {} // If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId());
}
}
TableCache::~TableCache() { TableCache::~TableCache() {
} }
@ -88,7 +108,8 @@ Status TableCache::FindTable(const EnvOptions& env_options,
// We do not cache error results so that if the error is transient, // We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically. // or somebody repairs the file, we recover automatically.
} else { } else {
*handle = cache_->Insert(key, table_reader.release(), 1, &DeleteEntry); *handle = cache_->Insert(key, table_reader.release(), 1,
&DeleteEntry<TableReader>);
} }
} }
return s; return s;
@ -137,6 +158,46 @@ Status TableCache::Get(const ReadOptions& options,
TableReader* t = fd.table_reader; TableReader* t = fd.table_reader;
Status s; Status s;
Cache::Handle* handle = nullptr; Cache::Handle* handle = nullptr;
std::string* row_cache_entry = nullptr;
#ifndef ROCKSDB_LITE
IterKey row_cache_key;
std::string row_cache_entry_buffer;
if (ioptions_.row_cache) {
uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
uint64_t seq_no =
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
user_key.size());
if (auto row_handle = ioptions_.row_cache->Lookup(row_cache_key.GetKey())) {
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
replayGetContextLog(*found_row_cache_entry, user_key, get_context);
ioptions_.row_cache->Release(row_handle);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
return Status::OK();
}
// Not found, setting up the replay log.
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
row_cache_entry = &row_cache_entry_buffer;
}
#endif // ROCKSDB_LITE
if (!t) { if (!t) {
s = FindTable(env_options_, internal_comparator, fd, &handle, s = FindTable(env_options_, internal_comparator, fd, &handle,
options.read_tier == kBlockCacheTier); options.read_tier == kBlockCacheTier);
@ -145,15 +206,30 @@ Status TableCache::Get(const ReadOptions& options,
} }
} }
if (s.ok()) { if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context); s = t->Get(options, k, get_context);
get_context->SetReplayLog(nullptr);
if (handle != nullptr) { if (handle != nullptr) {
ReleaseHandle(handle); ReleaseHandle(handle);
} }
} else if (options.read_tier && s.IsIncomplete()) { } else if (options.read_tier && s.IsIncomplete()) {
// Couldnt find Table in cache but treat as kFound if no_io set // Couldn't find Table in cache but treat as kFound if no_io set
get_context->MarkKeyMayExist(); get_context->MarkKeyMayExist();
return Status::OK(); return Status::OK();
} }
#ifndef ROCKSDB_LITE
// Put the replay log in row cache only if something was found.
if (s.ok() && row_cache_entry && !row_cache_entry->empty()) {
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
auto row_handle = ioptions_.row_cache->Insert(
row_cache_key.GetKey(), row_ptr, charge, &DeleteEntry<std::string>);
ioptions_.row_cache->Release(row_handle);
}
#endif // ROCKSDB_LITE
return s; return s;
} }

@ -94,6 +94,7 @@ class TableCache {
const ImmutableCFOptions& ioptions_; const ImmutableCFOptions& ioptions_;
const EnvOptions& env_options_; const EnvOptions& env_options_;
Cache* const cache_; Cache* const cache_;
std::string row_cache_id_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -98,6 +98,8 @@ struct ImmutableCFOptions {
// A vector of EventListeners which call-back functions will be called // A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens. // when specific RocksDB event happens.
std::vector<std::shared_ptr<EventListener>> listeners; std::vector<std::shared_ptr<EventListener>> listeners;
std::shared_ptr<Cache> row_cache;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -1055,6 +1055,11 @@ struct DBOptions {
// Recovery mode to control the consistency while replaying WAL // Recovery mode to control the consistency while replaying WAL
// Default: kTolerateCorruptedTailRecords // Default: kTolerateCorruptedTailRecords
WALRecoveryMode wal_recovery_mode; WALRecoveryMode wal_recovery_mode;
// A global cache for table-level rows.
// Default: nullptr (disabled)
// Not supported in ROCKSDB_LITE mode!
std::shared_ptr<Cache> row_cache;
}; };
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)

@ -141,6 +141,11 @@ enum Tickers : uint32_t {
NUMBER_BLOCK_NOT_COMPRESSED, NUMBER_BLOCK_NOT_COMPRESSED,
MERGE_OPERATION_TOTAL_TIME, MERGE_OPERATION_TOTAL_TIME,
FILTER_OPERATION_TOTAL_TIME, FILTER_OPERATION_TOTAL_TIME,
// Row cache.
ROW_CACHE_HIT,
ROW_CACHE_MISS,
TICKER_ENUM_MAX TICKER_ENUM_MAX
}; };
@ -209,6 +214,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"}, {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"},
{MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"}, {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"},
{FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"},
{ROW_CACHE_HIT, "rocksdb.row.cache.hit"},
{ROW_CACHE_MISS, "rocksdb.row.cache.miss"},
}; };
/** /**

@ -12,6 +12,24 @@
namespace rocksdb { namespace rocksdb {
namespace {
void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
#ifndef ROCKSDB_LITE
if (replay_log) {
if (replay_log->empty()) {
// Optimization: in the common case of only one operation in the
// log, we allocate the exact amount of space needed.
replay_log->reserve(1 + VarintLength(value.size()) + value.size());
}
replay_log->push_back(type);
PutLengthPrefixedSlice(replay_log, value);
}
#endif // ROCKSDB_LITE
}
} // namespace
GetContext::GetContext(const Comparator* ucmp, GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator, Logger* logger, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, Statistics* statistics, GetState init_state,
@ -26,7 +44,8 @@ GetContext::GetContext(const Comparator* ucmp,
value_(ret_value), value_(ret_value),
value_found_(value_found), value_found_(value_found),
merge_context_(merge_context), merge_context_(merge_context),
env_(env) {} env_(env),
replay_log_(nullptr) {}
// Called from TableCache::Get and Table::Get when file/block in which // Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this // key may exist are not there in TableCache/BlockCache respectively. In this
@ -41,6 +60,9 @@ void GetContext::MarkKeyMayExist() {
} }
void GetContext::SaveValue(const Slice& value) { void GetContext::SaveValue(const Slice& value) {
assert(state_ == kNotFound);
appendToReplayLog(replay_log_, kTypeValue, value);
state_ = kFound; state_ = kFound;
value_->assign(value.data(), value.size()); value_->assign(value.data(), value.size());
} }
@ -50,6 +72,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert((state_ != kMerge && parsed_key.type != kTypeMerge) || assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
merge_context_ != nullptr); merge_context_ != nullptr);
if (ucmp_->Compare(parsed_key.user_key, user_key_) == 0) { if (ucmp_->Compare(parsed_key.user_key, user_key_) == 0) {
appendToReplayLog(replay_log_, parsed_key.type, value);
// Key matches. Process it // Key matches. Process it
switch (parsed_key.type) { switch (parsed_key.type) {
case kTypeValue: case kTypeValue:
@ -116,4 +140,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return false; return false;
} }
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context) {
#ifndef ROCKSDB_LITE
Slice s = replay_log;
while (s.size()) {
auto type = static_cast<ValueType>(*s.data());
s.remove_prefix(1);
Slice value;
bool ret = GetLengthPrefixedSlice(&s, &value);
assert(ret);
(void)ret;
// Sequence number is ignored in SaveValue, so we just pass 0.
get_context->SaveValue(ParsedInternalKey(user_key, 0, type), value);
}
#else // ROCKSDB_LITE
assert(false);
#endif // ROCKSDB_LITE
}
} // namespace rocksdb } // namespace rocksdb

@ -31,6 +31,11 @@ class GetContext {
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value); bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value);
GetState State() const { return state_; } GetState State() const { return state_; }
// If a non-null string is passed, all the SaveValue calls will be
// logged into the string. The operations can then be replayed on
// another GetContext with replayGetContextLog.
void SetReplayLog(std::string* replay_log) { replay_log_ = replay_log; }
private: private:
const Comparator* ucmp_; const Comparator* ucmp_;
const MergeOperator* merge_operator_; const MergeOperator* merge_operator_;
@ -44,6 +49,10 @@ class GetContext {
bool* value_found_; // Is value set correctly? Used by KeyMayExist bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_; MergeContext* merge_context_;
Env* env_; Env* env_;
std::string* replay_log_;
}; };
void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
GetContext* get_context);
} // namespace rocksdb } // namespace rocksdb

@ -71,8 +71,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
access_hint_on_compaction_start(options.access_hint_on_compaction_start), access_hint_on_compaction_start(options.access_hint_on_compaction_start),
num_levels(options.num_levels), num_levels(options.num_levels),
optimize_filters_for_hits(options.optimize_filters_for_hits), optimize_filters_for_hits(options.optimize_filters_for_hits),
listeners(options.listeners) { listeners(options.listeners),
} row_cache(options.row_cache) {}
ColumnFamilyOptions::ColumnFamilyOptions() ColumnFamilyOptions::ColumnFamilyOptions()
: comparator(BytewiseComparator()), : comparator(BytewiseComparator()),
@ -290,7 +290,8 @@ DBOptions::DBOptions(const Options& options)
listeners(options.listeners), listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking), enable_thread_tracking(options.enable_thread_tracking),
delayed_write_rate(options.delayed_write_rate), delayed_write_rate(options.delayed_write_rate),
wal_recovery_mode(options.wal_recovery_mode) {} wal_recovery_mode(options.wal_recovery_mode),
row_cache(options.row_cache) {}
static const char* const access_hints[] = { static const char* const access_hints[] = {
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"
@ -360,6 +361,12 @@ void DBOptions::Dump(Logger* log) const {
wal_bytes_per_sync); wal_bytes_per_sync);
Warn(log, " Options.enable_thread_tracking: %d", Warn(log, " Options.enable_thread_tracking: %d",
enable_thread_tracking); enable_thread_tracking);
if (row_cache) {
Warn(log, " Options.row_cache: %" PRIu64,
row_cache->GetCapacity());
} else {
Warn(log, " Options.row_cache: None");
}
} // DBOptions::Dump } // DBOptions::Dump
void ColumnFamilyOptions::Dump(Logger* log) const { void ColumnFamilyOptions::Dump(Logger* log) const {

Loading…
Cancel
Save