diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 57f05438c..6f2901739 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -157,11 +157,12 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { std::unique_ptr file; { - TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile"); - assert(file_options_); - const Status s = - NewWritableFile(fs_, blob_file_path, &file, *file_options_); + Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_); + + TEST_SYNC_POINT_CALLBACK( + "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s); + if (!s.ok()) { return s; } @@ -184,9 +185,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners, immutable_cf_options_->file_checksum_gen_factory)); - std::unique_ptr blob_log_writer( - new BlobLogWriter(std::move(file_writer), env_, statistics, - blob_file_number, immutable_cf_options_->use_fsync)); + constexpr bool do_flush = false; + + std::unique_ptr blob_log_writer(new BlobLogWriter( + std::move(file_writer), env_, statistics, blob_file_number, + immutable_cf_options_->use_fsync, do_flush)); constexpr bool has_ttl = false; constexpr ExpirationRange expiration_range; @@ -195,9 +198,11 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { expiration_range); { - TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader"); + Status s = blob_log_writer->WriteHeader(header); + + TEST_SYNC_POINT_CALLBACK( + "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s); - const Status s = blob_log_writer->WriteHeader(header); if (!s.ok()) { return s; } @@ -247,9 +252,10 @@ Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob, uint64_t key_offset = 0; - TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord"); + Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset); + + TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s); - const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset); if (!s.ok()) { return s; } @@ -271,10 +277,10 @@ Status BlobFileBuilder::CloseBlobFile() { std::string checksum_method; std::string checksum_value; - TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter"); + Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value); + + TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s); - const Status s = - writer_->AppendFooter(footer, &checksum_method, &checksum_value); if (!s.ok()) { return s; } diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc index 2baa2470f..134ca3cb2 100644 --- a/db/blob/blob_file_builder_test.cc +++ b/db/blob/blob_file_builder_test.cc @@ -39,9 +39,8 @@ class TestFileNumberGenerator { class BlobFileBuilderTest : public testing::Test { protected: - BlobFileBuilderTest() : mock_env_(Env::Default()) { - fs_ = mock_env_.GetFileSystem(); - } + BlobFileBuilderTest() + : mock_env_(Env::Default()), fs_(mock_env_.GetFileSystem().get()) {} void VerifyBlobFile(uint64_t blob_file_number, const std::string& blob_file_path, @@ -109,7 +108,7 @@ class BlobFileBuilderTest : public testing::Test { } MockEnv mock_env_; - std::shared_ptr fs_; + FileSystem* fs_; FileOptions file_options_; }; @@ -139,7 +138,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -222,7 +221,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -307,7 +306,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -359,7 +358,7 @@ TEST_F(BlobFileBuilderTest, Compression) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -441,7 +440,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -518,7 +517,7 @@ TEST_F(BlobFileBuilderTest, Checksum) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_.get(), + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, @@ -571,13 +570,11 @@ class BlobFileBuilderIOErrorTest protected: BlobFileBuilderIOErrorTest() : mock_env_(Env::Default()), - fault_injection_env_(&mock_env_), - fs_(fault_injection_env_.GetFileSystem()), + fs_(mock_env_.GetFileSystem().get()), sync_point_(GetParam()) {} MockEnv mock_env_; - FaultInjectionTestEnv fault_injection_env_; - std::shared_ptr fs_; + FileSystem* fs_; FileOptions file_options_; std::string sync_point_; }; @@ -598,11 +595,11 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { Options options; options.cf_paths.emplace_back( - test::PerThreadDBPath(&fault_injection_env_, - "BlobFileBuilderIOErrorTest_IOError"), + test::PerThreadDBPath(&mock_env_, "BlobFileBuilderIOErrorTest_IOError"), 0); options.enable_blob_files = true; options.blob_file_size = value_size; + options.env = &mock_env_; ImmutableCFOptions immutable_cf_options(options); MutableCFOptions mutable_cf_options(options); @@ -616,15 +613,17 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_, - fs_.get(), &immutable_cf_options, &mutable_cf_options, + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, fs_, + &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, &blob_file_paths, &blob_file_additions); - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/blob/blob_file_cache_test.cc b/db/blob/blob_file_cache_test.cc index 214fe41c5..e1dd21d98 100644 --- a/db/blob/blob_file_cache_test.cc +++ b/db/blob/blob_file_cache_test.cc @@ -46,10 +46,11 @@ void WriteBlobFile(uint32_t column_family_id, constexpr Statistics* statistics = nullptr; constexpr bool use_fsync = false; + constexpr bool do_flush = false; BlobLogWriter blob_log_writer(std::move(file_writer), immutable_cf_options.env, statistics, - blob_file_number, use_fsync); + blob_file_number, use_fsync, do_flush); constexpr bool has_ttl = false; constexpr ExpirationRange expiration_range; diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index 71d5eadcc..a7b7fc878 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -54,10 +54,11 @@ void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options, constexpr Statistics* statistics = nullptr; constexpr bool use_fsync = false; + constexpr bool do_flush = false; BlobLogWriter blob_log_writer(std::move(file_writer), immutable_cf_options.env, statistics, - blob_file_number, use_fsync); + blob_file_number, use_fsync, do_flush); BlobLogHeader header(column_family_id, compression_type, has_ttl, expiration_range_header); @@ -263,10 +264,11 @@ TEST_F(BlobFileReaderTest, Malformed) { constexpr Statistics* statistics = nullptr; constexpr bool use_fsync = false; + constexpr bool do_flush = false; BlobLogWriter blob_log_writer(std::move(file_writer), immutable_cf_options.env, statistics, - blob_file_number, use_fsync); + blob_file_number, use_fsync, do_flush); BlobLogHeader header(column_family_id, kNoCompression, has_ttl, expiration_range); diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc index 8b3d0e2c7..859cbfc12 100644 --- a/db/blob/blob_log_writer.cc +++ b/db/blob/blob_log_writer.cc @@ -20,13 +20,15 @@ namespace ROCKSDB_NAMESPACE { BlobLogWriter::BlobLogWriter(std::unique_ptr&& dest, Env* env, Statistics* statistics, - uint64_t log_number, bool use_fs, uint64_t boffset) + uint64_t log_number, bool use_fs, bool do_flush, + uint64_t boffset) : dest_(std::move(dest)), env_(env), statistics_(statistics), log_number_(log_number), block_offset_(boffset), use_fsync_(use_fs), + do_flush_(do_flush), last_elem_type_(kEtNone) {} BlobLogWriter::~BlobLogWriter() = default; @@ -49,7 +51,9 @@ Status BlobLogWriter::WriteHeader(BlobLogHeader& header) { Status s = dest_->Append(Slice(str)); if (s.ok()) { block_offset_ += str.size(); - s = dest_->Flush(); + if (do_flush_) { + s = dest_->Flush(); + } } last_elem_type_ = kEtFileHdr; RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, @@ -152,7 +156,7 @@ Status BlobLogWriter::EmitPhysicalRecord(const std::string& headerbuf, if (s.ok()) { s = dest_->Append(val); } - if (s.ok()) { + if (do_flush_ && s.ok()) { s = dest_->Flush(); } diff --git a/db/blob/blob_log_writer.h b/db/blob/blob_log_writer.h index 0f9ea2516..a2f28d901 100644 --- a/db/blob/blob_log_writer.h +++ b/db/blob/blob_log_writer.h @@ -34,7 +34,7 @@ class BlobLogWriter { // "*dest" must remain live while this BlobLogWriter is in use. BlobLogWriter(std::unique_ptr&& dest, Env* env, Statistics* statistics, uint64_t log_number, bool use_fsync, - uint64_t boffset = 0); + bool do_flush, uint64_t boffset = 0); // No copying allowed BlobLogWriter(const BlobLogWriter&) = delete; BlobLogWriter& operator=(const BlobLogWriter&) = delete; @@ -74,6 +74,7 @@ class BlobLogWriter { uint64_t log_number_; uint64_t block_offset_; // Current offset in block bool use_fsync_; + bool do_flush_; public: enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter }; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c62a723af..ea1dc075a 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5957,11 +5957,8 @@ class DBCompactionTestBlobError : public DBCompactionTest, public testing::WithParamInterface { public: - DBCompactionTestBlobError() - : fault_injection_env_(env_), sync_point_(GetParam()) {} - ~DBCompactionTestBlobError() { Close(); } + DBCompactionTestBlobError() : sync_point_(GetParam()) {} - FaultInjectionTestEnv fault_injection_env_; std::string sync_point_; }; @@ -5996,13 +5993,14 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { ASSERT_OK(Flush()); options.enable_blob_files = true; - options.env = &fault_injection_env_; Reopen(options); - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 496cedfa1..df8108463 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -451,7 +451,6 @@ TEST_F(DBFlushTest, FlushWithBlob) { constexpr uint64_t min_blob_size = 10; Options options; - options.env = CurrentOptions().env; options.enable_blob_files = true; options.min_blob_size = min_blob_size; options.disable_auto_compactions = true; @@ -528,11 +527,8 @@ TEST_F(DBFlushTest, FlushWithBlob) { class DBFlushTestBlobError : public DBFlushTest, public testing::WithParamInterface { public: - DBFlushTestBlobError() - : fault_injection_env_(env_), sync_point_(GetParam()) {} - ~DBFlushTestBlobError() { Close(); } + DBFlushTestBlobError() : sync_point_(GetParam()) {} - FaultInjectionTestEnv fault_injection_env_; std::string sync_point_; }; @@ -545,20 +541,18 @@ TEST_P(DBFlushTestBlobError, FlushError) { Options options; options.enable_blob_files = true; options.disable_auto_compactions = true; - options.env = &fault_injection_env_; + options.env = env_; Reopen(options); ASSERT_OK(Put("key", "blob")); - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); - SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(true); - }); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_NOK(Flush()); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 0d20b9b19..cc87e0569 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -441,11 +441,8 @@ class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { public: - DBRecoveryTestBlobError() - : fault_injection_env_(env_), sync_point_(GetParam()) {} - ~DBRecoveryTestBlobError() { Close(); } + DBRecoveryTestBlobError() : sync_point_(GetParam()) {} - FaultInjectionTestEnv fault_injection_env_; std::string sync_point_; }; @@ -460,21 +457,19 @@ TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) { // Reopen with blob files enabled but make blob file writing fail during // recovery. - SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, - Status::IOError(sync_point_)); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { + Status* const s = static_cast(arg); + assert(s); + + (*s) = Status::IOError(sync_point_); }); - SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(true); - }); SyncPoint::GetInstance()->EnableProcessing(); Options options; options.enable_blob_files = true; options.avoid_flush_during_recovery = false; options.disable_auto_compactions = true; - options.env = &fault_injection_env_; + options.env = env_; ASSERT_NOK(TryReopen(options)); diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 5cce0df6e..97f23270a 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -754,9 +754,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { return Status::Corruption("Invalid blob file size"); } + constexpr bool do_flush = true; + bfile->log_writer_ = std::make_shared( std::move(fwriter), env_, statistics_, bfile->file_number_, - db_options_.use_fsync, boffset); + db_options_.use_fsync, do_flush, boffset); bfile->log_writer_->last_elem_type_ = et; return s;