From 64517d184a33463ea319d61d976c344ffd05756a Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 10 Mar 2021 10:58:07 -0800 Subject: [PATCH] Make secondary instance use ManifestTailer (#7998) Summary: This PR - adds a class `ManifestTailer` that inherits from `VersionEditHandlerPointInTime`. `ManifestTailer::Iterate()` can be called multiple times to tail the primary instance's MANIFEST and apply the changes to the secondary, - updates the implementation of `ReactiveVersionSet::ReadAndApply` to use this class, - removes unused code in version_set.cc, - updates existing tests, e.g. removing deleted sync points from unit tests, - adds a new test to address the bug in https://github.com/facebook/rocksdb/issues/7815. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7998 Test Plan: make check Existing and newly-added tests in version_set_test.cc and db_secondary_test.cc Reviewed By: jay-zhuang Differential Revision: D26926641 Pulled By: riversand963 fbshipit-source-id: 8d4dd15db0ba863c213f743e33b5a207e948c980 --- db/column_family.cc | 2 +- db/db_impl/db_impl_secondary.cc | 3 +- db/db_impl/db_secondary_test.cc | 29 +- db/version_edit_handler.cc | 124 ++++++- db/version_edit_handler.h | 73 +++- db/version_set.cc | 575 ++------------------------------ db/version_set.h | 46 +-- db/version_set_test.cc | 39 +-- 8 files changed, 235 insertions(+), 656 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 09543f2ba..2f3613144 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -633,7 +633,7 @@ ColumnFamilyData::~ColumnFamilyData() { if (dummy_versions_ != nullptr) { // List must be empty - assert(dummy_versions_->TEST_Next() == dummy_versions_); + assert(dummy_versions_->Next() == dummy_versions_); bool deleted __attribute__((__unused__)); deleted = dummy_versions_->Unref(); assert(deleted); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index e5669f9c4..e9bd19004 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -519,7 +519,8 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { { InstrumentedMutexLock lock_guard(&mutex_); s = static_cast_with_check(versions_.get()) - ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); + ->ReadAndApply(&mutex_, &manifest_reader_, + manifest_reader_status_.get(), &cfds_changed); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, static_cast(versions_->LastSequence())); diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index 281cf76be..49917d3c2 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -459,20 +459,6 @@ TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) { } TEST_F(DBSecondaryTest, MissingTableFile) { - int table_files_not_exist = 0; - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - SyncPoint::GetInstance()->SetCallBack( - "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers", - [&](void* arg) { - Status s = *reinterpret_cast(arg); - if (s.IsPathNotFound()) { - ++table_files_not_exist; - } else if (!s.ok()) { - assert(false); // Should not reach here - } - }); - SyncPoint::GetInstance()->EnableProcessing(); Options options; options.env = env_; options.level0_file_num_compaction_trigger = 4; @@ -499,7 +485,6 @@ TEST_F(DBSecondaryTest, MissingTableFile) { ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value)); ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); - ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist); ASSERT_OK(db_secondary_->Get(ropts, "foo", &value)); ASSERT_EQ("foo_value" + std::to_string(options.level0_file_num_compaction_trigger - 1), @@ -615,10 +600,7 @@ TEST_F(DBSecondaryTest, SwitchManifest) { range_scan_db(); } -// Here, "Snapshot" refers to the version edits written by -// VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after -// switching from the old one. -TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) { +TEST_F(DBSecondaryTest, SwitchManifestTwice) { Options options; options.env = env_; options.disable_auto_compactions = true; @@ -640,10 +622,15 @@ TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) { Reopen(options); ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + Reopen(options); + ASSERT_OK(Put("0", "value1")); ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + + ASSERT_OK(db_secondary_->Get(ropts, "0", &value)); + ASSERT_EQ("value1", value); } -TEST_F(DBSecondaryTest, SwitchWAL) { +TEST_F(DBSecondaryTest, DISABLED_SwitchWAL) { const int kNumKeysPerMemtable = 1; Options options; options.env = env_; @@ -692,7 +679,7 @@ TEST_F(DBSecondaryTest, SwitchWAL) { } } -TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { +TEST_F(DBSecondaryTest, DISABLED_SwitchWALMultiColumnFamilies) { const int kNumKeysPerMemtable = 1; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index 0aa726658..7e7921209 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -62,8 +62,6 @@ void VersionEditHandlerBase::Iterate(log::Reader& reader, s = *log_read_status; } - read_buffer_.Clear(); - CheckIterationResult(reader, &s); if (!s.ok()) { @@ -129,13 +127,13 @@ Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit, } VersionEditHandler::VersionEditHandler( - bool read_only, const std::vector& column_families, + bool read_only, std::vector column_families, VersionSet* version_set, bool track_missing_files, bool no_error_if_table_files_missing, const std::shared_ptr& io_tracer, bool skip_load_table_files) : VersionEditHandlerBase(), read_only_(read_only), - column_families_(column_families), + column_families_(std::move(column_families)), version_set_(version_set), track_missing_files_(track_missing_files), no_error_if_table_files_missing_(no_error_if_table_files_missing), @@ -351,7 +349,8 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, } // There were some column families in the MANIFEST that weren't specified // in the argument. This is OK in read_only mode - if (s->ok() && !read_only_ && !column_families_not_found_.empty()) { + if (s->ok() && MustOpenAllColumnFamilies() && + !column_families_not_found_.empty()) { std::string msg; for (const auto& cf : column_families_not_found_) { msg.append(", "); @@ -368,6 +367,9 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader, version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_); version_set_->MarkFileNumberUsed(version_edit_params_.log_number_); for (auto* cfd : *(version_set_->GetColumnFamilySet())) { + if (cfd->IsDropped()) { + continue; + } auto builder_iter = builders_.find(cfd->GetID()); assert(builder_iter != builders_.end()); auto* builder = builder_iter->second->version_builder(); @@ -452,11 +454,9 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup( ColumnFamilyData* ret = version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_); assert(ret != nullptr); - if (ret->UnrefAndTryDelete()) { - ret = nullptr; - } else { - assert(false); - } + ret->SetDropped(); + ret->UnrefAndTryDelete(); + ret = nullptr; return ret; } @@ -572,7 +572,7 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, } VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( - bool read_only, const std::vector& column_families, + bool read_only, std::vector column_families, VersionSet* version_set, const std::shared_ptr& io_tracer) : VersionEditHandler(read_only, column_families, version_set, /*track_missing_files=*/true, @@ -641,7 +641,7 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( uint64_t file_num = fd.GetNumber(); const std::string fpath = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num); - s = version_set_->VerifyFileMetadata(fpath, meta); + s = VerifyFile(fpath, meta); if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) { missing_files.insert(file_num); s = Status::OK(); @@ -682,6 +682,106 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion( return s; } +Status VersionEditHandlerPointInTime::VerifyFile(const std::string& fpath, + const FileMetaData& fmeta) { + return version_set_->VerifyFileMetadata(fpath, fmeta); +} + +Status ManifestTailer::Initialize() { + if (Mode::kRecovery == mode_) { + return VersionEditHandler::Initialize(); + } + assert(Mode::kCatchUp == mode_); + Status s; + if (!initialized_) { + ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet(); + assert(cfd_set); + ColumnFamilyData* default_cfd = cfd_set->GetDefault(); + assert(default_cfd); + auto builder_iter = builders_.find(default_cfd->GetID()); + assert(builder_iter != builders_.end()); + + Version* dummy_version = default_cfd->dummy_versions(); + assert(dummy_version); + Version* base_version = dummy_version->Next(); + assert(base_version); + base_version->Ref(); + VersionBuilderUPtr new_builder( + new BaseReferencedVersionBuilder(default_cfd, base_version)); + builder_iter->second = std::move(new_builder); + + initialized_ = true; + } + return s; +} + +Status ManifestTailer::ApplyVersionEdit(VersionEdit& edit, + ColumnFamilyData** cfd) { + Status s = VersionEditHandler::ApplyVersionEdit(edit, cfd); + if (s.ok()) { + assert(cfd); + if (*cfd) { + cfds_changed_.insert(*cfd); + } + } + return s; +} + +Status ManifestTailer::OnColumnFamilyAdd(VersionEdit& edit, + ColumnFamilyData** cfd) { + if (Mode::kRecovery == mode_) { + return VersionEditHandler::OnColumnFamilyAdd(edit, cfd); + } + assert(Mode::kCatchUp == mode_); + ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet(); + assert(cfd_set); + ColumnFamilyData* tmp_cfd = cfd_set->GetColumnFamily(edit.GetColumnFamily()); + assert(cfd); + *cfd = tmp_cfd; + if (!tmp_cfd) { + // For now, ignore new column families created after Recover() succeeds. + return Status::OK(); + } + auto builder_iter = builders_.find(edit.GetColumnFamily()); + assert(builder_iter != builders_.end()); + + Version* dummy_version = tmp_cfd->dummy_versions(); + assert(dummy_version); + Version* base_version = dummy_version->Next(); + assert(base_version); + base_version->Ref(); + VersionBuilderUPtr new_builder( + new BaseReferencedVersionBuilder(tmp_cfd, base_version)); + builder_iter->second = std::move(new_builder); + +#ifndef NDEBUG + auto version_iter = versions_.find(edit.GetColumnFamily()); + assert(version_iter != versions_.end()); +#endif // !NDEBUG + return Status::OK(); +} + +void ManifestTailer::CheckIterationResult(const log::Reader& reader, + Status* s) { + VersionEditHandlerPointInTime::CheckIterationResult(reader, s); + assert(s); + if (s->ok()) { + if (Mode::kRecovery == mode_) { + mode_ = Mode::kCatchUp; + } else { + assert(Mode::kCatchUp == mode_); + } + } +} + +Status ManifestTailer::VerifyFile(const std::string& fpath, + const FileMetaData& fmeta) { + Status s = VersionEditHandlerPointInTime::VerifyFile(fpath, fmeta); + // TODO: Open file or create hard link to prevent the file from being + // deleted. + return s; +} + void DumpManifestHandler::CheckIterationResult(const log::Reader& reader, Status* s) { VersionEditHandler::CheckIterationResult(reader, s); diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 113869c3c..1cf768df7 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -15,6 +15,8 @@ namespace ROCKSDB_NAMESPACE { +struct FileMetaData; + class VersionEditHandlerBase { public: explicit VersionEditHandlerBase() @@ -26,6 +28,8 @@ class VersionEditHandlerBase { const Status& status() const { return status_; } + AtomicGroupReadBuffer& GetReadBuffer() { return read_buffer_; } + protected: explicit VersionEditHandlerBase(uint64_t max_read_size) : max_manifest_read_size_(max_read_size) {} @@ -37,6 +41,8 @@ class VersionEditHandlerBase { virtual void CheckIterationResult(const log::Reader& /*reader*/, Status* /*s*/) {} + void ClearReadBuffer() { read_buffer_.Clear(); } + Status status_; private: @@ -125,15 +131,14 @@ class VersionEditHandler : public VersionEditHandlerBase { protected: explicit VersionEditHandler( - bool read_only, - const std::vector& column_families, + bool read_only, std::vector column_families, VersionSet* version_set, bool track_missing_files, bool no_error_if_table_files_missing, const std::shared_ptr& io_tracer, bool skip_load_table_files); Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override; - Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd); + virtual Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd); Status OnColumnFamilyDrop(VersionEdit& edit, ColumnFamilyData** cfd); @@ -163,8 +168,10 @@ class VersionEditHandler : public VersionEditHandlerBase { bool prefetch_index_and_filter_in_cache, bool is_initial_load); + virtual bool MustOpenAllColumnFamilies() const { return !read_only_; } + const bool read_only_; - const std::vector& column_families_; + std::vector column_families_; VersionSet* version_set_; std::unordered_map builders_; std::unordered_map name_to_options_; @@ -179,12 +186,11 @@ class VersionEditHandler : public VersionEditHandlerBase { bool no_error_if_table_files_missing_; std::shared_ptr io_tracer_; bool skip_load_table_files_; + bool initialized_; private: Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const VersionEdit& edit); - - bool initialized_; }; // A class similar to its base class, i.e. VersionEditHandler. @@ -196,8 +202,7 @@ class VersionEditHandler : public VersionEditHandlerBase { class VersionEditHandlerPointInTime : public VersionEditHandler { public: VersionEditHandlerPointInTime( - bool read_only, - const std::vector& column_families, + bool read_only, std::vector column_families, VersionSet* version_set, const std::shared_ptr& io_tracer); ~VersionEditHandlerPointInTime() override; @@ -206,17 +211,59 @@ class VersionEditHandlerPointInTime : public VersionEditHandler { ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override; Status MaybeCreateVersion(const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) override; + virtual Status VerifyFile(const std::string& fpath, + const FileMetaData& fmeta); - private: std::unordered_map versions_; }; +class ManifestTailer : public VersionEditHandlerPointInTime { + public: + explicit ManifestTailer(std::vector column_families, + VersionSet* version_set, + const std::shared_ptr& io_tracer) + : VersionEditHandlerPointInTime(/*read_only=*/false, column_families, + version_set, io_tracer), + mode_(Mode::kRecovery) {} + + void PrepareToReadNewManifest() { + initialized_ = false; + ClearReadBuffer(); + } + + std::unordered_set& GetUpdatedColumnFamilies() { + return cfds_changed_; + } + + protected: + Status Initialize() override; + + bool MustOpenAllColumnFamilies() const override { return false; } + + Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override; + + Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd) override; + + void CheckIterationResult(const log::Reader& reader, Status* s) override; + + Status VerifyFile(const std::string& fpath, + const FileMetaData& fmeta) override; + + enum Mode : uint8_t { + kRecovery = 0, + kCatchUp = 1, + }; + + Mode mode_; + std::unordered_set cfds_changed_; +}; + class DumpManifestHandler : public VersionEditHandler { public: - DumpManifestHandler( - const std::vector& column_families, - VersionSet* version_set, const std::shared_ptr& io_tracer, - bool verbose, bool hex, bool json) + DumpManifestHandler(std::vector column_families, + VersionSet* version_set, + const std::shared_ptr& io_tracer, bool verbose, + bool hex, bool json) : VersionEditHandler( /*read_only=*/true, column_families, version_set, /*track_missing_files=*/false, diff --git a/db/version_set.cc b/db/version_set.cc index c4ff1a6aa..12430a8a6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4491,110 +4491,6 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, return builder ? builder->Apply(edit) : Status::OK(); } -Status VersionSet::ApplyOneVersionEditToBuilder( - VersionEdit& edit, - const std::unordered_map& name_to_options, - std::unordered_map& column_families_not_found, - std::unordered_map>& - builders, - VersionEditParams* version_edit_params) { - // Not found means that user didn't supply that column - // family option AND we encountered column family add - // record. Once we encounter column family drop record, - // we will delete the column family from - // column_families_not_found. - bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) != - column_families_not_found.end()); - // in builders means that user supplied that column family - // option AND that we encountered column family add record - bool cf_in_builders = builders.find(edit.column_family_) != builders.end(); - - // they can't both be true - assert(!(cf_in_not_found && cf_in_builders)); - - ColumnFamilyData* cfd = nullptr; - - if (edit.is_column_family_add_) { - if (cf_in_builders || cf_in_not_found) { - return Status::Corruption( - "Manifest adding the same column family twice: " + - edit.column_family_name_); - } - auto cf_options = name_to_options.find(edit.column_family_name_); - // implicitly add persistent_stats column family without requiring user - // to specify - bool is_persistent_stats_column_family = - edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0; - if (cf_options == name_to_options.end() && - !is_persistent_stats_column_family) { - column_families_not_found.insert( - {edit.column_family_, edit.column_family_name_}); - } else { - // recover persistent_stats CF from a DB that already contains it - if (is_persistent_stats_column_family) { - ColumnFamilyOptions cfo; - OptimizeForPersistentStats(&cfo); - cfd = CreateColumnFamily(cfo, &edit); - } else { - cfd = CreateColumnFamily(cf_options->second, &edit); - } - cfd->set_initialized(); - builders.insert(std::make_pair( - edit.column_family_, std::unique_ptr( - new BaseReferencedVersionBuilder(cfd)))); - } - } else if (edit.is_column_family_drop_) { - if (cf_in_builders) { - auto builder = builders.find(edit.column_family_); - assert(builder != builders.end()); - builders.erase(builder); - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - assert(cfd != nullptr); - if (cfd->UnrefAndTryDelete()) { - cfd = nullptr; - } else { - // who else can have reference to cfd!? - assert(false); - } - } else if (cf_in_not_found) { - column_families_not_found.erase(edit.column_family_); - } else { - return Status::Corruption( - "Manifest - dropping non-existing column family"); - } - } else if (edit.IsWalAddition()) { - Status s = wals_.AddWals(edit.GetWalAdditions()); - if (!s.ok()) { - return s; - } - } else if (edit.IsWalDeletion()) { - Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber()); - if (!s.ok()) { - return s; - } - } else if (!cf_in_not_found) { - if (!cf_in_builders) { - return Status::Corruption( - "Manifest record referencing unknown column family"); - } - - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - // this should never happen since cf_in_builders is true - assert(cfd != nullptr); - - // if it is not column family add or column family drop, - // then it's a file add/delete, which should be forwarded - // to builder - auto builder = builders.find(edit.column_family_); - assert(builder != builders.end()); - Status s = builder->second->version_builder()->Apply(&edit); - if (!s.ok()) { - return s; - } - } - return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params); -} - Status VersionSet::ExtractInfoFromVersionEdit( ColumnFamilyData* cfd, const VersionEdit& from_edit, VersionEditParams* version_edit_params) { @@ -4680,77 +4576,6 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname, return Status::OK(); } -Status VersionSet::ReadAndRecover( - log::Reader& reader, AtomicGroupReadBuffer* read_buffer, - const std::unordered_map& name_to_options, - std::unordered_map& column_families_not_found, - std::unordered_map>& - builders, - Status* log_read_status, VersionEditParams* version_edit_params, - std::string* db_id) { - assert(read_buffer != nullptr); - assert(log_read_status != nullptr); - Status s; - Slice record; - std::string scratch; - size_t recovered_edits = 0; - while (s.ok() && reader.ReadRecord(&record, &scratch) && - log_read_status->ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - if (edit.has_db_id_) { - db_id_ = edit.GetDbId(); - if (db_id != nullptr) { - db_id->assign(edit.GetDbId()); - } - } - s = read_buffer->AddEdit(&edit); - if (!s.ok()) { - break; - } - if (edit.is_in_atomic_group_) { - if (read_buffer->IsFull()) { - // Apply edits in an atomic group when we have read all edits in the - // group. - for (auto& e : read_buffer->replay_buffer()) { - s = ApplyOneVersionEditToBuilder(e, name_to_options, - column_families_not_found, builders, - version_edit_params); - if (!s.ok()) { - break; - } - recovered_edits++; - } - if (!s.ok()) { - break; - } - read_buffer->Clear(); - } - } else { - // Apply a normal edit immediately. - s = ApplyOneVersionEditToBuilder(edit, name_to_options, - column_families_not_found, builders, - version_edit_params); - if (s.ok()) { - recovered_edits++; - } - } - } - if (!log_read_status->ok()) { - s = *log_read_status; - } - if (!s.ok()) { - // Clear the buffer if we fail to decode/apply an edit. - read_buffer->Clear(); - } - TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits", - &recovered_edits); - return s; -} - Status VersionSet::Recover( const std::vector& column_families, bool read_only, std::string* db_id) { @@ -5886,8 +5711,7 @@ ReactiveVersionSet::ReactiveVersionSet( const std::shared_ptr& io_tracer) : VersionSet(dbname, _db_options, _file_options, table_cache, write_buffer_manager, write_controller, - /*block_cache_tracer=*/nullptr, io_tracer), - number_of_edits_to_skip_(0) {} + /*block_cache_tracer=*/nullptr, io_tracer) {} ReactiveVersionSet::~ReactiveVersionSet() {} @@ -5900,394 +5724,44 @@ Status ReactiveVersionSet::Recover( assert(manifest_reporter != nullptr); assert(manifest_reader_status != nullptr); - std::unordered_map cf_name_to_options; - for (const auto& cf : column_families) { - cf_name_to_options.insert({cf.name, cf.options}); - } - - // add default column family - auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); - if (default_cf_iter == cf_name_to_options.end()) { - return Status::InvalidArgument("Default column family not specified"); - } - VersionEdit default_cf_edit; - default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); - default_cf_edit.SetColumnFamily(0); - ColumnFamilyData* default_cfd = - CreateColumnFamily(default_cf_iter->second, &default_cf_edit); - // In recovery, nobody else can access it, so it's fine to set it to be - // initialized earlier. - default_cfd->set_initialized(); - VersionBuilderMap builders; - std::unordered_map column_families_not_found; - builders.insert( - std::make_pair(0, std::unique_ptr( - new BaseReferencedVersionBuilder(default_cfd)))); - manifest_reader_status->reset(new Status()); manifest_reporter->reset(new LogReporter()); static_cast_with_check(manifest_reporter->get())->status = manifest_reader_status->get(); Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); log::Reader* reader = manifest_reader->get(); + assert(reader); - int retry = 0; - VersionEdit version_edit; - while (s.ok() && retry < 1) { - assert(reader != nullptr); - s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options, - column_families_not_found, builders, - manifest_reader_status->get(), &version_edit); - if (s.ok()) { - bool enough = version_edit.has_next_file_number_ && - version_edit.has_log_number_ && - version_edit.has_last_sequence_; - if (enough) { - for (const auto& cf : column_families) { - auto cfd = column_family_set_->GetColumnFamily(cf.name); - if (cfd == nullptr) { - enough = false; - break; - } - } - } - if (enough) { - for (const auto& cf : column_families) { - auto cfd = column_family_set_->GetColumnFamily(cf.name); - assert(cfd != nullptr); - if (!cfd->IsDropped()) { - auto builder_iter = builders.find(cfd->GetID()); - assert(builder_iter != builders.end()); - auto builder = builder_iter->second->version_builder(); - assert(builder != nullptr); - s = builder->LoadTableHandlers( - cfd->internal_stats(), db_options_->max_file_opening_threads, - false /* prefetch_index_and_filter_in_cache */, - true /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), - MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); - if (!s.ok()) { - enough = false; - if (s.IsPathNotFound()) { - s = Status::OK(); - } - break; - } - } - } - } - if (enough) { - break; - } - } - ++retry; - } - - if (s.ok()) { - if (!version_edit.has_prev_log_number_) { - version_edit.prev_log_number_ = 0; - } - column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_); - - MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_); - MarkFileNumberUsed(version_edit.prev_log_number_); - MarkFileNumberUsed(version_edit.log_number_); - - for (auto cfd : *column_family_set_) { - assert(builders.count(cfd->GetID()) > 0); - auto builder = builders[cfd->GetID()]->version_builder(); - if (!builder->CheckConsistencyForNumLevels()) { - s = Status::InvalidArgument( - "db has more levels than options.num_levels"); - break; - } - } - } - - if (s.ok()) { - for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } - assert(cfd->initialized()); - auto builders_iter = builders.find(cfd->GetID()); - assert(builders_iter != builders.end()); - auto* builder = builders_iter->second->version_builder(); + manifest_tailer_.reset(new ManifestTailer( + column_families, const_cast(this), io_tracer_)); - Version* v = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), io_tracer_, - current_version_number_++); - s = builder->SaveTo(v->storage_info()); + manifest_tailer_->Iterate(*reader, manifest_reader_status->get()); - if (s.ok()) { - // Install recovered version - v->PrepareApply(*cfd->GetLatestMutableCFOptions(), - !(db_options_->skip_stats_update_on_db_open)); - AppendVersion(cfd, v); - } else { - ROCKS_LOG_ERROR(db_options_->info_log, - "[%s]: inconsistent version: %s\n", - cfd->GetName().c_str(), s.ToString().c_str()); - delete v; - break; - } - } - } - if (s.ok()) { - next_file_number_.store(version_edit.next_file_number_ + 1); - last_allocated_sequence_ = version_edit.last_sequence_; - last_published_sequence_ = version_edit.last_sequence_; - last_sequence_ = version_edit.last_sequence_; - prev_log_number_ = version_edit.prev_log_number_; - for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } - ROCKS_LOG_INFO(db_options_->info_log, - "Column family [%s] (ID %u), log number is %" PRIu64 "\n", - cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber()); - } - } - return s; + return manifest_tailer_->status(); } Status ReactiveVersionSet::ReadAndApply( InstrumentedMutex* mu, std::unique_ptr* manifest_reader, + Status* manifest_read_status, std::unordered_set* cfds_changed) { assert(manifest_reader != nullptr); assert(cfds_changed != nullptr); mu->AssertHeld(); Status s; - uint64_t applied_edits = 0; - while (s.ok()) { - Slice record; - std::string scratch; - log::Reader* reader = manifest_reader->get(); - std::string old_manifest_path = reader->file()->file_name(); - while (reader->ReadRecord(&record, &scratch)) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - - // Skip the first VersionEdits of each MANIFEST generated by - // VersionSet::WriteCurrentStatetoManifest. - if (number_of_edits_to_skip_ > 0) { - ColumnFamilyData* cfd = - column_family_set_->GetColumnFamily(edit.column_family_); - if (cfd != nullptr && !cfd->IsDropped()) { - --number_of_edits_to_skip_; - } - continue; - } - - s = read_buffer_.AddEdit(&edit); - if (!s.ok()) { - break; - } - VersionEdit temp_edit; - if (edit.is_in_atomic_group_) { - if (read_buffer_.IsFull()) { - // Apply edits in an atomic group when we have read all edits in the - // group. - for (auto& e : read_buffer_.replay_buffer()) { - s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit); - if (!s.ok()) { - break; - } - applied_edits++; - } - if (!s.ok()) { - break; - } - read_buffer_.Clear(); - } - } else { - // Apply a normal edit immediately. - s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit); - if (s.ok()) { - applied_edits++; - } else { - break; - } - } - } - if (!s.ok()) { - // Clear the buffer if we fail to decode/apply an edit. - read_buffer_.Clear(); - } - // It's possible that: - // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted. - // Or the version(s) rebuilt from tailing the MANIFEST is inconsistent. - // 2) we have finished reading the current MANIFEST. - // 3) we have encountered an IOError reading the current MANIFEST. - // We need to look for the next MANIFEST and start from there. If we cannot - // find the next MANIFEST, we should exit the loop. - Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); - reader = manifest_reader->get(); - if (tmp_s.ok()) { - if (reader->file()->file_name() == old_manifest_path) { - // Still processing the same MANIFEST, thus no need to continue this - // loop since no record is available if we have reached here. - break; - } else { - // We have switched to a new MANIFEST whose first records have been - // generated by VersionSet::WriteCurrentStatetoManifest. Since the - // secondary instance has already finished recovering upon start, there - // is no need for the secondary to process these records. Actually, if - // the secondary were to replay these records, the secondary may end up - // adding the same SST files AGAIN to each column family, causing - // consistency checks done by VersionBuilder to fail. Therefore, we - // record the number of records to skip at the beginning of the new - // MANIFEST and ignore them. - number_of_edits_to_skip_ = 0; - for (auto* cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } - // Increase number_of_edits_to_skip by 2 because - // WriteCurrentStatetoManifest() writes 2 version edits for each - // column family at the beginning of the newly-generated MANIFEST. - // TODO(yanqin) remove hard-coded value. - if (db_options_->write_dbid_to_manifest) { - number_of_edits_to_skip_ += 3; - } else { - number_of_edits_to_skip_ += 2; - } - } - s = tmp_s; - } - } - } - - if (s.ok()) { - for (auto cfd : *column_family_set_) { - auto builder_iter = active_version_builders_.find(cfd->GetID()); - if (builder_iter == active_version_builders_.end()) { - continue; - } - auto builder = builder_iter->second->version_builder(); - if (!builder->CheckConsistencyForNumLevels()) { - s = Status::InvalidArgument( - "db has more levels than options.num_levels"); - break; - } - } - } - TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits", - &applied_edits); - return s; -} - -Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( - VersionEdit& edit, std::unordered_set* cfds_changed, - VersionEdit* version_edit) { - ColumnFamilyData* cfd = - column_family_set_->GetColumnFamily(edit.column_family_); - - // If we cannot find this column family in our column family set, then it - // may be a new column family created by the primary after the secondary - // starts. It is also possible that the secondary instance opens only a subset - // of column families. Ignore it for now. - if (nullptr == cfd) { - return Status::OK(); - } - if (active_version_builders_.find(edit.column_family_) == - active_version_builders_.end() && - !cfd->IsDropped()) { - std::unique_ptr builder_guard( - new BaseReferencedVersionBuilder(cfd)); - active_version_builders_.insert( - std::make_pair(edit.column_family_, std::move(builder_guard))); - } - - auto builder_iter = active_version_builders_.find(edit.column_family_); - assert(builder_iter != active_version_builders_.end()); - auto builder = builder_iter->second->version_builder(); - assert(builder != nullptr); - - if (edit.is_column_family_add_) { - // TODO (yanqin) for now the secondary ignores column families created - // after Open. This also simplifies handling of switching to a new MANIFEST - // and processing the snapshot of the system at the beginning of the - // MANIFEST. - } else if (edit.is_column_family_drop_) { - // Drop the column family by setting it to be 'dropped' without destroying - // the column family handle. - // TODO (haoyu) figure out how to handle column faimly drop for - // secondary instance. (Is it possible that the ref count for cfd is 0 but - // the ref count for its versions is higher than 0?) - cfd->SetDropped(); - if (cfd->UnrefAndTryDelete()) { - cfd = nullptr; - } - active_version_builders_.erase(builder_iter); - } else { - Status s = builder->Apply(&edit); - if (!s.ok()) { - return s; - } - } - Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit); + log::Reader* reader = manifest_reader->get(); + assert(reader); + s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); if (!s.ok()) { return s; } - - if (cfd != nullptr && !cfd->IsDropped()) { - s = builder->LoadTableHandlers( - cfd->internal_stats(), db_options_->max_file_opening_threads, - false /* prefetch_index_and_filter_in_cache */, - false /* is_initial_load */, - cfd->GetLatestMutableCFOptions()->prefix_extractor.get(), - MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions())); - TEST_SYNC_POINT_CALLBACK( - "ReactiveVersionSet::ApplyOneVersionEditToBuilder:" - "AfterLoadTableHandlers", - &s); - - if (s.ok()) { - auto version = new Version(cfd, this, file_options_, - *cfd->GetLatestMutableCFOptions(), io_tracer_, - current_version_number_++); - s = builder->SaveTo(version->storage_info()); - if (s.ok()) { - version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); - AppendVersion(cfd, version); - active_version_builders_.erase(builder_iter); - if (cfds_changed->count(cfd) == 0) { - cfds_changed->insert(cfd); - } - } else { - delete version; - } - } else if (s.IsPathNotFound()) { - s = Status::OK(); - } - // Some other error has occurred during LoadTableHandlers. - } - + manifest_tailer_->Iterate(*(manifest_reader->get()), manifest_read_status); + s = manifest_tailer_->status(); if (s.ok()) { - if (version_edit->HasNextFile()) { - next_file_number_.store(version_edit->next_file_number_ + 1); - } - if (version_edit->has_last_sequence_) { - last_allocated_sequence_ = version_edit->last_sequence_; - last_published_sequence_ = version_edit->last_sequence_; - last_sequence_ = version_edit->last_sequence_; - } - if (version_edit->has_prev_log_number_) { - prev_log_number_ = version_edit->prev_log_number_; - MarkFileNumberUsed(version_edit->prev_log_number_); - } - if (version_edit->has_log_number_) { - MarkFileNumberUsed(version_edit->log_number_); - } - column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_); - MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_); + *cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies()); } + return s; } @@ -6328,15 +5802,24 @@ Status ReactiveVersionSet::MaybeSwitchManifest( true /* checksum */, 0 /* log_number */)); ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n", manifest_path.c_str()); - // TODO (yanqin) every time we switch to a new MANIFEST, we clear the - // active_version_builders_ map because we choose to construct the - // versions from scratch, thanks to the first part of each MANIFEST - // written by VersionSet::WriteCurrentStatetoManifest. This is not - // necessary, but we choose this at present for the sake of simplicity. - active_version_builders_.clear(); + if (manifest_tailer_) { + manifest_tailer_->PrepareToReadNewManifest(); + } } } while (s.IsPathNotFound()); return s; } +#ifndef NDEBUG +uint64_t ReactiveVersionSet::TEST_read_edits_in_atomic_group() const { + assert(manifest_tailer_); + return manifest_tailer_->GetReadBuffer().TEST_read_edits_in_atomic_group(); +} +#endif // !NDEBUG + +std::vector& ReactiveVersionSet::replay_buffer() { + assert(manifest_tailer_); + return manifest_tailer_->GetReadBuffer().replay_buffer(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.h b/db/version_set.h index 68263eb85..99fcbf4d1 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -72,6 +72,7 @@ class MergeContext; class ColumnFamilySet; class MergeIteratorBuilder; class SystemClock; +class ManifestTailer; // VersionEdit is always supposed to be valid and it is used to point at // entries in Manifest. Ideally it should not be used as a container to @@ -772,10 +773,8 @@ class Version { ColumnFamilyData* cfd() const { return cfd_; } - // Return the next Version in the linked list. Used for debug only - Version* TEST_Next() const { - return next_; - } + // Return the next Version in the linked list. + Version* Next() const { return next_; } int TEST_refs() const { return refs_; } @@ -910,6 +909,7 @@ class BaseReferencedVersionBuilder; class AtomicGroupReadBuffer { public: + AtomicGroupReadBuffer() = default; Status AddEdit(VersionEdit* edit); void Clear(); bool IsFull() const; @@ -1331,25 +1331,6 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, const VersionEdit* edit); - Status ReadAndRecover( - log::Reader& reader, AtomicGroupReadBuffer* read_buffer, - const std::unordered_map& - name_to_options, - std::unordered_map& column_families_not_found, - std::unordered_map< - uint32_t, std::unique_ptr>& builders, - Status* log_read_status, VersionEditParams* version_edit, - std::string* db_id = nullptr); - - // REQUIRES db mutex - Status ApplyOneVersionEditToBuilder( - VersionEdit& edit, - const std::unordered_map& name_to_opts, - std::unordered_map& column_families_not_found, - std::unordered_map< - uint32_t, std::unique_ptr>& builders, - VersionEditParams* version_edit); - Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const VersionEdit& from_edit, VersionEditParams* version_edit_params); @@ -1449,23 +1430,20 @@ class ReactiveVersionSet : public VersionSet { Status ReadAndApply( InstrumentedMutex* mu, std::unique_ptr* manifest_reader, + Status* manifest_read_status, std::unordered_set* cfds_changed); Status Recover(const std::vector& column_families, std::unique_ptr* manifest_reader, std::unique_ptr* manifest_reporter, std::unique_ptr* manifest_reader_status); +#ifndef NDEBUG + uint64_t TEST_read_edits_in_atomic_group() const; +#endif //! NDEBUG - uint64_t TEST_read_edits_in_atomic_group() const { - return read_buffer_.TEST_read_edits_in_atomic_group(); - } - std::vector& replay_buffer() { - return read_buffer_.replay_buffer(); - } + std::vector& replay_buffer(); protected: - using VersionSet::ApplyOneVersionEditToBuilder; - // REQUIRES db mutex Status ApplyOneVersionEditToBuilder( VersionEdit& edit, std::unordered_set* cfds_changed, @@ -1476,11 +1454,7 @@ class ReactiveVersionSet : public VersionSet { std::unique_ptr* manifest_reader); private: - VersionBuilderMap active_version_builders_; - AtomicGroupReadBuffer read_buffer_; - // Number of version edits to skip by ReadAndApply at the beginning of a new - // MANIFEST created by primary. - int number_of_edits_to_skip_; + std::unique_ptr manifest_tailer_; using VersionSet::LogAndApply; using VersionSet::Recover; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 92232b87c..2bfbf03d0 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1883,13 +1883,6 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, "VersionEditHandlerBase::Iterate:Finish", [&](void* arg) { num_recovered_edits_ = *reinterpret_cast(arg); }); - SyncPoint::GetInstance()->SetCallBack( - "VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) { - num_recovered_edits_ = *reinterpret_cast(arg); - }); - SyncPoint::GetInstance()->SetCallBack( - "ReactiveVersionSet::ReadAndApply:AppliedEdits", - [&](void* arg) { num_applied_edits_ = *reinterpret_cast(arg); }); SyncPoint::GetInstance()->SetCallBack( "AtomicGroupReadBuffer::AddEdit:AtomicGroup", [&](void* /* arg */) { ++num_edits_in_atomic_group_; }); @@ -1929,7 +1922,6 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, bool last_in_atomic_group_ = false; int num_edits_in_atomic_group_ = 0; int num_recovered_edits_ = 0; - int num_applied_edits_ = 0; VersionEdit corrupted_edit_; VersionEdit edit_with_incorrect_group_size_; std::unique_ptr log_writer_; @@ -1945,7 +1937,6 @@ TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) { EXPECT_TRUE(first_in_atomic_group_); EXPECT_TRUE(last_in_atomic_group_); EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); - EXPECT_EQ(0, num_applied_edits_); } TEST_F(VersionSetAtomicGroupTest, @@ -1967,7 +1958,6 @@ TEST_F(VersionSetAtomicGroupTest, EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_); - EXPECT_EQ(0, num_applied_edits_); } TEST_F(VersionSetAtomicGroupTest, @@ -1980,20 +1970,20 @@ TEST_F(VersionSetAtomicGroupTest, EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader, &manifest_reporter, &manifest_reader_status)); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); AddNewEditsToLog(kAtomicGroupSize); InstrumentedMutex mu; std::unordered_set cfds_changed; mu.Lock(); - EXPECT_OK( - reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + EXPECT_OK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); mu.Unlock(); EXPECT_TRUE(first_in_atomic_group_); EXPECT_TRUE(last_in_atomic_group_); // The recover should clean up the replay buffer. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); - EXPECT_EQ(num_initial_edits_, num_recovered_edits_); - EXPECT_EQ(kAtomicGroupSize, num_applied_edits_); + EXPECT_EQ(kAtomicGroupSize, num_recovered_edits_); } TEST_F(VersionSetAtomicGroupTest, @@ -2009,7 +1999,6 @@ TEST_F(VersionSetAtomicGroupTest, EXPECT_FALSE(last_in_atomic_group_); EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_); EXPECT_EQ(num_initial_edits_, num_recovered_edits_); - EXPECT_EQ(0, num_applied_edits_); } TEST_F(VersionSetAtomicGroupTest, @@ -2041,14 +2030,13 @@ TEST_F(VersionSetAtomicGroupTest, InstrumentedMutex mu; std::unordered_set cfds_changed; mu.Lock(); - EXPECT_OK( - reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + EXPECT_OK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); mu.Unlock(); // Reactive version set should be empty now. EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0); EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0); EXPECT_EQ(num_initial_edits_, num_recovered_edits_); - EXPECT_EQ(kAtomicGroupSize, num_applied_edits_); } TEST_F(VersionSetAtomicGroupTest, @@ -2065,13 +2053,14 @@ TEST_F(VersionSetAtomicGroupTest, &manifest_reader_status)); EXPECT_EQ(column_families_.size(), reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + EXPECT_EQ(num_initial_edits_, num_recovered_edits_); // Write a few edits in an atomic group. AddNewEditsToLog(kNumberOfPersistedVersionEdits); InstrumentedMutex mu; std::unordered_set cfds_changed; mu.Lock(); - EXPECT_OK( - reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + EXPECT_OK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); mu.Unlock(); EXPECT_TRUE(first_in_atomic_group_); EXPECT_FALSE(last_in_atomic_group_); @@ -2080,8 +2069,6 @@ TEST_F(VersionSetAtomicGroupTest, EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == kNumberOfPersistedVersionEdits); EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize); - EXPECT_EQ(num_initial_edits_, num_recovered_edits_); - EXPECT_EQ(0, num_applied_edits_); } TEST_F(VersionSetAtomicGroupTest, @@ -2128,8 +2115,8 @@ TEST_F(VersionSetAtomicGroupTest, // Write the corrupted edits. AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); - EXPECT_NOK( - reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + EXPECT_NOK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); mu.Unlock(); EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(), corrupted_edit_.DebugString()); @@ -2178,8 +2165,8 @@ TEST_F(VersionSetAtomicGroupTest, &manifest_reader_status)); AddNewEditsToLog(kAtomicGroupSize); mu.Lock(); - EXPECT_NOK( - reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed)); + EXPECT_NOK(reactive_versions_->ReadAndApply( + &mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed)); mu.Unlock(); EXPECT_EQ(edits_[1].DebugString(), edit_with_incorrect_group_size_.DebugString());