diff --git a/HISTORY.md b/HISTORY.md index fa21fa585..0cbf74ab4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -24,6 +24,7 @@ * Add compaction priority information in RemoteCompaction, which can be used to schedule high priority job first. * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file. * Batch blob read requests for `DB::MultiGet` using `MultiRead`. +* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result. ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index a89fccb99..1bdc4125f 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -932,7 +932,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { } #ifndef ROCKSDB_LITE -void CompactionJob::ProcessKeyValueCompactionWithCompactionService( +CompactionServiceJobStatus +CompactionJob::ProcessKeyValueCompactionWithCompactionService( SubcompactionState* sub_compact) { assert(sub_compact); assert(sub_compact->compaction); @@ -969,7 +970,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( Status s = compaction_input.Write(&compaction_input_binary); if (!s.ok()) { sub_compact->status = s; - return; + return CompactionServiceJobStatus::kFailure; } std::ostringstream input_files_oss; @@ -988,36 +989,73 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( GetCompactionId(sub_compact), thread_pri_); CompactionServiceJobStatus compaction_status = db_options_.compaction_service->StartV2(info, compaction_input_binary); - if (compaction_status != CompactionServiceJobStatus::kSuccess) { - sub_compact->status = - Status::Incomplete("CompactionService failed to start compaction job."); - return; + switch (compaction_status) { + case CompactionServiceJobStatus::kSuccess: + break; + case CompactionServiceJobStatus::kFailure: + sub_compact->status = Status::Incomplete( + "CompactionService failed to start compaction job."); + ROCKS_LOG_WARN(db_options_.info_log, + "[%s] [JOB %d] Remote compaction failed to start.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; + case CompactionServiceJobStatus::kUseLocal: + ROCKS_LOG_INFO( + db_options_.info_log, + "[%s] [JOB %d] Remote compaction fallback to local by API Start.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; + default: + assert(false); // unknown status + break; } + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Waiting for remote compaction...", + compaction_input.column_family.name.c_str(), job_id_); std::string compaction_result_binary; compaction_status = db_options_.compaction_service->WaitForCompleteV2( info, &compaction_result_binary); + if (compaction_status == CompactionServiceJobStatus::kUseLocal) { + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Remote compaction fallback to local by API " + "WaitForComplete.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; + } + CompactionServiceResult compaction_result; s = CompactionServiceResult::Read(compaction_result_binary, &compaction_result); - if (compaction_status != CompactionServiceJobStatus::kSuccess) { - sub_compact->status = - s.ok() ? compaction_result.status - : Status::Incomplete( - "CompactionService failed to run compaction job."); - compaction_result.status.PermitUncheckedError(); + + if (compaction_status == CompactionServiceJobStatus::kFailure) { + if (s.ok()) { + if (compaction_result.status.ok()) { + sub_compact->status = Status::Incomplete( + "CompactionService failed to run the compaction job (even though " + "the internal status is okay)."); + } else { + // set the current sub compaction status with the status returned from + // remote + sub_compact->status = compaction_result.status; + } + } else { + sub_compact->status = Status::Incomplete( + "CompactionService failed to run the compaction job (and no valid " + "result is returned)."); + compaction_result.status.PermitUncheckedError(); + } ROCKS_LOG_WARN(db_options_.info_log, - "[%s] [JOB %d] Remote compaction failed, status: %s", - compaction_input.column_family.name.c_str(), job_id_, - s.ToString().c_str()); - return; + "[%s] [JOB %d] Remote compaction failed.", + compaction_input.column_family.name.c_str(), job_id_); + return compaction_status; } if (!s.ok()) { sub_compact->status = s; compaction_result.status.PermitUncheckedError(); - return; + return CompactionServiceJobStatus::kFailure; } sub_compact->status = compaction_result.status; @@ -1037,7 +1075,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( if (!s.ok()) { sub_compact->status = s; - return; + return CompactionServiceJobStatus::kFailure; } for (const auto& file : compaction_result.output_files) { @@ -1048,7 +1086,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr); if (!s.ok()) { sub_compact->status = s; - return; + return CompactionServiceJobStatus::kFailure; } FileMetaData meta; @@ -1056,7 +1094,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr); if (!s.ok()) { sub_compact->status = s; - return; + return CompactionServiceJobStatus::kFailure; } meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size, file.smallest_seqno, file.largest_seqno); @@ -1077,6 +1115,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( sub_compact->total_bytes = compaction_result.total_bytes; IOSTATS_ADD(bytes_written, compaction_result.bytes_written); IOSTATS_ADD(bytes_read, compaction_result.bytes_read); + return CompactionServiceJobStatus::kSuccess; } #endif // !ROCKSDB_LITE @@ -1086,7 +1125,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { #ifndef ROCKSDB_LITE if (db_options_.compaction_service) { - return ProcessKeyValueCompactionWithCompactionService(sub_compact); + CompactionServiceJobStatus comp_status = + ProcessKeyValueCompactionWithCompactionService(sub_compact); + if (comp_status == CompactionServiceJobStatus::kSuccess || + comp_status == CompactionServiceJobStatus::kFailure) { + return; + } + // fallback to local compaction + assert(comp_status == CompactionServiceJobStatus::kUseLocal); } #endif // !ROCKSDB_LITE diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 586bf86de..4f4563991 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -146,7 +146,7 @@ class CompactionJob { // consecutive groups such that each group has a similar size. void GenSubcompactionBoundaries(); - void ProcessKeyValueCompactionWithCompactionService( + CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService( SubcompactionState* sub_compact); // update the thread status for starting a compaction. diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 4555dba39..d68838182 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -19,6 +19,11 @@ class TestCompactionServiceBase { override_start_status = s; } + void OverrideWaitStatus(CompactionServiceJobStatus s) { + is_override_wait_status = true; + override_wait_status = s; + } + void OverrideWaitResult(std::string str) { is_override_wait_result = true; override_wait_result = std::move(str); @@ -27,6 +32,7 @@ class TestCompactionServiceBase { void ResetOverride() { is_override_wait_result = false; is_override_start_status = false; + is_override_wait_status = false; } virtual ~TestCompactionServiceBase() = default; @@ -35,6 +41,9 @@ class TestCompactionServiceBase { bool is_override_start_status = false; CompactionServiceJobStatus override_start_status = CompactionServiceJobStatus::kFailure; + bool is_override_wait_status = false; + CompactionServiceJobStatus override_wait_status = + CompactionServiceJobStatus::kFailure; bool is_override_wait_result = false; std::string override_wait_result; }; @@ -76,6 +85,10 @@ class MyTestCompactionServiceLegacy : public CompactionService, jobs_.erase(i); } + if (is_override_wait_status) { + return override_wait_status; + } + CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = @@ -160,6 +173,10 @@ class MyTestCompactionService : public CompactionService, jobs_.erase(i); } + if (is_override_wait_status) { + return override_wait_status; + } + CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = @@ -323,7 +340,7 @@ TEST_P(CompactionServiceTest, BasicCompactions) { Statistics* compactor_statistics = GetCompactorStatistics(); ASSERT_GE(my_cs->GetCompactionNum(), 1); - // make sure the compaction statistics is only recorded on remote side + // make sure the compaction statistics is only recorded on the remote side ASSERT_GE( compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1); Statistics* primary_statistics = GetPrimaryStatistics(); @@ -658,6 +675,114 @@ TEST_P(CompactionServiceTest, CompactionInfo) { ASSERT_EQ(Env::BOTTOM, info.priority); } +TEST_P(CompactionServiceTest, FallbackLocalAuto) { + Options options = CurrentOptions(); + ReopenWithCompactionService(&options); + + auto my_cs = GetCompactionService(); + Statistics* compactor_statistics = GetCompactorStatistics(); + Statistics* primary_statistics = GetPrimaryStatistics(); + uint64_t compactor_new_key = + compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + uint64_t primary_new_key = + primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + + my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // verify result + for (int i = 0; i < 200; i++) { + auto result = Get(Key(i)); + if (i % 2) { + ASSERT_EQ(result, "value" + ToString(i)); + } else { + ASSERT_EQ(result, "value_new" + ToString(i)); + } + } + + ASSERT_EQ(my_cs->GetCompactionNum(), 0); + + // make sure the compaction statistics is only recorded on the local side + ASSERT_EQ( + compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + compactor_new_key); + ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + primary_new_key); +} + +TEST_P(CompactionServiceTest, FallbackLocalManual) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + ReopenWithCompactionService(&options); + + GenerateTestData(); + VerifyTestData(); + + auto my_cs = GetCompactionService(); + Statistics* compactor_statistics = GetCompactorStatistics(); + Statistics* primary_statistics = GetPrimaryStatistics(); + uint64_t compactor_new_key = + compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + uint64_t primary_new_key = + primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + + // re-enable remote compaction + my_cs->ResetOverride(); + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + // make sure the compaction statistics is only recorded on the remote side + ASSERT_GT( + compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + compactor_new_key); + ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + primary_new_key); + + // return run local again with API WaitForComplete + my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal); + start_str = Key(120); + start = start_str; + comp_num = my_cs->GetCompactionNum(); + compactor_new_key = + compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + primary_new_key = + primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr)); + ASSERT_EQ(my_cs->GetCompactionNum(), + comp_num); // no remote compaction is run + // make sure the compaction statistics is only recorded on the local side + ASSERT_EQ( + compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + compactor_new_key); + ASSERT_GT(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + primary_new_key); + + // verify result after 2 manual compactions + VerifyTestData(); +} + INSTANTIATE_TEST_CASE_P( CompactionServiceTest, CompactionServiceTest, ::testing::Values( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 055060832..968abc57a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -372,7 +372,7 @@ extern const char* kHostnameForDbHostId; enum class CompactionServiceJobStatus : char { kSuccess, kFailure, - kUseLocal, // TODO: Add support for use local compaction + kUseLocal, }; struct CompactionServiceJobInfo {