From 1b224324b58b925efbd6e765dd345b4650d57fb1 Mon Sep 17 00:00:00 2001 From: Cheng Chang Date: Fri, 23 Oct 2020 22:48:00 -0700 Subject: [PATCH] Track WAL in MANIFEST: persist WALs to and recover WALs from MANIFEST (#7256) Summary: This PR makes it able to `LogAndApply` `VersionEdit`s related to WALs, and also be able to `Recover` from MANIFEST with WAL related `VersionEdit`s. The `VersionEdit`s related to WAL are treated similarly as those related to column family operations, they are not applied to versions, but can be in a commit group. Mixing WAL related `VersionEdit`s with other types of edits will make logic in `ProcessManifestWrite` more complicated, so `VersionEdit`s related to WAL can either be WAL additions or deletions, like column family add and drop. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7256 Test Plan: a set of unit tests are added in `version_set_test.cc` Reviewed By: riversand963 Differential Revision: D23123238 Pulled By: cheng-chang fbshipit-source-id: 246be2ed4744fd03fa2738aba408aaa611d0379c --- db/db_impl/db_impl_open.cc | 147 ++++++----- db/version_edit.h | 13 +- db/version_edit_handler.cc | 14 ++ db/version_edit_handler.h | 4 + db/version_edit_test.cc | 13 +- db/version_set.cc | 97 +++++-- db/version_set.h | 15 +- db/version_set_test.cc | 503 +++++++++++++++++++++++++++++++++++-- db/wal_edit.cc | 51 ++-- db/wal_edit.h | 18 +- db/wal_edit_test.cc | 57 +---- 11 files changed, 727 insertions(+), 205 deletions(-) diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index ff5c2c385..bd3770588 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -536,7 +536,7 @@ Status DBImpl::Recover( std::vector files_in_wal_dir; if (s.ok()) { - // Initial max_total_in_memory_state_ before recovery logs. Log recovery + // Initial max_total_in_memory_state_ before recovery wals. Log recovery // may check this value to decide whether to flush. max_total_in_memory_state_ = 0; for (auto cfd : *versions_->GetColumnFamilySet()) { @@ -571,7 +571,7 @@ Status DBImpl::Recover( return s; } - std::vector logs; + std::unordered_map wal_files; for (const auto& file : files_in_wal_dir) { uint64_t number; FileType type; @@ -582,21 +582,40 @@ Status DBImpl::Recover( "existing log file: ", file); } else { - logs.push_back(number); + wal_files[number] = + LogFileName(immutable_db_options_.wal_dir, number); } } } - if (logs.size() > 0) { + if (immutable_db_options_.track_and_verify_wals_in_manifest) { + // Verify WALs in MANIFEST. + s = versions_->GetWalSet().CheckWals(env_, wal_files); + } else if (!versions_->GetWalSet().GetWals().empty()) { + // Tracking is disabled, clear previously tracked WALs from MANIFEST, + // otherwise, in the future, if WAL tracking is enabled again, + // since the WALs deleted when WAL tracking is disabled are not persisted + // into MANIFEST, WAL check may fail. + VersionEdit edit; + for (const auto& wal : versions_->GetWalSet().GetWals()) { + WalNumber number = wal.first; + edit.DeleteWal(number); + } + s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_); + } + if (!s.ok()) { + return s; + } + + if (!wal_files.empty()) { if (error_if_wal_file_exists) { return Status::Corruption( "The db was opened in readonly mode with error_if_wal_file_exists" "flag but a WAL file already exists"); } else if (error_if_data_exists_in_wals) { - for (auto& log : logs) { - std::string fname = LogFileName(immutable_db_options_.wal_dir, log); + for (auto& wal_file : wal_files) { uint64_t bytes; - s = env_->GetFileSize(fname, &bytes); + s = env_->GetFileSize(wal_file.second, &bytes); if (s.ok()) { if (bytes > 0) { return Status::Corruption( @@ -608,13 +627,19 @@ Status DBImpl::Recover( } } - if (!logs.empty()) { - // Recover in the order in which the logs were generated - std::sort(logs.begin(), logs.end()); - bool corrupted_log_found = false; - s = RecoverLogFiles(logs, &next_sequence, read_only, - &corrupted_log_found); - if (corrupted_log_found && recovered_seq != nullptr) { + if (!wal_files.empty()) { + // Recover in the order in which the wals were generated + std::vector wals; + wals.reserve(wal_files.size()); + for (const auto& wal_file : wal_files) { + wals.push_back(wal_file.first); + } + std::sort(wals.begin(), wals.end()); + + bool corrupted_wal_found = false; + s = RecoverLogFiles(wals, &next_sequence, read_only, + &corrupted_wal_found); + if (corrupted_wal_found && recovered_seq != nullptr) { *recovered_seq = next_sequence; } if (!s.ok()) { @@ -767,10 +792,10 @@ Status DBImpl::InitPersistStatsColumnFamily() { return s; } -// REQUIRES: log_numbers are sorted in ascending order -Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, +// REQUIRES: wal_numbers are sorted in ascending order +Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, SequenceNumber* next_sequence, bool read_only, - bool* corrupted_log_found) { + bool* corrupted_wal_found) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -800,10 +825,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, auto stream = event_logger_.Log(); stream << "job" << job_id << "event" << "recovery_started"; - stream << "log_files"; + stream << "wal_files"; stream.StartArray(); - for (auto log_number : log_numbers) { - stream << log_number; + for (auto wal_number : wal_numbers) { + stream << wal_number; } stream.EndArray(); } @@ -826,25 +851,25 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool stop_replay_by_wal_filter = false; bool stop_replay_for_corruption = false; bool flushed = false; - uint64_t corrupted_log_number = kMaxSequenceNumber; - uint64_t min_log_number = MinLogNumberToKeep(); - for (auto log_number : log_numbers) { - if (log_number < min_log_number) { + uint64_t corrupted_wal_number = kMaxSequenceNumber; + uint64_t min_wal_number = MinLogNumberToKeep(); + for (auto wal_number : wal_numbers) { + if (wal_number < min_wal_number) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Skipping log #%" PRIu64 " since it is older than min log to keep #%" PRIu64, - log_number, min_log_number); + wal_number, min_wal_number); continue; } // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. - versions_->MarkFileNumberUsed(log_number); + versions_->MarkFileNumberUsed(wal_number); // Open the log file - std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number); + std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Recovering log #%" PRIu64 " mode %d", log_number, + "Recovering log #%" PRIu64 " mode %d", wal_number, static_cast(immutable_db_options_.wal_recovery_mode)); auto logFileDropped = [this, &fname]() { uint64_t bytes; @@ -897,7 +922,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), - &reporter, true /*checksum*/, log_number); + &reporter, true /*checksum*/, wal_number); // Determine if we should tolerate incomplete records at the tail end of the // Read all the records and add to a memtable @@ -945,7 +970,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, WalFilter::WalProcessingOption wal_processing_option = immutable_db_options_.wal_filter->LogRecordFound( - log_number, fname, batch, &new_batch, &batch_changed); + wal_number, fname, batch, &new_batch, &batch_changed); switch (wal_processing_option) { case WalFilter::WalProcessingOption::kContinueProcessing: @@ -997,7 +1022,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, " mode %d log filter %s returned " "more records (%d) than original (%d) which is not allowed. " "Aborting recovery.", - log_number, + wal_number, static_cast(immutable_db_options_.wal_recovery_mode), immutable_db_options_.wal_filter->Name(), new_count, original_count); @@ -1024,7 +1049,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), &flush_scheduler_, - &trim_history_scheduler_, true, log_number, this, + &trim_history_scheduler_, true, wal_number, this, false /* concurrent_memtable_writes */, next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); MaybeIgnoreError(&status); @@ -1044,7 +1069,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, cfd->UnrefAndTryDelete(); // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families - assert(cfd->GetLogNumber() <= log_number); + assert(cfd->GetLogNumber() <= wal_number); auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; @@ -1081,21 +1106,21 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, " seq #%" PRIu64 ". %s. This likely mean loss of synced WAL, " "thus recovery fails.", - log_number, *next_sequence, + wal_number, *next_sequence, status.ToString().c_str()); return status; } // We should ignore the error but not continue replaying status = Status::OK(); stop_replay_for_corruption = true; - corrupted_log_number = log_number; - if (corrupted_log_found != nullptr) { - *corrupted_log_found = true; + corrupted_wal_number = wal_number; + if (corrupted_wal_found != nullptr) { + *corrupted_wal_found = true; } ROCKS_LOG_INFO(immutable_db_options_.info_log, "Point in time recovered to log #%" PRIu64 " seq #%" PRIu64, - log_number, *next_sequence); + wal_number, *next_sequence); } else { assert(immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords || @@ -1121,7 +1146,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // corruption. This could during PIT recovery when the WAL is corrupted and // some (but not all) CFs are flushed // Exclude the PIT case where no log is dropped after the corruption point. - // This is to cover the case for empty logs after corrupted log, in which we + // This is to cover the case for empty wals after corrupted log, in which we // don't reset stop_replay_for_corruption. if (stop_replay_for_corruption == true && (immutable_db_options_.wal_recovery_mode == @@ -1129,7 +1154,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords)) { for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->GetLogNumber() > corrupted_log_number) { + if (cfd->GetLogNumber() > corrupted_wal_number) { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Column family inconsistency: SST file contains data" " beyond the point of corruption."); @@ -1144,16 +1169,16 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, if (!read_only) { // no need to refcount since client still doesn't have access // to the DB and can not drop column families while we iterate - auto max_log_number = log_numbers.back(); + auto max_wal_number = wal_numbers.back(); for (auto cfd : *versions_->GetColumnFamilySet()) { auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); VersionEdit* edit = &iter->second; - if (cfd->GetLogNumber() > max_log_number) { + if (cfd->GetLogNumber() > max_wal_number) { // Column family cfd has already flushed the data - // from all logs. Memtable has to be empty because - // we filter the updates based on log_number + // from all wals. Memtable has to be empty because + // we filter the updates based on wal_number // (in WriteBatch::InsertInto) assert(cfd->mem()->GetFirstSequenceNumber() == 0); assert(edit->NumEntries() == 0); @@ -1185,13 +1210,13 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // Update the log number info in the version edit corresponding to this // column family. Note that the version edits will be written to MANIFEST // together later. - // writing log_number in the manifest means that any log file - // with number strongly less than (log_number + 1) is already + // writing wal_number in the manifest means that any log file + // with number strongly less than (wal_number + 1) is already // recovered and should be ignored on next reincarnation. - // Since we already recovered max_log_number, we want all logs - // with numbers `<= max_log_number` (includes this one) to be ignored + // Since we already recovered max_wal_number, we want all wals + // with numbers `<= max_wal_number` (includes this one) to be ignored if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) { - edit->SetLogNumber(max_log_number + 1); + edit->SetLogNumber(max_wal_number + 1); } } if (status.ok()) { @@ -1199,7 +1224,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // not actually used. that is because VersionSet assumes // VersionSet::next_file_number_ always to be strictly greater than any // log number - versions_->MarkFileNumberUsed(max_log_number + 1); + versions_->MarkFileNumberUsed(max_wal_number + 1); autovector cfds; autovector cf_opts; @@ -1219,7 +1244,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } if (status.ok() && data_seen && !flushed) { - status = RestoreAliveLogFiles(log_numbers); + status = RestoreAliveLogFiles(wal_numbers); } event_logger_.Log() << "job" << job_id << "event" @@ -1228,8 +1253,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, return status; } -Status DBImpl::RestoreAliveLogFiles(const std::vector& log_numbers) { - if (log_numbers.empty()) { +Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { + if (wal_numbers.empty()) { return Status::OK(); } Status s; @@ -1242,20 +1267,20 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& log_numbers) { // FindObsoleteFiles() total_log_size_ = 0; log_empty_ = false; - for (auto log_number : log_numbers) { - LogFileNumberSize log(log_number); - std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number); - // This gets the appear size of the logs, not including preallocated space. + for (auto wal_number : wal_numbers) { + LogFileNumberSize log(wal_number); + std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number); + // This gets the appear size of the wals, not including preallocated space. s = env_->GetFileSize(fname, &log.size); if (!s.ok()) { break; } total_log_size_ += log.size; alive_log_files_.push_back(log); - // We preallocate space for logs, but then after a crash and restart, those + // We preallocate space for wals, but then after a crash and restart, those // preallocated space are not needed anymore. It is likely only the last // log has such preallocated space, so we only truncate for the last log. - if (log_number == log_numbers.back()) { + if (wal_number == wal_numbers.back()) { std::unique_ptr last_log; Status truncate_status = fs_->ReopenWritableFile( fname, @@ -1272,7 +1297,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& log_numbers) { // Not a critical error if fail to truncate. if (!truncate_status.ok()) { ROCKS_LOG_WARN(immutable_db_options_.info_log, - "Failed to truncate log #%" PRIu64 ": %s", log_number, + "Failed to truncate log #%" PRIu64 ": %s", wal_number, truncate_status.ToString().c_str()); } } @@ -1610,7 +1635,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, // In WritePrepared there could be gap in sequence numbers. This breaks // the trick we use in kPointInTimeRecovery which assumes the first seq in // the log right after the corrupted log is one larger than the last seq - // we read from the logs. To let this trick keep working, we add a dummy + // we read from the wals. To let this trick keep working, we add a dummy // entry with the expected sequence to the first log right after recovery. // In non-WritePrepared case also the new log after recovery could be // empty, and thus missing the consecutive seq hint to distinguish diff --git a/db/version_edit.h b/db/version_edit.h index d2d92c272..ff6a16c49 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -453,21 +453,26 @@ class VersionEdit { // Add a WAL (either just created or closed). void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) { + assert(NumEntries() == wal_additions_.size()); wal_additions_.emplace_back(number, std::move(metadata)); } // Retrieve all the added WALs. const WalAdditions& GetWalAdditions() const { return wal_additions_; } - bool HasWalAddition() const { return !wal_additions_.empty(); } + bool IsWalAddition() const { return !wal_additions_.empty(); } // Delete a WAL (either directly deleted or archived). - void DeleteWal(WalNumber number) { wal_deletions_.emplace_back(number); } + void DeleteWal(WalNumber number) { + assert(NumEntries() == wal_deletions_.size()); + wal_deletions_.emplace_back(number); + } - // Retrieve all the deleted WALs. const WalDeletions& GetWalDeletions() const { return wal_deletions_; } - bool HasWalDeletion() const { return !wal_deletions_.empty(); } + bool IsWalDeletion() const { return !wal_deletions_.empty(); } + + bool IsWalManipulation() const { return IsWalAddition() || IsWalDeletion(); } // Number of edits size_t NumEntries() const { diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 75fe107c5..a1ce12d1b 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -121,6 +121,10 @@ Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit, s = OnColumnFamilyAdd(edit, cfd); } else if (edit.is_column_family_drop_) { s = OnColumnFamilyDrop(edit, cfd); + } else if (edit.IsWalAddition()) { + s = OnWalAddition(edit); + } else if (edit.IsWalDeletion()) { + s = OnWalDeletion(edit); } else { s = OnNonCfOperation(edit, cfd); } @@ -190,6 +194,16 @@ Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit, return s; } +Status VersionEditHandler::OnWalAddition(VersionEdit& edit) { + assert(edit.IsWalAddition()); + return version_set_->wals_.AddWals(edit.GetWalAdditions()); +} + +Status VersionEditHandler::OnWalDeletion(VersionEdit& edit) { + assert(edit.IsWalDeletion()); + return version_set_->wals_.DeleteWals(edit.GetWalDeletions()); +} + Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit, ColumnFamilyData** cfd) { bool cf_in_not_found = false; diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index da222a8f3..4faf7d651 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -56,6 +56,10 @@ class VersionEditHandler { Status OnNonCfOperation(VersionEdit& edit, ColumnFamilyData** cfd); + Status OnWalAddition(VersionEdit& edit); + + Status OnWalDeletion(VersionEdit& edit); + Status Initialize(); void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found, diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index ea62d9a78..7236e0a2d 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -317,10 +317,6 @@ TEST_F(VersionEditTest, AddWalEncodeDecode) { if (has_size) { meta.SetSyncedSizeInBytes(rand() % 1000); } - bool is_closed = rand() % 2 == 0; - if (is_closed) { - meta.SetClosed(); - } edit.AddWal(log_number, meta); } TestEncodeDecode(edit); @@ -442,7 +438,7 @@ TEST_F(VersionEditTest, AddWalDebug) { const WalAdditions& wals = edit.GetWalAdditions(); - ASSERT_TRUE(edit.HasWalAddition()); + ASSERT_TRUE(edit.IsWalAddition()); ASSERT_EQ(wals.size(), n); for (int i = 0; i < n; i++) { const WalAddition& wal = wals[i]; @@ -454,7 +450,7 @@ TEST_F(VersionEditTest, AddWalDebug) { for (int i = 0; i < n; i++) { std::stringstream ss; ss << " WalAddition: log_number: " << kLogNumbers[i] - << " synced_size_in_bytes: " << kSizeInBytes[i] << " closed: 0\n"; + << " synced_size_in_bytes: " << kSizeInBytes[i] << "\n"; expected_str += ss.str(); } expected_str += " ColumnFamily: 0\n}\n"; @@ -464,8 +460,7 @@ TEST_F(VersionEditTest, AddWalDebug) { for (int i = 0; i < n; i++) { std::stringstream ss; ss << "{\"LogNumber\": " << kLogNumbers[i] << ", " - << "\"SyncedSizeInBytes\": " << kSizeInBytes[i] << ", " - << "\"Closed\": 0}"; + << "\"SyncedSizeInBytes\": " << kSizeInBytes[i] << "}"; if (i < n - 1) ss << ", "; expected_json += ss.str(); } @@ -492,7 +487,7 @@ TEST_F(VersionEditTest, DeleteWalDebug) { const WalDeletions& wals = edit.GetWalDeletions(); - ASSERT_TRUE(edit.HasWalDeletion()); + ASSERT_TRUE(edit.IsWalDeletion()); ASSERT_EQ(wals.size(), n); for (int i = 0; i < n; i++) { const WalDeletion& wal = wals[i]; diff --git a/db/version_set.cc b/db/version_set.cc index 6589b72e2..91fedab4d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3674,7 +3674,19 @@ struct VersionSet::ManifestWriter { cfd(_cfd), mutable_cf_options(cf_options), edit_list(e) {} + ~ManifestWriter() { status.PermitUncheckedError(); } + + bool IsAllWalEdits() const { + bool all_wal_edits = true; + for (const auto& e : edit_list) { + if (!e->IsWalManipulation()) { + all_wal_edits = false; + break; + } + } + return all_wal_edits; + } }; Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) { @@ -3826,6 +3838,7 @@ Status VersionSet::ProcessManifestWrites( std::deque& writers, InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { + mu->AssertHeld(); assert(!writers.empty()); ManifestWriter& first_writer = writers.front(); ManifestWriter* last_writer = &first_writer; @@ -3902,16 +3915,22 @@ Status VersionSet::ProcessManifestWrites( } } if (version == nullptr) { - version = new Version(last_writer->cfd, this, file_options_, - last_writer->mutable_cf_options, io_tracer_, - current_version_number_++); - versions.push_back(version); - mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options); - builder_guards.emplace_back( - new BaseReferencedVersionBuilder(last_writer->cfd)); - builder = builder_guards.back()->version_builder(); - } - assert(builder != nullptr); // make checker happy + // WAL manipulations do not need to be applied to versions. + if (!last_writer->IsAllWalEdits()) { + version = new Version(last_writer->cfd, this, file_options_, + last_writer->mutable_cf_options, io_tracer_, + current_version_number_++); + versions.push_back(version); + mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options); + builder_guards.emplace_back( + new BaseReferencedVersionBuilder(last_writer->cfd)); + builder = builder_guards.back()->version_builder(); + } + assert(last_writer->IsAllWalEdits() || builder); + assert(last_writer->IsAllWalEdits() || version); + TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion", + version); + } for (const auto& e : last_writer->edit_list) { if (e->is_in_atomic_group_) { if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ || @@ -3997,6 +4016,7 @@ Status VersionSet::ProcessManifestWrites( // reads its content after releasing db mutex to avoid race with // SwitchMemtable(). std::unordered_map curr_state; + VersionEdit wal_additions; if (new_descriptor_log) { pending_manifest_file_number_ = NewFileNumber(); batch_edits.back()->SetNextFile(next_file_number_.load()); @@ -4011,6 +4031,10 @@ Status VersionSet::ProcessManifestWrites( assert(curr_state.find(cfd->GetID()) == curr_state.end()); curr_state[cfd->GetID()] = {cfd->GetLogNumber()}; } + + for (const auto& wal : wals_.GetWals()) { + wal_additions.AddWal(wal.first, wal.second); + } } uint64_t new_manifest_file_size = 0; @@ -4063,8 +4087,8 @@ Status VersionSet::ProcessManifestWrites( io_tracer_, nullptr, db_options_->listeners)); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); - s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(), - io_s); + s = WriteCurrentStateToManifest(curr_state, wal_additions, + descriptor_log_.get(), io_s); } else { s = io_s; } @@ -4145,6 +4169,20 @@ Status VersionSet::ProcessManifestWrites( mu->Lock(); } + if (s.ok()) { + // Apply WAL edits, DB mutex must be held. + for (auto& e : batch_edits) { + if (e->IsWalAddition()) { + s = wals_.AddWals(e->GetWalAdditions()); + } else if (e->IsWalDeletion()) { + s = wals_.DeleteWals(e->GetWalDeletions()); + } + if (!s.ok()) { + break; + } + } + } + if (!io_s.ok()) { if (io_status_.ok()) { io_status_ = io_s; @@ -4392,9 +4430,11 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ : last_sequence_); - Status s = builder->Apply(edit); - - return s; + // The builder can be nullptr only if edit is WAL manipulation, + // because WAL edits do not need to be applied to versions, + // we return Status::OK() in this case. + assert(builder || edit->IsWalManipulation()); + return builder ? builder->Apply(edit) : Status::OK(); } Status VersionSet::ApplyOneVersionEditToBuilder( @@ -4468,6 +4508,16 @@ Status VersionSet::ApplyOneVersionEditToBuilder( return Status::Corruption( "Manifest - dropping non-existing column family"); } + } else if (edit.IsWalAddition()) { + Status s = wals_.AddWals(edit.GetWalAdditions()); + if (!s.ok()) { + return s; + } + } else if (edit.IsWalDeletion()) { + Status s = wals_.DeleteWals(edit.GetWalDeletions()); + if (!s.ok()) { + return s; + } } else if (!cf_in_not_found) { if (!cf_in_builders) { return Status::Corruption( @@ -5412,7 +5462,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { Status VersionSet::WriteCurrentStateToManifest( const std::unordered_map& curr_state, - log::Writer* log, IOStatus& io_s) { + const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) { // TODO: Break up into multiple records to reduce memory usage on recovery? // WARNING: This method doesn't hold a mutex!! @@ -5437,6 +5487,21 @@ Status VersionSet::WriteCurrentStateToManifest( } } + // Save WALs. + if (!wal_additions.GetWalAdditions().empty()) { + TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal", + const_cast(&wal_additions)); + std::string record; + if (!wal_additions.EncodeTo(&record)) { + return Status::Corruption("Unable to Encode VersionEdit: " + + wal_additions.DebugString(true)); + } + io_s = log->AddRecord(record); + if (!io_s.ok()) { + return io_s; + } + } + for (auto cfd : *column_family_set_) { assert(cfd); diff --git a/db/version_set.h b/db/version_set.h index eacfca44e..8bc9b3ef6 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -917,6 +917,17 @@ class VersionSet { virtual ~VersionSet(); + Status LogAndApplyToDefaultColumnFamily( + VersionEdit* edit, InstrumentedMutex* mu, + FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, + const ColumnFamilyOptions* column_family_options = nullptr) { + ColumnFamilyData* default_cf = GetColumnFamilySet()->GetDefault(); + const MutableCFOptions* cf_options = + default_cf->GetLatestMutableCFOptions(); + return LogAndApply(default_cf, *cf_options, edit, mu, db_directory, + new_descriptor_log, column_family_options); + } + // Apply *edit to the current version to form a new descriptor that // is both saved to persistent state and installed as the new // current version. Will release *mu while actually writing to the file. @@ -1187,6 +1198,7 @@ class VersionSet { // Get the IO Status returned by written Manifest. const IOStatus& io_status() const { return io_status_; } + // The returned WalSet needs to be accessed with DB mutex held. const WalSet& GetWalSet() const { return wals_; } void TEST_CreateAndAppendVersion(ColumnFamilyData* cfd) { @@ -1242,7 +1254,7 @@ class VersionSet { // Save current contents to *log Status WriteCurrentStateToManifest( const std::unordered_map& curr_state, - log::Writer* log, IOStatus& io_s); + const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s); void AppendVersion(ColumnFamilyData* column_family_data, Version* v); @@ -1275,6 +1287,7 @@ class VersionSet { Status VerifyFileMetadata(const std::string& fpath, const FileMetaData& meta) const; + // Protected by DB mutex. WalSet wals_; std::unique_ptr column_family_set_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 8f2134dce..d91216751 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -816,18 +816,17 @@ class VersionSetTestBase { // Create DB with 3 column families. void NewDB() { - std::vector column_families; SequenceNumber last_seqno; std::unique_ptr log_writer; SetIdentityFile(env_, dbname_); - PrepareManifest(&column_families, &last_seqno, &log_writer); + PrepareManifest(&column_families_, &last_seqno, &log_writer); log_writer.reset(); // Make "CURRENT" file point to the new manifest file. Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); ASSERT_OK(s); - EXPECT_OK(versions_->Recover(column_families, false)); - EXPECT_EQ(column_families.size(), + EXPECT_OK(versions_->Recover(column_families_, false)); + EXPECT_EQ(column_families_.size(), versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); } @@ -840,6 +839,40 @@ class VersionSetTestBase { ASSERT_EQ(1, manifest_file_number); } + Status LogAndApplyToDefaultCF(VersionEdit& edit) { + mutex_.Lock(); + Status s = + versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, &edit, &mutex_); + mutex_.Unlock(); + return s; + } + + Status LogAndApplyToDefaultCF( + const autovector>& edits) { + autovector vedits; + for (auto& e : edits) { + vedits.push_back(e.get()); + } + mutex_.Lock(); + Status s = + versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options_, vedits, &mutex_); + mutex_.Unlock(); + return s; + } + + void CreateNewManifest() { + constexpr FSDirectory* db_directory = nullptr; + constexpr bool new_descriptor_log = true; + mutex_.Lock(); + VersionEdit dummy; + ASSERT_OK(versions_->LogAndApply( + versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_, + &dummy, &mutex_, db_directory, new_descriptor_log)); + mutex_.Unlock(); + } + MockEnv* mem_env_; Env* env_; std::shared_ptr env_guard_; @@ -859,6 +892,7 @@ class VersionSetTestBase { InstrumentedMutex mutex_; std::atomic shutting_down_; std::shared_ptr mock_table_factory_; + std::vector column_families_; }; const std::string VersionSetTestBase::kColumnFamilyName1 = "alice"; @@ -979,17 +1013,8 @@ TEST_F(VersionSetTest, PersistBlobFileStateInNewManifest) { [&](void* /* arg */) { ++garbage_encoded; }); SyncPoint::GetInstance()->EnableProcessing(); - VersionEdit dummy; - - mutex_.Lock(); - constexpr FSDirectory* db_directory = nullptr; - constexpr bool new_descriptor_log = true; - Status s = versions_->LogAndApply( - versions_->GetColumnFamilySet()->GetDefault(), mutable_cf_options_, - &dummy, &mutex_, db_directory, new_descriptor_log); - mutex_.Unlock(); + CreateNewManifest(); - ASSERT_OK(s); ASSERT_EQ(addition_encoded, 2); ASSERT_EQ(garbage_encoded, 1); @@ -1158,6 +1183,456 @@ TEST_F(VersionSetTest, ObsoleteBlobFile) { } } +TEST_F(VersionSetTest, WalEditsNotAppliedToVersion) { + NewDB(); + + constexpr uint64_t kNumWals = 5; + + autovector> edits; + // Add some WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + // WAL's size equals its log number. + edits.back()->AddWal(i, WalMetadata(i)); + } + // Delete the first half of the WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWal(i); + } + + autovector versions; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:NewVersion", + [&](void* arg) { versions.push_back(reinterpret_cast(arg)); }); + SyncPoint::GetInstance()->EnableProcessing(); + + LogAndApplyToDefaultCF(edits); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Since the edits are all WAL edits, no version should be created. + ASSERT_EQ(versions.size(), 1); + ASSERT_EQ(versions[0], nullptr); +} + +// Similar to WalEditsNotAppliedToVersion, but contains a non-WAL edit. +TEST_F(VersionSetTest, NonWalEditsAppliedToVersion) { + NewDB(); + + const std::string kDBId = "db_db"; + constexpr uint64_t kNumWals = 5; + + autovector> edits; + // Add some WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + // WAL's size equals its log number. + edits.back()->AddWal(i, WalMetadata(i)); + } + // Delete the first half of the WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWal(i); + } + edits.emplace_back(new VersionEdit); + edits.back()->SetDBId(kDBId); + + autovector versions; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:NewVersion", + [&](void* arg) { versions.push_back(reinterpret_cast(arg)); }); + SyncPoint::GetInstance()->EnableProcessing(); + + LogAndApplyToDefaultCF(edits); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Since the edits are all WAL edits, no version should be created. + ASSERT_EQ(versions.size(), 1); + ASSERT_NE(versions[0], nullptr); +} + +TEST_F(VersionSetTest, WalAddition) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + + // A WAL is just created. + { + VersionEdit edit; + edit.AddWal(kLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize()); + } + + // The WAL is synced for several times before closing. + { + for (uint64_t size_delta = 100; size_delta > 0; size_delta /= 2) { + uint64_t size = kSizeInBytes - size_delta; + WalMetadata wal(size); + VersionEdit edit; + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), size); + } + } + + // The WAL is closed. + { + WalMetadata wal(kSizeInBytes); + VersionEdit edit; + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes); + } + + // Recover a new VersionSet. + { + std::unique_ptr new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSizeInBytes); + } +} + +TEST_F(VersionSetTest, WalCloseWithoutSync) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + constexpr uint64_t kSyncedSizeInBytes = kSizeInBytes / 2; + + // A WAL is just created. + { + VersionEdit edit; + edit.AddWal(kLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kLogNumber).HasSyncedSize()); + } + + // The WAL is synced before closing. + { + WalMetadata wal(kSyncedSizeInBytes); + VersionEdit edit; + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes); + } + + // A new WAL with larger log number is created, + // implicitly marking the current WAL closed. + { + VersionEdit edit; + edit.AddWal(kLogNumber + 1); + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 2); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes); + ASSERT_TRUE(wals.find(kLogNumber + 1) != wals.end()); + ASSERT_FALSE(wals.at(kLogNumber + 1).HasSyncedSize()); + } + + // Recover a new VersionSet. + { + std::unique_ptr new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 2); + ASSERT_TRUE(wals.find(kLogNumber) != wals.end()); + ASSERT_TRUE(wals.at(kLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kLogNumber).GetSyncedSizeInBytes(), kSyncedSizeInBytes); + } +} + +TEST_F(VersionSetTest, WalDeletion) { + NewDB(); + + constexpr WalNumber kClosedLogNumber = 10; + constexpr WalNumber kNonClosedLogNumber = 20; + constexpr uint64_t kSizeInBytes = 111; + + // Add a non-closed and a closed WAL. + { + VersionEdit edit; + edit.AddWal(kClosedLogNumber, WalMetadata(kSizeInBytes)); + edit.AddWal(kNonClosedLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 2); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_TRUE(wals.find(kClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + ASSERT_TRUE(wals.at(kClosedLogNumber).HasSyncedSize()); + ASSERT_EQ(wals.at(kClosedLogNumber).GetSyncedSizeInBytes(), kSizeInBytes); + } + + // Delete the closed WAL. + { + VersionEdit edit; + edit.DeleteWal(kClosedLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + const auto& wals = versions_->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + } + + // Recover a new VersionSet, only the non-closed WAL should show up. + { + std::unique_ptr new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + } + + // Force the creation of a new MANIFEST file, + // only the non-closed WAL should be written to the new MANIFEST. + { + std::vector wal_additions; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::WriteCurrentStateToManifest:SaveWal", [&](void* arg) { + VersionEdit* edit = reinterpret_cast(arg); + ASSERT_TRUE(edit->IsWalAddition()); + for (auto& addition : edit->GetWalAdditions()) { + wal_additions.push_back(addition); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateNewManifest(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_EQ(wal_additions.size(), 1); + ASSERT_EQ(wal_additions[0].GetLogNumber(), kNonClosedLogNumber); + ASSERT_FALSE(wal_additions[0].GetMetadata().HasSyncedSize()); + } + + // Recover from the new MANIFEST, only the non-closed WAL should show up. + { + std::unique_ptr new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + ASSERT_OK(new_versions->Recover(column_families_, false)); + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNonClosedLogNumber) != wals.end()); + ASSERT_FALSE(wals.at(kNonClosedLogNumber).HasSyncedSize()); + } +} + +TEST_F(VersionSetTest, WalCreateTwice) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + + VersionEdit edit; + edit.AddWal(kLogNumber); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + + Status s = LogAndApplyToDefaultCF(edit); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") != + std::string::npos) + << s.ToString(); +} + +TEST_F(VersionSetTest, WalCreateAfterClose) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add a closed WAL. + VersionEdit edit; + edit.AddWal(kLogNumber); + WalMetadata wal(kSizeInBytes); + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + { + // Create the same WAL again. + VersionEdit edit; + edit.AddWal(kLogNumber); + + Status s = LogAndApplyToDefaultCF(edit); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 10 is created more than once") != + std::string::npos) + << s.ToString(); + } +} + +TEST_F(VersionSetTest, AddWalWithSmallerSize) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add a closed WAL. + VersionEdit edit; + WalMetadata wal(kSizeInBytes); + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + { + // Add the same WAL with smaller synced size. + VersionEdit edit; + WalMetadata wal(kSizeInBytes / 2); + edit.AddWal(kLogNumber, wal); + + Status s = LogAndApplyToDefaultCF(edit); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE( + s.ToString().find( + "WAL 10 must not have smaller synced size than previous one") != + std::string::npos) + << s.ToString(); + } +} + +TEST_F(VersionSetTest, DeleteNonExistingWal) { + NewDB(); + + constexpr WalNumber kLogNumber = 10; + constexpr WalNumber kNonExistingNumber = 11; + constexpr uint64_t kSizeInBytes = 111; + + { + // Add a closed WAL. + VersionEdit edit; + WalMetadata wal(kSizeInBytes); + edit.AddWal(kLogNumber, wal); + + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + } + + { + // Delete a non-existing WAL. + VersionEdit edit; + edit.DeleteWal(kNonExistingNumber); + + Status s = LogAndApplyToDefaultCF(edit); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(s.ToString().find("WAL 11 must exist before deletion") != + std::string::npos) + << s.ToString(); + } +} + +TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { + NewDB(); + + constexpr int kAtomicGroupSize = 10; + constexpr uint64_t kNumWals = 5; + const std::string kDBId = "db_db"; + + int remaining = kAtomicGroupSize; + autovector> edits; + // Add 5 WALs. + for (uint64_t i = 1; i <= kNumWals; i++) { + edits.emplace_back(new VersionEdit); + // WAL's size equals its log number. + edits.back()->AddWal(i, WalMetadata(i)); + edits.back()->MarkAtomicGroup(--remaining); + } + // One edit with the min log number set. + edits.emplace_back(new VersionEdit); + edits.back()->SetDBId(kDBId); + edits.back()->MarkAtomicGroup(--remaining); + // Delete the first added 4 WALs. + for (uint64_t i = 1; i < kNumWals; i++) { + edits.emplace_back(new VersionEdit); + edits.back()->DeleteWal(i); + edits.back()->MarkAtomicGroup(--remaining); + } + ASSERT_EQ(remaining, 0); + + Status s = LogAndApplyToDefaultCF(edits); + + // Recover a new VersionSet, the min log number and the last WAL should be + // kept. + { + std::unique_ptr new_versions( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + std::string db_id; + ASSERT_OK( + new_versions->Recover(column_families_, /*read_only=*/false, &db_id)); + + ASSERT_EQ(db_id, kDBId); + + const auto& wals = new_versions->GetWalSet().GetWals(); + ASSERT_EQ(wals.size(), 1); + ASSERT_TRUE(wals.find(kNumWals) != wals.end()); + ASSERT_TRUE(wals.at(kNumWals).HasSyncedSize()); + ASSERT_EQ(wals.at(kNumWals).GetSyncedSizeInBytes(), kNumWals); + } +} + class VersionSetAtomicGroupTest : public VersionSetTestBase, public testing::Test { public: diff --git a/db/wal_edit.cc b/db/wal_edit.cc index f19395344..70965a770 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -19,10 +19,6 @@ void WalAddition::EncodeTo(std::string* dst) const { PutVarint64(dst, metadata_.GetSyncedSizeInBytes()); } - if (metadata_.IsClosed()) { - PutVarint32(dst, static_cast(WalAdditionTag::kClosed)); - } - PutVarint32(dst, static_cast(WalAdditionTag::kTerminate)); } @@ -48,10 +44,6 @@ Status WalAddition::DecodeFrom(Slice* src) { metadata_.SetSyncedSizeInBytes(size); break; } - case WalAdditionTag::kClosed: { - metadata_.SetClosed(); - break; - } // TODO: process future tags such as checksum. case WalAdditionTag::kTerminate: return Status::OK(); @@ -66,15 +58,13 @@ Status WalAddition::DecodeFrom(Slice* src) { JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) { jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes" - << wal.GetMetadata().GetSyncedSizeInBytes() << "Closed" - << wal.GetMetadata().IsClosed(); + << wal.GetMetadata().GetSyncedSizeInBytes(); return jw; } std::ostream& operator<<(std::ostream& os, const WalAddition& wal) { os << "log_number: " << wal.GetLogNumber() - << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes() - << " closed: " << wal.GetMetadata().IsClosed(); + << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes(); return os; } @@ -117,31 +107,23 @@ std::string WalDeletion::DebugString() const { Status WalSet::AddWal(const WalAddition& wal) { auto it = wals_.lower_bound(wal.GetLogNumber()); bool existing = it != wals_.end() && it->first == wal.GetLogNumber(); - if (wal.GetMetadata().IsClosed()) { - // The WAL must exist and not closed. - if (!existing) { - std::stringstream ss; - ss << "WAL " << wal.GetLogNumber() << " is not created before closing"; - return Status::Corruption("WalSet", ss.str()); - } - if (it->second.IsClosed()) { - std::stringstream ss; - ss << "WAL " << wal.GetLogNumber() << " is closed more than once"; - return Status::Corruption("WalSet", ss.str()); - } + if (existing && !wal.GetMetadata().HasSyncedSize()) { + std::stringstream ss; + ss << "WAL " << wal.GetLogNumber() << " is created more than once"; + return Status::Corruption("WalSet", ss.str()); } // If the WAL has synced size, it must >= the previous size. - if (existing && it->second.HasSyncedSize() && - (!wal.GetMetadata().HasSyncedSize() || - wal.GetMetadata().GetSyncedSizeInBytes() < - it->second.GetSyncedSizeInBytes())) { + if (wal.GetMetadata().HasSyncedSize() && existing && + it->second.HasSyncedSize() && + wal.GetMetadata().GetSyncedSizeInBytes() < + it->second.GetSyncedSizeInBytes()) { std::stringstream ss; ss << "WAL " << wal.GetLogNumber() << " must not have smaller synced size than previous one"; return Status::Corruption("WalSet", ss.str()); } if (existing) { - it->second = wal.GetMetadata(); + it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes()); } else { wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()}); } @@ -161,17 +143,12 @@ Status WalSet::AddWals(const WalAdditions& wals) { Status WalSet::DeleteWal(const WalDeletion& wal) { auto it = wals_.find(wal.GetLogNumber()); - // The WAL must exist and has been closed. + // The WAL must exist. if (it == wals_.end()) { std::stringstream ss; ss << "WAL " << wal.GetLogNumber() << " must exist before deletion"; return Status::Corruption("WalSet", ss.str()); } - if (!it->second.IsClosed()) { - std::stringstream ss; - ss << "WAL " << wal.GetLogNumber() << " must be closed before deletion"; - return Status::Corruption("WalSet", ss.str()); - } wals_.erase(it); return Status::OK(); } @@ -187,6 +164,10 @@ Status WalSet::DeleteWals(const WalDeletions& wals) { return s; } +void WalSet::DeleteWalsBefore(WalNumber number) { + wals_.erase(wals_.begin(), wals_.lower_bound(number)); +} + void WalSet::Reset() { wals_.clear(); } Status WalSet::CheckWals( diff --git a/db/wal_edit.h b/db/wal_edit.h index 9493c3648..78510a2e0 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -35,10 +35,6 @@ class WalMetadata { explicit WalMetadata(uint64_t synced_size_bytes) : synced_size_bytes_(synced_size_bytes) {} - bool IsClosed() const { return closed_; } - - void SetClosed() { closed_ = true; } - bool HasSyncedSize() const { return synced_size_bytes_ != kUnknownWalSize; } void SetSyncedSizeInBytes(uint64_t bytes) { synced_size_bytes_ = bytes; } @@ -52,9 +48,6 @@ class WalMetadata { // Size of the most recently synced WAL in bytes. uint64_t synced_size_bytes_ = kUnknownWalSize; - - // Whether the WAL is closed. - bool closed_ = false; }; // These tags are persisted to MANIFEST, so it's part of the user API. @@ -63,8 +56,6 @@ enum class WalAdditionTag : uint32_t { kTerminate = 1, // Synced Size in bytes. kSyncedSize = 2, - // Whether the WAL is closed. - kClosed = 3, // Add tags in the future, such as checksum? }; @@ -98,10 +89,10 @@ JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal); using WalAdditions = std::vector; -// Records the event of deleting/archiving a WAL in VersionEdit. +// Records the event of deleting a WAL. class WalDeletion { public: - WalDeletion() : number_(0) {} + WalDeletion() : number_(kEmpty) {} explicit WalDeletion(WalNumber number) : number_(number) {} @@ -114,6 +105,8 @@ class WalDeletion { std::string DebugString() const; private: + static constexpr WalNumber kEmpty = 0; + WalNumber number_; }; @@ -146,6 +139,9 @@ class WalSet { Status DeleteWal(const WalDeletion& wal); Status DeleteWals(const WalDeletions& wals); + // Delete WALs with log number < wal_number. + void DeleteWalsBefore(WalNumber wal_number); + // Resets the internal state. void Reset(); diff --git a/db/wal_edit_test.cc b/db/wal_edit_test.cc index b6eb347a3..cdfe1b9cf 100644 --- a/db/wal_edit_test.cc +++ b/db/wal_edit_test.cc @@ -24,14 +24,6 @@ TEST(WalSet, AddDeleteReset) { } ASSERT_EQ(wals.GetWals().size(), 10); - // Close WAL 1 - 5. - for (WalNumber log_number = 1; log_number <= 5; log_number++) { - WalMetadata wal(100); - wal.SetClosed(); - wals.AddWal(WalAddition(log_number, wal)); - } - ASSERT_EQ(wals.GetWals().size(), 10); - // Delete WAL 1 - 5. for (WalNumber log_number = 1; log_number <= 5; log_number++) { wals.DeleteWal(WalDeletion(log_number)); @@ -72,46 +64,14 @@ TEST(WalSet, SmallerSyncedSize) { std::string::npos); } -TEST(WalSet, CloseTwice) { - constexpr WalNumber kNumber = 100; - constexpr uint64_t kBytes = 200; - WalSet wals; - ASSERT_OK(wals.AddWal(WalAddition(kNumber))); - WalMetadata wal(kBytes); - wal.SetClosed(); - ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal))); - Status s = wals.AddWal(WalAddition(kNumber, wal)); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE(s.ToString().find("WAL 100 is closed more than once") != - std::string::npos); -} - -TEST(WalSet, CloseBeforeCreate) { - constexpr WalNumber kNumber = 100; - constexpr uint64_t kBytes = 200; - WalSet wals; - WalMetadata wal(kBytes); - wal.SetClosed(); - Status s = wals.AddWal(WalAddition(kNumber, wal)); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE(s.ToString().find("WAL 100 is not created before closing") != - std::string::npos); -} - -TEST(WalSet, CreateAfterClose) { +TEST(WalSet, CreateTwice) { constexpr WalNumber kNumber = 100; - constexpr uint64_t kBytes = 200; WalSet wals; ASSERT_OK(wals.AddWal(WalAddition(kNumber))); - WalMetadata wal(kBytes); - wal.SetClosed(); - ASSERT_OK(wals.AddWal(WalAddition(kNumber, wal))); Status s = wals.AddWal(WalAddition(kNumber)); ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE( - s.ToString().find( - "WAL 100 must not have smaller synced size than previous one") != - std::string::npos); + ASSERT_TRUE(s.ToString().find("WAL 100 is created more than once") != + std::string::npos); } TEST(WalSet, DeleteNonExistingWal) { @@ -123,16 +83,6 @@ TEST(WalSet, DeleteNonExistingWal) { std::string::npos); } -TEST(WalSet, DeleteNonClosedWal) { - constexpr WalNumber kNonClosedWalNumber = 100; - WalSet wals; - ASSERT_OK(wals.AddWal(WalAddition(kNonClosedWalNumber))); - Status s = wals.DeleteWal(WalDeletion(kNonClosedWalNumber)); - ASSERT_TRUE(s.IsCorruption()); - ASSERT_TRUE(s.ToString().find("WAL 100 must be closed before deletion") != - std::string::npos); -} - class WalSetTest : public DBTestBase { public: WalSetTest() : DBTestBase("WalSetTest", /* env_do_fsync */ true) {} @@ -165,7 +115,6 @@ class WalSetTest : public DBTestBase { ASSERT_OK(wals_.AddWal(WalAddition(number))); // Close WAL. WalMetadata wal(size_bytes); - wal.SetClosed(); ASSERT_OK(wals_.AddWal(WalAddition(number, wal))); }