Refactor and clean up the code that reads a blob from a file (#6093)

Summary:
This patch factors out the logic that reads a (potentially compressed) blob
from a file into a separate helper method `GetRawBlobFromFile`, and cleans
up the code a bit. Also, errors during decompression are now logged/propagated
to the user by returning a `Status` code of `Corruption`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6093

Test Plan: `make check`

Differential Revision: D18716673

Pulled By: ltamasi

fbshipit-source-id: 44144bc064cab616862d5643f34384f2bae6eb78
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent 57f3032285
commit d9314a9214
  1. 150
      utilities/blob_db/blob_db_impl.cc
  2. 5
      utilities/blob_db/blob_db_impl.h

@ -1251,15 +1251,18 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value, uint64_t* expiration) {
assert(value != nullptr);
assert(value);
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(index_entry);
if (!s.ok()) {
return s;
}
if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
return Status::NotFound("Key expired");
}
if (expiration != nullptr) {
if (blob_index.HasTTL()) {
*expiration = blob_index.expiration();
@ -1267,13 +1270,65 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
*expiration = kNoExpiration;
}
}
if (blob_index.IsInlined()) {
// TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
// memory buffer to avoid extra copy.
value->PinSelf(blob_index.value());
return Status::OK();
}
if (blob_index.size() == 0) {
CompressionType compression_type = kNoCompression;
s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
blob_index.size(), value, &compression_type);
if (!s.ok()) {
return s;
}
if (compression_type != kNoCompression) {
BlockContents contents;
auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
{
StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
s = UncompressBlockContentsForCompressionType(
info, value->data(), value->size(), &contents,
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
}
if (!s.ok()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Uncompression error during blob read from file: %" PRIu64
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
" key: %s status: '%s'",
blob_index.file_number(), blob_index.offset(), blob_index.size(),
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
}
return Status::Corruption("Unable to uncompress blob.");
}
value->PinSelf(contents.data);
}
return Status::OK();
}
Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
uint64_t offset, uint64_t size,
PinnableSlice* value,
CompressionType* compression_type) {
assert(value);
assert(compression_type);
assert(*compression_type == kNoCompression);
if (!size) {
value->PinSelf("");
return Status::OK();
}
@ -1281,47 +1336,46 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
// offset has to have certain min, as we will read CRC
// later from the Blob Header, which needs to be also a
// valid offset.
if (blob_index.offset() <
if (offset <
(BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Invalid blob index file_number: %" PRIu64
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
" key: %s",
blob_index.file_number(), blob_index.offset(),
blob_index.size(), key.data());
file_number, offset, size,
key.ToString(/* output_hex */ true).c_str());
}
return Status::NotFound("Invalid blob offset");
}
std::shared_ptr<BlobFile> bfile;
std::shared_ptr<BlobFile> blob_file;
{
ReadLock rl(&mutex_);
auto hitr = blob_files_.find(blob_index.file_number());
auto it = blob_files_.find(file_number);
// file was deleted
if (hitr == blob_files_.end()) {
if (it == blob_files_.end()) {
return Status::NotFound("Blob Not Found as blob file missing");
}
bfile = hitr->second;
blob_file = it->second;
}
if (blob_index.size() == 0 && value != nullptr) {
value->PinSelf("");
return Status::OK();
}
*compression_type = blob_file->compression();
// takes locks when called
std::shared_ptr<RandomAccessFileReader> reader;
s = GetBlobFileReader(bfile, &reader);
Status s = GetBlobFileReader(blob_file, &reader);
if (!s.ok()) {
return s;
}
assert(blob_index.offset() > key.size() + sizeof(uint32_t));
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size();
assert(offset >= key.size() + sizeof(uint32_t));
const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
// Allocate the buffer. This is safe in C++11
std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
@ -1329,42 +1383,44 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
// A partial blob record contain checksum, key and value.
Slice blob_record;
{
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
}
if (!s.ok()) {
ROCKS_LOG_DEBUG(db_options_.info_log,
"Failed to read blob from blob file %" PRIu64
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
bfile->BlobFileNumber(), blob_index.offset(),
blob_index.size(), key.size(), s.ToString().c_str());
ROCKS_LOG_DEBUG(
db_options_.info_log,
"Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
file_number, offset, size, key.size(), s.ToString().c_str());
return s;
}
if (blob_record.size() != record_size) {
ROCKS_LOG_DEBUG(
db_options_.info_log,
"Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
bfile->BlobFileNumber(), blob_index.offset(), blob_index.size(),
key.size(), blob_record.size(), record_size);
file_number, offset, size, key.size(), blob_record.size(), record_size);
return Status::Corruption("Failed to retrieve blob from blob index.");
}
Slice crc_slice(blob_record.data(), sizeof(uint32_t));
Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
static_cast<size_t>(blob_index.size()));
uint32_t crc_exp;
static_cast<size_t>(size));
uint32_t crc_exp = 0;
if (!GetFixed32(&crc_slice, &crc_exp)) {
ROCKS_LOG_DEBUG(db_options_.info_log,
"Unable to decode CRC from blob file %" PRIu64
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
", key size: %" ROCKSDB_PRIszt ", status: '%s'",
bfile->BlobFileNumber(), blob_index.offset(),
blob_index.size(), key.size(), s.ToString().c_str());
ROCKS_LOG_DEBUG(
db_options_.info_log,
"Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
file_number, offset, size, key.size(), s.ToString().c_str());
return Status::Corruption("Unable to decode checksum.");
}
@ -1373,34 +1429,20 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
crc = crc32c::Mask(crc); // Adjust for storage
if (crc != crc_exp) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Blob crc mismatch file: %s blob_offset: %" PRIu64
ROCKS_LOG_ERROR(
db_options_.info_log,
"Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), blob_index.offset(),
blob_index.size(), key.data(), s.ToString().c_str());
file_number, offset, size,
key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
}
return Status::Corruption("Corruption. Blob CRC mismatch");
}
if (bfile->compression() == kNoCompression) {
value->PinSelf(blob_value);
} else {
BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
{
StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext context(bfile->compression());
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
bfile->compression());
s = UncompressBlockContentsForCompressionType(
info, blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
}
value->PinSelf(contents.data);
}
return s;
return Status::OK();
}
Status BlobDBImpl::Get(const ReadOptions& read_options,

@ -237,6 +237,11 @@ class BlobDBImpl : public BlobDB {
Status GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value, uint64_t* expiration = nullptr);
Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
uint64_t offset, uint64_t size,
PinnableSlice* value,
CompressionType* compression_type);
Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const;

Loading…
Cancel
Save