diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc index 90e979b4e..007910ea5 100644 --- a/db/db_impl_secondary.cc +++ b/db/db_impl_secondary.cc @@ -59,40 +59,7 @@ Status DBImplSecondary::Recover( single_column_family_mode_ = versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; - // Recover from all newer log files than the ones named in the - // descriptor. - std::vector filenames; - s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); - if (s.IsNotFound()) { - return Status::InvalidArgument("Failed to open wal_dir", - immutable_db_options_.wal_dir); - } else if (!s.ok()) { - return s; - } - - std::vector logs; - // if log_readers_ is non-empty, it means we have applied all logs with log - // numbers smaller than the smallest log in log_readers_, so there is no - // need to pass these logs to RecoverLogFiles - uint64_t log_number_min = 0; - if (log_readers_.size() > 0) { - log_number_min = log_readers_.begin()->first; - } - for (size_t i = 0; i < filenames.size(); i++) { - uint64_t number; - FileType type; - if (ParseFileName(filenames[i], &number, &type) && type == kLogFile && - number >= log_number_min) { - logs.push_back(number); - } - } - - if (!logs.empty()) { - // Recover in the order in which the logs were generated - std::sort(logs.begin(), logs.end()); - SequenceNumber next_sequence(kMaxSequenceNumber); - s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/); - } + s = FindAndRecoverLogFiles(); } // TODO: update options_file_number_ needed? @@ -100,6 +67,41 @@ Status DBImplSecondary::Recover( return s; } +// List wal_dir and find all new WALs, return these log numbers +Status DBImplSecondary::FindNewLogNumbers(std::vector* logs) { + assert(logs != nullptr); + std::vector filenames; + Status s; + s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); + if (s.IsNotFound()) { + return Status::InvalidArgument("Failed to open wal_dir", + immutable_db_options_.wal_dir); + } else if (!s.ok()) { + return s; + } + + // if log_readers_ is non-empty, it means we have applied all logs with log + // numbers smaller than the smallest log in log_readers_, so there is no + // need to pass these logs to RecoverLogFiles + uint64_t log_number_min = 0; + if (log_readers_.size() > 0) { + log_number_min = log_readers_.begin()->first; + } + for (size_t i = 0; i < filenames.size(); i++) { + uint64_t number; + FileType type; + if (ParseFileName(filenames[i], &number, &type) && type == kLogFile && + number >= log_number_min) { + logs->push_back(number); + } + } + // Recover logs in the order that they were generated + if (!logs->empty()) { + std::sort(logs->begin(), logs->end()); + } + return s; +} + // try to find log reader using log_number from log_readers_ map, initialize // if it doesn't exist Status DBImplSecondary::MaybeInitLogReader( @@ -294,6 +296,18 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options, return s; } +// find new WAL and apply them in order to the secondary instance +Status DBImplSecondary::FindAndRecoverLogFiles() { + Status s; + std::vector logs; + s = FindNewLogNumbers(&logs); + if (s.ok() && !logs.empty()) { + SequenceNumber next_sequence(kMaxSequenceNumber); + s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/); + } + return s; +} + Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) { if (read_options.managed) { @@ -377,6 +391,7 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { assert(versions_.get() != nullptr); assert(manifest_reader_.get() != nullptr); Status s; + // read the manifest and apply new changes to the secondary instance std::unordered_set cfds_changed; InstrumentedMutexLock lock_guard(&mutex_); s = static_cast(versions_.get()) @@ -389,6 +404,9 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { } sv_context.Clean(); } + // list wal_dir to discover new WALs and apply new changes to the secondary + // instance + s = FindAndRecoverLogFiles(); return s; } diff --git a/db/db_impl_secondary.h b/db/db_impl_secondary.h index 64c814328..32dbae058 100644 --- a/db/db_impl_secondary.h +++ b/db/db_impl_secondary.h @@ -194,6 +194,8 @@ class DBImplSecondary : public DBImpl { using DBImpl::Recover; + Status FindAndRecoverLogFiles(); + Status FindNewLogNumbers(std::vector* logs); Status RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, bool read_only) override; diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 47daf9fd8..60ea5ba8d 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -237,6 +237,12 @@ TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { }; verify_db_func("foo_value2", "bar_value2"); + + ASSERT_OK(Put("foo", "new_foo_value")); + ASSERT_OK(Put("bar", "new_bar_value")); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value", "new_bar_value"); } TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {