diff --git a/db/builder.cc b/db/builder.cc index 90cfbbffb..206ff257f 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -212,6 +212,7 @@ Status BuildTable( } else if (!c_iter.status().ok()) { s = c_iter.status(); } + if (s.ok()) { auto range_del_it = range_del_agg->NewIterator(); for (range_del_it->SeekToFirst(); range_del_it->Valid(); @@ -222,10 +223,6 @@ Status BuildTable( meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(), tombstone.seq_, internal_comparator); } - - if (blob_file_builder) { - s = blob_file_builder->Finish(); - } } TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); @@ -273,6 +270,14 @@ Status BuildTable( s = *io_status; } + if (blob_file_builder) { + if (s.ok()) { + s = blob_file_builder->Finish(); + } + + blob_file_builder.reset(); + } + // TODO Also check the IO status when create the Iterator. if (s.ok() && !empty) { @@ -318,6 +323,8 @@ Status BuildTable( } if (!s.ok() || meta->fd.GetFileSize() == 0) { + TEST_SYNC_POINT("BuildTable:BeforeDeleteFile"); + constexpr IODebugContext* dbg = nullptr; Status ignored = fs->DeleteFile(fname, IOOptions(), dbg); @@ -330,8 +337,6 @@ Status BuildTable( ignored = fs->DeleteFile(blob_file_path, IOOptions(), dbg); ignored.PermitUncheckedError(); } - - blob_file_additions->clear(); } } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index bcb90a2d1..6485c3de2 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -20,6 +20,8 @@ #include #include +#include "db/blob/blob_file_addition.h" +#include "db/blob/blob_file_builder.h" #include "db/builder.h" #include "db/db_impl/db_impl.h" #include "db/db_iter.h" @@ -138,6 +140,7 @@ struct CompactionJob::SubcompactionState { // State kept for output being generated std::vector outputs; + std::vector blob_file_additions; std::unique_ptr outfile; std::unique_ptr builder; @@ -231,21 +234,13 @@ struct CompactionJob::CompactionState { std::vector sub_compact_states; Status status; - uint64_t total_bytes; - uint64_t num_output_records; - - explicit CompactionState(Compaction* c) - : compaction(c), - total_bytes(0), - num_output_records(0) {} + size_t num_output_files = 0; + uint64_t total_bytes = 0; + size_t num_blob_output_files = 0; + uint64_t total_blob_bytes = 0; + uint64_t num_output_records = 0; - size_t NumOutputFiles() { - size_t total = 0; - for (auto& s : sub_compact_states) { - total += s.outputs.size(); - } - return total; - } + explicit CompactionState(Compaction* c) : compaction(c) {} Slice SmallestUserKey() { for (const auto& sub_compact_state : sub_compact_states) { @@ -272,11 +267,29 @@ struct CompactionJob::CompactionState { }; void CompactionJob::AggregateStatistics() { + assert(compact_); + for (SubcompactionState& sc : compact_->sub_compact_states) { + auto& outputs = sc.outputs; + + if (!outputs.empty() && !outputs.back().meta.fd.file_size) { + // An error occurred, so ignore the last output. + outputs.pop_back(); + } + + compact_->num_output_files += outputs.size(); compact_->total_bytes += sc.total_bytes; + + const auto& blobs = sc.blob_file_additions; + + compact_->num_blob_output_files += blobs.size(); + + for (const auto& blob : blobs) { + compact_->total_blob_bytes += blob.GetTotalBlobBytes(); + } + compact_->num_output_records += sc.num_output_records; - } - for (SubcompactionState& sc : compact_->sub_compact_states) { + compaction_job_stats_->Add(sc.compaction_job_stats); } } @@ -286,7 +299,8 @@ CompactionJob::CompactionJob( const FileOptions& file_options, VersionSet* versions, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, - FSDirectory* db_directory, FSDirectory* output_directory, Statistics* stats, + FSDirectory* db_directory, FSDirectory* output_directory, + FSDirectory* blob_output_directory, Statistics* stats, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, @@ -317,6 +331,7 @@ CompactionJob::CompactionJob( log_buffer_(log_buffer), db_directory_(db_directory), output_directory_(output_directory), + blob_output_directory_(blob_output_directory), stats_(stats), db_mutex_(db_mutex), db_error_handler_(db_error_handler), @@ -604,18 +619,34 @@ Status CompactionJob::Run() { // Check if any thread encountered an error during execution Status status; IOStatus io_s; + bool wrote_new_blob_files = false; + for (const auto& state : compact_->sub_compact_states) { if (!state.status.ok()) { status = state.status; io_s = state.io_status; break; } + + if (!state.blob_file_additions.empty()) { + wrote_new_blob_files = true; + } } + if (io_status_.ok()) { io_status_ = io_s; } - if (status.ok() && output_directory_) { - io_s = output_directory_->Fsync(IOOptions(), nullptr); + if (status.ok()) { + constexpr IODebugContext* dbg = nullptr; + + if (output_directory_) { + io_s = output_directory_->Fsync(IOOptions(), dbg); + } + + if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ && + blob_output_directory_ != output_directory_) { + io_s = blob_output_directory_->Fsync(IOOptions(), dbg); + } } if (io_status_.ok()) { io_status_ = io_s; @@ -721,6 +752,7 @@ Status CompactionJob::Run() { // Finish up all book-keeping to unify the subcompaction results AggregateStatistics(); UpdateCompactionStats(); + RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); @@ -730,11 +762,16 @@ Status CompactionJob::Run() { } Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { + assert(compact_); + AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_INSTALL); db_mutex_->AssertHeld(); Status status = compact_->status; + ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + assert(cfd); + cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), thread_pri_, compaction_stats_); @@ -744,6 +781,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { if (!versions_->io_status().ok()) { io_status_ = versions_->io_status(); } + VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_; @@ -768,6 +806,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { stats.bytes_written / static_cast(stats.micros); } + const std::string& column_family_name = cfd->GetName(); + ROCKS_LOG_BUFFER( log_buffer_, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " @@ -775,8 +815,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s, records in: %" PRIu64 ", records dropped: %" PRIu64 " output_compression: %s\n", - cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec, - bytes_written_per_sec, compact_->compaction->output_level(), + column_family_name.c_str(), vstorage->LevelSummary(&tmp), + bytes_read_per_sec, bytes_written_per_sec, + compact_->compaction->output_level(), stats.num_input_files_in_non_output_levels, stats.num_input_files_in_output_level, stats.num_output_files, stats.bytes_read_non_output_levels / 1048576.0, @@ -787,6 +828,15 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { CompressionTypeToString(compact_->compaction->output_compression()) .c_str()); + const auto& blob_files = vstorage->GetBlobFiles(); + if (!blob_files.empty()) { + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 + "\n", + column_family_name.c_str(), blob_files.begin()->first, + blob_files.rbegin()->first); + } + UpdateCompactionJobStats(stats); auto stream = event_logger_->LogToBuffer(log_buffer_); @@ -795,11 +845,18 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { << "compaction_time_micros" << stats.micros << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level" << compact_->compaction->output_level() << "num_output_files" - << compact_->NumOutputFiles() << "total_output_size" - << compact_->total_bytes << "num_input_records" - << stats.num_input_records << "num_output_records" - << compact_->num_output_records << "num_subcompactions" - << compact_->sub_compact_states.size() << "output_compression" + << compact_->num_output_files << "total_output_size" + << compact_->total_bytes; + + if (compact_->num_blob_output_files > 0) { + stream << "num_blob_output_files" << compact_->num_blob_output_files + << "total_blob_output_size" << compact_->total_blob_bytes; + } + + stream << "num_input_records" << stats.num_input_records + << "num_output_records" << compact_->num_output_records + << "num_subcompactions" << compact_->sub_compact_states.size() + << "output_compression" << CompressionTypeToString(compact_->compaction->output_compression()); stream << "num_single_delete_mismatches" @@ -823,12 +880,18 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { } stream.EndArray(); + if (!blob_files.empty()) { + stream << "blob_file_head" << blob_files.begin()->first; + stream << "blob_file_tail" << blob_files.rbegin()->first; + } + CleanupCompaction(); return status; } void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { - assert(sub_compact != nullptr); + assert(sub_compact); + assert(sub_compact->compaction); uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000; @@ -899,6 +962,22 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { snapshot_checker_, compact_->compaction->level(), db_options_.statistics.get()); + const MutableCFOptions* mutable_cf_options = + sub_compact->compaction->mutable_cf_options(); + assert(mutable_cf_options); + + std::vector blob_file_paths; + + std::unique_ptr blob_file_builder( + mutable_cf_options->enable_blob_files + ? new BlobFileBuilder( + versions_, env_, fs_.get(), + sub_compact->compaction->immutable_cf_options(), + mutable_cf_options, &file_options_, job_id_, cfd->GetID(), + cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_, + &blob_file_paths, &sub_compact->blob_file_additions) + : nullptr); + TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); TEST_SYNC_POINT_CALLBACK( "CompactionJob::Run():PausingManualCompaction:1", @@ -921,7 +1000,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, - /* blob_file_builder */ nullptr, db_options_.allow_data_in_errors, + blob_file_builder.get(), db_options_.allow_data_in_errors, sub_compact->compaction, compaction_filter, shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log)); @@ -1093,6 +1172,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); } + if (blob_file_builder) { + if (status.ok()) { + status = blob_file_builder->Finish(); + } + + blob_file_builder.reset(); + } + sub_compact->compaction_job_stats.cpu_micros = env_->NowCPUNanos() / 1000 - prev_cpu_micros; @@ -1479,9 +1566,13 @@ Status CompactionJob::FinishCompactionOutputFile( Status CompactionJob::InstallCompactionResults( const MutableCFOptions& mutable_cf_options) { + assert(compact_); + db_mutex_->AssertHeld(); auto* compaction = compact_->compaction; + assert(compaction); + // paranoia: verify that the files that we started with // still exist in the current version and in the same original level. // This ensures that a concurrent compaction did not erroneously @@ -1497,23 +1588,32 @@ Status CompactionJob::InstallCompactionResults( { Compaction::InputLevelSummaryBuffer inputs_summary; - ROCKS_LOG_INFO( - db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", - compaction->column_family_data()->GetName().c_str(), job_id_, - compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes); + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", + compaction->column_family_data()->GetName().c_str(), job_id_, + compaction->InputLevelSummary(&inputs_summary), + compact_->total_bytes + compact_->total_blob_bytes); } + VersionEdit* const edit = compaction->edit(); + assert(edit); + // Add compaction inputs - compaction->AddInputDeletions(compact_->compaction->edit()); + compaction->AddInputDeletions(edit); for (const auto& sub_compact : compact_->sub_compact_states) { for (const auto& out : sub_compact.outputs) { - compaction->edit()->AddFile(compaction->output_level(), out.meta); + edit->AddFile(compaction->output_level(), out.meta); + } + + for (const auto& blob : sub_compact.blob_file_additions) { + edit->AddBlobFile(blob); } } + return versions_->LogAndApply(compaction->column_family_data(), - mutable_cf_options, compaction->edit(), - db_mutex_, db_directory_); + mutable_cf_options, edit, db_mutex_, + db_directory_); } void CompactionJob::RecordCompactionIOStats() { @@ -1689,6 +1789,8 @@ void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) { #endif // !ROCKSDB_LITE void CompactionJob::UpdateCompactionStats() { + assert(compact_); + Compaction* compaction = compact_->compaction; compaction_stats_.num_input_files_in_non_output_levels = 0; compaction_stats_.num_input_files_in_output_level = 0; @@ -1706,27 +1808,15 @@ void CompactionJob::UpdateCompactionStats() { } } - uint64_t num_output_records = 0; - - for (const auto& sub_compact : compact_->sub_compact_states) { - size_t num_output_files = sub_compact.outputs.size(); - if (sub_compact.builder != nullptr) { - // An error occurred so ignore the last output. - assert(num_output_files > 0); - --num_output_files; - } - compaction_stats_.num_output_files += static_cast(num_output_files); - - num_output_records += sub_compact.num_output_records; - - for (const auto& out : sub_compact.outputs) { - compaction_stats_.bytes_written += out.meta.fd.file_size; - } - } + compaction_stats_.num_output_files = + static_cast(compact_->num_output_files) + + static_cast(compact_->num_blob_output_files); + compaction_stats_.bytes_written = + compact_->total_bytes + compact_->total_blob_bytes; - if (compaction_stats_.num_input_records > num_output_records) { + if (compaction_stats_.num_input_records > compact_->num_output_records) { compaction_stats_.num_dropped_records = - compaction_stats_.num_input_records - num_output_records; + compaction_stats_.num_input_records - compact_->num_output_records; } } @@ -1765,7 +1855,7 @@ void CompactionJob::UpdateCompactionJobStats( compaction_job_stats_->num_output_records = compact_->num_output_records; compaction_job_stats_->num_output_files = stats.num_output_files; - if (compact_->NumOutputFiles() > 0U) { + if (stats.num_output_files > 0) { CopyPrefix(compact_->SmallestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->smallest_output_key_prefix); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index aafad8d3a..2c36b408d 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -68,8 +68,8 @@ class CompactionJob { const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, FSDirectory* db_directory, FSDirectory* output_directory, - Statistics* stats, InstrumentedMutex* db_mutex, - ErrorHandler* db_error_handler, + FSDirectory* blob_output_directory, Statistics* stats, + InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, @@ -169,6 +169,7 @@ class CompactionJob { LogBuffer* log_buffer_; FSDirectory* db_directory_; FSDirectory* output_directory_; + FSDirectory* blob_output_directory_; Statistics* stats_; InstrumentedMutex* db_mutex_; ErrorHandler* db_error_handler_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 210042ca0..8fbefa676 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -340,7 +340,7 @@ class CompactionJobTest : public testing::Test { CompactionJob compaction_job( 0, &compaction, db_options_, env_options_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, - nullptr, nullptr, &mutex_, &error_handler_, snapshots, + nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */); diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index aac39d980..d90c31bda 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5845,6 +5845,181 @@ TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) { ASSERT_EQ("0,5", FilesPerLevel(0)); } +TEST_F(DBCompactionTest, CompactionWithBlob) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char second_key[] = "second_key"; + constexpr char first_value[] = "first_value"; + constexpr char second_value[] = "second_value"; + constexpr char third_value[] = "third_value"; + + ASSERT_OK(Put(first_key, first_value)); + ASSERT_OK(Put(second_key, first_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, second_value)); + ASSERT_OK(Put(second_key, second_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, third_value)); + ASSERT_OK(Put(second_key, third_value)); + ASSERT_OK(Flush()); + + options.enable_blob_files = true; + + Reopen(options); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(Get(first_key), third_value); + ASSERT_EQ(Get(second_key), third_value); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l1_files = storage_info->LevelFiles(1); + ASSERT_EQ(l1_files.size(), 1); + + const FileMetaData* const table_file = l1_files[0]; + assert(table_file); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + const auto& blob_file = blob_files.begin()->second; + assert(blob_file); + + ASSERT_EQ(table_file->smallest.user_key(), first_key); + ASSERT_EQ(table_file->largest.user_key(), second_key); + ASSERT_EQ(table_file->oldest_blob_file_number, + blob_file->GetBlobFileNumber()); + + ASSERT_EQ(blob_file->GetTotalBlobCount(), 2); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const uint64_t expected_bytes = + table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + ASSERT_EQ(compaction_stats[1].bytes_written, expected_bytes); + ASSERT_EQ(compaction_stats[1].num_output_files, 2); +} + +class DBCompactionTestBlobError + : public DBCompactionTest, + public testing::WithParamInterface { + public: + DBCompactionTestBlobError() + : fault_injection_env_(env_), sync_point_(GetParam()) {} + ~DBCompactionTestBlobError() { Close(); } + + FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; +}; + +INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError, + ::testing::ValuesIn(std::vector{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(DBCompactionTestBlobError, CompactionError) { + Options options; + options.disable_auto_compactions = true; + options.env = env_; + + Reopen(options); + + constexpr char first_key[] = "first_key"; + constexpr char second_key[] = "second_key"; + constexpr char first_value[] = "first_value"; + constexpr char second_value[] = "second_value"; + constexpr char third_value[] = "third_value"; + + ASSERT_OK(Put(first_key, first_value)); + ASSERT_OK(Put(second_key, first_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, second_value)); + ASSERT_OK(Put(second_key, second_value)); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(first_key, third_value)); + ASSERT_OK(Put(second_key, third_value)); + ASSERT_OK(Flush()); + + options.enable_blob_files = true; + options.env = &fault_injection_env_; + + Reopen(options); + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l1_files = storage_info->LevelFiles(1); + ASSERT_TRUE(l1_files.empty()); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_TRUE(blob_files.empty()); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { + ASSERT_EQ(compaction_stats[1].bytes_written, 0); + ASSERT_EQ(compaction_stats[1].num_output_files, 0); + } else { + // SST file writing succeeded; blob file writing failed (during Finish) + ASSERT_GT(compaction_stats[1].bytes_written, 0); + ASSERT_EQ(compaction_stats[1].num_output_files, 1); + } +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 7edc6738c..1789ce44e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -453,6 +453,7 @@ TEST_F(DBFlushTest, FlushWithBlob) { options.enable_blob_files = true; options.min_blob_size = min_blob_size; options.disable_auto_compactions = true; + options.env = env_; Reopen(options); @@ -525,10 +526,12 @@ TEST_F(DBFlushTest, FlushWithBlob) { class DBFlushTestBlobError : public DBFlushTest, public testing::WithParamInterface { public: - DBFlushTestBlobError() : fault_injection_env_(env_) {} + DBFlushTestBlobError() + : fault_injection_env_(env_), sync_point_(GetParam()) {} ~DBFlushTestBlobError() { Close(); } FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; }; INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError, @@ -546,11 +549,12 @@ TEST_P(DBFlushTestBlobError, FlushError) { ASSERT_OK(Put("key", "blob")); - SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, Status::IOError()); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) { + "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { fault_injection_env_.SetFilesystemActive(true); }); SyncPoint::GetInstance()->EnableProcessing(); @@ -599,11 +603,19 @@ TEST_P(DBFlushTestBlobError, FlushError) { const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); ASSERT_FALSE(compaction_stats.empty()); - ASSERT_EQ(compaction_stats[0].bytes_written, 0); - ASSERT_EQ(compaction_stats[0].num_output_files, 0); + + if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { + ASSERT_EQ(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 0); + } else { + // SST file writing succeeded; blob file writing failed (during Finish) + ASSERT_GT(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 1); + } const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); - ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], 0); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], + compaction_stats[0].bytes_written); #endif // ROCKSDB_LITE } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 95855e4c7..3c8a3a454 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1147,9 +1147,10 @@ Status DBImpl::CompactFilesImpl( job_context->job_id, c.get(), immutable_db_options_, file_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), - GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_, - &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, table_cache_, &event_logger_, + GetDataDir(c->column_family_data(), c->output_path_id()), + GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, @@ -2954,10 +2955,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, job_context->job_id, c.get(), immutable_db_options_, file_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), - GetDataDir(c->column_family_data(), c->output_path_id()), stats_, - &mutex_, &error_handler_, snapshot_seqs, - earliest_write_conflict_snapshot, snapshot_checker, table_cache_, - &event_logger_, c->mutable_cf_options()->paranoid_file_checks, + GetDataDir(c->column_family_data(), c->output_path_id()), + GetDataDir(c->column_family_data(), 0), stats_, &mutex_, + &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker, table_cache_, &event_logger_, + c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? &manual_compaction_paused_ : nullptr, db_id_, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index bd3770588..8f660924e 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1393,7 +1393,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. const bool has_output = meta.fd.GetFileSize() > 0; - assert(has_output || blob_file_additions.empty()); constexpr int level = 0; @@ -1413,15 +1412,16 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (has_output) { stats.bytes_written = meta.fd.GetFileSize(); + stats.num_output_files = 1; + } - const auto& blobs = edit->GetBlobFileAdditions(); - for (const auto& blob : blobs) { - stats.bytes_written += blob.GetTotalBlobBytes(); - } - - stats.num_output_files = static_cast(blobs.size()) + 1; + const auto& blobs = edit->GetBlobFileAdditions(); + for (const auto& blob : blobs) { + stats.bytes_written += blob.GetTotalBlobBytes(); } + stats.num_output_files += static_cast(blobs.size()); + cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats); cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, stats.bytes_written); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f17861b3f..56d9b35ae 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -380,6 +380,7 @@ TEST_F(DBWALTest, RecoverWithBlob) { options.min_blob_size = min_blob_size; options.avoid_flush_during_recovery = false; options.disable_auto_compactions = true; + options.env = env_; Reopen(options); @@ -440,10 +441,12 @@ class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { public: - DBRecoveryTestBlobError() : fault_injection_env_(env_) {} + DBRecoveryTestBlobError() + : fault_injection_env_(env_), sync_point_(GetParam()) {} ~DBRecoveryTestBlobError() { Close(); } FaultInjectionTestEnv fault_injection_env_; + std::string sync_point_; }; INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError, @@ -457,11 +460,12 @@ TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) { // Reopen with blob files enabled but make blob file writing fail during // recovery. - SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) { - fault_injection_env_.SetFilesystemActive(false, Status::IOError()); + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, + Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->SetCallBack( - "BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) { + "BuildTable:BeforeDeleteFile", [this](void* /* arg */) { fault_injection_env_.SetFilesystemActive(true); }); SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 6e2a60ff9..504a232b6 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -438,7 +438,6 @@ Status FlushJob::WriteLevel0Table() { // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. const bool has_output = meta_.fd.GetFileSize() > 0; - assert(has_output || blob_file_additions.empty()); if (s.ok() && has_output) { // if we have more than 1 background thread, then we cannot @@ -467,15 +466,16 @@ Status FlushJob::WriteLevel0Table() { if (has_output) { stats.bytes_written = meta_.fd.GetFileSize(); + stats.num_output_files = 1; + } - const auto& blobs = edit_->GetBlobFileAdditions(); - for (const auto& blob : blobs) { - stats.bytes_written += blob.GetTotalBlobBytes(); - } - - stats.num_output_files = static_cast(blobs.size()) + 1; + const auto& blobs = edit_->GetBlobFileAdditions(); + for (const auto& blob : blobs) { + stats.bytes_written += blob.GetTotalBlobBytes(); } + stats.num_output_files += static_cast(blobs.size()); + RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,