Change the semantics of bytes_read in GetBlob/MultiGetBlob for consistency (#10248)

Summary:
The `bytes_read` returned by the current BlobSource interface is ambiguous. The uncompressed blob size is returned if the cache hits. The size of the blob read from disk, presumably the compressed version, is returned if the cache misses. Two differing semantics might cause ambiguity and consistency issues. For example, this inconsistency causes the assertion failure (T124246362 and its hot fix is https://github.com/facebook/rocksdb/issues/10249).

This goal is to require that the value of `byte read` always be an on-disk blob record size.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10248

Reviewed By: ltamasi

Differential Revision: D37470292

Pulled By: gangliao

fbshipit-source-id: fbca521b2791d3674dbf2484cea5fcae2fdd94d2
main
Gang Liao 2 years ago committed by Facebook GitHub Bot
parent 0d1e0722ef
commit a1eb02f089
  1. 46
      db/blob/blob_file_reader.cc
  2. 25
      db/blob/blob_source.cc
  3. 22
      db/blob/blob_source.h
  4. 203
      db/blob/blob_source_test.cc

@ -443,50 +443,40 @@ void BlobFileReader::MultiGetBlob(
}
assert(s.ok());
uint64_t total_bytes = 0;
for (size_t i = 0; i < num_blobs; ++i) {
auto& req = read_reqs[i];
const auto& record_slice = req.result;
assert(statuses[i]);
if (req.status.ok() && req.result.size() != req.len) {
if (req.status.ok() && record_slice.size() != req.len) {
req.status = IOStatus::Corruption("Failed to read data from blob file");
}
*statuses[i] = req.status;
}
if (!statuses[i]->ok()) {
continue;
}
if (read_options.verify_checksums) {
for (size_t i = 0; i < num_blobs; ++i) {
assert(statuses[i]);
// Verify checksums if enabled
if (read_options.verify_checksums) {
*statuses[i] = VerifyBlob(record_slice, user_keys[i], value_sizes[i]);
if (!statuses[i]->ok()) {
continue;
}
const Slice& record_slice = read_reqs[i].result;
s = VerifyBlob(record_slice, user_keys[i], value_sizes[i]);
if (!s.ok()) {
assert(statuses[i]);
*statuses[i] = s;
}
}
}
for (size_t i = 0; i < num_blobs; ++i) {
assert(statuses[i]);
if (!statuses[i]->ok()) {
continue;
}
const Slice& record_slice = read_reqs[i].result;
const Slice value_slice(record_slice.data() + adjustments[i],
value_sizes[i]);
s = UncompressBlobIfNeeded(value_slice, compression_type_, clock_,
statistics_, values[i]);
if (!s.ok()) {
*statuses[i] = s;
// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], value_sizes[i]);
*statuses[i] = UncompressBlobIfNeeded(value_slice, compression_type_,
clock_, statistics_, values[i]);
if (statuses[i]->ok()) {
total_bytes += record_slice.size();
}
}
if (bytes_read) {
uint64_t total_bytes = 0;
for (const auto& req : read_reqs) {
total_bytes += req.result.size();
}
*bytes_read = total_bytes;
}
}

@ -9,6 +9,7 @@
#include <string>
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "options/cf_options.h"
#include "table/multiget_context.h"
@ -99,8 +100,16 @@ Status BlobSource::GetBlob(const ReadOptions& read_options,
Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, &blob_entry);
if (s.ok() && blob_entry.GetValue()) {
// For consistency, the size of on-disk (possibly compressed) blob record
// is assigned to bytes_read.
if (bytes_read) {
*bytes_read = value_size;
uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
user_key.size())
: 0;
assert(offset >= adjustment);
*bytes_read = value_size + adjustment;
}
value->PinSelf(*blob_entry.GetValue());
return s;
@ -191,13 +200,20 @@ void BlobSource::MultiGetBlob(
s = GetBlobFromCache(key, &blob_entry);
if (s.ok() && blob_entry.GetValue()) {
assert(statuses[i]);
*statuses[i] = s;
blobs[i]->PinSelf(*blob_entry.GetValue());
// Update the counter for the number of valid blobs read from the cache.
++cached_blob_count;
total_bytes += value_sizes[i];
// For consistency, the size of each on-disk (possibly compressed) blob
// record is accumulated to total_bytes.
uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(
user_keys[i].get().size())
: 0;
assert(offsets[i] >= adjustment);
total_bytes += value_sizes[i] + adjustment;
cache_hit_mask |= (Mask{1} << i); // cache hit
}
}
@ -266,7 +282,6 @@ void BlobSource::MultiGetBlob(
CachableEntry<std::string> blob_entry;
const CacheKey cache_key = base_cache_key.WithOffset(_offsets[i]);
const Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, &blob_entry, _blobs[i]);
if (!s.ok()) {
*_statuses[i] = s;
@ -279,8 +294,6 @@ void BlobSource::MultiGetBlob(
if (bytes_read) {
*bytes_read = total_bytes;
}
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, _bytes_read);
}
}

@ -37,13 +37,33 @@ class BlobSource {
~BlobSource();
// Read a blob from the underlying cache or storage.
//
// If successful, returns ok and sets "*value" to the newly retrieved
// uncompressed blob. If there was an error while fetching the blob, sets
// "*value" to empty and returns a non-ok status.
//
// Note: For consistency, whether the blob is found in the cache or on disk,
// sets "*bytes_read" to the size of on-disk (possibly compressed) blob
// record.
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
uint64_t file_number, uint64_t offset, uint64_t file_size,
uint64_t value_size, CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read);
// Offsets must be sorted in ascending order by caller.
// Read multiple blobs from the underlying cache or storage.
//
// If successful, returns ok and sets the elements of blobs to the newly
// retrieved uncompressed blobs. If there was an error while fetching one of
// blobs, sets its corresponding "blobs[i]" to empty and sets "statuses[i]" to
// a non-ok status.
//
// Note:
// - Offsets must be sorted in ascending order by caller.
// - For consistency, whether the blob is found in the cache or on disk, sets
// "*bytes_read" to the total size of on-disk (possibly compressed) blob
// records.
void MultiGetBlob(
const ReadOptions& read_options,
const autovector<std::reference_wrapper<const Slice>>& user_keys,

@ -12,6 +12,7 @@
#include <string>
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
#include "db/db_test_util.h"
@ -108,33 +109,34 @@ class BlobSourceTest : public DBTestBase {
protected:
public:
explicit BlobSourceTest()
: DBTestBase("blob_source_test", /*env_do_fsync=*/true) {}
: DBTestBase("blob_source_test", /*env_do_fsync=*/true) {
options_.env = env_;
options_.enable_blob_files = true;
options_.create_if_missing = true;
LRUCacheOptions co;
co.capacity = 2048;
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
options_.blob_cache = NewLRUCache(co);
options_.lowest_used_cache_tier = CacheTier::kVolatileTier;
assert(db_->GetDbIdentity(db_id_).ok());
assert(db_->GetDbSessionId(db_session_id_).ok());
}
Options options_;
std::string db_id_;
std::string db_session_id_;
};
TEST_F(BlobSourceTest, GetBlobsFromCache) {
Options options;
options.env = env_;
options.cf_paths.emplace_back(
options_.cf_paths.emplace_back(
test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0);
options.enable_blob_files = true;
options.create_if_missing = true;
LRUCacheOptions co;
co.capacity = 2048;
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
options.blob_cache = NewLRUCache(co);
options.lowest_used_cache_tier = CacheTier::kVolatileTier;
DestroyAndReopen(options);
DestroyAndReopen(options_);
std::string db_id;
ASSERT_OK(db_->GetDbIdentity(db_id));
std::string db_session_id;
ASSERT_OK(db_->GetDbSessionId(db_session_id));
ImmutableOptions immutable_options(options);
ImmutableOptions immutable_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
@ -179,7 +181,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
BlobSource blob_source(&immutable_options, db_id, db_session_id,
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
ReadOptions read_options;
@ -204,7 +206,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read,
blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize);
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
@ -222,7 +224,7 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read,
blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize);
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
@ -239,7 +241,8 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
kNoCompression, prefetch_buffer, &values[i],
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read, blob_sizes[i]);
ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
@ -257,14 +260,15 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
kNoCompression, prefetch_buffer, &values[i],
&bytes_read));
ASSERT_EQ(values[i], blobs[i]);
ASSERT_EQ(bytes_read, blob_sizes[i]);
ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
}
}
options.blob_cache->EraseUnRefEntries();
options_.blob_cache->EraseUnRefEntries();
{
// Cache-only GetBlob
@ -320,30 +324,131 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
}
}
TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
Options options;
options.env = env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0);
options.enable_blob_files = true;
options.create_if_missing = true;
TEST_F(BlobSourceTest, GetCompressedBlobs) {
if (!Snappy_Supported()) {
return;
}
const CompressionType compression = kSnappyCompression;
options_.cf_paths.emplace_back(
test::PerThreadDBPath(env_, "BlobSourceTest_GetCompressedBlobs"), 0);
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr size_t num_blobs = 256;
std::vector<std::string> key_strs;
std::vector<std::string> blob_strs;
for (size_t i = 0; i < num_blobs; ++i) {
key_strs.push_back("key" + std::to_string(i));
blob_strs.push_back("blob" + std::to_string(i));
}
std::vector<Slice> keys;
std::vector<Slice> blobs;
for (size_t i = 0; i < num_blobs; ++i) {
keys.push_back({key_strs[i]});
blobs.push_back({blob_strs[i]});
}
std::vector<uint64_t> blob_offsets(keys.size());
std::vector<uint64_t> blob_sizes(keys.size());
constexpr size_t capacity = 1024;
auto backing_cache = NewLRUCache(capacity); // Blob file cache
FileOptions file_options;
std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
backing_cache.get(), &immutable_options, &file_options, column_family_id,
nullptr /*HistogramImpl*/, nullptr /*IOTracer*/));
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
ReadOptions read_options;
read_options.verify_checksums = true;
uint64_t bytes_read = 0;
std::vector<PinnableSlice> values(keys.size());
{
// Snappy Compression
const uint64_t file_number = 1;
read_options.read_tier = ReadTier::kReadAllTier;
WriteBlobFile(immutable_options, column_family_id, has_ttl,
expiration_range, expiration_range, file_number, keys, blobs,
compression, blob_offsets, blob_sizes);
CacheHandleGuard<BlobFileReader> blob_file_reader;
ASSERT_OK(blob_source.GetBlobFileReader(file_number, &blob_file_reader));
ASSERT_NE(blob_file_reader.GetValue(), nullptr);
const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize();
ASSERT_EQ(blob_file_reader.GetValue()->GetCompressionType(), compression);
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_NE(blobs[i].size() /*uncompressed size*/,
blob_sizes[i] /*compressed size*/);
}
read_options.fill_cache = true;
read_options.read_tier = ReadTier::kReadAllTier;
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[i]));
ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number,
blob_offsets[i], file_size, blob_sizes[i],
compression, nullptr /*prefetch_buffer*/,
&values[i], &bytes_read));
ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/);
ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/);
ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
LRUCacheOptions co;
co.capacity = 2048;
co.num_shard_bits = 2;
co.metadata_charge_policy = kDontChargeCacheMetadata;
options.blob_cache = NewLRUCache(co);
options.lowest_used_cache_tier = CacheTier::kVolatileTier;
ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[i]));
}
read_options.read_tier = ReadTier::kBlockCacheTier;
DestroyAndReopen(options);
for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[i]));
std::string db_id;
ASSERT_OK(db_->GetDbIdentity(db_id));
// Compressed blob size is passed in GetBlob
ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number,
blob_offsets[i], file_size, blob_sizes[i],
compression, nullptr /*prefetch_buffer*/,
&values[i], &bytes_read));
ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/);
ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/);
ASSERT_EQ(bytes_read,
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
blob_offsets[i]));
}
}
}
TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
options_.cf_paths.emplace_back(
test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0);
std::string db_session_id;
ASSERT_OK(db_->GetDbSessionId(db_session_id));
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options);
ImmutableOptions immutable_options(options_);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
@ -388,7 +493,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
BlobSource blob_source(&immutable_options, db_id, db_session_id,
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
ReadOptions read_options;
@ -451,7 +556,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
&value_buf[i], &bytes_read));
ASSERT_EQ(value_buf[i], blobs[i]);
ASSERT_EQ(bytes_read,
blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize);
BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
blob_offsets[i]));
@ -485,7 +590,7 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
}
}
options.blob_cache->EraseUnRefEntries();
options_.blob_cache->EraseUnRefEntries();
{
// Cache-only MultiGetBlob

Loading…
Cancel
Save