diff --git a/CMakeLists.txt b/CMakeLists.txt index c204baa21..91dce7b01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -403,7 +403,7 @@ endif() option(WITH_TSAN "build with TSAN" OFF) if(WITH_TSAN) - set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread -pie") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread -Wl,-pie") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fPIC") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=thread -fPIC") if(WITH_JEMALLOC) diff --git a/HISTORY.md b/HISTORY.md index 5664786e5..39b1f73a6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -68,6 +68,7 @@ * Add Transaction::SetReadTimestampForValidation() and Transaction::SetCommitTimestamp(). Default impl returns NotSupported(). * Add support for decimal patterns to ObjectLibrary::PatternEntry * Remove deprecated remote compaction APIs `CompactionService::Start()` and `CompactionService::WaitForComplete()`. Please use `CompactionService::StartV2()`, `CompactionService::WaitForCompleteV2()` instead, which provides the same information plus extra data like priority, db_id, etc. +* Add subcompaction callback APIs: `OnSubcompactionBegin()` and `OnSubcompactionCompleted()`. ### Behavior Changes * Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO. diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index f9bbb25fa..1d345f4d8 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -292,7 +292,7 @@ class Compaction { int GetInputBaseLevel() const; - CompactionReason compaction_reason() { return compaction_reason_; } + CompactionReason compaction_reason() const { return compaction_reason_; } const std::vector& grandparents() const { return grandparents_; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index e84ad734f..0ebd53b0b 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -195,6 +195,10 @@ struct CompactionJob::SubcompactionState { // within the same compaction job. const uint32_t sub_job_id; + // Notify on sub-compaction completion only if listener was notified on + // sub-compaction begin. + bool notify_on_subcompaction_completion = false; + SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size, uint32_t _sub_job_id) : compaction(c), @@ -1215,8 +1219,82 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_result.bytes_written); return CompactionServiceJobStatus::kSuccess; } + +void CompactionJob::BuildSubcompactionJobInfo( + SubcompactionState* sub_compact, + SubcompactionJobInfo* subcompaction_job_info) const { + Compaction* c = compact_->compaction; + ColumnFamilyData* cfd = c->column_family_data(); + + subcompaction_job_info->cf_id = cfd->GetID(); + subcompaction_job_info->cf_name = cfd->GetName(); + subcompaction_job_info->status = sub_compact->status; + subcompaction_job_info->thread_id = env_->GetThreadID(); + subcompaction_job_info->job_id = job_id_; + subcompaction_job_info->subcompaction_job_id = sub_compact->sub_job_id; + subcompaction_job_info->base_input_level = c->start_level(); + subcompaction_job_info->output_level = c->output_level(); + subcompaction_job_info->stats = sub_compact->compaction_job_stats; +} #endif // !ROCKSDB_LITE +void CompactionJob::NotifyOnSubcompactionBegin( + SubcompactionState* sub_compact) { +#ifndef ROCKSDB_LITE + Compaction* c = compact_->compaction; + + if (db_options_.listeners.empty()) { + return; + } + if (shutting_down_->load(std::memory_order_acquire)) { + return; + } + if (c->is_manual_compaction() && + manual_compaction_paused_->load(std::memory_order_acquire) > 0) { + return; + } + + sub_compact->notify_on_subcompaction_completion = true; + + SubcompactionJobInfo info{}; + BuildSubcompactionJobInfo(sub_compact, &info); + + for (auto listener : db_options_.listeners) { + listener->OnSubcompactionBegin(info); + } + info.status.PermitUncheckedError(); + +#else + (void)sub_compact; +#endif // ROCKSDB_LITE +} + +void CompactionJob::NotifyOnSubcompactionCompleted( + SubcompactionState* sub_compact) { +#ifndef ROCKSDB_LITE + + if (db_options_.listeners.empty()) { + return; + } + if (shutting_down_->load(std::memory_order_acquire)) { + return; + } + + if (sub_compact->notify_on_subcompaction_completion == false) { + return; + } + + SubcompactionJobInfo info{}; + BuildSubcompactionJobInfo(sub_compact, &info); + + for (auto listener : db_options_.listeners) { + listener->OnSubcompactionCompleted(info); + } +#else + (void)sub_compact; +#endif // ROCKSDB_LITE +} + void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact); assert(sub_compact->compaction); @@ -1255,6 +1333,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { return; } + NotifyOnSubcompactionBegin(sub_compact); + CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), existing_snapshots_); @@ -1614,6 +1694,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { clip.reset(); raw_input.reset(); sub_compact->status = status; + NotifyOnSubcompactionCompleted(sub_compact); } uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) { diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 428c847c3..0f16bb40c 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -167,6 +167,16 @@ class CompactionJob { void UpdateCompactionInputStatsHelper( int* num_files, uint64_t* bytes_read, int input_level); +#ifndef ROCKSDB_LITE + void BuildSubcompactionJobInfo( + SubcompactionState* sub_compact, + SubcompactionJobInfo* subcompaction_job_info) const; +#endif // ROCKSDB_LITE + + void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact); + + void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact); + uint32_t job_id_; CompactionJobStats* compaction_job_stats_; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index dce922e97..6af4bcafd 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -4671,6 +4671,98 @@ TEST_F(DBCompactionTest, CompactionStatsTest) { VerifyCompactionStats(*cfd, *collector); } +TEST_F(DBCompactionTest, SubcompactionEvent) { + class SubCompactionEventListener : public EventListener { + public: + void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override { + InstrumentedMutexLock l(&mutex_); + ASSERT_EQ(running_compactions_.find(ci.job_id), + running_compactions_.end()); + running_compactions_.emplace(ci.job_id, std::unordered_set()); + } + + void OnCompactionCompleted(DB* /*db*/, + const CompactionJobInfo& ci) override { + InstrumentedMutexLock l(&mutex_); + auto it = running_compactions_.find(ci.job_id); + ASSERT_NE(it, running_compactions_.end()); + ASSERT_EQ(it->second.size(), 0); + running_compactions_.erase(it); + } + + void OnSubcompactionBegin(const SubcompactionJobInfo& si) override { + InstrumentedMutexLock l(&mutex_); + auto it = running_compactions_.find(si.job_id); + ASSERT_NE(it, running_compactions_.end()); + auto r = it->second.insert(si.subcompaction_job_id); + ASSERT_TRUE(r.second); // each subcompaction_job_id should be different + total_subcompaction_cnt_++; + } + + void OnSubcompactionCompleted(const SubcompactionJobInfo& si) override { + InstrumentedMutexLock l(&mutex_); + auto it = running_compactions_.find(si.job_id); + ASSERT_NE(it, running_compactions_.end()); + auto r = it->second.erase(si.subcompaction_job_id); + ASSERT_EQ(r, 1); + } + + size_t GetRunningCompactionCount() { + InstrumentedMutexLock l(&mutex_); + return running_compactions_.size(); + } + + size_t GetTotalSubcompactionCount() { + InstrumentedMutexLock l(&mutex_); + return total_subcompaction_cnt_; + } + + private: + InstrumentedMutex mutex_; + std::unordered_map> running_compactions_; + size_t total_subcompaction_cnt_ = 0; + }; + + Options options = CurrentOptions(); + options.target_file_size_base = 1024; + options.level0_file_num_compaction_trigger = 10; + auto* listener = new SubCompactionEventListener(); + options.listeners.emplace_back(listener); + + DestroyAndReopen(options); + + // generate 4 files @ L2 + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + + // generate 2 files @ L1 which overlaps with L2 files + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + MoveFilesToLevel(1); + ASSERT_EQ(FilesPerLevel(), "0,2,4"); + + CompactRangeOptions comp_opts; + comp_opts.max_subcompactions = 4; + Status s = dbfull()->CompactRange(comp_opts, nullptr, nullptr); + ASSERT_OK(s); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // make sure there's no running compaction + ASSERT_EQ(listener->GetRunningCompactionCount(), 0); + // and sub compaction is triggered + ASSERT_GT(listener->GetTotalSubcompactionCount(), 0); +} + TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) { // LSM setup: // L1: [ba bz] diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index dec7ea799..e61e92581 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -363,6 +363,42 @@ struct CompactionFileInfo { uint64_t oldest_blob_file_number; }; +struct SubcompactionJobInfo { + ~SubcompactionJobInfo() { status.PermitUncheckedError(); } + // the id of the column family where the compaction happened. + uint32_t cf_id; + // the name of the column family where the compaction happened. + std::string cf_name; + // the status indicating whether the compaction was successful or not. + Status status; + // the id of the thread that completed this compaction job. + uint64_t thread_id; + // the job id, which is unique in the same thread. + int job_id; + + // sub-compaction job id, which is only unique within the same compaction, so + // use both 'job_id' and 'subcompaction_job_id' to identify a subcompaction + // within an instance. + // For non subcompaction job, it's set to -1. + int subcompaction_job_id; + // the smallest input level of the compaction. + int base_input_level; + // the output level of the compaction. + int output_level; + + // Reason to run the compaction + CompactionReason compaction_reason; + + // Compression algorithm used for output files + CompressionType compression; + + // Statistics and other additional details on the compaction + CompactionJobStats stats; + + // Compression algorithm used for blob output files. + CompressionType blob_compression_type; +}; + struct CompactionJobInfo { ~CompactionJobInfo() { status.PermitUncheckedError(); } // the id of the column family where the compaction happened. @@ -375,6 +411,7 @@ struct CompactionJobInfo { uint64_t thread_id; // the job id, which is unique in the same thread. int job_id; + // the smallest input level of the compaction. int base_input_level; // the output level of the compaction. @@ -579,6 +616,43 @@ class EventListener : public Customizable { virtual void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& /*ci*/) {} + // A callback function to RocksDB which will be called before a sub-compaction + // begins. If a compaction is split to 2 sub-compactions, it will trigger one + // `OnCompactionBegin()` first, then two `OnSubcompactionBegin()`. + // If compaction is not split, it will still trigger one + // `OnSubcompactionBegin()`, as internally, compaction is always handled by + // sub-compaction. The default implementation is a no-op. + // + // Note that this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + // + // @param ci a reference to a CompactionJobInfo struct, it contains a + // `sub_job_id` which is only unique within the specified compaction (which + // can be identified by `job_id`). 'ci' is released after this function is + // returned, and must be copied if it's needed outside this function. + // Note: `table_properties` is not set for sub-compaction, the information + // could be got from `OnCompactionBegin()`. + virtual void OnSubcompactionBegin(const SubcompactionJobInfo& /*si*/) {} + + // A callback function to RocksDB which will be called whenever a + // sub-compaction completed. The same as `OnSubcompactionBegin()`, if a + // compaction is split to 2 sub-compactions, it will be triggered twice. If + // a compaction is not split, it will still be triggered once. + // The default implementation is a no-op. + // + // Note that this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + // + // @param ci a reference to a CompactionJobInfo struct, it contains a + // `sub_job_id` which is only unique within the specified compaction (which + // can be identified by `job_id`). 'ci' is released after this function is + // returned, and must be copied if it's needed outside this function. + // Note: `table_properties` is not set for sub-compaction, the information + // could be got from `OnCompactionCompleted()`. + virtual void OnSubcompactionCompleted(const SubcompactionJobInfo& /*si*/) {} + // A callback function for RocksDB which will be called whenever // a SST file is created. Different from OnCompactionCompleted and // OnFlushCompleted, this callback is designed for external logging