diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 963dde6ce..e09f5fe8a 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1335,6 +1335,98 @@ INSTANTIATE_TEST_CASE_P( MultiGetPrefix, MultiGetPrefixExtractorTest, ::testing::Bool()); +class DBMultiGetRowCacheTest + : public DBBasicTest, + public ::testing::WithParamInterface {}; + +TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) { + do { + option_config_ = kRowCache; + Options options = CurrentOptions(); + options.statistics = rocksdb::CreateDBStatistics(); + CreateAndReopenWithCF({"pikachu"}, options); + SetPerfLevel(kEnableCount); + ASSERT_OK(Put(1, "k1", "v1")); + ASSERT_OK(Put(1, "k2", "v2")); + ASSERT_OK(Put(1, "k3", "v3")); + ASSERT_OK(Put(1, "k4", "v4")); + Flush(1); + ASSERT_OK(Put(1, "k5", "v5")); + const Snapshot* snap1 = dbfull()->GetSnapshot(); + ASSERT_OK(Delete(1, "k4")); + Flush(1); + const Snapshot* snap2 = dbfull()->GetSnapshot(); + + get_perf_context()->Reset(); + + std::vector keys({"no_key", "k5", "k4", "k3", "k1"}); + std::vector values(keys.size()); + std::vector cfs(keys.size(), handles_[1]); + std::vector s(keys.size()); + + ReadOptions ro; + bool use_snapshots = GetParam(); + if (use_snapshots) { + ro.snapshot = snap2; + } + db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), + values.data(), s.data(), false); + + ASSERT_EQ(values.size(), keys.size()); + ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1"); + ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3"); + ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5"); + // four kv pairs * two bytes per value + ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes); + + ASSERT_TRUE(s[0].IsNotFound()); + ASSERT_OK(s[1]); + ASSERT_TRUE(s[2].IsNotFound()); + ASSERT_OK(s[3]); + ASSERT_OK(s[4]); + + // Call MultiGet() again with some intersection with the previous set of + // keys. Those should already be in the row cache. + keys.assign({"no_key", "k5", "k3", "k2"}); + for (size_t i = 0; i < keys.size(); ++i) { + values[i].Reset(); + s[i] = Status::OK(); + } + get_perf_context()->Reset(); + + if (use_snapshots) { + ro.snapshot = snap1; + } + db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(), + values.data(), s.data(), false); + + ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2"); + ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3"); + ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5"); + // four kv pairs * two bytes per value + ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes); + + ASSERT_TRUE(s[0].IsNotFound()); + ASSERT_OK(s[1]); + ASSERT_OK(s[2]); + ASSERT_OK(s[3]); + if (use_snapshots) { + // Only reads from the first SST file would have been cached, since + // snapshot seq no is > fd.largest_seqno + ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT)); + } else { + ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT)); + } + + SetPerfLevel(kDisable); + dbfull()->ReleaseSnapshot(snap1); + dbfull()->ReleaseSnapshot(snap2); + } while (ChangeCompactOptions()); +} + +INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest, + testing::Values(true, false)); + #ifndef ROCKSDB_LITE TEST_F(DBBasicTest, GetAllKeyVersions) { Options options = CurrentOptions(); diff --git a/db/table_cache.cc b/db/table_cache.cc index f4de3b8fb..bd85fe0d3 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -277,6 +277,80 @@ Status TableCache::GetRangeTombstoneIterator( return s; } +#ifndef ROCKSDB_LITE +void TableCache::CreateRowCacheKeyPrefix( + const ReadOptions& options, + const FileDescriptor& fd, const Slice& internal_key, + GetContext* get_context, IterKey& row_cache_key) { + uint64_t fd_number = fd.GetNumber(); + // We use the user key as cache key instead of the internal key, + // otherwise the whole cache would be invalidated every time the + // sequence key increases. However, to support caching snapshot + // reads, we append the sequence number (incremented by 1 to + // distinguish from 0) only in this case. + // If the snapshot is larger than the largest seqno in the file, + // all data should be exposed to the snapshot, so we treat it + // the same as there is no snapshot. The exception is that if + // a seq-checking callback is registered, some internal keys + // may still be filtered out. + uint64_t seq_no = 0; + // Maybe we can include the whole file ifsnapshot == fd.largest_seqno. + if (options.snapshot != nullptr && + (get_context->has_callback() || + static_cast_with_check( + options.snapshot) + ->GetSequenceNumber() <= fd.largest_seqno)) { + // We should consider to use options.snapshot->GetSequenceNumber() + // instead of GetInternalKeySeqno(k), which will make the code + // easier to understand. + seq_no = 1 + GetInternalKeySeqno(internal_key); + } + + // Compute row cache key. + row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(), + row_cache_id_.size()); + AppendVarint64(&row_cache_key, fd_number); + AppendVarint64(&row_cache_key, seq_no); +} + +bool TableCache::GetFromRowCache( + const Slice& user_key, IterKey& row_cache_key, + size_t prefix_size, GetContext* get_context) { + bool found = false; + + row_cache_key.TrimAppend(prefix_size, user_key.data(), + user_key.size()); + if (auto row_handle = + ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) { + // Cleanable routine to release the cache entry + Cleanable value_pinner; + auto release_cache_entry_func = [](void* cache_to_clean, + void* cache_handle) { + ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle); + }; + auto found_row_cache_entry = static_cast( + ioptions_.row_cache->Value(row_handle)); + // If it comes here value is located on the cache. + // found_row_cache_entry points to the value on cache, + // and value_pinner has cleanup procedure for the cached entry. + // After replayGetContextLog() returns, get_context.pinnable_slice_ + // will point to cache entry buffer (or a copy based on that) and + // cleanup routine under value_pinner will be delegated to + // get_context.pinnable_slice_. Cache entry is released when + // get_context.pinnable_slice_ is reset. + value_pinner.RegisterCleanup(release_cache_entry_func, + ioptions_.row_cache.get(), row_handle); + replayGetContextLog(*found_row_cache_entry, user_key, get_context, + &value_pinner); + RecordTick(ioptions_.statistics, ROW_CACHE_HIT); + found = true; + } else { + RecordTick(ioptions_.statistics, ROW_CACHE_MISS); + } + return found; +} +#endif // ROCKSDB_LITE + Status TableCache::Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, const Slice& k, @@ -294,66 +368,11 @@ Status TableCache::Get(const ReadOptions& options, // Check row cache if enabled. Since row cache does not currently store // sequence numbers, we cannot use it if we need to fetch the sequence. if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { - uint64_t fd_number = fd.GetNumber(); auto user_key = ExtractUserKey(k); - // We use the user key as cache key instead of the internal key, - // otherwise the whole cache would be invalidated every time the - // sequence key increases. However, to support caching snapshot - // reads, we append the sequence number (incremented by 1 to - // distinguish from 0) only in this case. - // If the snapshot is larger than the largest seqno in the file, - // all data should be exposed to the snapshot, so we treat it - // the same as there is no snapshot. The exception is that if - // a seq-checking callback is registered, some internal keys - // may still be filtered out. - uint64_t seq_no = 0; - // Maybe we can include the whole file ifsnapshot == fd.largest_seqno. - if (options.snapshot != nullptr && - (get_context->has_callback() || - static_cast_with_check( - options.snapshot) - ->GetSequenceNumber() <= fd.largest_seqno)) { - // We should consider to use options.snapshot->GetSequenceNumber() - // instead of GetInternalKeySeqno(k), which will make the code - // easier to understand. - seq_no = 1 + GetInternalKeySeqno(k); - } - - // Compute row cache key. - row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(), - row_cache_id_.size()); - AppendVarint64(&row_cache_key, fd_number); - AppendVarint64(&row_cache_key, seq_no); - row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(), - user_key.size()); - - if (auto row_handle = - ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) { - // Cleanable routine to release the cache entry - Cleanable value_pinner; - auto release_cache_entry_func = [](void* cache_to_clean, - void* cache_handle) { - ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle); - }; - auto found_row_cache_entry = static_cast( - ioptions_.row_cache->Value(row_handle)); - // If it comes here value is located on the cache. - // found_row_cache_entry points to the value on cache, - // and value_pinner has cleanup procedure for the cached entry. - // After replayGetContextLog() returns, get_context.pinnable_slice_ - // will point to cache entry buffer (or a copy based on that) and - // cleanup routine under value_pinner will be delegated to - // get_context.pinnable_slice_. Cache entry is released when - // get_context.pinnable_slice_ is reset. - value_pinner.RegisterCleanup(release_cache_entry_func, - ioptions_.row_cache.get(), row_handle); - replayGetContextLog(*found_row_cache_entry, user_key, get_context, - &value_pinner); - RecordTick(ioptions_.statistics, ROW_CACHE_HIT); - done = true; - } else { - // Not found, setting up the replay log. - RecordTick(ioptions_.statistics, ROW_CACHE_MISS); + CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key); + done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(), + get_context); + if (!done) { row_cache_entry = &row_cache_entry_buffer; } } @@ -413,8 +432,6 @@ Status TableCache::Get(const ReadOptions& options, } // Batched version of TableCache::MultiGet. -// TODO: Add support for row cache. As of now, this ignores the row cache -// and directly looks up in the table files Status TableCache::MultiGet(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, @@ -426,7 +443,41 @@ Status TableCache::MultiGet(const ReadOptions& options, Status s; TableReader* t = fd.table_reader; Cache::Handle* handle = nullptr; - if (s.ok()) { + MultiGetRange table_range(*mget_range, mget_range->begin(), mget_range->end()); +#ifndef ROCKSDB_LITE + autovector row_cache_entries; + IterKey row_cache_key; + size_t row_cache_key_prefix_size = 0; + KeyContext& first_key = *table_range.begin(); + bool lookup_row_cache = ioptions_.row_cache && + !first_key.get_context->NeedToReadSequence(); + + // Check row cache if enabled. Since row cache does not currently store + // sequence numbers, we cannot use it if we need to fetch the sequence. + if (lookup_row_cache) { + GetContext* first_context = first_key.get_context; + CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context, + row_cache_key); + row_cache_key_prefix_size = row_cache_key.Size(); + + for (auto miter = table_range.begin(); miter != table_range.end(); ++miter) { + const Slice& user_key = miter->ukey;; + GetContext* get_context = miter->get_context; + + if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size, + get_context)) { + table_range.SkipKey(miter); + } else { + row_cache_entries.emplace_back(); + get_context->SetReplayLog(&(row_cache_entries.back())); + } + } + } +#endif // ROCKSDB_LITE + + // Check that table_range is not empty. Its possible all keys may have been + // found in the row cache and thus the range may now be empty + if (s.ok() && !table_range.empty()) { if (t == nullptr) { s = FindTable( env_options_, internal_comparator, fd, &handle, prefix_extractor, @@ -441,21 +492,20 @@ Status TableCache::MultiGet(const ReadOptions& options, std::unique_ptr range_del_iter( t->NewRangeTombstoneIterator(options)); if (range_del_iter != nullptr) { - for (auto iter = mget_range->begin(); iter != mget_range->end(); + for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) { - const Slice& k = iter->ikey; SequenceNumber* max_covering_tombstone_seq = iter->get_context->max_covering_tombstone_seq(); - *max_covering_tombstone_seq = std::max( - *max_covering_tombstone_seq, - range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k))); + *max_covering_tombstone_seq = + std::max(*max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey)); } } } if (s.ok()) { - t->MultiGet(options, mget_range, prefix_extractor, skip_filters); + t->MultiGet(options, &table_range, prefix_extractor, skip_filters); } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { - for (auto iter = mget_range->begin(); iter != mget_range->end(); ++iter) { + for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) { Status* status = iter->s; if (status->IsIncomplete()) { // Couldn't find Table in cache but treat as kFound if no_io set @@ -466,6 +516,31 @@ Status TableCache::MultiGet(const ReadOptions& options, } } +#ifndef ROCKSDB_LITE + if (lookup_row_cache) { + size_t row_idx = 0; + + for (auto miter = table_range.begin(); miter != table_range.end(); ++miter) { + std::string& row_cache_entry = row_cache_entries[row_idx++]; + const Slice& user_key = miter->ukey;; + GetContext* get_context = miter->get_context; + + get_context->SetReplayLog(nullptr); + // Compute row cache key. + row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(), + user_key.size()); + // Put the replay log in row cache only if something was found. + if (s.ok() && !row_cache_entry.empty()) { + size_t charge = + row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string); + void* row_ptr = new std::string(std::move(row_cache_entry)); + ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge, + &DeleteEntry); + } + } + } +#endif // ROCKSDB_LITE + if (handle != nullptr) { ReleaseHandle(handle); } diff --git a/db/table_cache.h b/db/table_cache.h index f7e0b0b35..85592858a 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -202,6 +202,20 @@ class TableCache { bool skip_filters = false, int level = -1, bool prefetch_index_and_filter_in_cache = true); + // Create a key prefix for looking up the row cache. The prefix is of the + // format row_cache_id + fd_number + seq_no. Later, the user key can be + // appended to form the full key + void CreateRowCacheKeyPrefix(const ReadOptions& options, + const FileDescriptor& fd, + const Slice& internal_key, + GetContext* get_context, + IterKey& row_cache_key); + + // Helper function to lookup the row cache for a key. It appends the + // user key to row_cache_key at offset prefix_size + bool GetFromRowCache(const Slice& user_key, IterKey& row_cache_key, + size_t prefix_size, GetContext* get_context); + const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; Cache* const cache_; diff --git a/db/version_set.cc b/db/version_set.cc index 7b4b9ccc1..ea6b0ab6a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1866,7 +1866,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, user_comparator(), merge_operator_, info_log_, db_statistics_, iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey, iter->value, nullptr, &(iter->merge_context), true, - &iter->max_covering_tombstone_seq, this->env_, &iter->seq, + &iter->max_covering_tombstone_seq, this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_mget_id); } diff --git a/table/multiget_context.h b/table/multiget_context.h index c9e682fad..88ec4dcc4 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -25,7 +25,6 @@ struct KeyContext { MergeContext merge_context; SequenceNumber max_covering_tombstone_seq; bool key_exists; - SequenceNumber seq; void* cb_arg; PinnableSlice* value; GetContext* get_context; @@ -36,7 +35,6 @@ struct KeyContext { s(stat), max_covering_tombstone_seq(0), key_exists(false), - seq(0), cb_arg(nullptr), value(val), get_context(nullptr) {}