Support row cache with batched MultiGet (#5706)

Summary:
This PR adds support for row cache in ```rocksdb::TableCache::MultiGet```.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5706

Test Plan:
1. Unit tests in db_basic_test
2. db_bench results with batch size of 2 (```Get``` is faster than ```MultiGet``` for single key) -
Get -
readrandom   :       3.935 micros/op 254116 ops/sec;   28.1 MB/s (22870998 of 22870999 found)
MultiGet -
multireadrandom :       3.743 micros/op 267190 ops/sec; (24047998 of 24047998 found)

Command used -
TEST_TMPDIR=/dev/shm/multiget numactl -C 10  ./db_bench -use_existing_db=true -use_existing_keys=false -benchmarks="readtorowcache,[read|multiread]random" -write_buffer_size=16777216 -target_file_size_base=4194304 -max_bytes_for_level_base=16777216 -num=12000000 -reads=12000000 -duration=90 -threads=1 -compression_type=none -cache_size=4194304000 -row_cache_size=4194304000 -batch_size=2 -disable_auto_compactions=true -bloom_bits=10 -cache_index_and_filter_blocks=true -pin_l0_filter_and_index_blocks_in_cache=true -multiread_batched=true -multiread_stride=131072

Differential Revision: D17086297

Pulled By: anand1976

fbshipit-source-id: 85784378da913e05f1baf31ec1b4e7c9345e7f57
main
anand76 5 years ago committed by Facebook Github Bot
parent 1daff8f85a
commit e10570331d
  1. 92
      db/db_basic_test.cc
  2. 213
      db/table_cache.cc
  3. 14
      db/table_cache.h
  4. 2
      db/version_set.cc
  5. 2
      table/multiget_context.h

@ -1335,6 +1335,98 @@ INSTANTIATE_TEST_CASE_P(
MultiGetPrefix, MultiGetPrefixExtractorTest,
::testing::Bool());
class DBMultiGetRowCacheTest
: public DBBasicTest,
public ::testing::WithParamInterface<bool> {};
TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
do {
option_config_ = kRowCache;
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
CreateAndReopenWithCF({"pikachu"}, options);
SetPerfLevel(kEnableCount);
ASSERT_OK(Put(1, "k1", "v1"));
ASSERT_OK(Put(1, "k2", "v2"));
ASSERT_OK(Put(1, "k3", "v3"));
ASSERT_OK(Put(1, "k4", "v4"));
Flush(1);
ASSERT_OK(Put(1, "k5", "v5"));
const Snapshot* snap1 = dbfull()->GetSnapshot();
ASSERT_OK(Delete(1, "k4"));
Flush(1);
const Snapshot* snap2 = dbfull()->GetSnapshot();
get_perf_context()->Reset();
std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
std::vector<PinnableSlice> values(keys.size());
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
std::vector<Status> s(keys.size());
ReadOptions ro;
bool use_snapshots = GetParam();
if (use_snapshots) {
ro.snapshot = snap2;
}
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(),
values.data(), s.data(), false);
ASSERT_EQ(values.size(), keys.size());
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
// four kv pairs * two bytes per value
ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
ASSERT_TRUE(s[0].IsNotFound());
ASSERT_OK(s[1]);
ASSERT_TRUE(s[2].IsNotFound());
ASSERT_OK(s[3]);
ASSERT_OK(s[4]);
// Call MultiGet() again with some intersection with the previous set of
// keys. Those should already be in the row cache.
keys.assign({"no_key", "k5", "k3", "k2"});
for (size_t i = 0; i < keys.size(); ++i) {
values[i].Reset();
s[i] = Status::OK();
}
get_perf_context()->Reset();
if (use_snapshots) {
ro.snapshot = snap1;
}
db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
values.data(), s.data(), false);
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
// four kv pairs * two bytes per value
ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
ASSERT_TRUE(s[0].IsNotFound());
ASSERT_OK(s[1]);
ASSERT_OK(s[2]);
ASSERT_OK(s[3]);
if (use_snapshots) {
// Only reads from the first SST file would have been cached, since
// snapshot seq no is > fd.largest_seqno
ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
} else {
ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
}
SetPerfLevel(kDisable);
dbfull()->ReleaseSnapshot(snap1);
dbfull()->ReleaseSnapshot(snap2);
} while (ChangeCompactOptions());
}
INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
testing::Values(true, false));
#ifndef ROCKSDB_LITE
TEST_F(DBBasicTest, GetAllKeyVersions) {
Options options = CurrentOptions();

@ -277,6 +277,80 @@ Status TableCache::GetRangeTombstoneIterator(
return s;
}
#ifndef ROCKSDB_LITE
void TableCache::CreateRowCacheKeyPrefix(
const ReadOptions& options,
const FileDescriptor& fd, const Slice& internal_key,
GetContext* get_context, IterKey& row_cache_key) {
uint64_t fd_number = fd.GetNumber();
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
// If the snapshot is larger than the largest seqno in the file,
// all data should be exposed to the snapshot, so we treat it
// the same as there is no snapshot. The exception is that if
// a seq-checking callback is registered, some internal keys
// may still be filtered out.
uint64_t seq_no = 0;
// Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
if (options.snapshot != nullptr &&
(get_context->has_callback() ||
static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->GetSequenceNumber() <= fd.largest_seqno)) {
// We should consider to use options.snapshot->GetSequenceNumber()
// instead of GetInternalKeySeqno(k), which will make the code
// easier to understand.
seq_no = 1 + GetInternalKeySeqno(internal_key);
}
// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
}
bool TableCache::GetFromRowCache(
const Slice& user_key, IterKey& row_cache_key,
size_t prefix_size, GetContext* get_context) {
bool found = false;
row_cache_key.TrimAppend(prefix_size, user_key.data(),
user_key.size());
if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
// Cleanable routine to release the cache entry
Cleanable value_pinner;
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
// If it comes here value is located on the cache.
// found_row_cache_entry points to the value on cache,
// and value_pinner has cleanup procedure for the cached entry.
// After replayGetContextLog() returns, get_context.pinnable_slice_
// will point to cache entry buffer (or a copy based on that) and
// cleanup routine under value_pinner will be delegated to
// get_context.pinnable_slice_. Cache entry is released when
// get_context.pinnable_slice_ is reset.
value_pinner.RegisterCleanup(release_cache_entry_func,
ioptions_.row_cache.get(), row_handle);
replayGetContextLog(*found_row_cache_entry, user_key, get_context,
&value_pinner);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
found = true;
} else {
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
}
return found;
}
#endif // ROCKSDB_LITE
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k,
@ -294,66 +368,11 @@ Status TableCache::Get(const ReadOptions& options,
// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
// If the snapshot is larger than the largest seqno in the file,
// all data should be exposed to the snapshot, so we treat it
// the same as there is no snapshot. The exception is that if
// a seq-checking callback is registered, some internal keys
// may still be filtered out.
uint64_t seq_no = 0;
// Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
if (options.snapshot != nullptr &&
(get_context->has_callback() ||
static_cast_with_check<const SnapshotImpl, const Snapshot>(
options.snapshot)
->GetSequenceNumber() <= fd.largest_seqno)) {
// We should consider to use options.snapshot->GetSequenceNumber()
// instead of GetInternalKeySeqno(k), which will make the code
// easier to understand.
seq_no = 1 + GetInternalKeySeqno(k);
}
// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
user_key.size());
if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
// Cleanable routine to release the cache entry
Cleanable value_pinner;
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
// If it comes here value is located on the cache.
// found_row_cache_entry points to the value on cache,
// and value_pinner has cleanup procedure for the cached entry.
// After replayGetContextLog() returns, get_context.pinnable_slice_
// will point to cache entry buffer (or a copy based on that) and
// cleanup routine under value_pinner will be delegated to
// get_context.pinnable_slice_. Cache entry is released when
// get_context.pinnable_slice_ is reset.
value_pinner.RegisterCleanup(release_cache_entry_func,
ioptions_.row_cache.get(), row_handle);
replayGetContextLog(*found_row_cache_entry, user_key, get_context,
&value_pinner);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
done = true;
} else {
// Not found, setting up the replay log.
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
get_context);
if (!done) {
row_cache_entry = &row_cache_entry_buffer;
}
}
@ -413,8 +432,6 @@ Status TableCache::Get(const ReadOptions& options,
}
// Batched version of TableCache::MultiGet.
// TODO: Add support for row cache. As of now, this ignores the row cache
// and directly looks up in the table files
Status TableCache::MultiGet(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
@ -426,7 +443,41 @@ Status TableCache::MultiGet(const ReadOptions& options,
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (s.ok()) {
MultiGetRange table_range(*mget_range, mget_range->begin(), mget_range->end());
#ifndef ROCKSDB_LITE
autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
IterKey row_cache_key;
size_t row_cache_key_prefix_size = 0;
KeyContext& first_key = *table_range.begin();
bool lookup_row_cache = ioptions_.row_cache &&
!first_key.get_context->NeedToReadSequence();
// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (lookup_row_cache) {
GetContext* first_context = first_key.get_context;
CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
row_cache_key);
row_cache_key_prefix_size = row_cache_key.Size();
for (auto miter = table_range.begin(); miter != table_range.end(); ++miter) {
const Slice& user_key = miter->ukey;;
GetContext* get_context = miter->get_context;
if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
get_context)) {
table_range.SkipKey(miter);
} else {
row_cache_entries.emplace_back();
get_context->SetReplayLog(&(row_cache_entries.back()));
}
}
}
#endif // ROCKSDB_LITE
// Check that table_range is not empty. Its possible all keys may have been
// found in the row cache and thus the range may now be empty
if (s.ok() && !table_range.empty()) {
if (t == nullptr) {
s = FindTable(
env_options_, internal_comparator, fd, &handle, prefix_extractor,
@ -441,21 +492,20 @@ Status TableCache::MultiGet(const ReadOptions& options,
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
for (auto iter = mget_range->begin(); iter != mget_range->end();
for (auto iter = table_range.begin(); iter != table_range.end();
++iter) {
const Slice& k = iter->ikey;
SequenceNumber* max_covering_tombstone_seq =
iter->get_context->max_covering_tombstone_seq();
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
*max_covering_tombstone_seq =
std::max(*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey));
}
}
}
if (s.ok()) {
t->MultiGet(options, mget_range, prefix_extractor, skip_filters);
t->MultiGet(options, &table_range, prefix_extractor, skip_filters);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) {
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
Status* status = iter->s;
if (status->IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
@ -466,6 +516,31 @@ Status TableCache::MultiGet(const ReadOptions& options,
}
}
#ifndef ROCKSDB_LITE
if (lookup_row_cache) {
size_t row_idx = 0;
for (auto miter = table_range.begin(); miter != table_range.end(); ++miter) {
std::string& row_cache_entry = row_cache_entries[row_idx++];
const Slice& user_key = miter->ukey;;
GetContext* get_context = miter->get_context;
get_context->SetReplayLog(nullptr);
// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
user_key.size());
// Put the replay log in row cache only if something was found.
if (s.ok() && !row_cache_entry.empty()) {
size_t charge =
row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
}
}
}
#endif // ROCKSDB_LITE
if (handle != nullptr) {
ReleaseHandle(handle);
}

@ -202,6 +202,20 @@ class TableCache {
bool skip_filters = false, int level = -1,
bool prefetch_index_and_filter_in_cache = true);
// Create a key prefix for looking up the row cache. The prefix is of the
// format row_cache_id + fd_number + seq_no. Later, the user key can be
// appended to form the full key
void CreateRowCacheKeyPrefix(const ReadOptions& options,
const FileDescriptor& fd,
const Slice& internal_key,
GetContext* get_context,
IterKey& row_cache_key);
// Helper function to lookup the row cache for a key. It appends the
// user key to row_cache_key at offset prefix_size
bool GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
size_t prefix_size, GetContext* get_context);
const ImmutableCFOptions& ioptions_;
const EnvOptions& env_options_;
Cache* const cache_;

@ -1866,7 +1866,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
user_comparator(), merge_operator_, info_log_, db_statistics_,
iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
iter->value, nullptr, &(iter->merge_context), true,
&iter->max_covering_tombstone_seq, this->env_, &iter->seq,
&iter->max_covering_tombstone_seq, this->env_, nullptr,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
tracing_mget_id);
}

@ -25,7 +25,6 @@ struct KeyContext {
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq;
bool key_exists;
SequenceNumber seq;
void* cb_arg;
PinnableSlice* value;
GetContext* get_context;
@ -36,7 +35,6 @@ struct KeyContext {
s(stat),
max_covering_tombstone_seq(0),
key_exists(false),
seq(0),
cb_arg(nullptr),
value(val),
get_context(nullptr) {}

Loading…
Cancel
Save