From f0fca2b1d51a529ed72d3877357c59277f32d1d2 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Thu, 22 Apr 2021 13:01:00 -0700 Subject: [PATCH] Add internal compaction API for Secondary instance (#8171) Summary: Add compaction API for secondary instance, which compact the files to a secondary DB path without installing to the LSM tree. The API will be used to remote compaction. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8171 Test Plan: `make check` Reviewed By: ajkr Differential Revision: D27694545 Pulled By: jay-zhuang fbshipit-source-id: 8ff3ec1bffdb2e1becee994918850c8902caf731 --- db/compaction/compaction_job.cc | 154 +++++++++++++++++++++--- db/compaction/compaction_job.h | 161 +++++++++++++++++++++---- db/db_impl/db_impl.cc | 8 +- db/db_impl/db_impl.h | 28 ++--- db/db_impl/db_impl_secondary.cc | 88 +++++++++++++- db/db_impl/db_impl_secondary.h | 20 +++- db/db_secondary_test.cc | 200 ++++++++++++++++++++++++++++++++ db/output_validator.h | 4 +- 8 files changed, 603 insertions(+), 60 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 4813fd313..3f750289e 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -312,14 +312,19 @@ CompactionJob::CompactionJob( const std::atomic* manual_compaction_paused, const std::string& db_id, const std::string& db_session_id, std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) - : job_id_(job_id), - compact_(new CompactionState(compaction)), - compaction_job_stats_(compaction_job_stats), + : compact_(new CompactionState(compaction)), compaction_stats_(compaction->compaction_reason(), 1), + db_options_(db_options), + log_buffer_(log_buffer), + output_directory_(output_directory), + stats_(stats), + bottommost_level_(false), + write_hint_(Env::WLTH_NOT_SET), + job_id_(job_id), + compaction_job_stats_(compaction_job_stats), dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), - db_options_(db_options), file_options_(file_options), env_(db_options.env), io_tracer_(io_tracer), @@ -330,11 +335,8 @@ CompactionJob::CompactionJob( shutting_down_(shutting_down), manual_compaction_paused_(manual_compaction_paused), preserve_deletes_seqnum_(preserve_deletes_seqnum), - log_buffer_(log_buffer), db_directory_(db_directory), - output_directory_(output_directory), blob_output_directory_(blob_output_directory), - stats_(stats), db_mutex_(db_mutex), db_error_handler_(db_error_handler), existing_snapshots_(std::move(existing_snapshots)), @@ -342,10 +344,8 @@ CompactionJob::CompactionJob( snapshot_checker_(snapshot_checker), table_cache_(std::move(table_cache)), event_logger_(event_logger), - bottommost_level_(false), paranoid_file_checks_(paranoid_file_checks), measure_io_stats_(measure_io_stats), - write_hint_(Env::WLTH_NOT_SET), thread_pri_(thread_pri), full_history_ts_low_(std::move(full_history_ts_low)), blob_callback_(blob_callback) { @@ -1550,9 +1550,7 @@ Status CompactionJob::FinishCompactionOutputFile( FileDescriptor output_fd; uint64_t oldest_blob_file_number = kInvalidBlobFileNumber; if (meta != nullptr) { - fname = - TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths, - meta->fd.GetNumber(), meta->fd.GetPathId()); + fname = GetTableFileName(meta->fd.GetNumber()); output_fd = meta->fd; oldest_blob_file_number = meta->oldest_blob_file_number; } else { @@ -1672,9 +1670,7 @@ Status CompactionJob::OpenCompactionOutputFile( assert(sub_compact->builder == nullptr); // no need to lock because VersionSet::next_file_number_ is atomic uint64_t file_number = versions_->NewFileNumber(); - std::string fname = - TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths, - file_number, sub_compact->compaction->output_path_id()); + std::string fname = GetTableFileName(file_number); // Fire events. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); #ifndef ROCKSDB_LITE @@ -1937,4 +1933,132 @@ void CompactionJob::LogCompaction() { } } +std::string CompactionJob::GetTableFileName(uint64_t file_number) { + return TableFileName(compact_->compaction->immutable_cf_options()->cf_paths, + file_number, compact_->compaction->output_path_id()); +} + +std::string CompactionServiceCompactionJob::GetTableFileName( + uint64_t file_number) { + return MakeTableFileName(output_path_, file_number); +} + +CompactionServiceCompactionJob::CompactionServiceCompactionJob( + int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, + const FileOptions& file_options, VersionSet* versions, + const std::atomic* shutting_down, LogBuffer* log_buffer, + FSDirectory* output_directory, Statistics* stats, + InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, + std::vector existing_snapshots, + std::shared_ptr table_cache, EventLogger* event_logger, + const std::string& dbname, const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id, + const std::string& output_path, + const CompactionServiceInput& compaction_service_input, + CompactionServiceResult* compaction_service_result) + : CompactionJob( + job_id, compaction, db_options, file_options, versions, shutting_down, + 0, log_buffer, nullptr, output_directory, nullptr, stats, db_mutex, + db_error_handler, existing_snapshots, kMaxSequenceNumber, nullptr, + table_cache, event_logger, + compaction->mutable_cf_options()->paranoid_file_checks, + compaction->mutable_cf_options()->report_bg_io_stats, dbname, + &(compaction_service_result->stats), Env::Priority::USER, io_tracer, + nullptr, db_id, db_session_id, + compaction->column_family_data()->GetFullHistoryTsLow()), + output_path_(output_path), + compaction_input_(compaction_service_input), + compaction_result_(compaction_service_result) {} + +Status CompactionServiceCompactionJob::Run() { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_COMPACTION_RUN); + + auto* c = compact_->compaction; + assert(c->column_family_data() != nullptr); + assert(c->column_family_data()->current()->storage_info()->NumLevelFiles( + compact_->compaction->level()) > 0); + + write_hint_ = + c->column_family_data()->CalculateSSTWriteHint(c->output_level()); + bottommost_level_ = c->bottommost_level(); + + compact_->sub_compact_states.emplace_back(c, compaction_input_.begin, + compaction_input_.end, + compaction_input_.approx_size); + + log_buffer_->FlushBufferToLog(); + LogCompaction(); + const uint64_t start_micros = db_options_.clock->NowMicros(); + // Pick the only sub-compaction we should have + assert(compact_->sub_compact_states.size() == 1); + SubcompactionState* sub_compact = compact_->sub_compact_states.data(); + + ProcessKeyValueCompaction(sub_compact); + + compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros; + compaction_stats_.cpu_micros = sub_compact->compaction_job_stats.cpu_micros; + + RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros); + RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME, + compaction_stats_.cpu_micros); + + Status status = sub_compact->status; + IOStatus io_s = sub_compact->io_status; + + if (io_status_.ok()) { + io_status_ = io_s; + } + + if (status.ok()) { + constexpr IODebugContext* dbg = nullptr; + + if (output_directory_) { + io_s = output_directory_->Fsync(IOOptions(), dbg); + } + } + if (io_status_.ok()) { + io_status_ = io_s; + } + if (status.ok()) { + status = io_s; + } + if (status.ok()) { + // TODO: Add verify_table() and VerifyCompactionFileConsistency() + } + + // Finish up all book-keeping to unify the subcompaction results + AggregateStatistics(); + UpdateCompactionStats(); + + compaction_result_->bytes_written = IOSTATS(bytes_written); + compaction_result_->bytes_read = IOSTATS(bytes_read); + RecordCompactionIOStats(); + + LogFlush(db_options_.info_log); + compact_->status = status; + compact_->status.PermitUncheckedError(); + + // Build compaction result + compaction_result_->output_level = compact_->compaction->output_level(); + compaction_result_->output_path = output_path_; + for (const auto& output_file : sub_compact->outputs) { + auto& meta = output_file.meta; + compaction_result_->output_files.emplace_back( + MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno, + meta.fd.largest_seqno, meta.smallest.Encode().ToString(), + meta.largest.Encode().ToString(), meta.oldest_ancester_time, + meta.file_creation_time, output_file.validator.GetHash(), + meta.marked_for_compaction); + } + compaction_result_->num_output_records = sub_compact->num_output_records; + compaction_result_->total_bytes = sub_compact->total_bytes; + + return status; +} + +void CompactionServiceCompactionJob::CleanupCompaction() { + CompactionJob::CleanupCompaction(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 1b3da1511..2a0e15e17 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -84,7 +84,7 @@ class CompactionJob { std::string full_history_ts_low = "", BlobFileCompletionCallback* blob_callback = nullptr); - ~CompactionJob(); + virtual ~CompactionJob(); // no copy/move CompactionJob(CompactionJob&& job) = delete; @@ -107,11 +107,35 @@ class CompactionJob { // Return the IO status IOStatus io_status() const { return io_status_; } - private: + protected: struct SubcompactionState; + // CompactionJob state + struct CompactionState; void AggregateStatistics(); + void UpdateCompactionStats(); + void LogCompaction(); + void RecordCompactionIOStats(); + void CleanupCompaction(); + + // Call compaction filter. Then iterate through input and compact the + // kv-pairs + void ProcessKeyValueCompaction(SubcompactionState* sub_compact); + + CompactionState* compact_; + InternalStats::CompactionStats compaction_stats_; + const ImmutableDBOptions& db_options_; + LogBuffer* log_buffer_; + FSDirectory* output_directory_; + Statistics* stats_; + // Is this compaction creating a file in the bottom most level? + bool bottommost_level_; + + Env::WriteLifeTimeHint write_hint_; + + IOStatus io_status_; + private: // Generates a histogram representing potential divisions of key ranges from // the input. It adds the starting and/or ending keys of certain input files // to the working set and then finds the approximate size of data in between @@ -122,9 +146,6 @@ class CompactionJob { // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); void AllocateCompactionOutputFileNumbers(); - // Call compaction filter. Then iterate through input and compact the - // kv-pairs - void ProcessKeyValueCompaction(SubcompactionState* sub_compact); Status FinishCompactionOutputFile( const Status& input_status, SubcompactionState* sub_compact, @@ -132,33 +153,23 @@ class CompactionJob { CompactionIterationStats* range_del_out_stats, const Slice* next_table_min_key = nullptr); Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); - void RecordCompactionIOStats(); Status OpenCompactionOutputFile(SubcompactionState* sub_compact); - void CleanupCompaction(); void UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const; void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats, CompactionJobStats* compaction_job_stats = nullptr); - void UpdateCompactionStats(); void UpdateCompactionInputStatsHelper( int* num_files, uint64_t* bytes_read, int input_level); - void LogCompaction(); - int job_id_; - // CompactionJob state - struct CompactionState; - CompactionState* compact_; CompactionJobStats* compaction_job_stats_; - InternalStats::CompactionStats compaction_stats_; // DBImpl state const std::string& dbname_; const std::string db_id_; const std::string db_session_id_; - const ImmutableDBOptions& db_options_; const FileOptions file_options_; Env* env_; @@ -170,11 +181,8 @@ class CompactionJob { const std::atomic* shutting_down_; const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; - LogBuffer* log_buffer_; FSDirectory* db_directory_; - FSDirectory* output_directory_; FSDirectory* blob_output_directory_; - Statistics* stats_; InstrumentedMutex* db_mutex_; ErrorHandler* db_error_handler_; // If there were two snapshots with seq numbers s1 and @@ -194,19 +202,128 @@ class CompactionJob { EventLogger* event_logger_; - // Is this compaction creating a file in the bottom most level? - bool bottommost_level_; bool paranoid_file_checks_; bool measure_io_stats_; // Stores the Slices that designate the boundaries for each subcompaction std::vector boundaries_; // Stores the approx size of keys covered in the range of each subcompaction std::vector sizes_; - Env::WriteLifeTimeHint write_hint_; Env::Priority thread_pri_; - IOStatus io_status_; std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; + + // Get table file name in where it's outputting to, which should also be in + // `output_directory_`. + virtual std::string GetTableFileName(uint64_t file_number); +}; + +// CompactionServiceInput is used the pass compaction information between two +// db instances. It contains the information needed to do a compaction. It +// doesn't contain the LSM tree information, which is passed though MANIFEST +// file. +struct CompactionServiceInput { + ColumnFamilyDescriptor column_family; + + DBOptions db_options; + + std::vector snapshots; + + // SST files for compaction, it should already be expended to include all the + // files needed for this compaction, for both input level files and output + // level files. + std::vector input_files; + int output_level; + + // information for subcompaction + Slice* begin = nullptr; + Slice* end = nullptr; + uint64_t approx_size = 0; +}; + +// CompactionServiceOutputFile is the metadata for the output SST file +struct CompactionServiceOutputFile { + std::string file_name; + SequenceNumber smallest_seqno; + SequenceNumber largest_seqno; + std::string smallest_internal_key; + std::string largest_internal_key; + uint64_t oldest_ancester_time; + uint64_t file_creation_time; + uint64_t paranoid_hash; + bool marked_for_compaction; + + CompactionServiceOutputFile() = default; + CompactionServiceOutputFile( + const std::string& name, SequenceNumber smallest, SequenceNumber largest, + std::string _smallest_internal_key, std::string _largest_internal_key, + uint64_t _oldest_ancester_time, uint64_t _file_creation_time, + uint64_t _paranoid_hash, bool _marked_for_compaction) + : file_name(name), + smallest_seqno(smallest), + largest_seqno(largest), + smallest_internal_key(std::move(_smallest_internal_key)), + largest_internal_key(std::move(_largest_internal_key)), + oldest_ancester_time(_oldest_ancester_time), + file_creation_time(_file_creation_time), + paranoid_hash(_paranoid_hash), + marked_for_compaction(_marked_for_compaction) {} +}; + +// CompactionServiceResult contains the compaction result from a different db +// instance, with these information, the primary db instance with write +// permission is able to install the result to the DB. +struct CompactionServiceResult { + std::vector output_files; + int output_level; + + // location of the output files + std::string output_path; + + // some statistics about the compaction + uint64_t num_output_records; + uint64_t total_bytes; + uint64_t bytes_read; + uint64_t bytes_written; + CompactionJobStats stats; +}; + +// CompactionServiceCompactionJob is an read-only compaction job, it takes +// input information from `compaction_service_input` and put result information +// in `compaction_service_result`, the SST files are generated to `output_path`. +class CompactionServiceCompactionJob : private CompactionJob { + public: + CompactionServiceCompactionJob( + int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, + const FileOptions& file_options, VersionSet* versions, + const std::atomic* shutting_down, LogBuffer* log_buffer, + FSDirectory* output_directory, Statistics* stats, + InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, + std::vector existing_snapshots, + std::shared_ptr table_cache, EventLogger* event_logger, + const std::string& dbname, const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id, + const std::string& output_path, + const CompactionServiceInput& compaction_service_input, + CompactionServiceResult* compaction_service_result); + + // Run the compaction in current thread and return the result + Status Run(); + + void CleanupCompaction(); + + IOStatus io_status() const { return CompactionJob::io_status(); } + + private: + // Get table file name in output_path + std::string GetTableFileName(uint64_t file_number) override; + // Specific the compaction output path, otherwise it uses default DB path + const std::string output_path_; + + // Compaction job input + const CompactionServiceInput& compaction_input_; + + // Compaction job result + CompactionServiceResult* compaction_result_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index dd5a3335d..a3e631b68 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -160,14 +160,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS, immutable_db_options_.use_adaptive_mutex), default_cf_handle_(nullptr), + error_handler_(this, immutable_db_options_, &mutex_), + event_logger_(immutable_db_options_.info_log.get()), max_total_in_memory_state_(0), file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite( file_options_, immutable_db_options_)), seq_per_batch_(seq_per_batch), batch_per_txn_(batch_per_txn), - db_lock_(nullptr), + next_job_id_(1), shutting_down_(false), + db_lock_(nullptr), manual_compaction_paused_(false), bg_cv_(&mutex_), logfile_number_(0), @@ -194,7 +197,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, pending_purge_obsolete_files_(0), delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()), last_stats_dump_time_microsec_(0), - next_job_id_(1), has_unpersisted_data_(false), unable_to_release_oldest_log_(false), num_running_ingest_file_(0), @@ -202,7 +204,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, wal_manager_(immutable_db_options_, file_options_, io_tracer_, seq_per_batch), #endif // ROCKSDB_LITE - event_logger_(immutable_db_options_.info_log.get()), bg_work_paused_(0), bg_compaction_paused_(0), refitting_level_(false), @@ -231,7 +232,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, own_sfm_(options.sst_file_manager == nullptr), preserve_deletes_(options.preserve_deletes), closed_(false), - error_handler_(this, immutable_db_options_, &mutex_), atomic_flush_install_cv_(&mutex_), blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_, &error_handler_) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 0841f485b..00210d6bb 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1130,6 +1130,14 @@ class DBImpl : public DB { ColumnFamilyHandleImpl* default_cf_handle_; InternalStats* default_cf_internal_stats_; + // table_cache_ provides its own synchronization + std::shared_ptr table_cache_; + + ErrorHandler error_handler_; + + // Unified interface for logging events + EventLogger event_logger_; + // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families uint64_t max_total_in_memory_state_; @@ -1160,6 +1168,12 @@ class DBImpl : public DB { // Default: true const bool batch_per_txn_; + // Each flush or compaction gets its own job id. this counter makes sure + // they're unique + std::atomic next_job_id_; + + std::atomic shutting_down_; + // Except in DB::Open(), WriteOptionsFile can only be called when: // Persist options to options file. // If need_mutex_lock = false, the method will lock DB mutex. @@ -1940,9 +1954,6 @@ class DBImpl : public DB { Status IncreaseFullHistoryTsLow(ColumnFamilyData* cfd, std::string ts_low); - // table_cache_ provides its own synchronization - std::shared_ptr table_cache_; - // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; @@ -1956,8 +1967,6 @@ class DBImpl : public DB { // mutex_, the order should be first mutex_ and then log_write_mutex_. InstrumentedMutex log_write_mutex_; - std::atomic shutting_down_; - // If zero, manual compactions are allowed to proceed. If non-zero, manual // compactions may still be running, but will quickly fail with // `Status::Incomplete`. The value indicates how many threads have paused @@ -2166,10 +2175,6 @@ class DBImpl : public DB { // Number of threads intending to write to memtable std::atomic pending_memtable_writes_ = {}; - // Each flush or compaction gets its own job id. this counter makes sure - // they're unique - std::atomic next_job_id_; - // A flag indicating whether the current rocksdb database has any // data that is not yet persisted into either WAL or SST file. // Used when disableWAL is true. @@ -2198,9 +2203,6 @@ class DBImpl : public DB { WalManager wal_manager_; #endif // ROCKSDB_LITE - // Unified interface for logging events - EventLogger event_logger_; - // A value of > 0 temporarily disables scheduling of background work int bg_work_paused_; @@ -2268,8 +2270,6 @@ class DBImpl : public DB { // Flag to check whether Close() has been called on this DB bool closed_; - ErrorHandler error_handler_; - // Conditional variable to coordinate installation of atomic flush results. // With atomic flush, each bg thread installs the result of flushing multiple // column families, and different threads can flush different column diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 161187692..8a845fca9 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -17,8 +17,10 @@ namespace ROCKSDB_NAMESPACE { #ifndef ROCKSDB_LITE DBImplSecondary::DBImplSecondary(const DBOptions& db_options, - const std::string& dbname) - : DBImpl(db_options, dbname) { + const std::string& dbname, + std::string secondary_path) + : DBImpl(db_options, dbname, false, true, true), + secondary_path_(std::move(secondary_path)) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Opening the db in secondary mode"); LogFlush(immutable_db_options_.info_log); @@ -617,7 +619,7 @@ Status DB::OpenAsSecondary( } handles->clear(); - DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname); + DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path); impl->versions_.reset(new ReactiveVersionSet( dbname, &impl->immutable_db_options_, impl->file_options_, impl->table_cache_.get(), impl->write_buffer_manager_, @@ -663,6 +665,86 @@ Status DB::OpenAsSecondary( } return s; } + +Status DBImplSecondary::CompactWithoutInstallation( + ColumnFamilyHandle* cfh, const CompactionServiceInput& input, + CompactionServiceResult* result) { + InstrumentedMutexLock l(&mutex_); + auto cfd = static_cast_with_check(cfh)->cfd(); + if (!cfd) { + return Status::InvalidArgument("Cannot find column family" + + cfh->GetName()); + } + + std::unordered_set input_set; + for (const auto& file_name : input.input_files) { + input_set.insert(TableFileNameToNumber(file_name)); + } + + auto* version = cfd->current(); + + ColumnFamilyMetaData cf_meta; + version->GetColumnFamilyMetaData(&cf_meta); + + const MutableCFOptions* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + ColumnFamilyOptions cf_options = cfd->GetLatestCFOptions(); + VersionStorageInfo* vstorage = version->storage_info(); + + // Use comp_options to reuse some CompactFiles functions + CompactionOptions comp_options; + comp_options.compression = kDisableCompressionOption; + comp_options.output_file_size_limit = MaxFileSizeForLevel( + *mutable_cf_options, input.output_level, cf_options.compaction_style, + vstorage->base_level(), cf_options.level_compaction_dynamic_level_bytes); + + std::vector input_files; + Status s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( + &input_files, &input_set, vstorage, comp_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr c; + assert(cfd->compaction_picker()); + c.reset(cfd->compaction_picker()->CompactFiles( + comp_options, input_files, input.output_level, vstorage, + *mutable_cf_options, mutable_db_options_, 0)); + assert(c != nullptr); + + c->SetInputVersion(version); + + // Create output directory if it's not existed yet + std::unique_ptr output_dir; + s = CreateAndNewDirectory(fs_.get(), secondary_path_, &output_dir); + if (!s.ok()) { + return s; + } + + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + immutable_db_options_.info_log.get()); + + const int job_id = next_job_id_.fetch_add(1); + + CompactionServiceCompactionJob compaction_job( + job_id, c.get(), immutable_db_options_, file_options_for_compaction_, + versions_.get(), &shutting_down_, &log_buffer, output_dir.get(), stats_, + &mutex_, &error_handler_, input.snapshots, table_cache_, &event_logger_, + dbname_, io_tracer_, db_id_, db_session_id_, secondary_path_, input, + result); + + mutex_.Unlock(); + s = compaction_job.Run(); + mutex_.Lock(); + + // clean up + compaction_job.io_status().PermitUncheckedError(); + compaction_job.CleanupCompaction(); + c->ReleaseCompactionFiles(s); + c.reset(); + + return s; +} + #else // !ROCKSDB_LITE Status DB::OpenAsSecondary(const Options& /*options*/, diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 8fc58616f..e278b79cd 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -71,7 +71,8 @@ class LogReaderContainer { // effort attempts to catch up with the primary. class DBImplSecondary : public DBImpl { public: - DBImplSecondary(const DBOptions& options, const std::string& dbname); + DBImplSecondary(const DBOptions& options, const std::string& dbname, + std::string secondary_path); ~DBImplSecondary() override; // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_ @@ -222,6 +223,14 @@ class DBImplSecondary : public DBImpl { // not flag the missing file as inconsistency. Status CheckConsistency() override; +#ifndef NDEBUG + Status TEST_CompactWithoutInstallation(ColumnFamilyHandle* cfh, + const CompactionServiceInput& input, + CompactionServiceResult* result) { + return CompactWithoutInstallation(cfh, input, result); + } +#endif // NDEBUG + protected: // ColumnFamilyCollector is a write batch handler which does nothing // except recording unique column family IDs @@ -316,6 +325,13 @@ class DBImplSecondary : public DBImpl { std::unordered_set* cfds_changed, JobContext* job_context); + // Run compaction without installation, the output files will be placed in the + // secondary DB path. The LSM tree won't be changed, the secondary DB is still + // in read-only mode. + Status CompactWithoutInstallation(ColumnFamilyHandle* cfh, + const CompactionServiceInput& input, + CompactionServiceResult* result); + std::unique_ptr manifest_reader_; std::unique_ptr manifest_reporter_; std::unique_ptr manifest_reader_status_; @@ -326,6 +342,8 @@ class DBImplSecondary : public DBImpl { // Current WAL number replayed for each column family. std::unordered_map cfd_to_current_log_; + + const std::string secondary_path_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 49917d3c2..9de935204 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -147,6 +147,206 @@ TEST_F(DBSecondaryTest, ReopenAsSecondary) { ASSERT_EQ(2, count); } +TEST_F(DBSecondaryTest, SimpleInternalCompaction) { + Options options; + options.env = env_; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + CompactionServiceInput input; + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + input.input_files.push_back(file.name); + } + ASSERT_EQ(input.input_files.size(), 3); + + input.output_level = 1; + Close(); + + options.max_open_files = -1; + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + + CompactionServiceResult result; + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, + &result)); + + ASSERT_EQ(result.output_files.size(), 1); + InternalKey smallest, largest; + smallest.DecodeFrom(result.output_files[0].smallest_internal_key); + largest.DecodeFrom(result.output_files[0].largest_internal_key); + ASSERT_EQ(smallest.user_key().ToString(), "bar"); + ASSERT_EQ(largest.user_key().ToString(), "foo"); + ASSERT_EQ(result.output_level, 1); + ASSERT_EQ(result.output_path, this->secondary_path_); + ASSERT_EQ(result.num_output_records, 2); + ASSERT_GT(result.bytes_written, 0); +} + +TEST_F(DBSecondaryTest, InternalCompactionMultiLevels) { + Options options; + options.env = env_; + options.disable_auto_compactions = true; + Reopen(options); + const int kRangeL2 = 10; + const int kRangeL1 = 30; + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i * kRangeL2), "value" + ToString(i))); + ASSERT_OK(Put(Key((i + 1) * kRangeL2 - 1), "value" + ToString(i))); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(2); + for (int i = 0; i < 5; i++) { + ASSERT_OK(Put(Key(i * kRangeL1), "value" + ToString(i))); + ASSERT_OK(Put(Key((i + 1) * kRangeL1 - 1), "value" + ToString(i))); + ASSERT_OK(Flush()); + } + MoveFilesToLevel(1); + for (int i = 0; i < 4; i++) { + ASSERT_OK(Put(Key(i * 30), "value" + ToString(i))); + ASSERT_OK(Put(Key(i * 30 + 50), "value" + ToString(i))); + ASSERT_OK(Flush()); + } + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + + // pick 2 files on level 0 for compaction, which has 3 overlap files on L1 + CompactionServiceInput input1; + input1.input_files.push_back(meta.levels[0].files[2].name); + input1.input_files.push_back(meta.levels[0].files[3].name); + input1.input_files.push_back(meta.levels[1].files[0].name); + input1.input_files.push_back(meta.levels[1].files[1].name); + input1.input_files.push_back(meta.levels[1].files[2].name); + + input1.output_level = 1; + + options.max_open_files = -1; + Close(); + + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + CompactionServiceResult result; + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input1, + &result)); + + // pick 2 files on level 1 for compaction, which has 6 overlap files on L2 + CompactionServiceInput input2; + input2.input_files.push_back(meta.levels[1].files[1].name); + input2.input_files.push_back(meta.levels[1].files[2].name); + for (int i = 3; i < 9; i++) { + input2.input_files.push_back(meta.levels[2].files[i].name); + } + + input2.output_level = 2; + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, + &result)); + + CloseSecondary(); + + // delete all l2 files, without update manifest + for (auto& file : meta.levels[2].files) { + ASSERT_OK(env_->DeleteFile(dbname_ + file.name)); + } + OpenSecondary(options); + cfh = db_secondary_->DefaultColumnFamily(); + Status s = db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input2, + &result); + ASSERT_TRUE(s.IsInvalidArgument()); + + // TODO: L0 -> L1 compaction should success, currently version is not built + // if files is missing. + // ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, + // input1, &result)); +} + +TEST_F(DBSecondaryTest, InternalCompactionCompactedFiles) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + CompactionServiceInput input; + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + input.input_files.push_back(file.name); + } + ASSERT_EQ(input.input_files.size(), 3); + + input.output_level = 1; + + // trigger compaction to delete the files for secondary instance compaction + ASSERT_OK(Put("foo", "foo_value" + std::to_string(3))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(3))); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + Close(); + + options.max_open_files = -1; + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + + CompactionServiceResult result; + Status s = + db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); + ASSERT_TRUE(s.IsInvalidArgument()); +} + +TEST_F(DBSecondaryTest, InternalCompactionMissingFiles) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + CompactionServiceInput input; + + ColumnFamilyMetaData meta; + db_->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + input.input_files.push_back(file.name); + } + ASSERT_EQ(input.input_files.size(), 3); + + input.output_level = 1; + + Close(); + + ASSERT_OK(env_->DeleteFile(dbname_ + input.input_files[0])); + + options.max_open_files = -1; + OpenSecondary(options); + auto cfh = db_secondary_->DefaultColumnFamily(); + + CompactionServiceResult result; + Status s = + db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, &result); + ASSERT_TRUE(s.IsInvalidArgument()); + + input.input_files.erase(input.input_files.begin()); + + ASSERT_OK(db_secondary_full()->TEST_CompactWithoutInstallation(cfh, input, + &result)); +} + TEST_F(DBSecondaryTest, OpenAsSecondary) { Options options; options.env = env_; diff --git a/db/output_validator.h b/db/output_validator.h index c2c5f9906..644c3f8c0 100644 --- a/db/output_validator.h +++ b/db/output_validator.h @@ -33,11 +33,13 @@ class OutputValidator { return GetHash() == other_validator.GetHash(); } - private: // Not (yet) intended to be persisted, so subject to change // without notice between releases. uint64_t GetHash() const { return paranoid_hash_; } + void SetHash(uint64_t hash) { paranoid_hash_ = hash; } + + private: const InternalKeyComparator& icmp_; std::string prev_key_; uint64_t paranoid_hash_ = 0;