diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 04c351d77..3395085a9 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -71,7 +71,6 @@ struct CompactionJob::CompactionState { SequenceNumber smallest_seqno, largest_seqno; }; std::vector outputs; - std::list allocated_file_numbers; // State kept for output being generated std::unique_ptr outfile; @@ -204,10 +203,10 @@ struct CompactionJob::CompactionState { CompactionJob::CompactionJob( Compaction* compaction, const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, - VersionSet* versions, port::Mutex* db_mutex, - std::atomic* shutting_down, LogBuffer* log_buffer, - Directory* db_directory, Statistics* stats, SnapshotList* snapshots, - bool is_snapshot_supported, std::shared_ptr table_cache, + VersionSet* versions, std::atomic* shutting_down, + LogBuffer* log_buffer, Directory* db_directory, Statistics* stats, + SnapshotList* snapshots, bool is_snapshot_supported, + std::shared_ptr table_cache, std::function yield_callback) : compact_(new CompactionState(compaction)), compaction_stats_(1), @@ -216,7 +215,6 @@ CompactionJob::CompactionJob( env_options_(env_options), env_(db_options.env), versions_(versions), - db_mutex_(db_mutex), shutting_down_(shutting_down), log_buffer_(log_buffer), db_directory_(db_directory), @@ -227,7 +225,6 @@ CompactionJob::CompactionJob( yield_callback_(std::move(yield_callback)) {} void CompactionJob::Prepare() { - db_mutex_->AssertHeld(); compact_->CleanupBatchBuffer(); compact_->CleanupMergedBuffer(); @@ -267,9 +264,6 @@ void CompactionJob::Prepare() { // Is this compaction producing files at the bottommost level? bottommost_level_ = compact_->compaction->BottomMostLevel(); - - // Allocate the output file numbers before we release the lock - AllocateCompactionOutputFileNumbers(); } Status CompactionJob::Run() { @@ -461,14 +455,14 @@ Status CompactionJob::Run() { return status; } -Status CompactionJob::Install(Status status) { - db_mutex_->AssertHeld(); +Status CompactionJob::Install(Status status, port::Mutex* db_mutex) { + db_mutex->AssertHeld(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), compaction_stats_); if (status.ok()) { - status = InstallCompactionResults(); + status = InstallCompactionResults(db_mutex); } VersionStorageInfo::LevelSummaryStorage tmp; const auto& stats = compaction_stats_; @@ -496,19 +490,6 @@ Status CompactionJob::Install(Status status) { return status; } -// Allocate the file numbers for the output file. We allocate as -// many output file numbers as there are files in level+1 (at least one) -// Insert them into pending_outputs so that they do not get deleted. -void CompactionJob::AllocateCompactionOutputFileNumbers() { - db_mutex_->AssertHeld(); - assert(compact_->builder == nullptr); - int filesNeeded = compact_->compaction->num_input_files(1); - for (int i = 0; i < std::max(filesNeeded, 1); i++) { - uint64_t file_number = versions_->NewFileNumber(); - compact_->allocated_file_numbers.push_back(file_number); - } -} - Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, bool is_compaction_v2) { @@ -958,8 +939,8 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { return s; } -Status CompactionJob::InstallCompactionResults() { - db_mutex_->AssertHeld(); +Status CompactionJob::InstallCompactionResults(port::Mutex* db_mutex) { + db_mutex->AssertHeld(); // paranoia: verify that the files that we started with // still exist in the current version and in the same original level. @@ -995,7 +976,7 @@ Status CompactionJob::InstallCompactionResults() { } return versions_->LogAndApply( compact_->compaction->column_family_data(), mutable_cf_options_, - compact_->compaction->edit(), db_mutex_, db_directory_); + compact_->compaction->edit(), db_mutex, db_directory_); } // Given a sequence number, return the sequence number of the @@ -1036,21 +1017,8 @@ void CompactionJob::RecordCompactionIOStats() { Status CompactionJob::OpenCompactionOutputFile() { assert(compact_ != nullptr); assert(compact_->builder == nullptr); - uint64_t file_number; - // If we have not yet exhausted the pre-allocated file numbers, - // then use the one from the front. Otherwise, we have to acquire - // the heavyweight lock and allocate a new file number. - if (!compact_->allocated_file_numbers.empty()) { - file_number = compact_->allocated_file_numbers.front(); - compact_->allocated_file_numbers.pop_front(); - } else { - db_mutex_->Lock(); - // TODO(icanadi) make Versions::next_file_number_ atomic and remove db_lock - // around here. Once we do that, AllocateCompactionOutputFileNumbers() will - // not be needed. - file_number = versions_->NewFileNumber(); - db_mutex_->Unlock(); - } + // no need to lock because VersionSet::next_file_number_ is atomic + uint64_t file_number = versions_->NewFileNumber(); // Make the output file std::string fname = TableFileName(db_options_.db_paths, file_number, compact_->compaction->GetOutputPathId()); @@ -1087,7 +1055,6 @@ Status CompactionJob::OpenCompactionOutputFile() { } void CompactionJob::CleanupCompaction(Status status) { - db_mutex_->AssertHeld(); if (compact_->builder != nullptr) { // May happen if we get a shutdown call in the middle of compaction compact_->builder->Abandon(); diff --git a/db/compaction_job.h b/db/compaction_job.h index 45d438156..e993ea675 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -56,10 +56,10 @@ class CompactionJob { CompactionJob(Compaction* compaction, const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, - port::Mutex* db_mutex, std::atomic* shutting_down, - LogBuffer* log_buffer, Directory* db_directory, - Statistics* stats, SnapshotList* snapshot_list, - bool is_snapshot_supported, std::shared_ptr table_cache, + std::atomic* shutting_down, LogBuffer* log_buffer, + Directory* db_directory, Statistics* stats, + SnapshotList* snapshot_list, bool is_snapshot_supported, + std::shared_ptr table_cache, std::function yield_callback); ~CompactionJob() { assert(compact_ == nullptr); } @@ -75,7 +75,7 @@ class CompactionJob { Status Run(); // REQUIRED: mutex held // status is the return of Run() - Status Install(Status status); + Status Install(Status status, port::Mutex* db_mutex); private: void AllocateCompactionOutputFileNumbers(); @@ -86,7 +86,7 @@ class CompactionJob { // Call compaction_filter_v2->Filter() on kv-pairs in compact void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2); Status FinishCompactionOutputFile(Iterator* input); - Status InstallCompactionResults(); + Status InstallCompactionResults(port::Mutex* db_mutex); SequenceNumber findEarliestVisibleSnapshot( SequenceNumber in, const std::vector& snapshots, SequenceNumber* prev_snapshot); @@ -111,7 +111,6 @@ class CompactionJob { const EnvOptions& env_options_; Env* env_; VersionSet* versions_; - port::Mutex* db_mutex_; std::atomic* shutting_down_; LogBuffer* log_buffer_; Directory* db_directory_; diff --git a/db/db_impl.cc b/db/db_impl.cc index 9db1da3b6..8ac509249 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -836,7 +836,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. - versions_->MarkFileNumberUsed(log_number); + versions_->MarkFileNumberUsedDuringRecovery(log_number); // Open the log file std::string fname = LogFileName(db_options_.wal_dir, log_number); unique_ptr file; @@ -970,7 +970,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // not actually used. that is because VersionSet assumes // VersionSet::next_file_number_ always to be strictly greater than any // log number - versions_->MarkFileNumberUsed(max_log_number + 1); + versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1); status = versions_->LogAndApply( cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_); if (!status.ok()) { @@ -1285,18 +1285,18 @@ Status DBImpl::CompactFilesImpl( *c->mutable_cf_options(), &job_context, &log_buffer); }; - CompactionJob compaction_job( - c.get(), db_options_, *c->mutable_cf_options(), env_options_, - versions_.get(), &mutex_, &shutting_down_, - &log_buffer, db_directory_.get(), stats_, &snapshots_, - IsSnapshotSupported(), table_cache_, std::move(yield_callback)); + CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(), + env_options_, versions_.get(), &shutting_down_, + &log_buffer, db_directory_.get(), stats_, + &snapshots_, IsSnapshotSupported(), table_cache_, + std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); Status status = compaction_job.Run(); mutex_.Lock(); if (status.ok()) { - status = compaction_job.Install(status); + status = compaction_job.Install(status, &mutex_); if (status.ok()) { InstallSuperVersionBackground(c->column_family_data(), &job_context, *c->mutable_cf_options()); @@ -2061,16 +2061,16 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, *c->mutable_cf_options(), job_context, log_buffer); }; - CompactionJob compaction_job( - c.get(), db_options_, *c->mutable_cf_options(), env_options_, - versions_.get(), &mutex_, &shutting_down_, log_buffer, - db_directory_.get(), stats_, &snapshots_, IsSnapshotSupported(), - table_cache_, std::move(yield_callback)); + CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(), + env_options_, versions_.get(), &shutting_down_, + log_buffer, db_directory_.get(), stats_, + &snapshots_, IsSnapshotSupported(), + table_cache_, std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); status = compaction_job.Run(); mutex_.Lock(); - status = compaction_job.Install(status); + status = compaction_job.Install(status, &mutex_); if (status.ok()) { InstallSuperVersionBackground(c->column_family_data(), job_context, *c->mutable_cf_options()); diff --git a/db/version_set.cc b/db/version_set.cc index 1c34b56a5..b2b63eb33 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1613,7 +1613,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!descriptor_log_ || manifest_file_size_ > db_options_->max_manifest_file_size) { pending_manifest_file_number_ = NewFileNumber(); - batch_edits.back()->SetNextFile(next_file_number_); + batch_edits.back()->SetNextFile(next_file_number_.load()); new_descriptor_log = true; } else { pending_manifest_file_number_ = manifest_file_number_; @@ -1814,7 +1814,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { assert(edit->IsColumnFamilyManipulation()); - edit->SetNextFile(next_file_number_); + edit->SetNextFile(next_file_number_.load()); edit->SetLastSequence(last_sequence_); if (edit->is_column_family_drop_) { // if we drop column family, we have to make sure to save max column family, @@ -1831,13 +1831,13 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, if (edit->has_log_number_) { assert(edit->log_number_ >= cfd->GetLogNumber()); - assert(edit->log_number_ < next_file_number_); + assert(edit->log_number_ < next_file_number_.load()); } if (!edit->has_prev_log_number_) { edit->SetPrevLogNumber(prev_log_number_); } - edit->SetNextFile(next_file_number_); + edit->SetNextFile(next_file_number_.load()); edit->SetLastSequence(last_sequence_); builder->Apply(edit); @@ -2064,8 +2064,8 @@ Status VersionSet::Recover( column_family_set_->UpdateMaxColumnFamily(max_column_family); - MarkFileNumberUsed(previous_log_number); - MarkFileNumberUsed(log_number); + MarkFileNumberUsedDuringRecovery(previous_log_number); + MarkFileNumberUsedDuringRecovery(log_number); } // there were some column families in the MANIFEST that weren't specified @@ -2105,7 +2105,7 @@ Status VersionSet::Recover( } manifest_file_size_ = current_manifest_file_size; - next_file_number_ = next_file + 1; + next_file_number_.store(next_file + 1); last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; @@ -2116,7 +2116,7 @@ Status VersionSet::Recover( "prev_log_number is %lu," "max_column_family is %u\n", manifest_filename.c_str(), (unsigned long)manifest_file_number_, - (unsigned long)next_file_number_, (unsigned long)last_sequence_, + (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily()); @@ -2452,14 +2452,14 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, delete v; } - next_file_number_ = next_file + 1; + next_file_number_.store(next_file + 1); last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; printf( "next_file_number %lu last_sequence " "%lu prev_log_number %lu max_column_family %u\n", - (unsigned long)next_file_number_, (unsigned long)last_sequence, + (unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)previous_log_number, column_family_set_->GetMaxColumnFamily()); } @@ -2468,9 +2468,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } #endif // ROCKSDB_LITE -void VersionSet::MarkFileNumberUsed(uint64_t number) { - if (next_file_number_ <= number) { - next_file_number_ = number + 1; +void VersionSet::MarkFileNumberUsedDuringRecovery(uint64_t number) { + // only called during recovery which is single threaded, so this works because + // there can't be concurrent calls + if (next_file_number_.load(std::memory_order_relaxed) <= number) { + next_file_number_.store(number + 1, std::memory_order_relaxed); } } diff --git a/db/version_set.h b/db/version_set.h index e0d166818..0be8c4e1b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -532,19 +532,18 @@ class VersionSet { return pending_manifest_file_number_; } - // REQUIRED: mutex locked - uint64_t current_next_file_number() const { return next_file_number_; } + uint64_t current_next_file_number() const { return next_file_number_.load(); } // Allocate and return a new file number - uint64_t NewFileNumber() { return next_file_number_++; } + uint64_t NewFileNumber() { return next_file_number_.fetch_add(1) + 1; } // Arrange to reuse "file_number" unless a newer file number has // already been allocated. // REQUIRES: "file_number" was returned by a call to NewFileNumber(). void ReuseLogFileNumber(uint64_t file_number) { - if (next_file_number_ == file_number + 1) { - next_file_number_ = file_number; - } + auto expected = file_number + 1; + std::atomic_compare_exchange_strong(&next_file_number_, &expected, + file_number); } // Return the last sequence number. @@ -559,7 +558,8 @@ class VersionSet { } // Mark the specified file number as used. - void MarkFileNumberUsed(uint64_t number); + // REQUIRED: this is only called during single-threaded recovery + void MarkFileNumberUsedDuringRecovery(uint64_t number); // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. @@ -636,7 +636,7 @@ class VersionSet { Env* const env_; const std::string dbname_; const DBOptions* const db_options_; - uint64_t next_file_number_; + std::atomic next_file_number_; uint64_t manifest_file_number_; uint64_t pending_manifest_file_number_; std::atomic last_sequence_;