diff --git a/db/db_impl.cc b/db/db_impl.cc index cc17e743d..06e485d50 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -601,6 +601,8 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // store the current filenum, lognum, etc deletion_state.manifest_file_number = versions_->ManifestFileNumber(); + deletion_state.pending_manifest_file_number = + versions_->PendingManifestFileNumber(); deletion_state.log_number = versions_->LogNumber(); deletion_state.prev_log_number = versions_->PrevLogNumber(); @@ -651,12 +653,10 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { return; } - // Now, convert live list to an unordered set, WITHOUT mutex held; // set is slow. - std::unordered_set sst_live( - state.sst_live.begin(), state.sst_live.end() - ); + std::unordered_set sst_live(state.sst_live.begin(), + state.sst_live.end()); auto& candidate_files = state.candidate_files; candidate_files.reserve( @@ -674,19 +674,15 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { for (auto file_num : state.log_delete_files) { if (file_num > 0) { - candidate_files.push_back( - LogFileName(kDumbDbName, file_num).substr(1) - ); + candidate_files.push_back(LogFileName(kDumbDbName, file_num).substr(1)); } } // dedup state.candidate_files so we don't try to delete the same // file twice sort(candidate_files.begin(), candidate_files.end()); - candidate_files.erase( - unique(candidate_files.begin(), candidate_files.end()), - candidate_files.end() - ); + candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()), + candidate_files.end()); std::vector old_info_log_files; @@ -706,7 +702,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { break; case kDescriptorFile: // Keep my manifest file, and any newer incarnations' - // (in case there is a race that allows other incarnations) + // (can happen during manifest roll) keep = (number >= state.manifest_file_number); break; case kTableFile: @@ -714,8 +710,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { break; case kTempFile: // Any temp files that are currently being written to must - // be recorded in pending_outputs_, which is inserted into "live" - keep = (sst_live.find(number) != sst_live.end()); + // be recorded in pending_outputs_, which is inserted into "live". + // Also, SetCurrentFile creates a temp file when writing out new + // manifest, which is equal to state.pending_manifest_file_number. We + // should not delete that file + keep = (sst_live.find(number) != sst_live.end()) || + (number == state.pending_manifest_file_number); break; case kInfoLogFile: keep = true; diff --git a/db/db_impl.h b/db/db_impl.h index e42848d11..6e6dc425a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -230,10 +230,12 @@ class DBImpl : public DB { // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. - uint64_t manifest_file_number, log_number, prev_log_number; + uint64_t manifest_file_number, pending_manifest_file_number, log_number, + prev_log_number; explicit DeletionState(bool create_superversion = false) { manifest_file_number = 0; + pending_manifest_file_number = 0; log_number = 0; prev_log_number = 0; new_superversion = diff --git a/db/version_set.cc b/db/version_set.cc index 5c4043116..0ce8a7efe 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -7,8 +7,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#define __STDC_FORMAT_MACROS #include "db/version_set.h" +#include #include #include #include @@ -1435,6 +1437,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, icmp_(*cmp), next_file_number_(2), manifest_file_number_(0), // Filled by Recover() + pending_manifest_file_number_(0), last_sequence_(0), log_number_(0), prev_log_number_(0), @@ -1527,22 +1530,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. - std::string new_manifest_filename; uint64_t new_manifest_file_size = 0; Status s; - // we will need this if we are creating new manifest - uint64_t old_manifest_file_number = manifest_file_number_; - // No need to perform this check if a new Manifest is being created anyways. + assert(pending_manifest_file_number_ == 0); if (!descriptor_log_ || manifest_file_size_ > options_->max_manifest_file_size) { + pending_manifest_file_number_ = NewFileNumber(); + batch_edits.back()->SetNextFile(next_file_number_); new_descriptor_log = true; - manifest_file_number_ = NewFileNumber(); // Change manifest file no. - } - - if (new_descriptor_log) { - new_manifest_filename = DescriptorFileName(dbname_, manifest_file_number_); - edit->SetNextFile(next_file_number_); + } else { + pending_manifest_file_number_ = manifest_file_number_; } // Unlock during expensive operations. New writes cannot get here @@ -1562,10 +1560,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // This is fine because everything inside of this block is serialized -- // only one thread can be here at the same time - if (!new_manifest_filename.empty()) { + if (new_descriptor_log) { unique_ptr descriptor_file; - s = env_->NewWritableFile(new_manifest_filename, &descriptor_file, - storage_options_.AdaptForLogWrite()); + s = env_->NewWritableFile( + DescriptorFileName(dbname_, pending_manifest_file_number_), + &descriptor_file, storage_options_.AdaptForLogWrite()); if (s.ok()) { descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); s = WriteSnapshot(descriptor_log_.get()); @@ -1604,7 +1603,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, for (auto& e : batch_edits) { std::string record; e->EncodeTo(&record); - if (!ManifestContains(record)) { + if (!ManifestContains(pending_manifest_file_number_, record)) { all_records_in = false; break; } @@ -1621,17 +1620,16 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. - if (s.ok() && !new_manifest_filename.empty()) { - s = SetCurrentFile(env_, dbname_, manifest_file_number_); - if (s.ok() && old_manifest_file_number < manifest_file_number_) { + if (s.ok() && new_descriptor_log) { + s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_); + if (s.ok() && pending_manifest_file_number_ > manifest_file_number_) { // delete old manifest file Log(options_->info_log, - "Deleting manifest %lu current manifest %lu\n", - (unsigned long)old_manifest_file_number, - (unsigned long)manifest_file_number_); + "Deleting manifest %" PRIu64 " current manifest %" PRIu64 "\n", + manifest_file_number_, pending_manifest_file_number_); // we don't care about an error here, PurgeObsoleteFiles will take care // of it later - env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number)); + env_->DeleteFile(DescriptorFileName(dbname_, manifest_file_number_)); } if (!options_->disableDataSync && db_directory != nullptr) { db_directory->Fsync(); @@ -1649,6 +1647,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // Install the new version if (s.ok()) { + manifest_file_number_ = pending_manifest_file_number_; manifest_file_size_ = new_manifest_file_size; AppendVersion(v); if (max_log_number_in_batch != 0) { @@ -1656,16 +1655,17 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, log_number_ = max_log_number_in_batch; } prev_log_number_ = edit->prev_log_number_; - } else { Log(options_->info_log, "Error in committing version %lu", (unsigned long)v->GetVersionNumber()); delete v; - if (!new_manifest_filename.empty()) { + if (new_descriptor_log) { descriptor_log_.reset(); - env_->DeleteFile(new_manifest_filename); + env_->DeleteFile( + DescriptorFileName(dbname_, pending_manifest_file_number_)); } } + pending_manifest_file_number_ = 0; // wake up all the waiting writers while (true) { @@ -2103,8 +2103,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // Opens the mainfest file and reads all records // till it finds the record we are looking for. -bool VersionSet::ManifestContains(const std::string& record) const { - std::string fname = DescriptorFileName(dbname_, manifest_file_number_); +bool VersionSet::ManifestContains(uint64_t manifest_file_number, + const std::string& record) const { + std::string fname = + DescriptorFileName(dbname_, manifest_file_number); Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str()); unique_ptr file; Status s = env_->NewSequentialFile(fname, &file, storage_options_); diff --git a/db/version_set.h b/db/version_set.h index 317cfe353..19489701f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -327,6 +327,10 @@ class VersionSet { // Return the current manifest file number uint64_t ManifestFileNumber() const { return manifest_file_number_; } + uint64_t PendingManifestFileNumber() const { + return pending_manifest_file_number_; + } + // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_++; } @@ -436,7 +440,8 @@ class VersionSet { void AppendVersion(Version* v); - bool ManifestContains(const std::string& record) const; + bool ManifestContains(uint64_t manifest_file_number, + const std::string& record) const; Env* const env_; const std::string dbname_; @@ -445,6 +450,7 @@ class VersionSet { const InternalKeyComparator icmp_; uint64_t next_file_number_; uint64_t manifest_file_number_; + uint64_t pending_manifest_file_number_; std::atomic last_sequence_; uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted