diff --git a/HISTORY.md b/HISTORY.md index 3add487c7..d8953b190 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## Unreleased +### New Features +* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions. + ## 6.24.0 (2021-08-20) ### Bug Fixes * If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file. diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index fbbb3f842..7c3a98e32 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -984,9 +984,10 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", compaction_input.column_family.name.c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); + CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_, + GetCompactionId(sub_compact)); CompactionServiceJobStatus compaction_status = - db_options_.compaction_service->Start(compaction_input_binary, - GetCompactionId(sub_compact)); + db_options_.compaction_service->StartV2(info, compaction_input_binary); if (compaction_status != CompactionServiceJobStatus::kSuccess) { sub_compact->status = Status::Incomplete("CompactionService failed to start compaction job."); @@ -994,8 +995,8 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( } std::string compaction_result_binary; - compaction_status = db_options_.compaction_service->WaitForComplete( - GetCompactionId(sub_compact), &compaction_result_binary); + compaction_status = db_options_.compaction_service->WaitForCompleteV2( + info, &compaction_result_binary); CompactionServiceResult compaction_result; s = CompactionServiceResult::Read(compaction_result_binary, diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index e20c6f865..e2c326be9 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -10,13 +10,45 @@ namespace ROCKSDB_NAMESPACE { -class MyTestCompactionService : public CompactionService { +class TestCompactionServiceBase { public: - MyTestCompactionService(const std::string& db_path, Options& options, - std::shared_ptr statistics = nullptr) - : db_path_(db_path), options_(options), statistics_(statistics) {} + virtual int GetCompactionNum() = 0; - static const char* kClassName() { return "MyTestCompactionService"; } + void OverrideStartStatus(CompactionServiceJobStatus s) { + is_override_start_status = true; + override_start_status = s; + } + + void OverrideWaitResult(std::string str) { + is_override_wait_result = true; + override_wait_result = std::move(str); + } + + void ResetOverride() { + is_override_wait_result = false; + is_override_start_status = false; + } + + virtual ~TestCompactionServiceBase() = default; + + protected: + bool is_override_start_status = false; + CompactionServiceJobStatus override_start_status = + CompactionServiceJobStatus::kFailure; + bool is_override_wait_result = false; + std::string override_wait_result; +}; + +class MyTestCompactionServiceLegacy : public CompactionService, + public TestCompactionServiceBase { + public: + MyTestCompactionServiceLegacy(std::string db_path, Options& options, + std::shared_ptr& statistics) + : db_path_(std::move(db_path)), + options_(options), + statistics_(statistics) {} + + static const char* kClassName() { return "MyTestCompactionServiceLegacy"; } const char* Name() const override { return kClassName(); } @@ -25,7 +57,9 @@ class MyTestCompactionService : public CompactionService { InstrumentedMutexLock l(&mutex_); jobs_.emplace(job_id, compaction_service_input); CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; - TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::Start::End", &s); + if (is_override_start_status) { + return override_start_status; + } return s; } @@ -59,8 +93,9 @@ class MyTestCompactionService : public CompactionService { Status s = DB::OpenAndCompact( db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id), compaction_input, compaction_service_result, options_override); - TEST_SYNC_POINT_CALLBACK("MyTestCompactionService::WaitForComplete::End", - compaction_service_result); + if (is_override_wait_result) { + *compaction_service_result = override_wait_result; + } compaction_num_.fetch_add(1); if (s.ok()) { return CompactionServiceJobStatus::kSuccess; @@ -69,7 +104,7 @@ class MyTestCompactionService : public CompactionService { } } - int GetCompactionNum() { return compaction_num_.load(); } + int GetCompactionNum() override { return compaction_num_.load(); } private: InstrumentedMutex mutex_; @@ -80,12 +115,140 @@ class MyTestCompactionService : public CompactionService { std::shared_ptr statistics_; }; -class CompactionServiceTest : public DBTestBase { +class MyTestCompactionService : public CompactionService, + public TestCompactionServiceBase { + public: + MyTestCompactionService(std::string db_path, Options& options, + std::shared_ptr& statistics) + : db_path_(std::move(db_path)), + options_(options), + statistics_(statistics), + start_info_("na", "na", "na", 0), + wait_info_("na", "na", "na", 0) {} + + static const char* kClassName() { return "MyTestCompactionService"; } + + const char* Name() const override { return kClassName(); } + + CompactionServiceJobStatus StartV2( + const CompactionServiceJobInfo& info, + const std::string& compaction_service_input) override { + InstrumentedMutexLock l(&mutex_); + start_info_ = info; + assert(info.db_name == db_path_); + jobs_.emplace(info.job_id, compaction_service_input); + CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; + if (is_override_start_status) { + return override_start_status; + } + return s; + } + + CompactionServiceJobStatus WaitForCompleteV2( + const CompactionServiceJobInfo& info, + std::string* compaction_service_result) override { + std::string compaction_input; + assert(info.db_name == db_path_); + { + InstrumentedMutexLock l(&mutex_); + wait_info_ = info; + auto i = jobs_.find(info.job_id); + if (i == jobs_.end()) { + return CompactionServiceJobStatus::kFailure; + } + compaction_input = std::move(i->second); + jobs_.erase(i); + } + + CompactionServiceOptionsOverride options_override; + options_override.env = options_.env; + options_override.file_checksum_gen_factory = + options_.file_checksum_gen_factory; + options_override.comparator = options_.comparator; + options_override.merge_operator = options_.merge_operator; + options_override.compaction_filter = options_.compaction_filter; + options_override.compaction_filter_factory = + options_.compaction_filter_factory; + options_override.prefix_extractor = options_.prefix_extractor; + options_override.table_factory = options_.table_factory; + options_override.sst_partitioner_factory = options_.sst_partitioner_factory; + options_override.statistics = statistics_; + + Status s = DB::OpenAndCompact( + db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id), + compaction_input, compaction_service_result, options_override); + if (is_override_wait_result) { + *compaction_service_result = override_wait_result; + } + compaction_num_.fetch_add(1); + if (s.ok()) { + return CompactionServiceJobStatus::kSuccess; + } else { + return CompactionServiceJobStatus::kFailure; + } + } + + int GetCompactionNum() override { return compaction_num_.load(); } + + CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; } + CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; } + + private: + InstrumentedMutex mutex_; + std::atomic_int compaction_num_{0}; + std::map jobs_; + const std::string db_path_; + Options options_; + std::shared_ptr statistics_; + CompactionServiceJobInfo start_info_; + CompactionServiceJobInfo wait_info_; +}; + +// This is only for listing test classes +enum TestCompactionServiceType { + MyTestCompactionServiceType, + MyTestCompactionServiceLegacyType, +}; + +class CompactionServiceTest + : public DBTestBase, + public testing::WithParamInterface { public: explicit CompactionServiceTest() : DBTestBase("compaction_service_test", true) {} protected: + void ReopenWithCompactionService(Options* options) { + options->env = env_; + primary_statistics_ = CreateDBStatistics(); + options->statistics = primary_statistics_; + compactor_statistics_ = CreateDBStatistics(); + TestCompactionServiceType cs_type = GetParam(); + switch (cs_type) { + case MyTestCompactionServiceType: + compaction_service_ = std::make_shared( + dbname_, *options, compactor_statistics_); + break; + case MyTestCompactionServiceLegacyType: + compaction_service_ = std::make_shared( + dbname_, *options, compactor_statistics_); + break; + default: + assert(false); + } + options->compaction_service = compaction_service_; + DestroyAndReopen(*options); + } + + Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); } + + Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); } + + TestCompactionServiceBase* GetCompactionService() { + CompactionService* cs = compaction_service_.get(); + return dynamic_cast(cs); + } + void GenerateTestData() { // Generate 20 files @ L2 for (int i = 0; i < 20; i++) { @@ -119,17 +282,16 @@ class CompactionServiceTest : public DBTestBase { } } } + + private: + std::shared_ptr compactor_statistics_; + std::shared_ptr primary_statistics_; + std::shared_ptr compaction_service_; }; -TEST_F(CompactionServiceTest, BasicCompactions) { +TEST_P(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); - options.env = env_; - options.statistics = CreateDBStatistics(); - std::shared_ptr compactor_statistics = CreateDBStatistics(); - options.compaction_service = std::make_shared( - dbname_, options, compactor_statistics); - - DestroyAndReopen(options); + ReopenWithCompactionService(&options); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { @@ -157,21 +319,22 @@ TEST_F(CompactionServiceTest, BasicCompactions) { ASSERT_EQ(result, "value_new" + ToString(i)); } } - auto my_cs = - dynamic_cast(options.compaction_service.get()); + auto my_cs = GetCompactionService(); + Statistics* compactor_statistics = GetCompactorStatistics(); ASSERT_GE(my_cs->GetCompactionNum(), 1); // make sure the compaction statistics is only recorded on remote side ASSERT_GE( compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1); - ASSERT_EQ(options.statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), + Statistics* primary_statistics = GetPrimaryStatistics(); + ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 0); // Test failed compaction SyncPoint::GetInstance()->SetCallBack( "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) { // override job status - Status* s = static_cast(status); + auto s = static_cast(status); *s = Status::Aborted("MyTestCompactionService failed to compact!"); }); SyncPoint::GetInstance()->EnableProcessing(); @@ -200,17 +363,13 @@ TEST_F(CompactionServiceTest, BasicCompactions) { ASSERT_TRUE(s.IsAborted()); } -TEST_F(CompactionServiceTest, ManualCompaction) { +TEST_P(CompactionServiceTest, ManualCompaction) { Options options = CurrentOptions(); - options.env = env_; options.disable_auto_compactions = true; - options.compaction_service = - std::make_shared(dbname_, options); - DestroyAndReopen(options); + ReopenWithCompactionService(&options); GenerateTestData(); - auto my_cs = - dynamic_cast(options.compaction_service.get()); + auto my_cs = GetCompactionService(); std::string start_str = Key(15); std::string end_str = Key(45); @@ -241,22 +400,15 @@ TEST_F(CompactionServiceTest, ManualCompaction) { VerifyTestData(); } -TEST_F(CompactionServiceTest, FailedToStart) { +TEST_P(CompactionServiceTest, FailedToStart) { Options options = CurrentOptions(); - options.env = env_; options.disable_auto_compactions = true; - options.compaction_service = - std::make_shared(dbname_, options); - DestroyAndReopen(options); + ReopenWithCompactionService(&options); + GenerateTestData(); - SyncPoint::GetInstance()->SetCallBack( - "MyTestCompactionService::Start::End", [&](void* status) { - // override job status - auto s = static_cast(status); - *s = CompactionServiceJobStatus::kFailure; - }); - SyncPoint::GetInstance()->EnableProcessing(); + auto my_cs = GetCompactionService(); + my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure); std::string start_str = Key(15); std::string end_str = Key(45); @@ -266,22 +418,15 @@ TEST_F(CompactionServiceTest, FailedToStart) { ASSERT_TRUE(s.IsIncomplete()); } -TEST_F(CompactionServiceTest, InvalidResult) { +TEST_P(CompactionServiceTest, InvalidResult) { Options options = CurrentOptions(); - options.env = env_; options.disable_auto_compactions = true; - options.compaction_service = - std::make_shared(dbname_, options); - DestroyAndReopen(options); + ReopenWithCompactionService(&options); + GenerateTestData(); - SyncPoint::GetInstance()->SetCallBack( - "MyTestCompactionService::WaitForComplete::End", [&](void* result) { - // override job status - auto result_str = static_cast(result); - *result_str = "Invalid Str"; - }); - SyncPoint::GetInstance()->EnableProcessing(); + auto my_cs = GetCompactionService(); + my_cs->OverrideWaitResult("Invalid Str"); std::string start_str = Key(15); std::string end_str = Key(45); @@ -291,21 +436,17 @@ TEST_F(CompactionServiceTest, InvalidResult) { ASSERT_FALSE(s.ok()); } -TEST_F(CompactionServiceTest, SubCompaction) { +TEST_P(CompactionServiceTest, SubCompaction) { Options options = CurrentOptions(); - options.env = env_; options.max_subcompactions = 10; options.target_file_size_base = 1 << 10; // 1KB options.disable_auto_compactions = true; - options.compaction_service = - std::make_shared(dbname_, options); + ReopenWithCompactionService(&options); - DestroyAndReopen(options); GenerateTestData(); VerifyTestData(); - auto my_cs = - dynamic_cast(options.compaction_service.get()); + auto my_cs = GetCompactionService(); int compaction_num_before = my_cs->GetCompactionNum(); auto cro = CompactRangeOptions(); @@ -334,16 +475,12 @@ class PartialDeleteCompactionFilter : public CompactionFilter { const char* Name() const override { return "PartialDeleteCompactionFilter"; } }; -TEST_F(CompactionServiceTest, CompactionFilter) { +TEST_P(CompactionServiceTest, CompactionFilter) { Options options = CurrentOptions(); - options.env = env_; std::unique_ptr delete_comp_filter( new PartialDeleteCompactionFilter()); options.compaction_filter = delete_comp_filter.get(); - options.compaction_service = - std::make_shared(dbname_, options); - - DestroyAndReopen(options); + ReopenWithCompactionService(&options); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { @@ -375,18 +512,13 @@ TEST_F(CompactionServiceTest, CompactionFilter) { ASSERT_EQ(result, "value_new" + ToString(i)); } } - auto my_cs = - dynamic_cast(options.compaction_service.get()); + auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); } -TEST_F(CompactionServiceTest, Snapshot) { +TEST_P(CompactionServiceTest, Snapshot) { Options options = CurrentOptions(); - options.env = env_; - options.compaction_service = - std::make_shared(dbname_, options); - - DestroyAndReopen(options); + ReopenWithCompactionService(&options); ASSERT_OK(Put(Key(1), "value1")); ASSERT_OK(Put(Key(2), "value1")); @@ -398,23 +530,18 @@ TEST_F(CompactionServiceTest, Snapshot) { ASSERT_OK(Flush()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - auto my_cs = - dynamic_cast(options.compaction_service.get()); + auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); ASSERT_EQ("value1", Get(Key(1), s1)); ASSERT_EQ("value2", Get(Key(1))); db_->ReleaseSnapshot(s1); } -TEST_F(CompactionServiceTest, ConcurrentCompaction) { +TEST_P(CompactionServiceTest, ConcurrentCompaction) { Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = 100; - options.env = env_; - options.compaction_service = - std::make_shared(dbname_, options); options.max_background_jobs = 20; - - DestroyAndReopen(options); + ReopenWithCompactionService(&options); GenerateTestData(); ColumnFamilyMetaData meta; @@ -442,12 +569,50 @@ TEST_F(CompactionServiceTest, ConcurrentCompaction) { ASSERT_EQ(result, "value_new" + ToString(i)); } } - auto my_cs = - dynamic_cast(options.compaction_service.get()); + auto my_cs = GetCompactionService(); ASSERT_EQ(my_cs->GetCompactionNum(), 10); ASSERT_EQ(FilesPerLevel(), "0,0,10"); } +TEST_P(CompactionServiceTest, CompactionInfo) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + ReopenWithCompactionService(&options); + GenerateTestData(); + + auto my_cs = GetCompactionService(); + + std::string start_str = Key(15); + std::string end_str = Key(45); + Slice start(start_str); + Slice end(end_str); + uint64_t comp_num = my_cs->GetCompactionNum(); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); + ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); + VerifyTestData(); + // only test compaction info for new compaction service interface + if (GetParam() == MyTestCompactionServiceType) { + auto cs = static_cast_with_check(my_cs); + CompactionServiceJobInfo info = cs->GetCompactionInfoForStart(); + ASSERT_EQ(dbname_, info.db_name); + std::string db_id, db_session_id; + ASSERT_OK(db_->GetDbIdentity(db_id)); + ASSERT_EQ(db_id, info.db_id); + ASSERT_OK(db_->GetDbSessionId(db_session_id)); + ASSERT_EQ(db_session_id, info.db_session_id); + info = cs->GetCompactionInfoForWait(); + ASSERT_EQ(dbname_, info.db_name); + ASSERT_EQ(db_id, info.db_id); + ASSERT_EQ(db_session_id, info.db_session_id); + } +} + +INSTANTIATE_TEST_CASE_P( + CompactionServiceTest, CompactionServiceTest, + ::testing::Values( + TestCompactionServiceType::MyTestCompactionServiceType, + TestCompactionServiceType::MyTestCompactionServiceLegacyType)); + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 224f6616c..3160fec7f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -375,27 +375,70 @@ enum class CompactionServiceJobStatus : char { kUseLocal, // TODO: Add support for use local compaction }; +struct CompactionServiceJobInfo { + std::string db_name; + std::string db_id; + std::string db_session_id; + uint64_t job_id; // job_id is only unique within the current DB and session, + // restart DB will reset the job_id. `db_id` and + // `db_session_id` could help you build unique id across + // different DBs and sessions. + + // TODO: Add priority information + CompactionServiceJobInfo(std::string db_name_, std::string db_id_, + std::string db_session_id_, uint64_t job_id_) + : db_name(std::move(db_name_)), + db_id(std::move(db_id_)), + db_session_id(std::move(db_session_id_)), + job_id(job_id_) {} +}; + class CompactionService : public Customizable { public: static const char* Type() { return "CompactionService"; } // Returns the name of this compaction service. - virtual const char* Name() const = 0; + const char* Name() const override = 0; // Start the compaction with input information, which can be passed to // `DB::OpenAndCompact()`. // job_id is pre-assigned, it will be reset after DB re-open. - // TODO: sub-compaction is not supported, as they will have the same job_id, a - // sub-compaction id might be added + // Warning: deprecated, please use the new interface + // `StartV2(CompactionServiceJobInfo, ...)` instead. virtual CompactionServiceJobStatus Start( - const std::string& compaction_service_input, uint64_t job_id) = 0; + const std::string& /*compaction_service_input*/, uint64_t /*job_id*/) { + return CompactionServiceJobStatus::kUseLocal; + } + + // Start the remote compaction with `compaction_service_input`, which can be + // passed to `DB::OpenAndCompact()` on the remote side. `info` provides the + // information the user might want to know, which includes `job_id`. + virtual CompactionServiceJobStatus StartV2( + const CompactionServiceJobInfo& info, + const std::string& compaction_service_input) { + // Default implementation to call legacy interface, please override and + // replace the legacy implementation + return Start(compaction_service_input, info.job_id); + } // Wait compaction to be finish. - // TODO: Add output path override + // Warning: deprecated, please use the new interface + // `WaitForCompleteV2(CompactionServiceJobInfo, ...)` instead. virtual CompactionServiceJobStatus WaitForComplete( - uint64_t job_id, std::string* compaction_service_result) = 0; - - virtual ~CompactionService() {} + uint64_t /*job_id*/, std::string* /*compaction_service_result*/) { + return CompactionServiceJobStatus::kUseLocal; + } + + // Wait for remote compaction to finish. + virtual CompactionServiceJobStatus WaitForCompleteV2( + const CompactionServiceJobInfo& info, + std::string* compaction_service_result) { + // Default implementation to call legacy interface, please override and + // replace the legacy implementation + return WaitForComplete(info.job_id, compaction_service_result); + } + + ~CompactionService() override = default; }; struct DBOptions {