From 53af5d877db5b126d043f29996f9dfc9352c6f81 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 7 Nov 2014 11:50:34 -0800 Subject: [PATCH] Redesign pending_outputs_ Summary: Here's a prototype of redesigning pending_outputs_. This way, we don't have to expose pending_outputs_ to other classes (CompactionJob, FlushJob, MemtableList). DBImpl takes care of it. Still have to write some comments, but should be good enough to start the discussion. Test Plan: make check, will also run stress test Reviewers: ljin, sdong, rven, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D28353 --- db/compaction_job.cc | 28 +++++----------------- db/compaction_job.h | 9 +++---- db/db_impl.cc | 56 +++++++++++++++++++++++++++++++++++--------- db/db_impl.h | 33 ++++++++++++++++++++++---- db/db_test.cc | 39 ++++++++++++++++++++++++++++++ db/filename.h | 3 --- db/flush_job.cc | 17 ++------------ db/flush_job.h | 8 +++---- db/flush_job_test.cc | 9 ++++--- db/job_context.h | 8 +++++-- db/memtable_list.cc | 15 +++--------- db/memtable_list.h | 8 +++---- db/version_set.h | 3 +++ 13 files changed, 146 insertions(+), 90 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index dc472233b..04c351d77 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -205,10 +205,9 @@ CompactionJob::CompactionJob( Compaction* compaction, const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, port::Mutex* db_mutex, - std::atomic* shutting_down, FileNumToPathIdMap* pending_outputs, - LogBuffer* log_buffer, Directory* db_directory, Statistics* stats, - SnapshotList* snapshots, bool is_snapshot_supported, - std::shared_ptr table_cache, + std::atomic* shutting_down, LogBuffer* log_buffer, + Directory* db_directory, Statistics* stats, SnapshotList* snapshots, + bool is_snapshot_supported, std::shared_ptr table_cache, std::function yield_callback) : compact_(new CompactionState(compaction)), compaction_stats_(1), @@ -219,7 +218,6 @@ CompactionJob::CompactionJob( versions_(versions), db_mutex_(db_mutex), shutting_down_(shutting_down), - pending_outputs_(pending_outputs), log_buffer_(log_buffer), db_directory_(db_directory), stats_(stats), @@ -469,10 +467,6 @@ Status CompactionJob::Install(Status status) { cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), compaction_stats_); - // if there were any unused file number (mostly in case of - // compaction error), free up the entry from pending_putputs - ReleaseCompactionUnusedFileNumbers(); - if (status.ok()) { status = InstallCompactionResults(); } @@ -511,8 +505,6 @@ void CompactionJob::AllocateCompactionOutputFileNumbers() { int filesNeeded = compact_->compaction->num_input_files(1); for (int i = 0; i < std::max(filesNeeded, 1); i++) { uint64_t file_number = versions_->NewFileNumber(); - pending_outputs_->insert( - {file_number, compact_->compaction->GetOutputPathId()}); compact_->allocated_file_numbers.push_back(file_number); } } @@ -1041,14 +1033,6 @@ void CompactionJob::RecordCompactionIOStats() { IOSTATS_RESET(bytes_written); } -// Frees up unused file number. -void CompactionJob::ReleaseCompactionUnusedFileNumbers() { - db_mutex_->AssertHeld(); - for (const auto file_number : compact_->allocated_file_numbers) { - pending_outputs_->erase(file_number); - } -} - Status CompactionJob::OpenCompactionOutputFile() { assert(compact_ != nullptr); assert(compact_->builder == nullptr); @@ -1061,9 +1045,10 @@ Status CompactionJob::OpenCompactionOutputFile() { compact_->allocated_file_numbers.pop_front(); } else { db_mutex_->Lock(); + // TODO(icanadi) make Versions::next_file_number_ atomic and remove db_lock + // around here. Once we do that, AllocateCompactionOutputFileNumbers() will + // not be needed. file_number = versions_->NewFileNumber(); - pending_outputs_->insert( - {file_number, compact_->compaction->GetOutputPathId()}); db_mutex_->Unlock(); } // Make the output file @@ -1112,7 +1097,6 @@ void CompactionJob::CleanupCompaction(Status status) { } for (size_t i = 0; i < compact_->outputs.size(); i++) { const CompactionState::Output& out = compact_->outputs[i]; - pending_outputs_->erase(out.number); // If this file was inserted into the table cache then remove // them here because this compaction was not committed. diff --git a/db/compaction_job.h b/db/compaction_job.h index f090c351d..45d438156 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -57,10 +57,9 @@ class CompactionJob { const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, port::Mutex* db_mutex, std::atomic* shutting_down, - FileNumToPathIdMap* pending_outputs, LogBuffer* log_buffer, - Directory* db_directory, Statistics* stats, - SnapshotList* snapshot_list, bool is_snapshot_supported, - std::shared_ptr table_cache, + LogBuffer* log_buffer, Directory* db_directory, + Statistics* stats, SnapshotList* snapshot_list, + bool is_snapshot_supported, std::shared_ptr table_cache, std::function yield_callback); ~CompactionJob() { assert(compact_ == nullptr); } @@ -92,7 +91,6 @@ class CompactionJob { SequenceNumber in, const std::vector& snapshots, SequenceNumber* prev_snapshot); void RecordCompactionIOStats(); - void ReleaseCompactionUnusedFileNumbers(); Status OpenCompactionOutputFile(); void CleanupCompaction(Status status); @@ -115,7 +113,6 @@ class CompactionJob { VersionSet* versions_; port::Mutex* db_mutex_; std::atomic* shutting_down_; - FileNumToPathIdMap* pending_outputs_; LogBuffer* log_buffer_; Directory* db_directory_; Statistics* stats_; diff --git a/db/db_impl.cc b/db/db_impl.cc index 2bbb3345f..da0603303 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -443,8 +443,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } // don't delete live files - for (auto pair : pending_outputs_) { - job_context->sst_live.emplace_back(pair.first, pair.second, 0); + if (pending_outputs_.size()) { + job_context->min_pending_output = *pending_outputs_.begin(); + } else { + // delete all of them + job_context->min_pending_output = std::numeric_limits::max(); } versions_->AddLiveFiles(&job_context->sst_live); @@ -567,7 +570,10 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { keep = (number >= state.manifest_file_number); break; case kTableFile: - keep = (sst_live_map.find(number) != sst_live_map.end()); + // If the second condition is not there, this makes + // DontDeletePendingOutputs fail + keep = (sst_live_map.find(number) != sst_live_map.end()) || + number >= state.min_pending_output; break; case kTempFile: // Any temp files that are currently being written to must @@ -981,7 +987,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); - pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. + auto pending_outputs_inserted_elem = + CaptureCurrentFileNumberInPendingOutputs(); ReadOptions ro; ro.total_order_seek = true; Arena arena; @@ -1013,7 +1020,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); } - pending_outputs_.erase(meta.fd.GetNumber()); + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. @@ -1044,9 +1051,9 @@ Status DBImpl::FlushMemTableToOutputFile( FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, env_options_, versions_.get(), &mutex_, &shutting_down_, - &pending_outputs_, snapshots_.GetNewest(), job_context, - log_buffer, db_directory_.get(), - GetCompressionFlush(*cfd->ioptions()), stats_); + snapshots_.GetNewest(), job_context, log_buffer, + db_directory_.get(), GetCompressionFlush(*cfd->ioptions()), + stats_); Status s = flush_job.Run(); @@ -1550,6 +1557,9 @@ void DBImpl::BackgroundCallFlush() { { MutexLock l(&mutex_); + auto pending_outputs_inserted_elem = + CaptureCurrentFileNumberInPendingOutputs(); + Status s; if (!shutting_down_.load(std::memory_order_acquire)) { s = BackgroundFlush(&madeProgress, &job_context, &log_buffer); @@ -1573,6 +1583,8 @@ void DBImpl::BackgroundCallFlush() { } } + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + // If !s.ok(), this means that Flush failed. In that case, we want // to delete all obsolete files and we force FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok()); @@ -1616,6 +1628,10 @@ void DBImpl::BackgroundCallCompaction() { LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); { MutexLock l(&mutex_); + + auto pending_outputs_inserted_elem = + CaptureCurrentFileNumberInPendingOutputs(); + assert(bg_compaction_scheduled_); Status s; if (!shutting_down_.load(std::memory_order_acquire)) { @@ -1640,6 +1656,8 @@ void DBImpl::BackgroundCallCompaction() { } } + ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + // If !s.ok(), this means that Compaction failed. In that case, we want // to delete all obsolete files we might have created and we force // FindObsoleteFiles(). This is because job_context does not @@ -1848,9 +1866,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, }; CompactionJob compaction_job( c.get(), db_options_, *c->mutable_cf_options(), env_options_, - versions_.get(), &mutex_, &shutting_down_, &pending_outputs_, - log_buffer, db_directory_.get(), stats_, &snapshots_, - IsSnapshotSupported(), table_cache_, std::move(yield_callback)); + versions_.get(), &mutex_, &shutting_down_, log_buffer, + db_directory_.get(), stats_, &snapshots_, IsSnapshotSupported(), + table_cache_, std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); status = compaction_job.Run(); @@ -2968,6 +2986,22 @@ void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, } } +std::list::iterator +DBImpl::CaptureCurrentFileNumberInPendingOutputs() { + // We need to remember the iterator of our insert, because after the + // background job is done, we need to remove that element from + // pending_outputs_. + pending_outputs_.push_back(versions_->current_next_file_number()); + auto pending_outputs_inserted_elem = pending_outputs_.end(); + --pending_outputs_inserted_elem; + return pending_outputs_inserted_elem; +} + +void DBImpl::ReleaseFileNumberFromPendingOutputs( + std::list::iterator v) { + pending_outputs_.erase(v); +} + #ifndef ROCKSDB_LITE Status DBImpl::GetUpdatesSince( SequenceNumber seq, unique_ptr* iter, diff --git a/db/db_impl.h b/db/db_impl.h index 8717dee90..a25a82a9a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -265,6 +266,24 @@ class DBImpl : public DB { // Delete any unneeded files and stale in-memory entries. void DeleteObsoleteFiles(); + // Background process needs to call + // auto x = CaptureCurrentFileNumberInPendingOutputs() + // + // ReleaseFileNumberFromPendingOutputs(x) + // This will protect any temporary files created while is + // executing from being deleted. + // ----------- + // This function will capture current file number and append it to + // pending_outputs_. This will prevent any background process to delete any + // file created after this point. + std::list::iterator CaptureCurrentFileNumberInPendingOutputs(); + // This function should be called with the result of + // CaptureCurrentFileNumberInPendingOutputs(). It then marks that any file + // created between the calls CaptureCurrentFileNumberInPendingOutputs() and + // ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live + // and blocked by any other pending_outputs_ calls) + void ReleaseFileNumberFromPendingOutputs(std::list::iterator v); + // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, @@ -390,10 +409,16 @@ class DBImpl : public DB { SnapshotList snapshots_; - // Set of table files to protect from deletion because they are - // part of ongoing compactions. - // map from pending file number ID to their path IDs. - FileNumToPathIdMap pending_outputs_; + // For each background job, pending_outputs_ keeps the current file number at + // the time that background job started. + // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has + // number bigger than any of the file number in pending_outputs_. Since file + // numbers grow monotonically, this also means that pending_outputs_ is always + // sorted. After a background job is done executing, its file number is + // deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean + // it up. + // State is protected with db mutex. + std::list pending_outputs_; // At least one compaction or flush job is pending but not yet scheduled // because of the max background thread limit. diff --git a/db/db_test.cc b/db/db_test.cc index 091e373fa..ee8844c6c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -165,6 +165,8 @@ class SpecialEnv : public EnvWrapper { std::atomic non_writable_count_; + std::function* table_write_callback_; + explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301) { delay_sstable_sync_.store(false, std::memory_order_release); drop_writes_.store(false, std::memory_order_release); @@ -181,6 +183,8 @@ class SpecialEnv : public EnvWrapper { non_writeable_rate_ = 0; new_writable_count_ = 0; non_writable_count_ = 0; + periodic_non_writable_ = 0; + table_write_callback_ = nullptr; } Status NewWritableFile(const std::string& f, unique_ptr* r, @@ -196,6 +200,9 @@ class SpecialEnv : public EnvWrapper { base_(std::move(base)) { } Status Append(const Slice& data) { + if (env_->table_write_callback_) { + (*env_->table_write_callback_)(); + } if (env_->drop_writes_.load(std::memory_order_acquire)) { // Drop writes on the floor return Status::OK(); @@ -9042,6 +9049,38 @@ TEST(DBTest, DynamicMiscOptions) { assert_reseek_count(300, 1); } +TEST(DBTest, DontDeletePendingOutputs) { + Options options; + options.env = env_; + options.create_if_missing = true; + DestroyAndReopen(options); + + // Every time we write to a table file, call FOF/POF with full DB scan. This + // will make sure our pending_outputs_ protection work correctly + std::function purge_obsolete_files_function = [&]() { + JobContext job_context; + dbfull()->TEST_LockMutex(); + dbfull()->FindObsoleteFiles(&job_context, true /*force*/); + dbfull()->TEST_UnlockMutex(); + dbfull()->PurgeObsoleteFiles(job_context); + }; + + env_->table_write_callback_ = &purge_obsolete_files_function; + + for (int i = 0; i < 2; ++i) { + ASSERT_OK(Put("a", "begin")); + ASSERT_OK(Put("z", "end")); + ASSERT_OK(Flush()); + } + + // If pending output guard does not work correctly, PurgeObsoleteFiles() will + // delete the file that Compaction is trying to create, causing this: error + // db/db_test.cc:975: IO error: + // /tmp/rocksdbtest-1552237650/db_test/000009.sst: No such file or directory + Compact("a", "b"); +} + + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/filename.h b/db/filename.h index a80703074..87963ea21 100644 --- a/db/filename.h +++ b/db/filename.h @@ -36,9 +36,6 @@ enum FileType { kIdentityFile }; -// map from file number to path ID. -typedef std::unordered_map FileNumToPathIdMap; - // Return the name of the log file with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". diff --git a/db/flush_job.cc b/db/flush_job.cc index c477a5e8d..973d86033 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -55,7 +55,6 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, port::Mutex* db_mutex, std::atomic* shutting_down, - FileNumToPathIdMap* pending_outputs, SequenceNumber newest_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, CompressionType output_compression, Statistics* stats) @@ -67,7 +66,6 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, versions_(versions), db_mutex_(db_mutex), shutting_down_(shutting_down), - pending_outputs_(pending_outputs), newest_snapshot_(newest_snapshot), job_context_(job_context), log_buffer_(log_buffer), @@ -107,13 +105,12 @@ Status FlushJob::Run() { } if (!s.ok()) { - cfd_->imm()->RollbackMemtableFlush(mems, file_number, pending_outputs_); + cfd_->imm()->RollbackMemtableFlush(mems, file_number); } else { // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( cfd_, mutable_cf_options_, mems, versions_, db_mutex_, file_number, - pending_outputs_, &job_context_->memtables_to_free, db_directory_, - log_buffer_); + &job_context_->memtables_to_free, db_directory_, log_buffer_); } return s; @@ -128,7 +125,6 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); *filenumber = meta.fd.GetNumber(); // path 0 for level 0 file. - pending_outputs_->insert({meta.fd.GetNumber(), 0}); const SequenceNumber earliest_seqno_in_memtable = mems[0]->GetFirstSequenceNumber(); @@ -180,15 +176,6 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, // re-acquire the most current version base = cfd_->current(); - // There could be multiple threads writing to its own level-0 file. - // The pending_outputs cannot be cleared here, otherwise this newly - // created file might not be considered as a live-file by another - // compaction thread that is concurrently deleting obselete files. - // The pending_outputs can be cleared only after the new version is - // committed so that other threads can recognize this file as a - // valid one. - // pending_outputs_.erase(meta.number); - // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. int level = 0; diff --git a/db/flush_job.h b/db/flush_job.h index a5a40ce41..86d4aa073 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -55,10 +55,9 @@ class FlushJob { const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, port::Mutex* db_mutex, std::atomic* shutting_down, - FileNumToPathIdMap* pending_outputs, SequenceNumber newest_snapshot, - JobContext* job_context, LogBuffer* log_buffer, - Directory* db_directory, CompressionType output_compression, - Statistics* stats); + SequenceNumber newest_snapshot, JobContext* job_context, + LogBuffer* log_buffer, Directory* db_directory, + CompressionType output_compression, Statistics* stats); ~FlushJob() {} Status Run(); @@ -74,7 +73,6 @@ class FlushJob { VersionSet* versions_; port::Mutex* db_mutex_; std::atomic* shutting_down_; - FileNumToPathIdMap* pending_outputs_; SequenceNumber newest_snapshot_; JobContext* job_context_; LogBuffer* log_buffer_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 9cfe015e1..e39916bd6 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -73,7 +73,6 @@ class FlushJobTest { std::unique_ptr versions_; port::Mutex mutex_; std::atomic shutting_down_; - FileNumToPathIdMap pending_outputs_; std::shared_ptr mock_table_factory_; }; @@ -83,8 +82,8 @@ TEST(FlushJobTest, Empty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - &pending_outputs_, SequenceNumber(), &job_context, nullptr, - nullptr, kNoCompression, nullptr); + SequenceNumber(), &job_context, nullptr, nullptr, + kNoCompression, nullptr); ASSERT_OK(flush_job.Run()); } @@ -108,8 +107,8 @@ TEST(FlushJobTest, NonEmpty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - &pending_outputs_, SequenceNumber(), &job_context, nullptr, - nullptr, kNoCompression, nullptr); + SequenceNumber(), &job_context, nullptr, nullptr, + kNoCompression, nullptr); mutex_.Lock(); ASSERT_OK(flush_job.Run()); mutex_.Unlock(); diff --git a/db/job_context.h b/db/job_context.h index caf28f7d9..d73e817a6 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -58,8 +58,12 @@ struct JobContext { // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. - uint64_t manifest_file_number, pending_manifest_file_number, log_number, - prev_log_number; + uint64_t manifest_file_number; + uint64_t pending_manifest_file_number; + uint64_t log_number; + uint64_t prev_log_number; + + uint64_t min_pending_output = 0; explicit JobContext(bool create_superversion = false) { manifest_file_number = 0; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 0066a68ba..8d568e895 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -144,8 +144,7 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { } void MemTableList::RollbackMemtableFlush(const autovector& mems, - uint64_t file_number, - FileNumToPathIdMap* pending_outputs) { + uint64_t file_number) { assert(!mems.empty()); // If the flush was not successful, then just reset state. @@ -159,7 +158,6 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, m->edit_.Clear(); num_flush_not_started_++; } - pending_outputs->erase(file_number); imm_flush_needed.store(true, std::memory_order_release); } @@ -167,9 +165,8 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, const autovector& mems, VersionSet* vset, port::Mutex* mu, - uint64_t file_number, FileNumToPathIdMap* pending_outputs, - autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer) { + uint64_t file_number, autovector* to_delete, + Directory* db_directory, LogBuffer* log_buffer) { mu->AssertHeld(); // flush was sucessful @@ -220,11 +217,6 @@ Status MemTableList::InstallMemtableFlushResults( current_->Remove(m); assert(m->file_number_ > 0); - // pending_outputs can be cleared only after the newly created file - // has been written to a committed version so that other concurrently - // executing compaction threads do not mistakenly assume that this - // file is not live. - pending_outputs->erase(m->file_number_); if (m->Unref() != nullptr) { to_delete->push_back(m); } @@ -237,7 +229,6 @@ Status MemTableList::InstallMemtableFlushResults( m->flush_in_progress_ = false; m->edit_.Clear(); num_flush_not_started_++; - pending_outputs->erase(m->file_number_); m->file_number_ = 0; imm_flush_needed.store(true, std::memory_order_release); } diff --git a/db/memtable_list.h b/db/memtable_list.h index 9f499b834..6cf1737c1 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -108,16 +108,14 @@ class MemTableList { // Reset status of the given memtable list back to pending state so that // they can get picked up again on the next round of flush. void RollbackMemtableFlush(const autovector& mems, - uint64_t file_number, - FileNumToPathIdMap* pending_outputs); + uint64_t file_number); // Commit a successful flush in the manifest file Status InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, const autovector& m, VersionSet* vset, port::Mutex* mu, - uint64_t file_number, FileNumToPathIdMap* pending_outputs, - autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer); + uint64_t file_number, autovector* to_delete, + Directory* db_directory, LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/version_set.h b/db/version_set.h index 0ae6f1cfd..f9801c7c7 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -530,6 +530,9 @@ class VersionSet { return pending_manifest_file_number_; } + // REQUIRED: mutex locked + uint64_t current_next_file_number() const { return next_file_number_; } + // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_++; }