From 431e8afba7fc42c55d03a00da00f848560b20de0 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 25 Jan 2021 13:30:17 -0800 Subject: [PATCH] Do not explicitly flush blob files when using the integrated BlobDB (#7892) Summary: In the original stacked BlobDB implementation, which writes blobs to blob files immediately and treats blob files as logs, it makes sense to flush the file after writing each blob to protect against process crashes; however, in the integrated implementation, which builds blob files in the background jobs, this unnecessarily reduces performance. This patch fixes this by simply adding a `do_flush` flag to `BlobLogWriter`, which is set to `true` by the stacked implementation and to `false` by the new code. Note: the change itself is trivial but the tests needed some work; since in the new implementation, blobs are now buffered, adding a blob to `BlobFileBuilder` is no longer guaranteed to result in an actual I/O. Therefore, we can no longer rely on `FaultInjectionTestEnv` when testing failure cases; instead, we manipulate the return values of I/O methods directly using `SyncPoint`s. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7892 Test Plan: `make check` Reviewed By: jay-zhuang Differential Revision: D26022814 Pulled By: ltamasi fbshipit-source-id: b3dce419f312137fa70d84cdd9b908fd5d60d8cd --- db/blob/blob_file_builder.cc | 34 ++++++++++++++----------- db/blob/blob_file_builder_test.cc | 41 +++++++++++++++---------------- db/blob/blob_file_cache_test.cc | 3 ++- db/blob/blob_file_reader_test.cc | 6 +++-- db/blob/blob_log_writer.cc | 10 +++++--- db/blob/blob_log_writer.h | 3 ++- db/db_compaction_test.cc | 14 +++++------ db/db_flush_test.cc | 20 ++++++--------- db/db_wal_test.cc | 19 ++++++-------- utilities/blob_db/blob_db_impl.cc | 4 ++- 10 files changed, 78 insertions(+), 76 deletions(-) diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 57f05438c..6f2901739 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -157,11 +157,12 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { std::unique_ptr file; { - TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile"); - assert(file_options_); - const Status s = - NewWritableFile(fs_, blob_file_path, &file, *file_options_); + Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_); + + TEST_SYNC_POINT_CALLBACK( + "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s); + if (!s.ok()) { return s; } @@ -184,9 +185,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners, immutable_cf_options_->file_checksum_gen_factory)); - std::unique_ptr blob_log_writer( - new BlobLogWriter(std::move(file_writer), env_, statistics, - blob_file_number, immutable_cf_options_->use_fsync)); + constexpr bool do_flush = false; + + std::unique_ptr blob_log_writer(new BlobLogWriter( + std::move(file_writer), env_, statistics, blob_file_number, + immutable_cf_options_->use_fsync, do_flush)); constexpr bool has_ttl = false; constexpr ExpirationRange expiration_range; @@ -195,9 +198,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { expiration_range); { - TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader"); + Status s = blob_log_writer->WriteHeader(header); + + TEST_SYNC_POINT_CALLBACK( + "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s); - const Status s = blob_log_writer->WriteHeader(header); if (!s.ok()) { return s; } @@ -247,9 +252,10 @@ Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob, uint64_t key_offset = 0; - TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord"); + Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset); + + TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s); - const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset); if (!s.ok()) { return s; } @@ -271,10 +277,10 @@ Status BlobFileBuilder::CloseBlobFile() { std::string checksum_method; std::string checksum_value; - TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter"); + Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value); + + TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s); - const Status s = - writer_->AppendFooter(footer, &checksum_method, &checksum_value); if (!s.ok()) { return s; } diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc index 2baa2470f..134ca3cb2 100644 --- a/db/blob/blob_file_builder_test.cc +++ b/db/blob/blob_file_builder_test.cc @@ -39,9 +39,8 @@ class TestFileNumberGenerator { class BlobFileBuilderTest : public testing::Test { protected: - BlobFileBuilderTest() : mock_env_(Env::Default()) { - fs_ = mock_env_.GetFileSystem(); - } + BlobFileBuilderTest() + : mock_env_(Env::Default()), fs_(mock_env_.GetFileSystem().get()) {} void VerifyBlobFile(uint64_t blob_file_number, const std::string& blob_file_path, @@ -109,7 +108,7 @@ class BlobFileBuilderTest : public testing::Test { } MockEnv mock_env_; - std::shared_ptr fs_; + FileSystem* fs_; FileOptions file_options_; }; @@ -139,7 +138,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -222,7 +221,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -307,7 +306,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -359,7 +358,7 @@ TEST_F(BlobFileBuilderTest, Compression) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -441,7 +440,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -518,7 +517,7 @@ TEST_F(BlobFileBuilderTest, Checksum) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -571,13 +570,11 @@ class BlobFileBuilderIOErrorTest protected: BlobFileBuilderIOErrorTest() : mock_env_(Env::Default()), - fault_injection_env_(&mock_env_), - fs_(fault_injection_env_.GetFileSystem()), + fs_(mock_env_.GetFileSystem().get()), sync_point_(GetParam()) {} MockEnv mock_env_; - FaultInjectionTestEnv fault_injection_env_; - std::shared_ptr fs_; + FileSystem* fs_; FileOptions file_options_; std::string sync_point_; }; @@ -598,11 +595,11 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { Options options; options.cf_paths.emplace_back( - test::PerThreadDBPath(&fault_injection_env_, - "BlobFileBuilderIOErrorTest_IOError"), + test::PerThreadDBPath(&mock_env_, "BlobFileBuilderIOErrorTest_IOError"), 0); options.enable_blob_files = true; options.blob_file_size = value_size; + options.env = &mock_env_; ImmutableCFOptions immutable_cf_options(options); MutableCFOptions mutable_cf_options(options); @@ -616,15 +613,17 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_, - fs_.get(), &immutable_cf_options, &mutable_cf_options, + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, + &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, &blob_file_paths, &blob_file_additions); - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/blob/blob_file_cache_test.cc b/db/blob/blob_file_cache_test.cc index 214fe41c5..e1dd21d98 100644 --- a/db/blob/blob_file_cache_test.cc +++ b/db/blob/blob_file_cache_test.cc @@ -46,10 +46,11 @@ void WriteBlobFile(uint32_t column_family_id, constexpr Statistics* statistics = nullptr; constexpr bool use_fsync = false; + constexpr bool do_flush = false; BlobLogWriter blob_log_writer(std::move(file_writer), immutable_cf_options.env, statistics, - blob_file_number, use_fsync); + blob_file_number, use_fsync, do_flush); constexpr bool has_ttl = false; constexpr ExpirationRange expiration_range; diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index 71d5eadcc..a7b7fc878 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -54,10 +54,11 @@ void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options, constexpr Statistics* statistics = nullptr; constexpr bool use_fsync = false; + constexpr bool do_flush = false; BlobLogWriter blob_log_writer(std::move(file_writer), immutable_cf_options.env, statistics, - blob_file_number, use_fsync); + blob_file_number, use_fsync, do_flush); BlobLogHeader header(column_family_id, compression_type, has_ttl, expiration_range_header); @@ -263,10 +264,11 @@ TEST_F(BlobFileReaderTest, Malformed) { constexpr Statistics* statistics = nullptr; constexpr bool use_fsync = false; + constexpr bool do_flush = false; BlobLogWriter blob_log_writer(std::move(file_writer), immutable_cf_options.env, statistics, - blob_file_number, use_fsync); + blob_file_number, use_fsync, do_flush); BlobLogHeader header(column_family_id, kNoCompression, has_ttl, expiration_range); diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc index 8b3d0e2c7..859cbfc12 100644 --- a/db/blob/blob_log_writer.cc +++ b/db/blob/blob_log_writer.cc @@ -20,13 +20,15 @@ namespace ROCKSDB_NAMESPACE { BlobLogWriter::BlobLogWriter(std::unique_ptr&& dest, Env* env, Statistics* statistics, - uint64_t log_number, bool use_fs, uint64_t boffset) + uint64_t log_number, bool use_fs, bool do_flush, + uint64_t boffset) : dest_(std::move(dest)), env_(env), statistics_(statistics), log_number_(log_number), block_offset_(boffset), use_fsync_(use_fs), + do_flush_(do_flush), last_elem_type_(kEtNone) {} BlobLogWriter::~BlobLogWriter() = default; @@ -49,7 +51,9 @@ Status BlobLogWriter::WriteHeader(BlobLogHeader& header) { Status s = dest_->Append(Slice(str)); if (s.ok()) { block_offset_ += str.size(); - s = dest_->Flush(); + if (do_flush_) { + s = dest_->Flush(); + } } last_elem_type_ = kEtFileHdr; RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, @@ -152,7 +156,7 @@ Status BlobLogWriter::EmitPhysicalRecord(const std::string& headerbuf, if (s.ok()) { s = dest_->Append(val); } - if (s.ok()) { + if (do_flush_ && s.ok()) { s = dest_->Flush(); } diff --git a/db/blob/blob_log_writer.h b/db/blob/blob_log_writer.h index 0f9ea2516..a2f28d901 100644 --- a/db/blob/blob_log_writer.h +++ b/db/blob/blob_log_writer.h @@ -34,7 +34,7 @@ class BlobLogWriter { // "*dest" must remain live while this BlobLogWriter is in use. BlobLogWriter(std::unique_ptr&& dest, Env* env, Statistics* statistics, uint64_t log_number, bool use_fsync, - uint64_t boffset = 0); + bool do_flush, uint64_t boffset = 0); // No copying allowed BlobLogWriter(const BlobLogWriter&) = delete; BlobLogWriter& operator=(const BlobLogWriter&) = delete; @@ -74,6 +74,7 @@ class BlobLogWriter { uint64_t log_number_; uint64_t block_offset_; // Current offset in block bool use_fsync_; + bool do_flush_; public: enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter }; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c62a723af..ea1dc075a 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5957,11 +5957,8 @@ class DBCompactionTestBlobError : public DBCompactionTest, public testing::WithParamInterface { public: - DBCompactionTestBlobError() - : fault_injection_env_(env_), sync_point_(GetParam()) {} - ~DBCompactionTestBlobError() { Close(); } + DBCompactionTestBlobError() : sync_point_(GetParam()) {} - FaultInjectionTestEnv fault_injection_env_; std::string sync_point_; }; @@ -5996,13 +5993,14 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { ASSERT_OK(Flush()); options.enable_blob_files = true; - options.env = &fault_injection_env_; Reopen(options); - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 496cedfa1..df8108463 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -451,7 +451,6 @@ TEST_F(DBFlushTest, FlushWithBlob) { constexpr uint64_t min_blob_size = 10; Options options; - options.env = CurrentOptions().env; options.enable_blob_files = true; options.min_blob_size = min_blob_size; options.disable_auto_compactions = true; @@ -528,11 +527,8 @@ TEST_F(DBFlushTest, FlushWithBlob) { class DBFlushTestBlobError : public DBFlushTest, public testing::WithParamInterface { public: - DBFlushTestBlobError() - : fault_injection_env_(env_), sync_point_(GetParam()) {} - ~DBFlushTestBlobError() { Close(); } + DBFlushTestBlobError() : sync_point_(GetParam()) {} - FaultInjectionTestEnv fault_injection_env_; std::string sync_point_; }; @@ -545,20 +541,18 @@ TEST_P(DBFlushTestBlobError, FlushError) { Options options; options.enable_blob_files = true; options.disable_auto_compactions = true; - options.env = &fault_injection_env_; + options.env = env_; Reopen(options); ASSERT_OK(Put("key", "blob")); - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); - SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(true); - }); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_NOK(Flush()); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 0d20b9b19..cc87e0569 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -441,11 +441,8 @@ class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { public: - DBRecoveryTestBlobError() - : fault_injection_env_(env_), sync_point_(GetParam()) {} - ~DBRecoveryTestBlobError() { Close(); } + DBRecoveryTestBlobError() : sync_point_(GetParam()) {} - FaultInjectionTestEnv fault_injection_env_; std::string sync_point_; }; @@ -460,21 +457,19 @@ TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) { // Reopen with blob files enabled but make blob file writing fail during // recovery. - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); - SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(true); - }); SyncPoint::GetInstance()->EnableProcessing(); Options options; options.enable_blob_files = true; options.avoid_flush_during_recovery = false; options.disable_auto_compactions = true; - options.env = &fault_injection_env_; + options.env = env_; ASSERT_NOK(TryReopen(options)); diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 5cce0df6e..97f23270a 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -754,9 +754,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { return Status::Corruption("Invalid blob file size"); } + constexpr bool do_flush = true; + bfile->log_writer_ = std::make_shared( std::move(fwriter), env_, statistics_, bfile->file_number_, - db_options_.use_fsync, boffset); + db_options_.use_fsync, do_flush, boffset); bfile->log_writer_->last_elem_type_ = et; return s;