diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c11b83d8..46209c7d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -546,6 +546,7 @@ set(SOURCES cache/sharded_cache.cc db/arena_wrapped_db_iter.cc db/blob/blob_file_addition.cc + db/blob/blob_file_builder.cc db/blob/blob_file_garbage.cc db/blob/blob_file_meta.cc db/blob/blob_log_format.cc @@ -1012,6 +1013,7 @@ if(WITH_TESTS) cache/cache_test.cc cache/lru_cache_test.cc db/blob/blob_file_addition_test.cc + db/blob/blob_file_builder_test.cc db/blob/blob_file_garbage_test.cc db/blob/db_blob_index_test.cc db/column_family_test.cc diff --git a/Makefile b/Makefile index b15a895e4..044b3b4fa 100644 --- a/Makefile +++ b/Makefile @@ -566,6 +566,7 @@ ifdef ASSERT_STATUS_CHECKED cache_test \ lru_cache_test \ blob_file_addition_test \ + blob_file_builder_test \ blob_file_garbage_test \ bloom_test \ cassandra_format_test \ @@ -1806,6 +1807,9 @@ defer_test: $(OBJ_DIR)/util/defer_test.o $(TEST_LIBRARY) $(LIBRARY) blob_file_addition_test: $(OBJ_DIR)/db/blob/blob_file_addition_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +blob_file_builder_test: $(OBJ_DIR)/db/blob/blob_file_builder_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 0ffe1cbb2..ab0ce85e4 100644 --- a/TARGETS +++ b/TARGETS @@ -122,6 +122,7 @@ cpp_library( "cache/sharded_cache.cc", "db/arena_wrapped_db_iter.cc", "db/blob/blob_file_addition.cc", + "db/blob/blob_file_builder.cc", "db/blob/blob_file_garbage.cc", "db/blob/blob_file_meta.cc", "db/blob/blob_log_format.cc", @@ -533,6 +534,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "blob_file_builder_test", + "db/blob/blob_file_builder_test.cc", + "serial", + [], + [], + ], [ "blob_file_garbage_test", "db/blob/blob_file_garbage_test.cc", diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc new file mode 100644 index 000000000..282f2e712 --- /dev/null +++ b/db/blob/blob_file_builder.cc @@ -0,0 +1,291 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_file_builder.h" + +#include + +#include "db/blob/blob_file_addition.h" +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_writer.h" +#include "db/version_set.h" +#include "file/filename.h" +#include "file/read_write_util.h" +#include "file/writable_file_writer.h" +#include "options/cf_options.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "test_util/sync_point.h" +#include "util/compression.h" + +namespace ROCKSDB_NAMESPACE { + +BlobFileBuilder::BlobFileBuilder( + VersionSet* versions, Env* env, FileSystem* fs, + const ImmutableCFOptions* immutable_cf_options, + const MutableCFOptions* mutable_cf_options, const FileOptions* file_options, + uint32_t column_family_id, Env::IOPriority io_priority, + Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_additions) + : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env, + fs, immutable_cf_options, mutable_cf_options, + file_options, column_family_id, io_priority, write_hint, + blob_file_additions) {} + +BlobFileBuilder::BlobFileBuilder( + std::function file_number_generator, Env* env, FileSystem* fs, + const ImmutableCFOptions* immutable_cf_options, + const MutableCFOptions* mutable_cf_options, const FileOptions* file_options, + uint32_t column_family_id, Env::IOPriority io_priority, + Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_additions) + : file_number_generator_(std::move(file_number_generator)), + env_(env), + fs_(fs), + immutable_cf_options_(immutable_cf_options), + min_blob_size_(mutable_cf_options->min_blob_size), + blob_file_size_(mutable_cf_options->blob_file_size), + blob_compression_type_(mutable_cf_options->blob_compression_type), + file_options_(file_options), + column_family_id_(column_family_id), + io_priority_(io_priority), + write_hint_(write_hint), + blob_file_additions_(blob_file_additions), + blob_count_(0), + blob_bytes_(0) { + assert(file_number_generator_); + assert(env_); + assert(fs_); + assert(immutable_cf_options_); + assert(file_options_); + assert(blob_file_additions_); +} + +BlobFileBuilder::~BlobFileBuilder() = default; + +Status BlobFileBuilder::Add(const Slice& key, const Slice& value, + std::string* blob_index) { + assert(blob_index); + assert(blob_index->empty()); + + if (value.size() < min_blob_size_) { + return Status::OK(); + } + + { + const Status s = OpenBlobFileIfNeeded(); + if (!s.ok()) { + return s; + } + } + + Slice blob = value; + std::string compressed_blob; + + { + const Status s = CompressBlobIfNeeded(&blob, &compressed_blob); + if (!s.ok()) { + return s; + } + } + + uint64_t blob_file_number = 0; + uint64_t blob_offset = 0; + + { + const Status s = + WriteBlobToFile(key, blob, &blob_file_number, &blob_offset); + if (!s.ok()) { + return s; + } + } + + { + const Status s = CloseBlobFileIfNeeded(); + if (!s.ok()) { + return s; + } + } + + BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(), + blob_compression_type_); + + return Status::OK(); +} + +Status BlobFileBuilder::Finish() { + if (!IsBlobFileOpen()) { + return Status::OK(); + } + + return CloseBlobFile(); +} + +bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_; } + +Status BlobFileBuilder::OpenBlobFileIfNeeded() { + if (IsBlobFileOpen()) { + return Status::OK(); + } + + assert(!blob_count_); + assert(!blob_bytes_); + + assert(file_number_generator_); + const uint64_t blob_file_number = file_number_generator_(); + + assert(immutable_cf_options_); + assert(!immutable_cf_options_->cf_paths.empty()); + const std::string blob_file_path = BlobFileName( + immutable_cf_options_->cf_paths.front().path, blob_file_number); + + std::unique_ptr file; + + { + TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile"); + + assert(file_options_); + const Status s = + NewWritableFile(fs_, blob_file_path, &file, *file_options_); + if (!s.ok()) { + return s; + } + } + + assert(file); + file->SetIOPriority(io_priority_); + file->SetWriteLifeTimeHint(write_hint_); + + Statistics* const statistics = immutable_cf_options_->statistics; + + std::unique_ptr file_writer( + new WritableFileWriter(std::move(file), blob_file_path, *file_options_, + env_, statistics, immutable_cf_options_->listeners, + immutable_cf_options_->file_checksum_gen_factory)); + + std::unique_ptr blob_log_writer( + new BlobLogWriter(std::move(file_writer), env_, statistics, + blob_file_number, immutable_cf_options_->use_fsync)); + + constexpr bool has_ttl = false; + constexpr ExpirationRange expiration_range; + + BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl, + expiration_range); + + { + TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader"); + + const Status s = blob_log_writer->WriteHeader(header); + if (!s.ok()) { + return s; + } + } + + writer_ = std::move(blob_log_writer); + + assert(IsBlobFileOpen()); + + return Status::OK(); +} + +Status BlobFileBuilder::CompressBlobIfNeeded( + Slice* blob, std::string* compressed_blob) const { + assert(blob); + assert(compressed_blob); + assert(compressed_blob->empty()); + + if (blob_compression_type_ == kNoCompression) { + return Status::OK(); + } + + CompressionOptions opts; + CompressionContext context(blob_compression_type_); + constexpr uint64_t sample_for_compression = 0; + + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), + blob_compression_type_, sample_for_compression); + + constexpr uint32_t compression_format_version = 2; + + if (!CompressData(*blob, info, compression_format_version, compressed_blob)) { + return Status::Corruption("Error compressing blob"); + } + + *blob = Slice(*compressed_blob); + + return Status::OK(); +} + +Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob, + uint64_t* blob_file_number, + uint64_t* blob_offset) { + assert(IsBlobFileOpen()); + assert(blob_file_number); + assert(blob_offset); + + uint64_t key_offset = 0; + + TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord"); + + const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset); + if (!s.ok()) { + return s; + } + + *blob_file_number = writer_->get_log_number(); + + ++blob_count_; + blob_bytes_ += BlobLogRecord::kHeaderSize + key.size() + blob.size(); + + return Status::OK(); +} + +Status BlobFileBuilder::CloseBlobFile() { + assert(IsBlobFileOpen()); + + BlobLogFooter footer; + footer.blob_count = blob_count_; + + std::string checksum_method; + std::string checksum_value; + + TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter"); + + const Status s = + writer_->AppendFooter(footer, &checksum_method, &checksum_value); + if (!s.ok()) { + return s; + } + + const uint64_t blob_file_number = writer_->get_log_number(); + + assert(blob_file_additions_); + blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_, + std::move(checksum_method), + std::move(checksum_value)); + + writer_.reset(); + blob_count_ = 0; + blob_bytes_ = 0; + + return Status::OK(); +} + +Status BlobFileBuilder::CloseBlobFileIfNeeded() { + assert(IsBlobFileOpen()); + + const WritableFileWriter* const file_writer = writer_->file(); + assert(file_writer); + + if (file_writer->GetFileSize() < blob_file_size_) { + return Status::OK(); + } + + return CloseBlobFile(); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h new file mode 100644 index 000000000..4a0370b89 --- /dev/null +++ b/db/blob/blob_file_builder.h @@ -0,0 +1,82 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include +#include +#include +#include +#include + +#include "rocksdb/compression_type.h" +#include "rocksdb/env.h" +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +class VersionSet; +class FileSystem; +struct ImmutableCFOptions; +struct MutableCFOptions; +struct FileOptions; +class BlobFileAddition; +class Status; +class Slice; +class BlobLogWriter; + +class BlobFileBuilder { + public: + BlobFileBuilder(VersionSet* versions, Env* env, FileSystem* fs, + const ImmutableCFOptions* immutable_cf_options, + const MutableCFOptions* mutable_cf_options, + const FileOptions* file_options, uint32_t column_family_id, + Env::IOPriority io_priority, + Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_additions); + + BlobFileBuilder(std::function file_number_generator, Env* env, + FileSystem* fs, + const ImmutableCFOptions* immutable_cf_options, + const MutableCFOptions* mutable_cf_options, + const FileOptions* file_options, uint32_t column_family_id, + Env::IOPriority io_priority, + Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_additions); + + BlobFileBuilder(const BlobFileBuilder&) = delete; + BlobFileBuilder& operator=(const BlobFileBuilder&) = delete; + + ~BlobFileBuilder(); + + Status Add(const Slice& key, const Slice& value, std::string* blob_index); + Status Finish(); + + private: + bool IsBlobFileOpen() const; + Status OpenBlobFileIfNeeded(); + Status CompressBlobIfNeeded(Slice* blob, std::string* compressed_blob) const; + Status WriteBlobToFile(const Slice& key, const Slice& blob, + uint64_t* blob_file_number, uint64_t* blob_offset); + Status CloseBlobFile(); + Status CloseBlobFileIfNeeded(); + + std::function file_number_generator_; + Env* env_; + FileSystem* fs_; + const ImmutableCFOptions* immutable_cf_options_; + uint64_t min_blob_size_; + uint64_t blob_file_size_; + CompressionType blob_compression_type_; + const FileOptions* file_options_; + uint32_t column_family_id_; + Env::IOPriority io_priority_; + Env::WriteLifeTimeHint write_hint_; + std::vector* blob_file_additions_; + std::unique_ptr writer_; + uint64_t blob_count_; + uint64_t blob_bytes_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc new file mode 100644 index 000000000..bafac4a41 --- /dev/null +++ b/db/blob/blob_file_builder_test.cc @@ -0,0 +1,579 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/blob/blob_file_builder.h" + +#include +#include +#include +#include +#include + +#include "db/blob/blob_file_addition.h" +#include "db/blob/blob_index.h" +#include "db/blob/blob_log_format.h" +#include "db/blob/blob_log_reader.h" +#include "env/composite_env_wrapper.h" +#include "env/mock_env.h" +#include "file/filename.h" +#include "file/random_access_file_reader.h" +#include "options/cf_options.h" +#include "rocksdb/env.h" +#include "rocksdb/file_checksum.h" +#include "rocksdb/options.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "util/compression.h" +#include "utilities/fault_injection_env.h" + +namespace ROCKSDB_NAMESPACE { + +class TestFileNumberGenerator { + public: + uint64_t operator()() { return ++next_file_number_; } + + private: + uint64_t next_file_number_ = 1; +}; + +class BlobFileBuilderTest : public testing::Test { + protected: + BlobFileBuilderTest() : mock_env_(Env::Default()), fs_(&mock_env_) {} + + void VerifyBlobFile(const ImmutableCFOptions& immutable_cf_options, + uint64_t blob_file_number, uint32_t column_family_id, + CompressionType blob_compression_type, + const std::vector>& + expected_key_value_pairs, + const std::vector& blob_indexes) { + assert(expected_key_value_pairs.size() == blob_indexes.size()); + + const std::string blob_file_path = BlobFileName( + immutable_cf_options.cf_paths.front().path, blob_file_number); + + std::unique_ptr file; + constexpr IODebugContext* dbg = nullptr; + ASSERT_OK( + fs_.NewRandomAccessFile(blob_file_path, file_options_, &file, dbg)); + + std::unique_ptr file_reader( + new RandomAccessFileReader(std::move(file), blob_file_path, + &mock_env_)); + + constexpr Statistics* statistics = nullptr; + BlobLogReader blob_log_reader(std::move(file_reader), &mock_env_, + statistics); + + BlobLogHeader header; + ASSERT_OK(blob_log_reader.ReadHeader(&header)); + ASSERT_EQ(header.version, kVersion1); + ASSERT_EQ(header.column_family_id, column_family_id); + ASSERT_EQ(header.compression, blob_compression_type); + ASSERT_FALSE(header.has_ttl); + ASSERT_EQ(header.expiration_range, ExpirationRange()); + + for (size_t i = 0; i < expected_key_value_pairs.size(); ++i) { + BlobLogRecord record; + uint64_t blob_offset = 0; + + ASSERT_OK(blob_log_reader.ReadRecord( + &record, BlobLogReader::kReadHeaderKeyBlob, &blob_offset)); + + // Check the contents of the blob file + const auto& expected_key_value = expected_key_value_pairs[i]; + const auto& key = expected_key_value.first; + const auto& value = expected_key_value.second; + + ASSERT_EQ(record.key_size, key.size()); + ASSERT_EQ(record.value_size, value.size()); + ASSERT_EQ(record.expiration, 0); + ASSERT_EQ(record.key, key); + ASSERT_EQ(record.value, value); + + // Make sure the blob reference returned by the builder points to the + // right place + BlobIndex blob_index; + ASSERT_OK(blob_index.DecodeFrom(blob_indexes[i])); + ASSERT_FALSE(blob_index.IsInlined()); + ASSERT_FALSE(blob_index.HasTTL()); + ASSERT_EQ(blob_index.file_number(), blob_file_number); + ASSERT_EQ(blob_index.offset(), blob_offset); + ASSERT_EQ(blob_index.size(), value.size()); + } + + BlobLogFooter footer; + ASSERT_OK(blob_log_reader.ReadFooter(&footer)); + ASSERT_EQ(footer.blob_count, expected_key_value_pairs.size()); + ASSERT_EQ(footer.expiration_range, ExpirationRange()); + } + + MockEnv mock_env_; + LegacyFileSystemWrapper fs_; + FileOptions file_options_; +}; + +TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { + // Build a single blob file + constexpr size_t number_of_blobs = 10; + constexpr size_t key_size = 1; + constexpr size_t value_size = 4; + constexpr size_t value_offset = 1234; + + Options options; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileBuilderTest_BuildAndCheckOneFile"), + 0); + options.enable_blob_files = true; + + ImmutableCFOptions immutable_cf_options(options); + MutableCFOptions mutable_cf_options(options); + + constexpr uint32_t column_family_id = 123; + constexpr Env::IOPriority io_priority = Env::IO_HIGH; + constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + + std::vector blob_file_additions; + + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, column_family_id, io_priority, + write_hint, &blob_file_additions); + + std::vector> expected_key_value_pairs( + number_of_blobs); + std::vector blob_indexes(number_of_blobs); + + for (size_t i = 0; i < number_of_blobs; ++i) { + auto& expected_key_value = expected_key_value_pairs[i]; + + auto& key = expected_key_value.first; + key = std::to_string(i); + assert(key.size() == key_size); + + auto& value = expected_key_value.second; + value = std::to_string(i + value_offset); + assert(value.size() == value_size); + + auto& blob_index = blob_indexes[i]; + + ASSERT_OK(builder.Add(key, value, &blob_index)); + ASSERT_FALSE(blob_index.empty()); + } + + ASSERT_OK(builder.Finish()); + + // Check the metadata generated + ASSERT_EQ(blob_file_additions.size(), 1); + + const auto& blob_file_addition = blob_file_additions[0]; + + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); + ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), number_of_blobs); + ASSERT_EQ( + blob_file_addition.GetTotalBlobBytes(), + number_of_blobs * (BlobLogRecord::kHeaderSize + key_size + value_size)); + + // Verify the contents of the new blob file as well as the blob references + VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + kNoCompression, expected_key_value_pairs, blob_indexes); +} + +TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { + // Build multiple blob files: file size limit is set to the size of a single + // value, so each blob ends up in a file of its own + constexpr size_t number_of_blobs = 10; + constexpr size_t key_size = 1; + constexpr size_t value_size = 10; + constexpr size_t value_offset = 1234567890; + + Options options; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, + "BlobFileBuilderTest_BuildAndCheckMultipleFiles"), + 0); + options.enable_blob_files = true; + options.blob_file_size = value_size; + + ImmutableCFOptions immutable_cf_options(options); + MutableCFOptions mutable_cf_options(options); + + constexpr uint32_t column_family_id = 123; + constexpr Env::IOPriority io_priority = Env::IO_HIGH; + constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + + std::vector blob_file_additions; + + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, column_family_id, io_priority, + write_hint, &blob_file_additions); + + std::vector> expected_key_value_pairs( + number_of_blobs); + std::vector blob_indexes(number_of_blobs); + + for (size_t i = 0; i < number_of_blobs; ++i) { + auto& expected_key_value = expected_key_value_pairs[i]; + + auto& key = expected_key_value.first; + key = std::to_string(i); + assert(key.size() == key_size); + + auto& value = expected_key_value.second; + value = std::to_string(i + value_offset); + assert(value.size() == value_size); + + auto& blob_index = blob_indexes[i]; + + ASSERT_OK(builder.Add(key, value, &blob_index)); + ASSERT_FALSE(blob_index.empty()); + } + + ASSERT_OK(builder.Finish()); + + // Check the metadata generated + ASSERT_EQ(blob_file_additions.size(), number_of_blobs); + + for (size_t i = 0; i < number_of_blobs; ++i) { + const auto& blob_file_addition = blob_file_additions[i]; + + ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), i + 2); + ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); + ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), + BlobLogRecord::kHeaderSize + key_size + value_size); + } + + // Verify the contents of the new blob files as well as the blob references + for (size_t i = 0; i < number_of_blobs; ++i) { + std::vector> expected_key_value_pair{ + expected_key_value_pairs[i]}; + std::vector blob_index{blob_indexes[i]}; + + VerifyBlobFile(immutable_cf_options, i + 2, column_family_id, + kNoCompression, expected_key_value_pair, blob_index); + } +} + +TEST_F(BlobFileBuilderTest, InlinedValues) { + // All values are below the min_blob_size threshold; no blob files get written + constexpr size_t number_of_blobs = 10; + constexpr size_t key_size = 1; + constexpr size_t value_size = 10; + constexpr size_t value_offset = 1234567890; + + Options options; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileBuilderTest_InlinedValues"), + 0); + options.enable_blob_files = true; + options.min_blob_size = 1024; + + ImmutableCFOptions immutable_cf_options(options); + MutableCFOptions mutable_cf_options(options); + + constexpr uint32_t column_family_id = 123; + constexpr Env::IOPriority io_priority = Env::IO_HIGH; + constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + + std::vector blob_file_additions; + + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, column_family_id, io_priority, + write_hint, &blob_file_additions); + + for (size_t i = 0; i < number_of_blobs; ++i) { + const std::string key = std::to_string(i); + assert(key.size() == key_size); + + const std::string value = std::to_string(i + value_offset); + assert(value.size() == value_size); + + std::string blob_index; + ASSERT_OK(builder.Add(key, value, &blob_index)); + ASSERT_TRUE(blob_index.empty()); + } + + ASSERT_OK(builder.Finish()); + + // Check the metadata generated + ASSERT_TRUE(blob_file_additions.empty()); +} + +TEST_F(BlobFileBuilderTest, Compression) { + // Build a blob file with a compressed blob + if (!Snappy_Supported()) { + return; + } + + constexpr size_t key_size = 1; + constexpr size_t value_size = 100; + + Options options; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileBuilderTest_Compression"), 0); + options.enable_blob_files = true; + options.blob_compression_type = kSnappyCompression; + + ImmutableCFOptions immutable_cf_options(options); + MutableCFOptions mutable_cf_options(options); + + constexpr uint32_t column_family_id = 123; + constexpr Env::IOPriority io_priority = Env::IO_HIGH; + constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + + std::vector blob_file_additions; + + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, column_family_id, io_priority, + write_hint, &blob_file_additions); + + const std::string key("1"); + const std::string uncompressed_value(value_size, 'x'); + + std::string blob_index; + + ASSERT_OK(builder.Add(key, uncompressed_value, &blob_index)); + ASSERT_FALSE(blob_index.empty()); + + ASSERT_OK(builder.Finish()); + + // Check the metadata generated + ASSERT_EQ(blob_file_additions.size(), 1); + + const auto& blob_file_addition = blob_file_additions[0]; + + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); + ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); + + CompressionOptions opts; + CompressionContext context(kSnappyCompression); + constexpr uint64_t sample_for_compression = 0; + + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), + kSnappyCompression, sample_for_compression); + + std::string compressed_value; + ASSERT_TRUE(Snappy_Compress(info, uncompressed_value.data(), + uncompressed_value.size(), &compressed_value)); + + ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), + BlobLogRecord::kHeaderSize + key_size + compressed_value.size()); + + // Verify the contents of the new blob file as well as the blob reference + std::vector> expected_key_value_pairs{ + {key, compressed_value}}; + std::vector blob_indexes{blob_index}; + + VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + kSnappyCompression, expected_key_value_pairs, blob_indexes); +} + +TEST_F(BlobFileBuilderTest, CompressionError) { + // Simulate an error during compression + if (!Snappy_Supported()) { + return; + } + + Options options; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileBuilderTest_CompressionError"), + 0); + options.enable_blob_files = true; + options.blob_compression_type = kSnappyCompression; + + ImmutableCFOptions immutable_cf_options(options); + MutableCFOptions mutable_cf_options(options); + + constexpr uint32_t column_family_id = 123; + constexpr Env::IOPriority io_priority = Env::IO_HIGH; + constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + + std::vector blob_file_additions; + + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, column_family_id, io_priority, + write_hint, &blob_file_additions); + + SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue", + [](void* arg) { + bool* ret = static_cast(arg); + *ret = false; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr char key[] = "1"; + constexpr char value[] = "deadbeef"; + + std::string blob_index; + + ASSERT_TRUE(builder.Add(key, value, &blob_index).IsCorruption()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(BlobFileBuilderTest, Checksum) { + // Build a blob file with checksum + + class DummyFileChecksumGenerator : public FileChecksumGenerator { + public: + void Update(const char* /* data */, size_t /* n */) override {} + + void Finalize() override {} + + std::string GetChecksum() const override { return std::string("dummy"); } + + const char* Name() const override { return "DummyFileChecksum"; } + }; + + class DummyFileChecksumGenFactory : public FileChecksumGenFactory { + public: + std::unique_ptr CreateFileChecksumGenerator( + const FileChecksumGenContext& /* context */) override { + return std::unique_ptr( + new DummyFileChecksumGenerator); + } + + const char* Name() const override { return "DummyFileChecksumGenFactory"; } + }; + + Options options; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&mock_env_, "BlobFileBuilderTest_Checksum"), 0); + options.enable_blob_files = true; + options.file_checksum_gen_factory = + std::make_shared(); + + ImmutableCFOptions immutable_cf_options(options); + MutableCFOptions mutable_cf_options(options); + + constexpr uint32_t column_family_id = 123; + constexpr Env::IOPriority io_priority = Env::IO_HIGH; + constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + + std::vector blob_file_additions; + + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, column_family_id, io_priority, + write_hint, &blob_file_additions); + + const std::string key("1"); + const std::string value("deadbeef"); + + std::string blob_index; + + ASSERT_OK(builder.Add(key, value, &blob_index)); + ASSERT_FALSE(blob_index.empty()); + + ASSERT_OK(builder.Finish()); + + // Check the metadata generated + ASSERT_EQ(blob_file_additions.size(), 1); + + const auto& blob_file_addition = blob_file_additions[0]; + + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); + ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); + ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), + BlobLogRecord::kHeaderSize + key.size() + value.size()); + ASSERT_EQ(blob_file_addition.GetChecksumMethod(), "DummyFileChecksum"); + ASSERT_EQ(blob_file_addition.GetChecksumValue(), "dummy"); + + // Verify the contents of the new blob file as well as the blob reference + std::vector> expected_key_value_pairs{ + {key, value}}; + std::vector blob_indexes{blob_index}; + + VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + kNoCompression, expected_key_value_pairs, blob_indexes); +} + +class BlobFileBuilderIOErrorTest + : public testing::Test, + public testing::WithParamInterface { + protected: + BlobFileBuilderIOErrorTest() + : mock_env_(Env::Default()), + fault_injection_env_(&mock_env_), + fs_(&fault_injection_env_), + sync_point_(GetParam()) {} + + MockEnv mock_env_; + FaultInjectionTestEnv fault_injection_env_; + LegacyFileSystemWrapper fs_; + FileOptions file_options_; + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P( + BlobFileBuilderTest, BlobFileBuilderIOErrorTest, + ::testing::ValuesIn(std::vector{ + "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", + "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(BlobFileBuilderIOErrorTest, IOError) { + // Simulate an I/O error during the specified step of Add() + // Note: blob_file_size will be set to value_size in order for the first blob + // to trigger close + constexpr size_t value_size = 8; + + Options options; + options.cf_paths.emplace_back( + test::PerThreadDBPath(&fault_injection_env_, + "BlobFileBuilderIOErrorTest_IOError"), + 0); + options.enable_blob_files = true; + options.blob_file_size = value_size; + + ImmutableCFOptions immutable_cf_options(options); + MutableCFOptions mutable_cf_options(options); + + constexpr uint32_t column_family_id = 123; + constexpr Env::IOPriority io_priority = Env::IO_HIGH; + constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + + std::vector blob_file_additions; + + BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_, + &fs_, &immutable_cf_options, &mutable_cf_options, + &file_options_, column_family_id, io_priority, + write_hint, &blob_file_additions); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr char key[] = "1"; + constexpr char value[] = "deadbeef"; + + std::string blob_index; + + ASSERT_TRUE(builder.Add(key, value, &blob_index).IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/blob/blob_log_reader.cc b/db/blob/blob_log_reader.cc index e5fe3fb5b..991616e1b 100644 --- a/db/blob/blob_log_reader.cc +++ b/db/blob/blob_log_reader.cc @@ -24,6 +24,8 @@ BlobLogReader::BlobLogReader( next_byte_(0) {} Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) { + assert(file_); + StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); Status s = file_->Read(IOOptions(), next_byte_, static_cast(size), slice, buf, nullptr); @@ -39,8 +41,11 @@ Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) { } Status BlobLogReader::ReadHeader(BlobLogHeader* header) { - assert(file_.get() != nullptr); assert(next_byte_ == 0); + + static_assert(BlobLogHeader::kSize <= sizeof(header_buf_), + "Buffer is smaller than BlobLogHeader::kSize"); + Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_); if (!s.ok()) { return s; @@ -55,6 +60,9 @@ Status BlobLogReader::ReadHeader(BlobLogHeader* header) { Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level, uint64_t* blob_offset) { + static_assert(BlobLogRecord::kHeaderSize <= sizeof(header_buf_), + "Buffer is smaller than BlobLogRecord::kHeaderSize"); + Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_); if (!s.ok()) { return s; @@ -100,4 +108,20 @@ Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level, return s; } +Status BlobLogReader::ReadFooter(BlobLogFooter* footer) { + static_assert(BlobLogFooter::kSize <= sizeof(header_buf_), + "Buffer is smaller than BlobLogFooter::kSize"); + + Status s = ReadSlice(BlobLogFooter::kSize, &buffer_, header_buf_); + if (!s.ok()) { + return s; + } + + if (buffer_.size() != BlobLogFooter::kSize) { + return Status::Corruption("EOF reached before file footer"); + } + + return footer->DecodeFrom(buffer_); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_log_reader.h b/db/blob/blob_log_reader.h index 057cfffc5..9ce0e64c0 100644 --- a/db/blob/blob_log_reader.h +++ b/db/blob/blob_log_reader.h @@ -56,6 +56,8 @@ class BlobLogReader { Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader, uint64_t* blob_offset = nullptr); + Status ReadFooter(BlobLogFooter* footer); + void ResetNextByte() { next_byte_ = 0; } uint64_t GetNextByte() const { return next_byte_; } diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc index 52c4d0402..8b3d0e2c7 100644 --- a/db/blob/blob_log_writer.cc +++ b/db/blob/blob_log_writer.cc @@ -29,6 +29,8 @@ BlobLogWriter::BlobLogWriter(std::unique_ptr&& dest, use_fsync_(use_fs), last_elem_type_(kEtNone) {} +BlobLogWriter::~BlobLogWriter() = default; + Status BlobLogWriter::Sync() { TEST_SYNC_POINT("BlobLogWriter::Sync"); @@ -55,7 +57,9 @@ Status BlobLogWriter::WriteHeader(BlobLogHeader& header) { return s; } -Status BlobLogWriter::AppendFooter(BlobLogFooter& footer) { +Status BlobLogWriter::AppendFooter(BlobLogFooter& footer, + std::string* checksum_method, + std::string* checksum_value) { assert(block_offset_ != 0); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); @@ -65,10 +69,34 @@ Status BlobLogWriter::AppendFooter(BlobLogFooter& footer) { Status s = dest_->Append(Slice(str)); if (s.ok()) { block_offset_ += str.size(); + s = Sync(); + if (s.ok()) { s = dest_->Close(); + + if (s.ok()) { + assert(!!checksum_method == !!checksum_value); + + if (checksum_method) { + assert(checksum_method->empty()); + + std::string method = dest_->GetFileChecksumFuncName(); + if (method != kUnknownFileChecksumFuncName) { + *checksum_method = std::move(method); + } + } + if (checksum_value) { + assert(checksum_value->empty()); + + std::string value = dest_->GetFileChecksum(); + if (value != kUnknownFileChecksum) { + *checksum_value = std::move(value); + } + } + } } + dest_.reset(); } diff --git a/db/blob/blob_log_writer.h b/db/blob/blob_log_writer.h index a95808bd4..0f9ea2516 100644 --- a/db/blob/blob_log_writer.h +++ b/db/blob/blob_log_writer.h @@ -39,7 +39,7 @@ class BlobLogWriter { BlobLogWriter(const BlobLogWriter&) = delete; BlobLogWriter& operator=(const BlobLogWriter&) = delete; - ~BlobLogWriter() = default; + ~BlobLogWriter(); static void ConstructBlobHeader(std::string* buf, const Slice& key, const Slice& val, uint64_t expiration); @@ -54,7 +54,8 @@ class BlobLogWriter { const Slice& val, uint64_t* key_offset, uint64_t* blob_offset); - Status AppendFooter(BlobLogFooter& footer); + Status AppendFooter(BlobLogFooter& footer, std::string* checksum_method, + std::string* checksum_value); Status WriteHeader(BlobLogHeader& header); diff --git a/src.mk b/src.mk index c7bd65fcc..49d6132a3 100644 --- a/src.mk +++ b/src.mk @@ -6,6 +6,7 @@ LIB_SOURCES = \ cache/sharded_cache.cc \ db/arena_wrapped_db_iter.cc \ db/blob/blob_file_addition.cc \ + db/blob/blob_file_builder.cc \ db/blob/blob_file_garbage.cc \ db/blob/blob_file_meta.cc \ db/blob/blob_log_format.cc \ @@ -340,6 +341,7 @@ TEST_MAIN_SOURCES = \ cache/cache_test.cc \ cache/lru_cache_test.cc \ db/blob/blob_file_addition_test.cc \ + db/blob/blob_file_builder_test.cc \ db/blob/blob_file_garbage_test.cc \ db/blob/db_blob_index_test.cc \ db/column_family_test.cc \ diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index b7e4f2442..1d94f3b71 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -111,7 +111,8 @@ Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) { } // this will close the file and reset the Writable File Pointer. - Status s = log_writer_->AppendFooter(footer); + Status s = log_writer_->AppendFooter(footer, /* checksum_method */ nullptr, + /* checksum_value */ nullptr); if (s.ok()) { closed_ = true; immutable_sequence_ = sequence;