Introduce a blob file reader class (#7461)

Summary:
The patch adds a class called `BlobFileReader` that can be used to retrieve blobs
using the information available in blob references (e.g. blob file number, offset, and
size). This will come in handy when implementing blob support for `Get`, `MultiGet`,
and iterators, and also for compaction/garbage collection.

When a `BlobFileReader` object is created (using the factory method `Create`),
it first checks whether the specified file is potentially valid by comparing the file
size against the combined size of the blob file header and footer (files smaller than
the threshold are considered malformed). Then, it opens the file, and reads and verifies
the header and footer. The verification involves magic number/CRC checks
as well as checking for unexpected header/footer fields, e.g. incorrect column family ID
or TTL blob files.

Blobs can be retrieved using `GetBlob`. `GetBlob` validates the offset and compression
type passed by the caller (because of the presence of the header and footer, the
specified offset cannot be too close to the start/end of the file; also, the compression type
has to match the one in the blob file header), and retrieves and potentially verifies and
uncompresses the blob. In particular, when `ReadOptions::verify_checksums` is set,
`BlobFileReader` reads the blob record header as well (as opposed to just the blob itself)
and verifies the key/value size, the key itself, as well as the CRC of the blob record header
and the key/value pair.

In addition, the patch exposes the compression type from `BlobIndex` (both using an
accessor and via `DebugString`), and adds a blob file read latency histogram to
`InternalStats` that can be used with `BlobFileReader`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7461

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D23999219

Pulled By: ltamasi

fbshipit-source-id: deb6b1160d251258b308d5156e2ec063c3e12e5e
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 38d0a365e3
commit 22655a398b
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 9
      TARGETS
  4. 423
      db/blob/blob_file_reader.cc
  5. 81
      db/blob/blob_file_reader.h
  6. 768
      db/blob/blob_file_reader_test.cc
  7. 11
      db/blob/blob_index.h
  8. 4
      db/blob/blob_log_format.cc
  9. 20
      db/blob/blob_log_format.h
  10. 26
      db/internal_stats.cc
  11. 4
      db/internal_stats.h
  12. 2
      src.mk

@ -573,6 +573,7 @@ set(SOURCES
db/blob/blob_file_builder.cc
db/blob/blob_file_garbage.cc
db/blob/blob_file_meta.cc
db/blob/blob_file_reader.cc
db/blob/blob_log_format.cc
db/blob/blob_log_reader.cc
db/blob/blob_log_writer.cc
@ -1044,6 +1045,7 @@ if(WITH_TESTS)
db/blob/blob_file_addition_test.cc
db/blob/blob_file_builder_test.cc
db/blob/blob_file_garbage_test.cc
db/blob/blob_file_reader_test.cc
db/blob/db_blob_index_test.cc
db/column_family_test.cc
db/compact_files_test.cc

@ -578,6 +578,7 @@ ifdef ASSERT_STATUS_CHECKED
blob_file_addition_test \
blob_file_builder_test \
blob_file_garbage_test \
blob_file_reader_test \
bloom_test \
cassandra_format_test \
cassandra_row_merge_test \
@ -1910,6 +1911,9 @@ blob_file_builder_test: $(OBJ_DIR)/db/blob/blob_file_builder_test.o $(TEST_LIBRA
blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
blob_file_reader_test: $(OBJ_DIR)/db/blob/blob_file_reader_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

@ -137,6 +137,7 @@ cpp_library(
"db/blob/blob_file_builder.cc",
"db/blob/blob_file_garbage.cc",
"db/blob/blob_file_meta.cc",
"db/blob/blob_file_reader.cc",
"db/blob/blob_log_format.cc",
"db/blob/blob_log_reader.cc",
"db/blob/blob_log_writer.cc",
@ -424,6 +425,7 @@ cpp_library(
"db/blob/blob_file_builder.cc",
"db/blob/blob_file_garbage.cc",
"db/blob/blob_file_meta.cc",
"db/blob/blob_file_reader.cc",
"db/blob/blob_log_format.cc",
"db/blob/blob_log_reader.cc",
"db/blob/blob_log_writer.cc",
@ -862,6 +864,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"blob_file_reader_test",
"db/blob/blob_file_reader_test.cc",
"serial",
[],
[],
],
[
"block_based_filter_block_test",
"table/block_based/block_based_filter_block_test.cc",

@ -0,0 +1,423 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_file_reader.h"
#include <cassert>
#include <string>
#include "db/blob/blob_log_format.h"
#include "file/filename.h"
#include "options/cf_options.h"
#include "rocksdb/file_system.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
#include "util/compression.h"
#include "util/crc32c.h"
namespace ROCKSDB_NAMESPACE {
Status BlobFileReader::Create(
const ImmutableCFOptions& immutable_cf_options,
const FileOptions& file_options, uint32_t column_family_id,
HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* blob_file_reader) {
assert(blob_file_reader);
assert(!*blob_file_reader);
uint64_t file_size = 0;
std::unique_ptr<RandomAccessFileReader> file_reader;
{
const Status s =
OpenFile(immutable_cf_options, file_options, blob_file_read_hist,
blob_file_number, &file_size, &file_reader);
if (!s.ok()) {
return s;
}
}
assert(file_reader);
CompressionType compression_type = kNoCompression;
{
const Status s =
ReadHeader(file_reader.get(), column_family_id, &compression_type);
if (!s.ok()) {
return s;
}
}
{
const Status s = ReadFooter(file_size, file_reader.get());
if (!s.ok()) {
return s;
}
}
blob_file_reader->reset(
new BlobFileReader(std::move(file_reader), file_size, compression_type));
return Status::OK();
}
Status BlobFileReader::OpenFile(
const ImmutableCFOptions& immutable_cf_options,
const FileOptions& file_opts, HistogramImpl* blob_file_read_hist,
uint64_t blob_file_number, uint64_t* file_size,
std::unique_ptr<RandomAccessFileReader>* file_reader) {
assert(file_size);
assert(file_reader);
const auto& cf_paths = immutable_cf_options.cf_paths;
assert(!cf_paths.empty());
const std::string blob_file_path =
BlobFileName(cf_paths.front().path, blob_file_number);
FileSystem* const fs = immutable_cf_options.fs;
assert(fs);
constexpr IODebugContext* dbg = nullptr;
{
TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize");
const Status s =
fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg);
if (!s.ok()) {
return s;
}
}
if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
return Status::Corruption("Malformed blob file");
}
std::unique_ptr<FSRandomAccessFile> file;
{
TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile");
const Status s =
fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg);
if (!s.ok()) {
return s;
}
}
assert(file);
if (immutable_cf_options.advise_random_on_open) {
file->Hint(FSRandomAccessFile::kRandom);
}
file_reader->reset(new RandomAccessFileReader(
std::move(file), blob_file_path, immutable_cf_options.env,
std::shared_ptr<IOTracer>(), immutable_cf_options.statistics,
BLOB_DB_BLOB_FILE_READ_MICROS, blob_file_read_hist,
immutable_cf_options.rate_limiter, immutable_cf_options.listeners));
return Status::OK();
}
Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
uint32_t column_family_id,
CompressionType* compression_type) {
assert(file_reader);
assert(compression_type);
Slice header_slice;
Buffer buf;
AlignedBuf aligned_buf;
{
TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile");
constexpr uint64_t read_offset = 0;
constexpr size_t read_size = BlobLogHeader::kSize;
const Status s = ReadFromFile(file_reader, read_offset, read_size,
&header_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult",
&header_slice);
}
BlobLogHeader header;
{
const Status s = header.DecodeFrom(header_slice);
if (!s.ok()) {
return s;
}
}
constexpr ExpirationRange no_expiration_range;
if (header.has_ttl || header.expiration_range != no_expiration_range) {
return Status::Corruption("Unexpected TTL blob file");
}
if (header.column_family_id != column_family_id) {
return Status::Corruption("Column family ID mismatch");
}
*compression_type = header.compression;
return Status::OK();
}
Status BlobFileReader::ReadFooter(uint64_t file_size,
const RandomAccessFileReader* file_reader) {
assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
assert(file_reader);
Slice footer_slice;
Buffer buf;
AlignedBuf aligned_buf;
{
TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile");
const uint64_t read_offset = file_size - BlobLogFooter::kSize;
constexpr size_t read_size = BlobLogFooter::kSize;
const Status s = ReadFromFile(file_reader, read_offset, read_size,
&footer_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult",
&footer_slice);
}
BlobLogFooter footer;
{
const Status s = footer.DecodeFrom(footer_slice);
if (!s.ok()) {
return s;
}
}
constexpr ExpirationRange no_expiration_range;
if (footer.expiration_range != no_expiration_range) {
return Status::Corruption("Unexpected TTL blob file");
}
return Status::OK();
}
Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
uint64_t read_offset, size_t read_size,
Slice* slice, Buffer* buf,
AlignedBuf* aligned_buf) {
assert(slice);
assert(buf);
assert(aligned_buf);
assert(file_reader);
Status s;
if (file_reader->use_direct_io()) {
constexpr char* scratch = nullptr;
s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
aligned_buf);
} else {
buf->reset(new char[read_size]);
constexpr AlignedBuf* aligned_scratch = nullptr;
s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
buf->get(), aligned_scratch);
}
if (!s.ok()) {
return s;
}
if (slice->size() != read_size) {
return Status::Corruption("Failed to read data from blob file");
}
return Status::OK();
}
BlobFileReader::BlobFileReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
CompressionType compression_type)
: file_reader_(std::move(file_reader)),
file_size_(file_size),
compression_type_(compression_type) {
assert(file_reader_);
}
BlobFileReader::~BlobFileReader() = default;
Status BlobFileReader::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t offset,
uint64_t value_size,
CompressionType compression_type,
PinnableSlice* value) const {
assert(value);
const uint64_t key_size = user_key.size();
if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
return Status::Corruption("Invalid blob offset");
}
if (compression_type != compression_type_) {
return Status::Corruption("Compression type mismatch when reading blob");
}
// Note: if verify_checksum is set, we read the entire blob record to be able
// to perform the verification; otherwise, we just read the blob itself. Since
// the offset in BlobIndex actually points to the blob value, we need to make
// an adjustment in the former case.
const uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
: 0;
assert(offset >= adjustment);
Slice record_slice;
Buffer buf;
AlignedBuf aligned_buf;
{
TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
const uint64_t record_offset = offset - adjustment;
const uint64_t record_size = value_size + adjustment;
const Status s = ReadFromFile(file_reader_.get(), record_offset,
static_cast<size_t>(record_size),
&record_slice, &buf, &aligned_buf);
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
&record_slice);
}
if (read_options.verify_checksums) {
const Status s = VerifyBlob(record_slice, user_key, value_size);
if (!s.ok()) {
return s;
}
}
const Slice value_slice(record_slice.data() + adjustment, value_size);
{
const Status s =
UncompressBlobIfNeeded(value_slice, compression_type, value);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status BlobFileReader::VerifyBlob(const Slice& record_slice,
const Slice& user_key, uint64_t value_size) {
BlobLogRecord record;
const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
{
const Status s = record.DecodeHeaderFrom(header_slice);
if (!s.ok()) {
return s;
}
}
if (record.key_size != user_key.size()) {
return Status::Corruption("Key size mismatch when reading blob");
}
if (record.value_size != value_size) {
return Status::Corruption("Value size mismatch when reading blob");
}
record.key =
Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size);
if (record.key != user_key) {
return Status::Corruption("Key mismatch when reading blob");
}
record.value = Slice(record.key.data() + record.key_size, value_size);
{
TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC",
&record);
const Status s = record.CheckBlobCRC();
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
CompressionType compression_type,
PinnableSlice* value) {
assert(value);
if (compression_type == kNoCompression) {
SaveValue(value_slice, value);
return Status::OK();
}
UncompressionContext context(compression_type);
UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
compression_type);
size_t uncompressed_size = 0;
constexpr uint32_t compression_format_version = 2;
constexpr MemoryAllocator* allocator = nullptr;
CacheAllocationPtr output =
UncompressData(info, value_slice.data(), value_slice.size(),
&uncompressed_size, compression_format_version, allocator);
TEST_SYNC_POINT_CALLBACK(
"BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output);
if (!output) {
return Status::Corruption("Unable to uncompress blob");
}
SaveValue(Slice(output.get(), uncompressed_size), value);
return Status::OK();
}
void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) {
assert(dst);
if (dst->IsPinned()) {
dst->Reset();
}
dst->PinSelf(src);
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,81 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cinttypes>
#include <memory>
#include "file/random_access_file_reader.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
class Status;
struct ImmutableCFOptions;
struct FileOptions;
class HistogramImpl;
struct ReadOptions;
class Slice;
class PinnableSlice;
class BlobFileReader {
public:
static Status Create(const ImmutableCFOptions& immutable_cf_options,
const FileOptions& file_options,
uint32_t column_family_id,
HistogramImpl* blob_file_read_hist,
uint64_t blob_file_number,
std::unique_ptr<BlobFileReader>* reader);
BlobFileReader(const BlobFileReader&) = delete;
BlobFileReader& operator=(const BlobFileReader&) = delete;
~BlobFileReader();
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
uint64_t offset, uint64_t value_size,
CompressionType compression_type, PinnableSlice* value) const;
private:
BlobFileReader(std::unique_ptr<RandomAccessFileReader>&& file_reader,
uint64_t file_size, CompressionType compression_type);
static Status OpenFile(const ImmutableCFOptions& immutable_cf_options,
const FileOptions& file_opts,
HistogramImpl* blob_file_read_hist,
uint64_t blob_file_number, uint64_t* file_size,
std::unique_ptr<RandomAccessFileReader>* file_reader);
static Status ReadHeader(const RandomAccessFileReader* file_reader,
uint32_t column_family_id,
CompressionType* compression_type);
static Status ReadFooter(uint64_t file_size,
const RandomAccessFileReader* file_reader);
using Buffer = std::unique_ptr<char[]>;
static Status ReadFromFile(const RandomAccessFileReader* file_reader,
uint64_t read_offset, size_t read_size,
Slice* slice, Buffer* buf,
AlignedBuf* aligned_buf);
static Status VerifyBlob(const Slice& record_slice, const Slice& user_key,
uint64_t value_size);
static Status UncompressBlobIfNeeded(const Slice& value_slice,
CompressionType compression_type,
PinnableSlice* value);
static void SaveValue(const Slice& src, PinnableSlice* dst);
std::unique_ptr<RandomAccessFileReader> file_reader_;
uint64_t file_size_;
CompressionType compression_type_;
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,768 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_file_reader.h"
#include <cassert>
#include <string>
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
#include "env/mock_env.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/options.h"
#include "test_util/testharness.h"
#include "util/compression.h"
#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
namespace {
// Creates a test blob file with a single blob in it. Note: this method
// makes it possible to test various corner cases by allowing the caller
// to specify the contents of various blob file header/footer fields.
void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options,
uint32_t column_family_id, bool has_ttl,
const ExpirationRange& expiration_range_header,
const ExpirationRange& expiration_range_footer,
uint64_t blob_file_number, const Slice& key,
const Slice& blob, CompressionType compression_type,
uint64_t* blob_offset, uint64_t* blob_size) {
assert(!immutable_cf_options.cf_paths.empty());
assert(blob_offset);
assert(blob_size);
const std::string blob_file_path = BlobFileName(
immutable_cf_options.cf_paths.front().path, blob_file_number);
std::unique_ptr<FSWritableFile> file;
ASSERT_OK(NewWritableFile(immutable_cf_options.fs, blob_file_path, &file,
FileOptions()));
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
immutable_cf_options.env));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
blob_file_number, use_fsync);
BlobLogHeader header(column_family_id, compression_type, has_ttl,
expiration_range_header);
ASSERT_OK(blob_log_writer.WriteHeader(header));
std::string compressed_blob;
Slice blob_to_write;
if (compression_type == kNoCompression) {
blob_to_write = blob;
*blob_size = blob.size();
} else {
CompressionOptions opts;
CompressionContext context(compression_type);
constexpr uint64_t sample_for_compression = 0;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
compression_type, sample_for_compression);
constexpr uint32_t compression_format_version = 2;
ASSERT_TRUE(
CompressData(blob, info, compression_format_version, &compressed_blob));
blob_to_write = compressed_blob;
*blob_size = compressed_blob.size();
}
uint64_t key_offset = 0;
ASSERT_OK(
blob_log_writer.AddRecord(key, blob_to_write, &key_offset, blob_offset));
BlobLogFooter footer;
footer.blob_count = 1;
footer.expiration_range = expiration_range_footer;
std::string checksum_method;
std::string checksum_value;
ASSERT_OK(
blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value));
}
} // anonymous namespace
class BlobFileReaderTest : public testing::Test {
protected:
BlobFileReaderTest() : mock_env_(Env::Default()) {}
MockEnv mock_env_;
};
TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_,
"BlobFileReaderTest_CreateReaderAndGetBlob"),
0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kNoCompression, &blob_offset, &blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader));
// Make sure the blob can be retrieved with and without checksum verification
ReadOptions read_options;
read_options.verify_checksums = false;
{
PinnableSlice value;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kNoCompression, &value));
ASSERT_EQ(value, blob);
}
read_options.verify_checksums = true;
{
PinnableSlice value;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kNoCompression, &value));
ASSERT_EQ(value, blob);
}
// Invalid offset (too close to start of file)
{
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(read_options, key, blob_offset - 1, blob_size,
kNoCompression, &value)
.IsCorruption());
}
// Invalid offset (too close to end of file)
{
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(read_options, key, blob_offset + 1, blob_size,
kNoCompression, &value)
.IsCorruption());
}
// Incorrect compression type
{
PinnableSlice value;
ASSERT_TRUE(
reader
->GetBlob(read_options, key, blob_offset, blob_size, kZSTD, &value)
.IsCorruption());
}
// Incorrect key size
{
constexpr char shorter_key[] = "k";
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(read_options, shorter_key,
blob_offset - (sizeof(key) - sizeof(shorter_key)),
blob_size, kNoCompression, &value)
.IsCorruption());
}
// Incorrect key
{
constexpr char incorrect_key[] = "foo";
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(read_options, incorrect_key, blob_offset,
blob_size, kNoCompression, &value)
.IsCorruption());
}
// Incorrect value size
{
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(read_options, key, blob_offset, blob_size + 1,
kNoCompression, &value)
.IsCorruption());
}
}
TEST_F(BlobFileReaderTest, Malformed) {
// Write a blob file consisting of nothing but a header, and make sure we
// detect the error when we open it for reading
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_Malformed"), 0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr uint64_t blob_file_number = 1;
{
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
const std::string blob_file_path = BlobFileName(
immutable_cf_options.cf_paths.front().path, blob_file_number);
std::unique_ptr<FSWritableFile> file;
ASSERT_OK(NewWritableFile(immutable_cf_options.fs, blob_file_path, &file,
FileOptions()));
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
immutable_cf_options.env));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.env, statistics,
blob_file_number, use_fsync);
BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
expiration_range);
ASSERT_OK(blob_log_writer.WriteHeader(header));
}
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader)
.IsCorruption());
}
TEST_F(BlobFileReaderTest, TTL) {
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_TTL"), 0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = true;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kNoCompression, &blob_offset, &blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader)
.IsCorruption());
}
TEST_F(BlobFileReaderTest, ExpirationRangeInHeader) {
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_,
"BlobFileReaderTest_ExpirationRangeInHeader"),
0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range_header(1, 2);
constexpr ExpirationRange expiration_range_footer;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range_header, expiration_range_footer,
blob_file_number, key, blob, kNoCompression, &blob_offset,
&blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader)
.IsCorruption());
}
TEST_F(BlobFileReaderTest, ExpirationRangeInFooter) {
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_,
"BlobFileReaderTest_ExpirationRangeInFooter"),
0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range_header;
constexpr ExpirationRange expiration_range_footer(1, 2);
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range_header, expiration_range_footer,
blob_file_number, key, blob, kNoCompression, &blob_offset,
&blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader)
.IsCorruption());
}
TEST_F(BlobFileReaderTest, IncorrectColumnFamily) {
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_,
"BlobFileReaderTest_IncorrectColumnFamily"),
0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kNoCompression, &blob_offset, &blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
constexpr uint32_t incorrect_column_family_id = 2;
ASSERT_TRUE(BlobFileReader::Create(immutable_cf_options, FileOptions(),
incorrect_column_family_id,
blob_file_read_hist, blob_file_number,
&reader)
.IsCorruption());
}
TEST_F(BlobFileReaderTest, BlobCRCError) {
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_BlobCRCError"), 0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kNoCompression, &blob_offset, &blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader));
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::VerifyBlob:CheckBlobCRC", [](void* arg) {
BlobLogRecord* const record = static_cast<BlobLogRecord*>(arg);
assert(record);
record->blob_crc = 0xfaceb00c;
});
SyncPoint::GetInstance()->EnableProcessing();
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value)
.IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(BlobFileReaderTest, Compression) {
if (!Snappy_Supported()) {
return;
}
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_, "BlobFileReaderTest_Compression"), 0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kSnappyCompression, &blob_offset, &blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader));
// Make sure the blob can be retrieved with and without checksum verification
ReadOptions read_options;
read_options.verify_checksums = false;
{
PinnableSlice value;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, &value));
ASSERT_EQ(value, blob);
}
read_options.verify_checksums = true;
{
PinnableSlice value;
ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
kSnappyCompression, &value));
ASSERT_EQ(value, blob);
}
}
TEST_F(BlobFileReaderTest, UncompressionError) {
if (!Snappy_Supported()) {
return;
}
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_,
"BlobFileReaderTest_UncompressionError"),
0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kSnappyCompression, &blob_offset, &blob_size);
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
ASSERT_OK(BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader));
SyncPoint::GetInstance()->SetCallBack(
"BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", [](void* arg) {
CacheAllocationPtr* const output =
static_cast<CacheAllocationPtr*>(arg);
assert(output);
output->reset();
});
SyncPoint::GetInstance()->EnableProcessing();
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kSnappyCompression, &value)
.IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class BlobFileReaderIOErrorTest
: public testing::Test,
public testing::WithParamInterface<std::string> {
protected:
BlobFileReaderIOErrorTest()
: mock_env_(Env::Default()),
fault_injection_env_(&mock_env_),
sync_point_(GetParam()) {}
MockEnv mock_env_;
FaultInjectionTestEnv fault_injection_env_;
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderIOErrorTest,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileReader::OpenFile:GetFileSize",
"BlobFileReader::OpenFile:NewRandomAccessFile",
"BlobFileReader::ReadHeader:ReadFromFile",
"BlobFileReader::ReadFooter:ReadFromFile",
"BlobFileReader::GetBlob:ReadFromFile"}));
TEST_P(BlobFileReaderIOErrorTest, IOError) {
// Simulates an I/O error during the specified step
Options options;
options.env = &fault_injection_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&fault_injection_env_,
"BlobFileReaderIOErrorTest_IOError"),
0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kNoCompression, &blob_offset, &blob_size);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
fault_injection_env_.SetFilesystemActive(false,
Status::IOError(sync_point_));
});
SyncPoint::GetInstance()->EnableProcessing();
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
const Status s = BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader);
const bool fail_during_create =
(sync_point_ != "BlobFileReader::GetBlob:ReadFromFile");
if (fail_during_create) {
ASSERT_TRUE(s.IsIOError());
} else {
ASSERT_OK(s);
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value)
.IsIOError());
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
class BlobFileReaderDecodingErrorTest
: public testing::Test,
public testing::WithParamInterface<std::string> {
protected:
BlobFileReaderDecodingErrorTest()
: mock_env_(Env::Default()), sync_point_(GetParam()) {}
MockEnv mock_env_;
std::string sync_point_;
};
INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderDecodingErrorTest,
::testing::ValuesIn(std::vector<std::string>{
"BlobFileReader::ReadHeader:TamperWithResult",
"BlobFileReader::ReadFooter:TamperWithResult",
"BlobFileReader::GetBlob:TamperWithResult"}));
TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) {
Options options;
options.env = &mock_env_;
options.cf_paths.emplace_back(
test::PerThreadDBPath(&mock_env_,
"BlobFileReaderDecodingErrorTest_DecodingError"),
0);
options.enable_blob_files = true;
ImmutableCFOptions immutable_cf_options(options);
constexpr uint32_t column_family_id = 1;
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
constexpr uint64_t blob_file_number = 1;
constexpr char key[] = "key";
constexpr char blob[] = "blob";
uint64_t blob_offset = 0;
uint64_t blob_size = 0;
WriteBlobFile(immutable_cf_options, column_family_id, has_ttl,
expiration_range, expiration_range, blob_file_number, key, blob,
kNoCompression, &blob_offset, &blob_size);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [](void* arg) {
Slice* const slice = static_cast<Slice*>(arg);
assert(slice);
assert(!slice->empty());
slice->remove_prefix(1);
});
SyncPoint::GetInstance()->EnableProcessing();
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileReader> reader;
const Status s = BlobFileReader::Create(immutable_cf_options, FileOptions(),
column_family_id, blob_file_read_hist,
blob_file_number, &reader);
const bool fail_during_create =
sync_point_ != "BlobFileReader::GetBlob:TamperWithResult";
if (fail_during_create) {
ASSERT_TRUE(s.IsCorruption());
} else {
ASSERT_OK(s);
PinnableSlice value;
ASSERT_TRUE(reader
->GetBlob(ReadOptions(), key, blob_offset, blob_size,
kNoCompression, &value)
.IsCorruption());
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -7,8 +7,9 @@
#include <sstream>
#include <string>
#include "rocksdb/options.h"
#include "rocksdb/compression_type.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
@ -82,6 +83,11 @@ class BlobIndex {
return size_;
}
CompressionType compression() const {
assert(!IsInlined());
return compression_;
}
Status DecodeFrom(Slice slice) {
static const std::string kErrorMessage = "Error while decoding blob index";
assert(slice.size() > 0);
@ -117,7 +123,8 @@ class BlobIndex {
oss << "[inlined blob] value:" << value_.ToString(output_hex);
} else {
oss << "[blob ref] file:" << file_number_ << " offset:" << offset_
<< " size:" << size_;
<< " size:" << size_
<< " compression: " << CompressionTypeToString(compression_);
}
if (HasTTL()) {

@ -95,6 +95,10 @@ Status BlobLogFooter::DecodeFrom(Slice src) {
return Status::OK();
}
uint64_t BlobLogRecord::CalculateAdjustmentForRecordHeader(uint64_t key_size) {
return key_size + kHeaderSize;
}
void BlobLogRecord::EncodeHeaderTo(std::string* dst) {
assert(dst != nullptr);
dst->clear();

@ -104,6 +104,11 @@ struct BlobLogRecord {
// header include fields up to blob CRC
static constexpr size_t kHeaderSize = 32;
// Note that the offset field of BlobIndex actually points to the blob value
// as opposed to the start of the blob record. The following method can
// be used to calculate the adjustment needed to read the blob record header.
static uint64_t CalculateAdjustmentForRecordHeader(uint64_t key_size);
uint64_t key_size = 0;
uint64_t value_size = 0;
uint64_t expiration = 0;
@ -123,4 +128,19 @@ struct BlobLogRecord {
Status CheckBlobCRC() const;
};
// Checks whether a blob offset is potentially valid or not.
inline bool IsValidBlobOffset(uint64_t value_offset, uint64_t key_size,
uint64_t value_size, uint64_t file_size) {
if (value_offset <
BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key_size) {
return false;
}
if (value_offset + value_size + BlobLogFooter::kSize > file_size) {
return false;
}
return true;
}
} // namespace ROCKSDB_NAMESPACE

@ -13,6 +13,7 @@
#include <algorithm>
#include <cinttypes>
#include <limits>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
@ -1386,21 +1387,26 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
}
void InternalStats::DumpCFFileHistogram(std::string* value) {
char buf[2000];
snprintf(buf, sizeof(buf),
"\n** File Read Latency Histogram By Level [%s] **\n",
cfd_->GetName().c_str());
value->append(buf);
assert(value);
assert(cfd_);
std::ostringstream oss;
oss << "\n** File Read Latency Histogram By Level [" << cfd_->GetName()
<< "] **\n";
for (int level = 0; level < number_levels_; level++) {
if (!file_read_latency_[level].Empty()) {
char buf2[5000];
snprintf(buf2, sizeof(buf2),
"** Level %d read latency histogram (micros):\n%s\n", level,
file_read_latency_[level].ToString().c_str());
value->append(buf2);
oss << "** Level " << level << " read latency histogram (micros):\n"
<< file_read_latency_[level].ToString() << '\n';
}
}
if (!blob_file_read_latency_.Empty()) {
oss << "** Blob file read latency histogram (micros):\n"
<< blob_file_read_latency_.ToString() << '\n';
}
*value = oss.str();
}
#else

@ -335,6 +335,7 @@ class InternalStats {
for (auto& h : file_read_latency_) {
h.Clear();
}
blob_file_read_latency_.Clear();
cf_stats_snapshot_.Clear();
db_stats_snapshot_.Clear();
bg_error_count_ = 0;
@ -375,6 +376,8 @@ class InternalStats {
return &file_read_latency_[level];
}
HistogramImpl* GetBlobFileReadHist() { return &blob_file_read_latency_; }
uint64_t GetBackgroundErrorCount() const { return bg_error_count_; }
uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; }
@ -426,6 +429,7 @@ class InternalStats {
std::vector<CompactionStats> comp_stats_;
std::vector<CompactionStats> comp_stats_by_pri_;
std::vector<HistogramImpl> file_read_latency_;
HistogramImpl blob_file_read_latency_;
// Used to compute per-interval statistics
struct CFStatsSnapshot {

@ -9,6 +9,7 @@ LIB_SOURCES = \
db/blob/blob_file_builder.cc \
db/blob/blob_file_garbage.cc \
db/blob/blob_file_meta.cc \
db/blob/blob_file_reader.cc \
db/blob/blob_log_format.cc \
db/blob/blob_log_reader.cc \
db/blob/blob_log_writer.cc \
@ -356,6 +357,7 @@ TEST_MAIN_SOURCES = \
db/blob/blob_file_addition_test.cc \
db/blob/blob_file_builder_test.cc \
db/blob/blob_file_garbage_test.cc \
db/blob/blob_file_reader_test.cc \
db/blob/db_blob_index_test.cc \
db/column_family_test.cc \
db/compact_files_test.cc \

Loading…
Cancel
Save