diff --git a/db/compaction.cc b/db/compaction.cc index 2f202cf91..d1cf85f01 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -322,12 +322,11 @@ void Compaction::Summary(char* output, int len) { snprintf(output + write, len - write, "]"); } -uint64_t Compaction::OutputFilePreallocationSize( - const MutableCFOptions& mutable_options) { +uint64_t Compaction::OutputFilePreallocationSize() { uint64_t preallocation_size = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { - preallocation_size = mutable_options.MaxFileSizeForLevel(output_level()); + preallocation_size = max_output_file_size_; } else { for (size_t level_iter = 0; level_iter < num_input_levels(); ++level_iter) { for (const auto& f : inputs_[level_iter].files) { diff --git a/db/compaction.h b/db/compaction.h index d31a76921..3bb87c21f 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -163,7 +163,7 @@ class Compaction { // Returns the size in bytes that the output file should be preallocated to. // In level compaction, that is max_file_size_. In universal compaction, that // is the sum of all input file sizes. - uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options); + uint64_t OutputFilePreallocationSize(); void SetInputVersion(Version* input_version); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 3066e8f9a..8abf2afa1 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -58,12 +58,6 @@ namespace rocksdb { struct CompactionJob::CompactionState { Compaction* const compaction; - // If there were two snapshots with seq numbers s1 and - // s2 and s1 < s2, and if we find two instances of a key k1 then lies - // entirely within s1 and s2, then the earlier version of k1 can be safely - // deleted because that version is not visible in any snapshot. - std::vector existing_snapshots; - // Files produced by compaction struct Output { uint64_t number; @@ -204,17 +198,17 @@ struct CompactionJob::CompactionState { CompactionJob::CompactionJob( int job_id, Compaction* compaction, const DBOptions& db_options, - const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, - VersionSet* versions, std::atomic* shutting_down, - LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, - Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported, + const EnvOptions& env_options, VersionSet* versions, + std::atomic* shutting_down, LogBuffer* log_buffer, + Directory* db_directory, Directory* output_directory, Statistics* stats, + std::vector existing_snapshots, std::shared_ptr table_cache, - std::function yield_callback, EventLogger* event_logger) + std::function yield_callback, EventLogger* event_logger, + bool paranoid_file_checks) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_stats_(1), db_options_(db_options), - mutable_cf_options_(mutable_cf_options), env_options_(env_options), env_(db_options.env), versions_(versions), @@ -223,13 +217,12 @@ CompactionJob::CompactionJob( db_directory_(db_directory), output_directory_(output_directory), stats_(stats), - snapshots_(snapshots), - is_snapshot_supported_(is_snapshot_supported), + existing_snapshots_(std::move(existing_snapshots)), table_cache_(std::move(table_cache)), yield_callback_(std::move(yield_callback)), - event_logger_(event_logger) { - ThreadStatusUtil::SetColumnFamily( - compact_->compaction->column_family_data()); + event_logger_(event_logger), + paranoid_file_checks_(paranoid_file_checks) { + ThreadStatusUtil::SetColumnFamily(compact_->compaction->column_family_data()); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); } @@ -256,18 +249,16 @@ void CompactionJob::Prepare() { visible_at_tip_ = 0; latest_snapshot_ = 0; - // TODO(icanadi) move snapshots_ out of CompactionJob - snapshots_->getAll(compact_->existing_snapshots); - if (compact_->existing_snapshots.size() == 0) { + if (existing_snapshots_.size() == 0) { // optimize for fast path if there are no snapshots visible_at_tip_ = versions_->LastSequence(); earliest_snapshot_ = visible_at_tip_; } else { - latest_snapshot_ = compact_->existing_snapshots.back(); + latest_snapshot_ = existing_snapshots_.back(); // Add the current seqno as the 'latest' virtual // snapshot to the end of this list. - compact_->existing_snapshots.push_back(versions_->LastSequence()); - earliest_snapshot_ = compact_->existing_snapshots[0]; + existing_snapshots_.push_back(versions_->LastSequence()); + earliest_snapshot_ = existing_snapshots_[0]; } // Is this compaction producing files at the bottommost level? @@ -509,7 +500,9 @@ Status CompactionJob::Run() { return status; } -void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { +void CompactionJob::Install(Status* status, + const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_INSTALL); db_mutex->AssertHeld(); @@ -518,7 +511,7 @@ void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) { compact_->compaction->output_level(), compaction_stats_); if (status->ok()) { - *status = InstallCompactionResults(db_mutex); + *status = InstallCompactionResults(db_mutex, mutable_cf_options); } VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); @@ -716,11 +709,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, SequenceNumber visible = visible_at_tip_ ? visible_at_tip_ - : is_snapshot_supported_ - ? findEarliestVisibleSnapshot(ikey.sequence, - compact_->existing_snapshots, - &prev_snapshot) - : 0; + : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_, + &prev_snapshot); if (visible_in_snapshot == visible) { // If the earliest snapshot is which this key is visible in @@ -1018,7 +1008,7 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { ReadOptions(), env_options_, cfd->internal_comparator(), fd); s = iter->status(); - if (s.ok() && mutable_cf_options_.paranoid_file_checks) { + if (s.ok() && paranoid_file_checks_) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} s = iter->status(); } @@ -1039,7 +1029,8 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { return s; } -Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) { +Status CompactionJob::InstallCompactionResults( + InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) { db_mutex->AssertHeld(); auto* compaction = compact_->compaction; @@ -1074,7 +1065,7 @@ Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) { out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compaction->column_family_data(), - mutable_cf_options_, compaction->edit(), + mutable_cf_options, compaction->edit(), db_mutex, db_directory_); } @@ -1142,8 +1133,8 @@ Status CompactionJob::OpenCompactionOutputFile() { compact_->outputs.push_back(out); compact_->outfile->SetIOPriority(Env::IO_LOW); - compact_->outfile->SetPreallocationBlockSize(static_cast( - compact_->compaction->OutputFilePreallocationSize(mutable_cf_options_))); + compact_->outfile->SetPreallocationBlockSize( + static_cast(compact_->compaction->OutputFilePreallocationSize())); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); bool skip_filters = false; diff --git a/db/compaction_job.h b/db/compaction_job.h index b7422725d..1a64aeb60 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -19,7 +19,6 @@ #include "db/dbformat.h" #include "db/log_writer.h" -#include "db/snapshot.h" #include "db/column_family.h" #include "db/version_edit.h" #include "db/memtable_list.h" @@ -51,18 +50,15 @@ class Arena; class CompactionJob { public: - // TODO(icanadi) make effort to reduce number of parameters here - // IMPORTANT: mutable_cf_options needs to be alive while CompactionJob is - // alive CompactionJob(int job_id, Compaction* compaction, const DBOptions& db_options, - const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, - Statistics* stats, SnapshotList* snapshot_list, - bool is_snapshot_supported, std::shared_ptr table_cache, + Statistics* stats, + std::vector existing_snapshots, + std::shared_ptr table_cache, std::function yield_callback, - EventLogger* event_logger); + EventLogger* event_logger, bool paranoid_file_checks); ~CompactionJob(); @@ -77,7 +73,8 @@ class CompactionJob { Status Run(); // REQUIRED: mutex held // status is the return of Run() - void Install(Status* status, InstrumentedMutex* db_mutex); + void Install(Status* status, const MutableCFOptions& mutable_cf_options, + InstrumentedMutex* db_mutex); private: void AllocateCompactionOutputFileNumbers(); @@ -89,7 +86,8 @@ class CompactionJob { void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2, uint64_t* time); Status FinishCompactionOutputFile(Iterator* input); - Status InstallCompactionResults(InstrumentedMutex* db_mutex); + Status InstallCompactionResults(InstrumentedMutex* db_mutex, + const MutableCFOptions& mutable_cf_options); SequenceNumber findEarliestVisibleSnapshot( SequenceNumber in, const std::vector& snapshots, SequenceNumber* prev_snapshot); @@ -112,7 +110,6 @@ class CompactionJob { // DBImpl state const DBOptions& db_options_; - const MutableCFOptions& mutable_cf_options_; const EnvOptions& env_options_; Env* env_; VersionSet* versions_; @@ -121,14 +118,19 @@ class CompactionJob { Directory* db_directory_; Directory* output_directory_; Statistics* stats_; - SnapshotList* snapshots_; - bool is_snapshot_supported_; + // If there were two snapshots with seq numbers s1 and + // s2 and s1 < s2, and if we find two instances of a key k1 then lies + // entirely within s1 and s2, then the earlier version of k1 can be safely + // deleted because that version is not visible in any snapshot. + std::vector existing_snapshots_; std::shared_ptr table_cache_; // yield callback std::function yield_callback_; EventLogger* event_logger_; + + bool paranoid_file_checks_; }; } // namespace rocksdb diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index a0b6a4306..e4c407a78 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -155,7 +155,6 @@ TEST_F(CompactionJobTest, Simple) { {compaction_input_files}, 1, 1024 * 1024, 10, 0, kNoCompression, {})); compaction->SetInputVersion(cfd->current()); - SnapshotList snapshots; int yield_callback_called = 0; std::function yield_callback = [&]() { yield_callback_called++; @@ -164,17 +163,17 @@ TEST_F(CompactionJobTest, Simple) { LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); EventLogger event_logger(db_options_.info_log.get()); - CompactionJob compaction_job( - 0, compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, - nullptr, nullptr, &snapshots, true, table_cache_, - std::move(yield_callback), &event_logger); + CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_, + versions_.get(), &shutting_down_, &log_buffer, + nullptr, nullptr, nullptr, {}, table_cache_, + std::move(yield_callback), &event_logger, false); + compaction_job.Prepare(); mutex_.Unlock(); ASSERT_OK(compaction_job.Run()); mutex_.Lock(); Status s; - compaction_job.Install(&s, &mutex_); + compaction_job.Install(&s, *cfd->GetLatestMutableCFOptions(), &mutex_); ASSERT_OK(s); mutex_.Unlock(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 86f35eb15..1a8c3bc06 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1482,18 +1482,19 @@ Status DBImpl::CompactFilesImpl( c->column_family_data(), *c->mutable_cf_options(), job_context, log_buffer); }; + assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( - job_context->job_id, c.get(), db_options_, *c->mutable_cf_options(), - env_options_, versions_.get(), &shutting_down_, log_buffer, - directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), - stats_, &snapshots_, is_snapshot_supported_, table_cache_, - std::move(yield_callback), &event_logger_); + job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), + &shutting_down_, log_buffer, directories_.GetDbDir(), + directories_.GetDataDir(c->GetOutputPathId()), stats_, + snapshots_.GetAll(), table_cache_, std::move(yield_callback), + &event_logger_, c->mutable_cf_options()->paranoid_file_checks); compaction_job.Prepare(); mutex_.Unlock(); Status status = compaction_job.Run(); mutex_.Lock(); - compaction_job.Install(&status, &mutex_); + compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionBackground(c->column_family_data(), job_context, *c->mutable_cf_options()); @@ -2357,17 +2358,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, *c->mutable_cf_options(), job_context, log_buffer); }; + assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( - job_context->job_id, c.get(), db_options_, *c->mutable_cf_options(), - env_options_, versions_.get(), &shutting_down_, log_buffer, - directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), - stats_, &snapshots_, is_snapshot_supported_, table_cache_, - std::move(yield_callback), &event_logger_); + job_context->job_id, c.get(), db_options_, env_options_, + versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), + directories_.GetDataDir(c->GetOutputPathId()), stats_, + snapshots_.GetAll(), table_cache_, std::move(yield_callback), + &event_logger_, c->mutable_cf_options()->paranoid_file_checks); compaction_job.Prepare(); mutex_.Unlock(); status = compaction_job.Run(); mutex_.Lock(); - compaction_job.Install(&status, &mutex_); + compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionBackground(c->column_family_data(), job_context, *c->mutable_cf_options()); diff --git a/db/snapshot.h b/db/snapshot.h index af4ceba6d..c6852f5e5 100644 --- a/db/snapshot.h +++ b/db/snapshot.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include + #include "rocksdb/db.h" namespace rocksdb { @@ -69,13 +71,17 @@ class SnapshotList { } // retrieve all snapshot numbers. They are sorted in ascending order. - void getAll(std::vector& ret) { - if (empty()) return; + std::vector GetAll() { + std::vector ret; + if (empty()) { + return ret; + } SnapshotImpl* s = &list_; while (s->next_ != &list_) { ret.push_back(s->next_->number_); - s = s ->next_; + s = s->next_; } + return ret; } // get the sequence number of the most recent snapshot