fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							642 lines
						
					
					
						
							20 KiB
						
					
					
				
			
		
		
	
	
							642 lines
						
					
					
						
							20 KiB
						
					
					
				| //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under both the GPLv2 (found in the
 | |
| //  COPYING file in the root directory) and Apache 2.0 License
 | |
| //  (found in the LICENSE.Apache file in the root directory).
 | |
| 
 | |
| #ifndef ROCKSDB_LITE
 | |
| 
 | |
| #include "db/db_test_util.h"
 | |
| #include "port/stack_trace.h"
 | |
| 
 | |
| namespace ROCKSDB_NAMESPACE {
 | |
| 
 | |
| class TestCompactionServiceBase {
 | |
|  public:
 | |
|   virtual int GetCompactionNum() = 0;
 | |
| 
 | |
|   void OverrideStartStatus(CompactionServiceJobStatus s) {
 | |
|     is_override_start_status = true;
 | |
|     override_start_status = s;
 | |
|   }
 | |
| 
 | |
|   void OverrideWaitResult(std::string str) {
 | |
|     is_override_wait_result = true;
 | |
|     override_wait_result = std::move(str);
 | |
|   }
 | |
| 
 | |
|   void ResetOverride() {
 | |
|     is_override_wait_result = false;
 | |
|     is_override_start_status = false;
 | |
|   }
 | |
| 
 | |
|   virtual ~TestCompactionServiceBase() = default;
 | |
| 
 | |
|  protected:
 | |
|   bool is_override_start_status = false;
 | |
|   CompactionServiceJobStatus override_start_status =
 | |
|       CompactionServiceJobStatus::kFailure;
 | |
|   bool is_override_wait_result = false;
 | |
|   std::string override_wait_result;
 | |
| };
 | |
| 
 | |
| class MyTestCompactionServiceLegacy : public CompactionService,
 | |
|                                       public TestCompactionServiceBase {
 | |
|  public:
 | |
|   MyTestCompactionServiceLegacy(std::string db_path, Options& options,
 | |
|                                 std::shared_ptr<Statistics>& statistics)
 | |
|       : db_path_(std::move(db_path)),
 | |
|         options_(options),
 | |
|         statistics_(statistics) {}
 | |
| 
 | |
|   static const char* kClassName() { return "MyTestCompactionServiceLegacy"; }
 | |
| 
 | |
|   const char* Name() const override { return kClassName(); }
 | |
| 
 | |
|   CompactionServiceJobStatus Start(const std::string& compaction_service_input,
 | |
|                                    uint64_t job_id) override {
 | |
|     InstrumentedMutexLock l(&mutex_);
 | |
|     jobs_.emplace(job_id, compaction_service_input);
 | |
|     CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
 | |
|     if (is_override_start_status) {
 | |
|       return override_start_status;
 | |
|     }
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   CompactionServiceJobStatus WaitForComplete(
 | |
|       uint64_t job_id, std::string* compaction_service_result) override {
 | |
|     std::string compaction_input;
 | |
|     {
 | |
|       InstrumentedMutexLock l(&mutex_);
 | |
|       auto i = jobs_.find(job_id);
 | |
|       if (i == jobs_.end()) {
 | |
|         return CompactionServiceJobStatus::kFailure;
 | |
|       }
 | |
|       compaction_input = std::move(i->second);
 | |
|       jobs_.erase(i);
 | |
|     }
 | |
| 
 | |
|     CompactionServiceOptionsOverride options_override;
 | |
|     options_override.env = options_.env;
 | |
|     options_override.file_checksum_gen_factory =
 | |
|         options_.file_checksum_gen_factory;
 | |
|     options_override.comparator = options_.comparator;
 | |
|     options_override.merge_operator = options_.merge_operator;
 | |
|     options_override.compaction_filter = options_.compaction_filter;
 | |
|     options_override.compaction_filter_factory =
 | |
|         options_.compaction_filter_factory;
 | |
|     options_override.prefix_extractor = options_.prefix_extractor;
 | |
|     options_override.table_factory = options_.table_factory;
 | |
|     options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
 | |
|     options_override.statistics = statistics_;
 | |
| 
 | |
|     Status s = DB::OpenAndCompact(
 | |
|         db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
 | |
|         compaction_input, compaction_service_result, options_override);
 | |
|     if (is_override_wait_result) {
 | |
|       *compaction_service_result = override_wait_result;
 | |
|     }
 | |
|     compaction_num_.fetch_add(1);
 | |
|     if (s.ok()) {
 | |
|       return CompactionServiceJobStatus::kSuccess;
 | |
|     } else {
 | |
|       return CompactionServiceJobStatus::kFailure;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   int GetCompactionNum() override { return compaction_num_.load(); }
 | |
| 
 | |
|  private:
 | |
|   InstrumentedMutex mutex_;
 | |
|   std::atomic_int compaction_num_{0};
 | |
|   std::map<uint64_t, std::string> jobs_;
 | |
|   const std::string db_path_;
 | |
|   Options options_;
 | |
|   std::shared_ptr<Statistics> statistics_;
 | |
| };
 | |
| 
 | |
| class MyTestCompactionService : public CompactionService,
 | |
|                                 public TestCompactionServiceBase {
 | |
|  public:
 | |
|   MyTestCompactionService(std::string db_path, Options& options,
 | |
|                           std::shared_ptr<Statistics>& statistics)
 | |
|       : db_path_(std::move(db_path)),
 | |
|         options_(options),
 | |
|         statistics_(statistics),
 | |
|         start_info_("na", "na", "na", 0),
 | |
|         wait_info_("na", "na", "na", 0) {}
 | |
| 
 | |
|   static const char* kClassName() { return "MyTestCompactionService"; }
 | |
| 
 | |
|   const char* Name() const override { return kClassName(); }
 | |
| 
 | |
|   CompactionServiceJobStatus StartV2(
 | |
|       const CompactionServiceJobInfo& info,
 | |
|       const std::string& compaction_service_input) override {
 | |
|     InstrumentedMutexLock l(&mutex_);
 | |
|     start_info_ = info;
 | |
|     assert(info.db_name == db_path_);
 | |
|     jobs_.emplace(info.job_id, compaction_service_input);
 | |
|     CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
 | |
|     if (is_override_start_status) {
 | |
|       return override_start_status;
 | |
|     }
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   CompactionServiceJobStatus WaitForCompleteV2(
 | |
|       const CompactionServiceJobInfo& info,
 | |
|       std::string* compaction_service_result) override {
 | |
|     std::string compaction_input;
 | |
|     assert(info.db_name == db_path_);
 | |
|     {
 | |
|       InstrumentedMutexLock l(&mutex_);
 | |
|       wait_info_ = info;
 | |
|       auto i = jobs_.find(info.job_id);
 | |
|       if (i == jobs_.end()) {
 | |
|         return CompactionServiceJobStatus::kFailure;
 | |
|       }
 | |
|       compaction_input = std::move(i->second);
 | |
|       jobs_.erase(i);
 | |
|     }
 | |
| 
 | |
|     CompactionServiceOptionsOverride options_override;
 | |
|     options_override.env = options_.env;
 | |
|     options_override.file_checksum_gen_factory =
 | |
|         options_.file_checksum_gen_factory;
 | |
|     options_override.comparator = options_.comparator;
 | |
|     options_override.merge_operator = options_.merge_operator;
 | |
|     options_override.compaction_filter = options_.compaction_filter;
 | |
|     options_override.compaction_filter_factory =
 | |
|         options_.compaction_filter_factory;
 | |
|     options_override.prefix_extractor = options_.prefix_extractor;
 | |
|     options_override.table_factory = options_.table_factory;
 | |
|     options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
 | |
|     options_override.statistics = statistics_;
 | |
| 
 | |
|     Status s = DB::OpenAndCompact(
 | |
|         db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id),
 | |
|         compaction_input, compaction_service_result, options_override);
 | |
|     if (is_override_wait_result) {
 | |
|       *compaction_service_result = override_wait_result;
 | |
|     }
 | |
|     compaction_num_.fetch_add(1);
 | |
|     if (s.ok()) {
 | |
|       return CompactionServiceJobStatus::kSuccess;
 | |
|     } else {
 | |
|       return CompactionServiceJobStatus::kFailure;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   int GetCompactionNum() override { return compaction_num_.load(); }
 | |
| 
 | |
|   CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
 | |
|   CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
 | |
| 
 | |
|  private:
 | |
|   InstrumentedMutex mutex_;
 | |
|   std::atomic_int compaction_num_{0};
 | |
|   std::map<uint64_t, std::string> jobs_;
 | |
|   const std::string db_path_;
 | |
|   Options options_;
 | |
|   std::shared_ptr<Statistics> statistics_;
 | |
|   CompactionServiceJobInfo start_info_;
 | |
|   CompactionServiceJobInfo wait_info_;
 | |
| };
 | |
| 
 | |
| // This is only for listing test classes
 | |
| enum TestCompactionServiceType {
 | |
|   MyTestCompactionServiceType,
 | |
|   MyTestCompactionServiceLegacyType,
 | |
| };
 | |
| 
 | |
| class CompactionServiceTest
 | |
|     : public DBTestBase,
 | |
|       public testing::WithParamInterface<TestCompactionServiceType> {
 | |
|  public:
 | |
|   explicit CompactionServiceTest()
 | |
|       : DBTestBase("compaction_service_test", true) {}
 | |
| 
 | |
|  protected:
 | |
|   void ReopenWithCompactionService(Options* options) {
 | |
|     options->env = env_;
 | |
|     primary_statistics_ = CreateDBStatistics();
 | |
|     options->statistics = primary_statistics_;
 | |
|     compactor_statistics_ = CreateDBStatistics();
 | |
|     TestCompactionServiceType cs_type = GetParam();
 | |
|     switch (cs_type) {
 | |
|       case MyTestCompactionServiceType:
 | |
|         compaction_service_ = std::make_shared<MyTestCompactionService>(
 | |
|             dbname_, *options, compactor_statistics_);
 | |
|         break;
 | |
|       case MyTestCompactionServiceLegacyType:
 | |
|         compaction_service_ = std::make_shared<MyTestCompactionServiceLegacy>(
 | |
|             dbname_, *options, compactor_statistics_);
 | |
|         break;
 | |
|       default:
 | |
|         assert(false);
 | |
|     }
 | |
|     options->compaction_service = compaction_service_;
 | |
|     DestroyAndReopen(*options);
 | |
|   }
 | |
| 
 | |
|   Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
 | |
| 
 | |
|   Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
 | |
| 
 | |
|   TestCompactionServiceBase* GetCompactionService() {
 | |
|     CompactionService* cs = compaction_service_.get();
 | |
|     return dynamic_cast<TestCompactionServiceBase*>(cs);
 | |
|   }
 | |
| 
 | |
|   void GenerateTestData() {
 | |
|     // Generate 20 files @ L2
 | |
|     for (int i = 0; i < 20; i++) {
 | |
|       for (int j = 0; j < 10; j++) {
 | |
|         int key_id = i * 10 + j;
 | |
|         ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
 | |
|       }
 | |
|       ASSERT_OK(Flush());
 | |
|     }
 | |
|     MoveFilesToLevel(2);
 | |
| 
 | |
|     // Generate 10 files @ L1 overlap with all 20 files @ L2
 | |
|     for (int i = 0; i < 10; i++) {
 | |
|       for (int j = 0; j < 10; j++) {
 | |
|         int key_id = i * 20 + j * 2;
 | |
|         ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
 | |
|       }
 | |
|       ASSERT_OK(Flush());
 | |
|     }
 | |
|     MoveFilesToLevel(1);
 | |
|     ASSERT_EQ(FilesPerLevel(), "0,10,20");
 | |
|   }
 | |
| 
 | |
|   void VerifyTestData() {
 | |
|     for (int i = 0; i < 200; i++) {
 | |
|       auto result = Get(Key(i));
 | |
|       if (i % 2) {
 | |
|         ASSERT_EQ(result, "value" + ToString(i));
 | |
|       } else {
 | |
|         ASSERT_EQ(result, "value_new" + ToString(i));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|  private:
 | |
|   std::shared_ptr<Statistics> compactor_statistics_;
 | |
|   std::shared_ptr<Statistics> primary_statistics_;
 | |
|   std::shared_ptr<CompactionService> compaction_service_;
 | |
| };
 | |
| 
 | |
| TEST_P(CompactionServiceTest, BasicCompactions) {
 | |
|   Options options = CurrentOptions();
 | |
|   ReopenWithCompactionService(&options);
 | |
| 
 | |
|   for (int i = 0; i < 20; i++) {
 | |
|     for (int j = 0; j < 10; j++) {
 | |
|       int key_id = i * 10 + j;
 | |
|       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
 | |
|     }
 | |
|     ASSERT_OK(Flush());
 | |
|   }
 | |
| 
 | |
|   for (int i = 0; i < 10; i++) {
 | |
|     for (int j = 0; j < 10; j++) {
 | |
|       int key_id = i * 20 + j * 2;
 | |
|       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
 | |
|     }
 | |
|     ASSERT_OK(Flush());
 | |
|   }
 | |
|   ASSERT_OK(dbfull()->TEST_WaitForCompact());
 | |
| 
 | |
|   // verify result
 | |
|   for (int i = 0; i < 200; i++) {
 | |
|     auto result = Get(Key(i));
 | |
|     if (i % 2) {
 | |
|       ASSERT_EQ(result, "value" + ToString(i));
 | |
|     } else {
 | |
|       ASSERT_EQ(result, "value_new" + ToString(i));
 | |
|     }
 | |
|   }
 | |
|   auto my_cs = GetCompactionService();
 | |
|   Statistics* compactor_statistics = GetCompactorStatistics();
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), 1);
 | |
| 
 | |
|   // make sure the compaction statistics is only recorded on remote side
 | |
|   ASSERT_GE(
 | |
|       compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
 | |
|   Statistics* primary_statistics = GetPrimaryStatistics();
 | |
|   ASSERT_EQ(primary_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
 | |
|             0);
 | |
| 
 | |
|   // Test failed compaction
 | |
|   SyncPoint::GetInstance()->SetCallBack(
 | |
|       "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
 | |
|         // override job status
 | |
|         auto s = static_cast<Status*>(status);
 | |
|         *s = Status::Aborted("MyTestCompactionService failed to compact!");
 | |
|       });
 | |
|   SyncPoint::GetInstance()->EnableProcessing();
 | |
| 
 | |
|   Status s;
 | |
|   for (int i = 0; i < 10; i++) {
 | |
|     for (int j = 0; j < 10; j++) {
 | |
|       int key_id = i * 20 + j * 2;
 | |
|       s = Put(Key(key_id), "value_new" + ToString(key_id));
 | |
|       if (s.IsAborted()) {
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|     if (s.IsAborted()) {
 | |
|       break;
 | |
|     }
 | |
|     s = Flush();
 | |
|     if (s.IsAborted()) {
 | |
|       break;
 | |
|     }
 | |
|     s = dbfull()->TEST_WaitForCompact();
 | |
|     if (s.IsAborted()) {
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
|   ASSERT_TRUE(s.IsAborted());
 | |
| }
 | |
| 
 | |
| TEST_P(CompactionServiceTest, ManualCompaction) {
 | |
|   Options options = CurrentOptions();
 | |
|   options.disable_auto_compactions = true;
 | |
|   ReopenWithCompactionService(&options);
 | |
|   GenerateTestData();
 | |
| 
 | |
|   auto my_cs = GetCompactionService();
 | |
| 
 | |
|   std::string start_str = Key(15);
 | |
|   std::string end_str = Key(45);
 | |
|   Slice start(start_str);
 | |
|   Slice end(end_str);
 | |
|   uint64_t comp_num = my_cs->GetCompactionNum();
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
 | |
|   VerifyTestData();
 | |
| 
 | |
|   start_str = Key(120);
 | |
|   start = start_str;
 | |
|   comp_num = my_cs->GetCompactionNum();
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
 | |
|   VerifyTestData();
 | |
| 
 | |
|   end_str = Key(92);
 | |
|   end = end_str;
 | |
|   comp_num = my_cs->GetCompactionNum();
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
 | |
|   VerifyTestData();
 | |
| 
 | |
|   comp_num = my_cs->GetCompactionNum();
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
 | |
|   VerifyTestData();
 | |
| }
 | |
| 
 | |
| TEST_P(CompactionServiceTest, FailedToStart) {
 | |
|   Options options = CurrentOptions();
 | |
|   options.disable_auto_compactions = true;
 | |
|   ReopenWithCompactionService(&options);
 | |
| 
 | |
|   GenerateTestData();
 | |
| 
 | |
|   auto my_cs = GetCompactionService();
 | |
|   my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
 | |
| 
 | |
|   std::string start_str = Key(15);
 | |
|   std::string end_str = Key(45);
 | |
|   Slice start(start_str);
 | |
|   Slice end(end_str);
 | |
|   Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
 | |
|   ASSERT_TRUE(s.IsIncomplete());
 | |
| }
 | |
| 
 | |
| TEST_P(CompactionServiceTest, InvalidResult) {
 | |
|   Options options = CurrentOptions();
 | |
|   options.disable_auto_compactions = true;
 | |
|   ReopenWithCompactionService(&options);
 | |
| 
 | |
|   GenerateTestData();
 | |
| 
 | |
|   auto my_cs = GetCompactionService();
 | |
|   my_cs->OverrideWaitResult("Invalid Str");
 | |
| 
 | |
|   std::string start_str = Key(15);
 | |
|   std::string end_str = Key(45);
 | |
|   Slice start(start_str);
 | |
|   Slice end(end_str);
 | |
|   Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
 | |
|   ASSERT_FALSE(s.ok());
 | |
| }
 | |
| 
 | |
| TEST_P(CompactionServiceTest, SubCompaction) {
 | |
|   Options options = CurrentOptions();
 | |
|   options.max_subcompactions = 10;
 | |
|   options.target_file_size_base = 1 << 10;  // 1KB
 | |
|   options.disable_auto_compactions = true;
 | |
|   ReopenWithCompactionService(&options);
 | |
| 
 | |
|   GenerateTestData();
 | |
|   VerifyTestData();
 | |
| 
 | |
|   auto my_cs = GetCompactionService();
 | |
|   int compaction_num_before = my_cs->GetCompactionNum();
 | |
| 
 | |
|   auto cro = CompactRangeOptions();
 | |
|   cro.max_subcompactions = 10;
 | |
|   Status s = db_->CompactRange(cro, nullptr, nullptr);
 | |
|   ASSERT_OK(s);
 | |
|   VerifyTestData();
 | |
|   int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
 | |
|   // make sure there's sub-compaction by checking the compaction number
 | |
|   ASSERT_GE(compaction_num, 2);
 | |
| }
 | |
| 
 | |
| class PartialDeleteCompactionFilter : public CompactionFilter {
 | |
|  public:
 | |
|   CompactionFilter::Decision FilterV2(
 | |
|       int /*level*/, const Slice& key, ValueType /*value_type*/,
 | |
|       const Slice& /*existing_value*/, std::string* /*new_value*/,
 | |
|       std::string* /*skip_until*/) const override {
 | |
|     int i = std::stoi(key.ToString().substr(3));
 | |
|     if (i > 5 && i <= 105) {
 | |
|       return CompactionFilter::Decision::kRemove;
 | |
|     }
 | |
|     return CompactionFilter::Decision::kKeep;
 | |
|   }
 | |
| 
 | |
|   const char* Name() const override { return "PartialDeleteCompactionFilter"; }
 | |
| };
 | |
| 
 | |
| TEST_P(CompactionServiceTest, CompactionFilter) {
 | |
|   Options options = CurrentOptions();
 | |
|   std::unique_ptr<CompactionFilter> delete_comp_filter(
 | |
|       new PartialDeleteCompactionFilter());
 | |
|   options.compaction_filter = delete_comp_filter.get();
 | |
|   ReopenWithCompactionService(&options);
 | |
| 
 | |
|   for (int i = 0; i < 20; i++) {
 | |
|     for (int j = 0; j < 10; j++) {
 | |
|       int key_id = i * 10 + j;
 | |
|       ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
 | |
|     }
 | |
|     ASSERT_OK(Flush());
 | |
|   }
 | |
| 
 | |
|   for (int i = 0; i < 10; i++) {
 | |
|     for (int j = 0; j < 10; j++) {
 | |
|       int key_id = i * 20 + j * 2;
 | |
|       ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id)));
 | |
|     }
 | |
|     ASSERT_OK(Flush());
 | |
|   }
 | |
|   ASSERT_OK(dbfull()->TEST_WaitForCompact());
 | |
| 
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
| 
 | |
|   // verify result
 | |
|   for (int i = 0; i < 200; i++) {
 | |
|     auto result = Get(Key(i));
 | |
|     if (i > 5 && i <= 105) {
 | |
|       ASSERT_EQ(result, "NOT_FOUND");
 | |
|     } else if (i % 2) {
 | |
|       ASSERT_EQ(result, "value" + ToString(i));
 | |
|     } else {
 | |
|       ASSERT_EQ(result, "value_new" + ToString(i));
 | |
|     }
 | |
|   }
 | |
|   auto my_cs = GetCompactionService();
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), 1);
 | |
| }
 | |
| 
 | |
| TEST_P(CompactionServiceTest, Snapshot) {
 | |
|   Options options = CurrentOptions();
 | |
|   ReopenWithCompactionService(&options);
 | |
| 
 | |
|   ASSERT_OK(Put(Key(1), "value1"));
 | |
|   ASSERT_OK(Put(Key(2), "value1"));
 | |
|   const Snapshot* s1 = db_->GetSnapshot();
 | |
|   ASSERT_OK(Flush());
 | |
| 
 | |
|   ASSERT_OK(Put(Key(1), "value2"));
 | |
|   ASSERT_OK(Put(Key(3), "value2"));
 | |
|   ASSERT_OK(Flush());
 | |
| 
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
|   auto my_cs = GetCompactionService();
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), 1);
 | |
|   ASSERT_EQ("value1", Get(Key(1), s1));
 | |
|   ASSERT_EQ("value2", Get(Key(1)));
 | |
|   db_->ReleaseSnapshot(s1);
 | |
| }
 | |
| 
 | |
| TEST_P(CompactionServiceTest, ConcurrentCompaction) {
 | |
|   Options options = CurrentOptions();
 | |
|   options.level0_file_num_compaction_trigger = 100;
 | |
|   options.max_background_jobs = 20;
 | |
|   ReopenWithCompactionService(&options);
 | |
|   GenerateTestData();
 | |
| 
 | |
|   ColumnFamilyMetaData meta;
 | |
|   db_->GetColumnFamilyMetaData(&meta);
 | |
| 
 | |
|   std::vector<std::thread> threads;
 | |
|   for (const auto& file : meta.levels[1].files) {
 | |
|     threads.push_back(std::thread([&]() {
 | |
|       std::string fname = file.db_path + "/" + file.name;
 | |
|       ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
 | |
|     }));
 | |
|   }
 | |
| 
 | |
|   for (auto& thread : threads) {
 | |
|     thread.join();
 | |
|   }
 | |
|   ASSERT_OK(dbfull()->TEST_WaitForCompact());
 | |
| 
 | |
|   // verify result
 | |
|   for (int i = 0; i < 200; i++) {
 | |
|     auto result = Get(Key(i));
 | |
|     if (i % 2) {
 | |
|       ASSERT_EQ(result, "value" + ToString(i));
 | |
|     } else {
 | |
|       ASSERT_EQ(result, "value_new" + ToString(i));
 | |
|     }
 | |
|   }
 | |
|   auto my_cs = GetCompactionService();
 | |
|   ASSERT_EQ(my_cs->GetCompactionNum(), 10);
 | |
|   ASSERT_EQ(FilesPerLevel(), "0,0,10");
 | |
| }
 | |
| 
 | |
| TEST_P(CompactionServiceTest, CompactionInfo) {
 | |
|   Options options = CurrentOptions();
 | |
|   options.disable_auto_compactions = true;
 | |
|   ReopenWithCompactionService(&options);
 | |
|   GenerateTestData();
 | |
| 
 | |
|   auto my_cs = GetCompactionService();
 | |
| 
 | |
|   std::string start_str = Key(15);
 | |
|   std::string end_str = Key(45);
 | |
|   Slice start(start_str);
 | |
|   Slice end(end_str);
 | |
|   uint64_t comp_num = my_cs->GetCompactionNum();
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
 | |
|   ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
 | |
|   VerifyTestData();
 | |
|   // only test compaction info for new compaction service interface
 | |
|   if (GetParam() == MyTestCompactionServiceType) {
 | |
|     auto cs = static_cast_with_check<MyTestCompactionService>(my_cs);
 | |
|     CompactionServiceJobInfo info = cs->GetCompactionInfoForStart();
 | |
|     ASSERT_EQ(dbname_, info.db_name);
 | |
|     std::string db_id, db_session_id;
 | |
|     ASSERT_OK(db_->GetDbIdentity(db_id));
 | |
|     ASSERT_EQ(db_id, info.db_id);
 | |
|     ASSERT_OK(db_->GetDbSessionId(db_session_id));
 | |
|     ASSERT_EQ(db_session_id, info.db_session_id);
 | |
|     info = cs->GetCompactionInfoForWait();
 | |
|     ASSERT_EQ(dbname_, info.db_name);
 | |
|     ASSERT_EQ(db_id, info.db_id);
 | |
|     ASSERT_EQ(db_session_id, info.db_session_id);
 | |
|   }
 | |
| }
 | |
| 
 | |
| INSTANTIATE_TEST_CASE_P(
 | |
|     CompactionServiceTest, CompactionServiceTest,
 | |
|     ::testing::Values(
 | |
|         TestCompactionServiceType::MyTestCompactionServiceType,
 | |
|         TestCompactionServiceType::MyTestCompactionServiceLegacyType));
 | |
| 
 | |
| }  // namespace ROCKSDB_NAMESPACE
 | |
| 
 | |
| #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
 | |
| extern "C" {
 | |
| void RegisterCustomObjects(int argc, char** argv);
 | |
| }
 | |
| #else
 | |
| void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
 | |
| #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
 | |
| 
 | |
| int main(int argc, char** argv) {
 | |
|   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
 | |
|   ::testing::InitGoogleTest(&argc, argv);
 | |
|   RegisterCustomObjects(argc, argv);
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | |
| 
 | |
| #else
 | |
| #include <stdio.h>
 | |
| 
 | |
| int main(int /*argc*/, char** /*argv*/) {
 | |
|   fprintf(stderr,
 | |
|           "SKIPPED as CompactionService is not supported in ROCKSDB_LITE\n");
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| #endif  // ROCKSDB_LITE
 | |
| 
 |