Pin the newly cached blob after insert (#10625)

Summary:
With the current code, when a blob isn't found in the cache and gets read
from the blob file and then inserted into the cache, the application gets
passed the self-contained `PinnableSlice` resulting from the blob file read.
The patch changes this so that the `PinnableSlice` pins the cache entry
instead in this case.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10625

Test Plan: `make check`

Reviewed By: pdillinger

Differential Revision: D39220904

Pulled By: ltamasi

fbshipit-source-id: cb9c62881e3523b1e9f614e00bf503bac2fe3b0a
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 4cd16d65ae
commit b07217da04
  1. 72
      db/blob/blob_source.cc
  2. 3
      db/blob/blob_source.h
  3. 40
      db/blob/blob_source_test.cc

@ -122,6 +122,25 @@ Cache::Handle* BlobSource::GetEntryFromCache(const Slice& key) const {
return cache_handle; return cache_handle;
} }
void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* value) {
assert(cached_blob);
assert(cached_blob->GetValue());
assert(value);
// To avoid copying the cached blob into the buffer provided by the
// application, we can simply transfer ownership of the cache handle to
// the target PinnableSlice. This has the potential to save a lot of
// CPU, especially with large blob values.
value->Reset();
constexpr Cleanable* cleanable = nullptr;
value->PinSlice(cached_blob->GetValue()->data(), cleanable);
cached_blob->TransferTo(value);
}
Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value, Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
size_t charge, size_t charge,
Cache::Handle** cache_handle, Cache::Handle** cache_handle,
@ -165,23 +184,7 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
Slice key = cache_key.AsSlice(); Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, &blob_handle); s = GetBlobFromCache(key, &blob_handle);
if (s.ok() && blob_handle.GetValue()) { if (s.ok() && blob_handle.GetValue()) {
{ PinCachedBlob(&blob_handle, value);
value->Reset();
// To avoid copying the cached blob into the buffer provided by the
// application, we can simply transfer ownership of the cache handle to
// the target PinnableSlice. This has the potential to save a lot of
// CPU, especially with large blob values.
value->PinSlice(
blob_handle.GetValue()->data(),
[](void* arg1, void* arg2) {
Cache* const cache = static_cast<Cache*>(arg1);
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
cache->Release(handle);
},
blob_handle.GetCache(), blob_handle.GetCacheHandle());
// Make the CacheHandleGuard relinquish ownership of the handle.
blob_handle.TransferTo(nullptr);
}
// For consistency, the size of on-disk (possibly compressed) blob record // For consistency, the size of on-disk (possibly compressed) blob record
// is assigned to bytes_read. // is assigned to bytes_read.
@ -243,6 +246,8 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
PinCachedBlob(&blob_handle, value);
} }
assert(s.ok()); assert(s.ok());
@ -312,23 +317,7 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
assert(req.status); assert(req.status);
*req.status = s; *req.status = s;
{ PinCachedBlob(&blob_handle, req.result);
req.result->Reset();
// To avoid copying the cached blob into the buffer provided by the
// application, we can simply transfer ownership of the cache handle
// to the target PinnableSlice. This has the potential to save a lot
// of CPU, especially with large blob values.
req.result->PinSlice(
blob_handle.GetValue()->data(),
[](void* arg1, void* arg2) {
Cache* const cache = static_cast<Cache*>(arg1);
Cache::Handle* const handle = static_cast<Cache::Handle*>(arg2);
cache->Release(handle);
},
blob_handle.GetCache(), blob_handle.GetCacheHandle());
// Make the CacheHandleGuard relinquish ownership of the handle.
blob_handle.TransferTo(nullptr);
}
// Update the counter for the number of valid blobs read from the cache. // Update the counter for the number of valid blobs read from the cache.
++cached_blob_count; ++cached_blob_count;
@ -397,15 +386,18 @@ void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
if (blob_cache_ && read_options.fill_cache) { if (blob_cache_ && read_options.fill_cache) {
// If filling cache is allowed and a cache is configured, try to put // If filling cache is allowed and a cache is configured, try to put
// the blob(s) to the cache. // the blob(s) to the cache.
for (size_t i = 0; i < _blob_reqs.size(); ++i) { for (BlobReadRequest* req : _blob_reqs) {
if (_blob_reqs[i]->status->ok()) { assert(req);
if (req->status->ok()) {
CacheHandleGuard<BlobContents> blob_handle; CacheHandleGuard<BlobContents> blob_handle;
const CacheKey cache_key = const CacheKey cache_key = base_cache_key.WithOffset(req->offset);
base_cache_key.WithOffset(_blob_reqs[i]->offset);
const Slice key = cache_key.AsSlice(); const Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_handle, _blob_reqs[i]->result); s = PutBlobIntoCache(key, &blob_handle, req->result);
if (!s.ok()) { if (!s.ok()) {
*_blob_reqs[i]->status = s; *req->status = s;
} else {
PinCachedBlob(&blob_handle, req->result);
} }
} }
} }

@ -113,6 +113,9 @@ class BlobSource {
CacheHandleGuard<BlobContents>* cached_blob, CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* blob) const; PinnableSlice* blob) const;
static void PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
PinnableSlice* value);
Cache::Handle* GetEntryFromCache(const Slice& key) const; Cache::Handle* GetEntryFromCache(const Slice& key) const;
Status InsertEntryIntoCache(const Slice& key, BlobContents* value, Status InsertEntryIntoCache(const Slice& key, BlobContents* value,

@ -219,6 +219,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
kNoCompression, prefetch_buffer, &values[i], kNoCompression, prefetch_buffer, &values[i],
&bytes_read)); &bytes_read));
ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(values[i], blobs[i]);
ASSERT_FALSE(values[i].IsPinned());
ASSERT_EQ(bytes_read, ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
@ -256,6 +257,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
kNoCompression, prefetch_buffer, &values[i], kNoCompression, prefetch_buffer, &values[i],
&bytes_read)); &bytes_read));
ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(values[i], blobs[i]);
ASSERT_TRUE(values[i].IsPinned());
ASSERT_EQ(bytes_read, ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
@ -299,6 +301,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
kNoCompression, prefetch_buffer, &values[i], kNoCompression, prefetch_buffer, &values[i],
&bytes_read)); &bytes_read));
ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(values[i], blobs[i]);
ASSERT_TRUE(values[i].IsPinned());
ASSERT_EQ(bytes_read, ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
@ -337,6 +340,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
kNoCompression, prefetch_buffer, &values[i], kNoCompression, prefetch_buffer, &values[i],
&bytes_read)); &bytes_read));
ASSERT_EQ(values[i], blobs[i]); ASSERT_EQ(values[i], blobs[i]);
ASSERT_TRUE(values[i].IsPinned());
ASSERT_EQ(bytes_read, ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
@ -383,6 +387,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
&bytes_read) &bytes_read)
.IsIncomplete()); .IsIncomplete());
ASSERT_TRUE(values[i].empty()); ASSERT_TRUE(values[i].empty());
ASSERT_FALSE(values[i].IsPinned());
ASSERT_EQ(bytes_read, 0); ASSERT_EQ(bytes_read, 0);
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
@ -424,6 +429,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
&bytes_read) &bytes_read)
.IsIOError()); .IsIOError());
ASSERT_TRUE(values[i].empty()); ASSERT_TRUE(values[i].empty());
ASSERT_FALSE(values[i].IsPinned());
ASSERT_EQ(bytes_read, 0); ASSERT_EQ(bytes_read, 0);
ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
@ -856,6 +862,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
if (i % 2 == 0) { if (i % 2 == 0) {
ASSERT_OK(statuses_buf[i]); ASSERT_OK(statuses_buf[i]);
ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_EQ(value_buf[i], blobs[i]);
ASSERT_TRUE(value_buf[i].IsPinned());
fs_read_bytes += fs_read_bytes +=
blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize; blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize;
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
@ -864,6 +871,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
} else { } else {
statuses_buf[i].PermitUncheckedError(); statuses_buf[i].PermitUncheckedError();
ASSERT_TRUE(value_buf[i].empty()); ASSERT_TRUE(value_buf[i].empty());
ASSERT_FALSE(value_buf[i].IsPinned());
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i])); blob_offsets[i]));
} }
@ -896,6 +904,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
kNoCompression, prefetch_buffer, kNoCompression, prefetch_buffer,
&value_buf[i], &bytes_read)); &value_buf[i], &bytes_read));
ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_EQ(value_buf[i], blobs[i]);
ASSERT_TRUE(value_buf[i].IsPinned());
ASSERT_EQ(bytes_read, ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
@ -921,6 +930,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
for (size_t i = 0; i < num_blobs; ++i) { for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_OK(statuses_buf[i]); ASSERT_OK(statuses_buf[i]);
ASSERT_EQ(value_buf[i], blobs[i]); ASSERT_EQ(value_buf[i], blobs[i]);
ASSERT_TRUE(value_buf[i].IsPinned());
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i])); blob_offsets[i]));
blob_bytes += blob_sizes[i]; blob_bytes += blob_sizes[i];
@ -969,6 +979,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
for (size_t i = 0; i < num_blobs; ++i) { for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(statuses_buf[i].IsIncomplete()); ASSERT_TRUE(statuses_buf[i].IsIncomplete());
ASSERT_TRUE(value_buf[i].empty()); ASSERT_TRUE(value_buf[i].empty());
ASSERT_FALSE(value_buf[i].IsPinned());
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i])); blob_offsets[i]));
} }
@ -1012,6 +1023,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
for (size_t i = 0; i < num_blobs; ++i) { for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(statuses_buf[i].IsIOError()); ASSERT_TRUE(statuses_buf[i].IsIOError());
ASSERT_TRUE(value_buf[i].empty()); ASSERT_TRUE(value_buf[i].empty());
ASSERT_FALSE(value_buf[i].IsPinned());
ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number, ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number,
file_size, blob_offsets[i])); file_size, blob_offsets[i]));
} }
@ -1153,6 +1165,9 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
ASSERT_TRUE( ASSERT_TRUE(
blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[0])); blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[0]));
// Release cache handle
values[0].Reset();
// key0 should be demoted to the secondary cache, and key1 should be filled // key0 should be demoted to the secondary cache, and key1 should be filled
// to the primary cache from the blob file. // to the primary cache from the blob file.
ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number, ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number,
@ -1163,6 +1178,9 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
ASSERT_TRUE( ASSERT_TRUE(
blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[1])); blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[1]));
// Release cache handle
values[1].Reset();
OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number); OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number);
// blob_cache here only looks at the primary cache since we didn't provide // blob_cache here only looks at the primary cache since we didn't provide
@ -1219,6 +1237,9 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
&values[0], nullptr /* bytes_read */)); &values[0], nullptr /* bytes_read */));
ASSERT_EQ(values[0], blobs[0]); ASSERT_EQ(values[0], blobs[0]);
// Release cache handle
values[0].Reset();
// key0 should be in the primary cache. // key0 should be in the primary cache.
CacheKey cache_key0 = base_cache_key.WithOffset(blob_offsets[0]); CacheKey cache_key0 = base_cache_key.WithOffset(blob_offsets[0]);
const Slice key0 = cache_key0.AsSlice(); const Slice key0 = cache_key0.AsSlice();
@ -1379,11 +1400,11 @@ TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = true; read_options.verify_checksums = true;
std::vector<PinnableSlice> values(keys_.size());
{ {
read_options.fill_cache = false; read_options.fill_cache = false;
std::vector<PinnableSlice> values(keys_.size());
for (size_t i = 0; i < kNumBlobs; ++i) { for (size_t i = 0; i < kNumBlobs; ++i) {
ASSERT_OK(blob_source.GetBlob( ASSERT_OK(blob_source.GetBlob(
read_options, keys_[i], kBlobFileNumber, blob_offsets[i], read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
@ -1397,6 +1418,8 @@ TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
{ {
read_options.fill_cache = true; read_options.fill_cache = true;
std::vector<PinnableSlice> values(keys_.size());
// num_blobs is 16, so the total blob cache usage is less than a single // num_blobs is 16, so the total blob cache usage is less than a single
// dummy entry. Therefore, cache reservation manager only reserves one dummy // dummy entry. Therefore, cache reservation manager only reserves one dummy
// entry here. // entry here.
@ -1434,8 +1457,8 @@ TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
// cache usage after erasing the cache entry. // cache usage after erasing the cache entry.
blob_source.GetBlobCache()->Erase(cache_key.AsSlice()); blob_source.GetBlobCache()->Erase(cache_key.AsSlice());
if (i == kNumBlobs - 1) { if (i == kNumBlobs - 1) {
// The last blob is not in the cache. cache_res_mgr should not reserve // All the blobs got removed from the cache. cache_res_mgr should not
// any space for it. // reserve any space for them.
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0); ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0);
} else { } else {
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry); ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry);
@ -1498,11 +1521,11 @@ TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservationOnFullCache) {
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = true; read_options.verify_checksums = true;
std::vector<PinnableSlice> values(keys_.size());
{ {
read_options.fill_cache = false; read_options.fill_cache = false;
std::vector<PinnableSlice> values(keys_.size());
for (size_t i = 0; i < kNumBlobs; ++i) { for (size_t i = 0; i < kNumBlobs; ++i) {
ASSERT_OK(blob_source.GetBlob( ASSERT_OK(blob_source.GetBlob(
read_options, keys_[i], kBlobFileNumber, blob_offsets[i], read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
@ -1516,6 +1539,8 @@ TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservationOnFullCache) {
{ {
read_options.fill_cache = true; read_options.fill_cache = true;
std::vector<PinnableSlice> values(keys_.size());
// Since we resized each blob to be kSizeDummyEntry / (num_blobs / 2), we // Since we resized each blob to be kSizeDummyEntry / (num_blobs / 2), we
// can't fit all the blobs in the cache at the same time, which means we // can't fit all the blobs in the cache at the same time, which means we
// should observe cache evictions once we reach the cache's capacity. // should observe cache evictions once we reach the cache's capacity.
@ -1528,6 +1553,9 @@ TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservationOnFullCache) {
blob_file_size_, blob_sizes[i], kNoCompression, blob_file_size_, blob_sizes[i], kNoCompression,
nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
// Release cache handle
values[i].Reset();
if (i < kNumBlobs / 2 - 1) { if (i < kNumBlobs / 2 - 1) {
size_t charge = 0; size_t charge = 0;
ASSERT_TRUE(blob_source.TEST_BlobInCache( ASSERT_TRUE(blob_source.TEST_BlobInCache(

Loading…
Cancel
Save