diff --git a/HISTORY.md b/HISTORY.md index e4e7bed3b..55d922902 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -65,6 +65,8 @@ * Remove deprecated option DBOptions::new_table_reader_for_compaction_inputs. * Add Transaction::SetReadTimestampForValidation() and Transaction::SetCommitTimestamp(). Default impl returns NotSupported(). * Add support for decimal patterns to ObjectLibrary::PatternEntry +* Remove deprecated remote compaction APIs `CompactionService::Start()` and `CompactionService::WaitForComplete()`. Please use `CompactionService::StartV2()`, `CompactionService::WaitForCompleteV2()` instead, which provides the same information plus extra data like priority, db_id, etc. + ### Behavior Changes * Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO. * `ReadOptions::total_order_seek` no longer affects `DB::Get()`. The original motivation for this interaction has been obsolete since RocksDB has been able to detect whether the current prefix extractor is compatible with that used to generate table files, probably RocksDB 5.14.0. diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 5a4ff7799..867cb08ef 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -10,126 +10,7 @@ namespace ROCKSDB_NAMESPACE { -class TestCompactionServiceBase { - public: - virtual int GetCompactionNum() = 0; - - void OverrideStartStatus(CompactionServiceJobStatus s) { - is_override_start_status = true; - override_start_status = s; - } - - void OverrideWaitStatus(CompactionServiceJobStatus s) { - is_override_wait_status = true; - override_wait_status = s; - } - - void OverrideWaitResult(std::string str) { - is_override_wait_result = true; - override_wait_result = std::move(str); - } - - void ResetOverride() { - is_override_wait_result = false; - is_override_start_status = false; - is_override_wait_status = false; - } - - virtual ~TestCompactionServiceBase() = default; - - protected: - bool is_override_start_status = false; - CompactionServiceJobStatus override_start_status = - CompactionServiceJobStatus::kFailure; - bool is_override_wait_status = false; - CompactionServiceJobStatus override_wait_status = - CompactionServiceJobStatus::kFailure; - bool is_override_wait_result = false; - std::string override_wait_result; -}; - -class MyTestCompactionServiceLegacy : public CompactionService, - public TestCompactionServiceBase { - public: - MyTestCompactionServiceLegacy(std::string db_path, Options& options, - std::shared_ptr& statistics) - : db_path_(std::move(db_path)), - options_(options), - statistics_(statistics) {} - - static const char* kClassName() { return "MyTestCompactionServiceLegacy"; } - - const char* Name() const override { return kClassName(); } - - CompactionServiceJobStatus Start(const std::string& compaction_service_input, - uint64_t job_id) override { - InstrumentedMutexLock l(&mutex_); - jobs_.emplace(job_id, compaction_service_input); - CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; - if (is_override_start_status) { - return override_start_status; - } - return s; - } - - CompactionServiceJobStatus WaitForComplete( - uint64_t job_id, std::string* compaction_service_result) override { - std::string compaction_input; - { - InstrumentedMutexLock l(&mutex_); - auto i = jobs_.find(job_id); - if (i == jobs_.end()) { - return CompactionServiceJobStatus::kFailure; - } - compaction_input = std::move(i->second); - jobs_.erase(i); - } - - if (is_override_wait_status) { - return override_wait_status; - } - - CompactionServiceOptionsOverride options_override; - options_override.env = options_.env; - options_override.file_checksum_gen_factory = - options_.file_checksum_gen_factory; - options_override.comparator = options_.comparator; - options_override.merge_operator = options_.merge_operator; - options_override.compaction_filter = options_.compaction_filter; - options_override.compaction_filter_factory = - options_.compaction_filter_factory; - options_override.prefix_extractor = options_.prefix_extractor; - options_override.table_factory = options_.table_factory; - options_override.sst_partitioner_factory = options_.sst_partitioner_factory; - options_override.statistics = statistics_; - - Status s = DB::OpenAndCompact( - db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id), - compaction_input, compaction_service_result, options_override); - if (is_override_wait_result) { - *compaction_service_result = override_wait_result; - } - compaction_num_.fetch_add(1); - if (s.ok()) { - return CompactionServiceJobStatus::kSuccess; - } else { - return CompactionServiceJobStatus::kFailure; - } - } - - int GetCompactionNum() override { return compaction_num_.load(); } - - private: - InstrumentedMutex mutex_; - std::atomic_int compaction_num_{0}; - std::map jobs_; - const std::string db_path_; - Options options_; - std::shared_ptr statistics_; -}; - -class MyTestCompactionService : public CompactionService, - public TestCompactionServiceBase { +class MyTestCompactionService : public CompactionService { public: MyTestCompactionService(std::string db_path, Options& options, std::shared_ptr& statistics) @@ -151,8 +32,8 @@ class MyTestCompactionService : public CompactionService, assert(info.db_name == db_path_); jobs_.emplace(info.job_id, compaction_service_input); CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; - if (is_override_start_status) { - return override_start_status; + if (is_override_start_status_) { + return override_start_status_; } return s; } @@ -173,8 +54,8 @@ class MyTestCompactionService : public CompactionService, jobs_.erase(i); } - if (is_override_wait_status) { - return override_wait_status; + if (is_override_wait_status_) { + return override_wait_status_; } CompactionServiceOptionsOverride options_override; @@ -194,8 +75,8 @@ class MyTestCompactionService : public CompactionService, Status s = DB::OpenAndCompact( db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id), compaction_input, compaction_service_result, options_override); - if (is_override_wait_result) { - *compaction_service_result = override_wait_result; + if (is_override_wait_result_) { + *compaction_service_result = override_wait_result_; } compaction_num_.fetch_add(1); if (s.ok()) { @@ -205,11 +86,32 @@ class MyTestCompactionService : public CompactionService, } } - int GetCompactionNum() override { return compaction_num_.load(); } + int GetCompactionNum() { return compaction_num_.load(); } CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; } CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; } + void OverrideStartStatus(CompactionServiceJobStatus s) { + is_override_start_status_ = true; + override_start_status_ = s; + } + + void OverrideWaitStatus(CompactionServiceJobStatus s) { + is_override_wait_status_ = true; + override_wait_status_ = s; + } + + void OverrideWaitResult(std::string str) { + is_override_wait_result_ = true; + override_wait_result_ = std::move(str); + } + + void ResetOverride() { + is_override_wait_result_ = false; + is_override_start_status_ = false; + is_override_wait_status_ = false; + } + private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; @@ -219,17 +121,17 @@ class MyTestCompactionService : public CompactionService, std::shared_ptr statistics_; CompactionServiceJobInfo start_info_; CompactionServiceJobInfo wait_info_; + bool is_override_start_status_ = false; + CompactionServiceJobStatus override_start_status_ = + CompactionServiceJobStatus::kFailure; + bool is_override_wait_status_ = false; + CompactionServiceJobStatus override_wait_status_ = + CompactionServiceJobStatus::kFailure; + bool is_override_wait_result_ = false; + std::string override_wait_result_; }; -// This is only for listing test classes -enum TestCompactionServiceType { - MyTestCompactionServiceType, - MyTestCompactionServiceLegacyType, -}; - -class CompactionServiceTest - : public DBTestBase, - public testing::WithParamInterface { +class CompactionServiceTest : public DBTestBase { public: explicit CompactionServiceTest() : DBTestBase("compaction_service_test", true) {} @@ -240,19 +142,9 @@ class CompactionServiceTest primary_statistics_ = CreateDBStatistics(); options->statistics = primary_statistics_; compactor_statistics_ = CreateDBStatistics(); - TestCompactionServiceType cs_type = GetParam(); - switch (cs_type) { - case MyTestCompactionServiceType: - compaction_service_ = std::make_shared( - dbname_, *options, compactor_statistics_); - break; - case MyTestCompactionServiceLegacyType: - compaction_service_ = std::make_shared( - dbname_, *options, compactor_statistics_); - break; - default: - assert(false); - } + + compaction_service_ = std::make_shared( + dbname_, *options, compactor_statistics_); options->compaction_service = compaction_service_; DestroyAndReopen(*options); } @@ -261,9 +153,9 @@ class CompactionServiceTest Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); } - TestCompactionServiceBase* GetCompactionService() { + MyTestCompactionService* GetCompactionService() { CompactionService* cs = compaction_service_.get(); - return dynamic_cast(cs); + return static_cast_with_check(cs); } void GenerateTestData() { @@ -306,7 +198,7 @@ class CompactionServiceTest std::shared_ptr compaction_service_; }; -TEST_P(CompactionServiceTest, BasicCompactions) { +TEST_F(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); @@ -393,7 +285,7 @@ TEST_P(CompactionServiceTest, BasicCompactions) { ASSERT_TRUE(s.IsAborted()); } -TEST_P(CompactionServiceTest, ManualCompaction) { +TEST_F(CompactionServiceTest, ManualCompaction) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); @@ -430,7 +322,7 @@ TEST_P(CompactionServiceTest, ManualCompaction) { VerifyTestData(); } -TEST_P(CompactionServiceTest, FailedToStart) { +TEST_F(CompactionServiceTest, FailedToStart) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); @@ -448,7 +340,7 @@ TEST_P(CompactionServiceTest, FailedToStart) { ASSERT_TRUE(s.IsIncomplete()); } -TEST_P(CompactionServiceTest, InvalidResult) { +TEST_F(CompactionServiceTest, InvalidResult) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); @@ -466,7 +358,7 @@ TEST_P(CompactionServiceTest, InvalidResult) { ASSERT_FALSE(s.ok()); } -TEST_P(CompactionServiceTest, SubCompaction) { +TEST_F(CompactionServiceTest, SubCompaction) { Options options = CurrentOptions(); options.max_subcompactions = 10; options.target_file_size_base = 1 << 10; // 1KB @@ -505,7 +397,7 @@ class PartialDeleteCompactionFilter : public CompactionFilter { const char* Name() const override { return "PartialDeleteCompactionFilter"; } }; -TEST_P(CompactionServiceTest, CompactionFilter) { +TEST_F(CompactionServiceTest, CompactionFilter) { Options options = CurrentOptions(); std::unique_ptr delete_comp_filter( new PartialDeleteCompactionFilter()); @@ -546,7 +438,7 @@ TEST_P(CompactionServiceTest, CompactionFilter) { ASSERT_GE(my_cs->GetCompactionNum(), 1); } -TEST_P(CompactionServiceTest, Snapshot) { +TEST_F(CompactionServiceTest, Snapshot) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); @@ -567,7 +459,7 @@ TEST_P(CompactionServiceTest, Snapshot) { db_->ReleaseSnapshot(s1); } -TEST_P(CompactionServiceTest, ConcurrentCompaction) { +TEST_F(CompactionServiceTest, ConcurrentCompaction) { Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = 100; options.max_background_jobs = 20; @@ -579,7 +471,7 @@ TEST_P(CompactionServiceTest, ConcurrentCompaction) { std::vector threads; for (const auto& file : meta.levels[1].files) { - threads.push_back(std::thread([&]() { + threads.emplace_back(std::thread([&]() { std::string fname = file.db_path + "/" + file.name; ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2)); })); @@ -604,12 +496,7 @@ TEST_P(CompactionServiceTest, ConcurrentCompaction) { ASSERT_EQ(FilesPerLevel(), "0,0,10"); } -TEST_P(CompactionServiceTest, CompactionInfo) { - // only test compaction info for new compaction service interface - if (GetParam() != MyTestCompactionServiceType) { - return; - } - +TEST_F(CompactionServiceTest, CompactionInfo) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); @@ -688,7 +575,7 @@ TEST_P(CompactionServiceTest, CompactionInfo) { ASSERT_EQ(Env::BOTTOM, info.priority); } -TEST_P(CompactionServiceTest, FallbackLocalAuto) { +TEST_F(CompactionServiceTest, FallbackLocalAuto) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); @@ -740,7 +627,7 @@ TEST_P(CompactionServiceTest, FallbackLocalAuto) { ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0); } -TEST_P(CompactionServiceTest, FallbackLocalManual) { +TEST_F(CompactionServiceTest, FallbackLocalManual) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); @@ -798,12 +685,6 @@ TEST_P(CompactionServiceTest, FallbackLocalManual) { VerifyTestData(); } -INSTANTIATE_TEST_CASE_P( - CompactionServiceTest, CompactionServiceTest, - ::testing::Values( - TestCompactionServiceType::MyTestCompactionServiceType, - TestCompactionServiceType::MyTestCompactionServiceLegacyType)); - } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 94dc4ce07..2bc80f007 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -406,42 +406,20 @@ class CompactionService : public Customizable { // Returns the name of this compaction service. const char* Name() const override = 0; - // Start the compaction with input information, which can be passed to - // `DB::OpenAndCompact()`. - // job_id is pre-assigned, it will be reset after DB re-open. - // Warning: deprecated, please use the new interface - // `StartV2(CompactionServiceJobInfo, ...)` instead. - virtual CompactionServiceJobStatus Start( - const std::string& /*compaction_service_input*/, uint64_t /*job_id*/) { - return CompactionServiceJobStatus::kUseLocal; - } - // Start the remote compaction with `compaction_service_input`, which can be // passed to `DB::OpenAndCompact()` on the remote side. `info` provides the // information the user might want to know, which includes `job_id`. virtual CompactionServiceJobStatus StartV2( - const CompactionServiceJobInfo& info, - const std::string& compaction_service_input) { - // Default implementation to call legacy interface, please override and - // replace the legacy implementation - return Start(compaction_service_input, info.job_id); - } - - // Wait compaction to be finish. - // Warning: deprecated, please use the new interface - // `WaitForCompleteV2(CompactionServiceJobInfo, ...)` instead. - virtual CompactionServiceJobStatus WaitForComplete( - uint64_t /*job_id*/, std::string* /*compaction_service_result*/) { + const CompactionServiceJobInfo& /*info*/, + const std::string& /*compaction_service_input*/) { return CompactionServiceJobStatus::kUseLocal; } // Wait for remote compaction to finish. virtual CompactionServiceJobStatus WaitForCompleteV2( - const CompactionServiceJobInfo& info, - std::string* compaction_service_result) { - // Default implementation to call legacy interface, please override and - // replace the legacy implementation - return WaitForComplete(info.job_id, compaction_service_result); + const CompactionServiceJobInfo& /*info*/, + std::string* /*compaction_service_result*/) { + return CompactionServiceJobStatus::kUseLocal; } ~CompactionService() override = default;