// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #ifndef ROCKSDB_LITE #include "db/db_test_util.h" #include "port/stack_trace.h" namespace ROCKSDB_NAMESPACE { class TestCompactionServiceBase { public: virtual int GetCompactionNum() = 0; void OverrideStartStatus(CompactionServiceJobStatus s) { is_override_start_status = true; override_start_status = s; } void OverrideWaitResult(std::string str) { is_override_wait_result = true; override_wait_result = std::move(str); } void ResetOverride() { is_override_wait_result = false; is_override_start_status = false; } virtual ~TestCompactionServiceBase() = default; protected: bool is_override_start_status = false; CompactionServiceJobStatus override_start_status = CompactionServiceJobStatus::kFailure; bool is_override_wait_result = false; std::string override_wait_result; }; class MyTestCompactionServiceLegacy : public CompactionService, public TestCompactionServiceBase { public: MyTestCompactionServiceLegacy(std::string db_path, Options& options, std::shared_ptr& statistics) : db_path_(std::move(db_path)), options_(options), statistics_(statistics) {} static const char* kClassName() { return "MyTestCompactionServiceLegacy"; } const char* Name() const override { return kClassName(); } CompactionServiceJobStatus Start(const std::string& compaction_service_input, uint64_t job_id) override { InstrumentedMutexLock l(&mutex_); jobs_.emplace(job_id, compaction_service_input); CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; if (is_override_start_status) { return override_start_status; } return s; } CompactionServiceJobStatus WaitForComplete( uint64_t job_id, std::string* compaction_service_result) override { std::string compaction_input; { InstrumentedMutexLock l(&mutex_); auto i = jobs_.find(job_id); if (i == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } compaction_input = std::move(i->second); jobs_.erase(i); } CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = options_.file_checksum_gen_factory; options_override.comparator = options_.comparator; options_override.merge_operator = options_.merge_operator; options_override.compaction_filter = options_.compaction_filter; options_override.compaction_filter_factory = options_.compaction_filter_factory; options_override.prefix_extractor = options_.prefix_extractor; options_override.table_factory = options_.table_factory; options_override.sst_partitioner_factory = options_.sst_partitioner_factory; options_override.statistics = statistics_; Status s = DB::OpenAndCompact( db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id), compaction_input, compaction_service_result, options_override); if (is_override_wait_result) { *compaction_service_result = override_wait_result; } compaction_num_.fetch_add(1); if (s.ok()) { return CompactionServiceJobStatus::kSuccess; } else { return CompactionServiceJobStatus::kFailure; } } int GetCompactionNum() override { return compaction_num_.load(); } private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; std::map jobs_; const std::string db_path_; Options options_; std::shared_ptr statistics_; }; class MyTestCompactionService : public CompactionService, public TestCompactionServiceBase { public: MyTestCompactionService(std::string db_path, Options& options, std::shared_ptr& statistics) : db_path_(std::move(db_path)), options_(options), statistics_(statistics), start_info_("na", "na", "na", 0, Env::TOTAL), wait_info_("na", "na", "na", 0, Env::TOTAL) {} static const char* kClassName() { return "MyTestCompactionService"; } const char* Name() const override { return kClassName(); } CompactionServiceJobStatus StartV2( const CompactionServiceJobInfo& info, const std::string& compaction_service_input) override { InstrumentedMutexLock l(&mutex_); start_info_ = info; assert(info.db_name == db_path_); jobs_.emplace(info.job_id, compaction_service_input); CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; if (is_override_start_status) { return override_start_status; } return s; } CompactionServiceJobStatus WaitForCompleteV2( const CompactionServiceJobInfo& info, std::string* compaction_service_result) override { std::string compaction_input; assert(info.db_name == db_path_); { InstrumentedMutexLock l(&mutex_); wait_info_ = info; auto i = jobs_.find(info.job_id); if (i == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } compaction_input = std::move(i->second); jobs_.erase(i); } CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = options_.file_checksum_gen_factory; options_override.comparator = options_.comparator; options_override.merge_operator = options_.merge_operator; options_override.compaction_filter = options_.compaction_filter; options_override.compaction_filter_factory = options_.compaction_filter_factory; options_override.prefix_extractor = options_.prefix_extractor; options_override.table_factory = options_.table_factory; options_override.sst_partitioner_factory = options_.sst_partitioner_factory; options_override.statistics = statistics_; Status s = DB::OpenAndCompact( db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id), compaction_input, compaction_service_result, options_override); if (is_override_wait_result) { *compaction_service_result = override_wait_result; } compaction_num_.fetch_add(1); if (s.ok()) { return CompactionServiceJobStatus::kSuccess; } else { return CompactionServiceJobStatus::kFailure; } } int GetCompactionNum() override { return compaction_num_.load(); } CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; } CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; } private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; std::map jobs_; const std::string db_path_; Options options_; std::shared_ptr statistics_; CompactionServiceJobInfo start_info_; CompactionServiceJobInfo wait_info_; }; // This is only for listing test classes enum TestCompactionServiceType { MyTestCompactionServiceType, MyTestCompactionServiceLegacyType, }; class CompactionServiceTest : public DBTestBase, public testing::WithParamInterface { public: explicit CompactionServiceTest() : DBTestBase("compaction_service_test", true) {} protected: void ReopenWithCompactionService(Options* options) { options->env = env_; primary_statistics_ = CreateDBStatistics(); options->statistics = primary_statistics_; compactor_statistics_ = CreateDBStatistics(); TestCompactionServiceType cs_type = GetParam(); switch (cs_type) { case MyTestCompactionServiceType: compaction_service_ = std::make_shared( dbname_, *options, compactor_statistics_); break; case MyTestCompactionServiceLegacyType: compaction_service_ = std::make_shared( dbname_, *options, compactor_statistics_); break; default: assert(false); } options->compaction_service = compaction_service_; DestroyAndReopen(*options); } Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); } Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); } TestCompactionServiceBase* GetCompactionService() { CompactionService* cs = compaction_service_.get(); return dynamic_cast(cs); } void GenerateTestData() { // Generate 20 files @ L2 for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); } ASSERT_OK(Flush()); } MoveFilesToLevel(2); // Generate 10 files @ L1 overlap with all 20 files @ L2 for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); } ASSERT_OK(Flush()); } MoveFilesToLevel(1); ASSERT_EQ(FilesPerLevel(), "0,10,20"); } void VerifyTestData() { for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + ToString(i)); } else { ASSERT_EQ(result, "value_new" + ToString(i)); } } } private: std::shared_ptr compactor_statistics_; std::shared_ptr primary_statistics_; std::shared_ptr compaction_service_; }; TEST_P(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + ToString(i)); } else { ASSERT_EQ(result, "value_new" + ToString(i)); } } auto my_cs = GetCompactionService(); Statistics* compactor_statistics = GetCompactorStatistics(); ASSERT_GE(my_cs->GetCompactionNum(), 1); // make sure the compaction statistics is only recorded on remote side ASSERT_GE( compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1); Statistics* primary_statistics = GetPrimaryStatistics(); ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 0); // Test failed compaction SyncPoint::GetInstance()->SetCallBack( "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) { // override job status auto s = static_cast(status); *s = Status::Aborted("MyTestCompactionService failed to compact!"); }); SyncPoint::GetInstance()->EnableProcessing(); Status s; for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; s = Put(Key(key_id), "value_new" + ToString(key_id)); if (s.IsAborted()) { break; } } if (s.IsAborted()) { break; } s = Flush(); if (s.IsAborted()) { break; } s = dbfull()->TEST_WaitForCompact(); if (s.IsAborted()) { break; } } ASSERT_TRUE(s.IsAborted()); } TEST_P(CompactionServiceTest, ManualCompaction) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); auto my_cs = GetCompactionService(); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); uint64_t comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); start_str = Key(120); start = start_str; comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); end_str = Key(92); end = end_str; comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); } TEST_P(CompactionServiceTest, FailedToStart) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); auto my_cs = GetCompactionService(); my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); ASSERT_TRUE(s.IsIncomplete()); } TEST_P(CompactionServiceTest, InvalidResult) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); auto my_cs = GetCompactionService(); my_cs->OverrideWaitResult("Invalid Str"); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); ASSERT_FALSE(s.ok()); } TEST_P(CompactionServiceTest, SubCompaction) { Options options = CurrentOptions(); options.max_subcompactions = 10; options.target_file_size_base = 1 << 10; // 1KB options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); VerifyTestData(); auto my_cs = GetCompactionService(); int compaction_num_before = my_cs->GetCompactionNum(); auto cro = CompactRangeOptions(); cro.max_subcompactions = 10; Status s = db_->CompactRange(cro, nullptr, nullptr); ASSERT_OK(s); VerifyTestData(); int compaction_num = my_cs->GetCompactionNum() - compaction_num_before; // make sure there's sub-compaction by checking the compaction number ASSERT_GE(compaction_num, 2); } class PartialDeleteCompactionFilter : public CompactionFilter { public: CompactionFilter::Decision FilterV2( int /*level*/, const Slice& key, ValueType /*value_type*/, const Slice& /*existing_value*/, std::string* /*new_value*/, std::string* /*skip_until*/) const override { int i = std::stoi(key.ToString().substr(3)); if (i > 5 && i <= 105) { return CompactionFilter::Decision::kRemove; } return CompactionFilter::Decision::kKeep; } const char* Name() const override { return "PartialDeleteCompactionFilter"; } }; TEST_P(CompactionServiceTest, CompactionFilter) { Options options = CurrentOptions(); std::unique_ptr delete_comp_filter( new PartialDeleteCompactionFilter()); options.compaction_filter = delete_comp_filter.get(); ReopenWithCompactionService(&options); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i > 5 && i <= 105) { ASSERT_EQ(result, "NOT_FOUND"); } else if (i % 2) { ASSERT_EQ(result, "value" + ToString(i)); } else { ASSERT_EQ(result, "value_new" + ToString(i)); } } auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); } TEST_P(CompactionServiceTest, Snapshot) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); ASSERT_OK(Put(Key(1), "value1")); ASSERT_OK(Put(Key(2), "value1")); const Snapshot* s1 = db_->GetSnapshot(); ASSERT_OK(Flush()); ASSERT_OK(Put(Key(1), "value2")); ASSERT_OK(Put(Key(3), "value2")); ASSERT_OK(Flush()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); ASSERT_EQ("value1", Get(Key(1), s1)); ASSERT_EQ("value2", Get(Key(1))); db_->ReleaseSnapshot(s1); } TEST_P(CompactionServiceTest, ConcurrentCompaction) { Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = 100; options.max_background_jobs = 20; ReopenWithCompactionService(&options); GenerateTestData(); ColumnFamilyMetaData meta; db_->GetColumnFamilyMetaData(&meta); std::vector threads; for (const auto& file : meta.levels[1].files) { threads.push_back(std::thread([&]() { std::string fname = file.db_path + "/" + file.name; ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2)); })); } for (auto& thread : threads) { thread.join(); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + ToString(i)); } else { ASSERT_EQ(result, "value_new" + ToString(i)); } } auto my_cs = GetCompactionService(); ASSERT_EQ(my_cs->GetCompactionNum(), 10); ASSERT_EQ(FilesPerLevel(), "0,0,10"); } TEST_P(CompactionServiceTest, CompactionInfo) { // only test compaction info for new compaction service interface if (GetParam() != MyTestCompactionServiceType) { return; } Options options = CurrentOptions(); ReopenWithCompactionService(&options); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); auto my_cs = static_cast_with_check(GetCompactionService()); uint64_t comp_num = my_cs->GetCompactionNum(); ASSERT_GE(comp_num, 1); CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(dbname_, info.db_name); std::string db_id, db_session_id; ASSERT_OK(db_->GetDbIdentity(db_id)); ASSERT_EQ(db_id, info.db_id); ASSERT_OK(db_->GetDbSessionId(db_session_id)); ASSERT_EQ(db_session_id, info.db_session_id); ASSERT_EQ(Env::LOW, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(dbname_, info.db_name); ASSERT_EQ(db_id, info.db_id); ASSERT_EQ(db_session_id, info.db_session_id); ASSERT_EQ(Env::LOW, info.priority); // Test priority USER ColumnFamilyMetaData meta; db_->GetColumnFamilyMetaData(&meta); SstFileMetaData file = meta.levels[1].files[0]; ASSERT_OK(db_->CompactFiles(CompactionOptions(), {file.db_path + "/" + file.name}, 2)); info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(Env::USER, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::USER, info.priority); // Test priority BOTTOM env_->SetBackgroundThreads(1, Env::BOTTOM); options.num_levels = 2; ReopenWithCompactionService(&options); my_cs = static_cast_with_check(GetCompactionService()); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 4; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(Env::BOTTOM, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::BOTTOM, info.priority); } INSTANTIATE_TEST_CASE_P( CompactionServiceTest, CompactionServiceTest, ::testing::Values( TestCompactionServiceType::MyTestCompactionServiceType, TestCompactionServiceType::MyTestCompactionServiceLegacyType)); } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS extern "C" { void RegisterCustomObjects(int argc, char** argv); } #else void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {} #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); RegisterCustomObjects(argc, argv); return RUN_ALL_TESTS(); } #else #include int main(int /*argc*/, char** /*argv*/) { fprintf(stderr, "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n"); return 0; } #endif // ROCKSDB_LITE