add WAL replay in TryCatchUpWithPrimary (#5282)

Summary:
Previously in PR https://github.com/facebook/rocksdb/pull/5161 we have added the capability to do WAL tailing in `OpenAsSecondary`, in this PR we extend such feature to `TryCatchUpWithPrimary` which is useful for an secondary RocksDB instance to retrieve and apply the latest updates and refresh log readers if needed.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5282

Differential Revision: D15261011

Pulled By: miasantreble

fbshipit-source-id: a15c94471e8c3b3b1f7f47c3135db1126e936949
main
Zhongyi Xie 6 years ago committed by Facebook Github Bot
parent eea1cad850
commit bdba6c56dd
  1. 46
      db/db_impl_secondary.cc
  2. 2
      db/db_impl_secondary.h
  3. 6
      db/db_secondary_test.cc

@ -59,9 +59,19 @@ Status DBImplSecondary::Recover(
single_column_family_mode_ = single_column_family_mode_ =
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
// Recover from all newer log files than the ones named in the s = FindAndRecoverLogFiles();
// descriptor. }
// TODO: update options_file_number_ needed?
return s;
}
// List wal_dir and find all new WALs, return these log numbers
Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
assert(logs != nullptr);
std::vector<std::string> filenames; std::vector<std::string> filenames;
Status s;
s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
if (s.IsNotFound()) { if (s.IsNotFound()) {
return Status::InvalidArgument("Failed to open wal_dir", return Status::InvalidArgument("Failed to open wal_dir",
@ -70,7 +80,6 @@ Status DBImplSecondary::Recover(
return s; return s;
} }
std::vector<uint64_t> logs;
// if log_readers_ is non-empty, it means we have applied all logs with log // if log_readers_ is non-empty, it means we have applied all logs with log
// numbers smaller than the smallest log in log_readers_, so there is no // numbers smaller than the smallest log in log_readers_, so there is no
// need to pass these logs to RecoverLogFiles // need to pass these logs to RecoverLogFiles
@ -83,20 +92,13 @@ Status DBImplSecondary::Recover(
FileType type; FileType type;
if (ParseFileName(filenames[i], &number, &type) && type == kLogFile && if (ParseFileName(filenames[i], &number, &type) && type == kLogFile &&
number >= log_number_min) { number >= log_number_min) {
logs.push_back(number); logs->push_back(number);
} }
} }
// Recover logs in the order that they were generated
if (!logs.empty()) { if (!logs->empty()) {
// Recover in the order in which the logs were generated std::sort(logs->begin(), logs->end());
std::sort(logs.begin(), logs.end());
SequenceNumber next_sequence(kMaxSequenceNumber);
s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/);
}
} }
// TODO: update options_file_number_ needed?
return s; return s;
} }
@ -294,6 +296,18 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
return s; return s;
} }
// find new WAL and apply them in order to the secondary instance
Status DBImplSecondary::FindAndRecoverLogFiles() {
Status s;
std::vector<uint64_t> logs;
s = FindNewLogNumbers(&logs);
if (s.ok() && !logs.empty()) {
SequenceNumber next_sequence(kMaxSequenceNumber);
s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/);
}
return s;
}
Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) { ColumnFamilyHandle* column_family) {
if (read_options.managed) { if (read_options.managed) {
@ -377,6 +391,7 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
assert(versions_.get() != nullptr); assert(versions_.get() != nullptr);
assert(manifest_reader_.get() != nullptr); assert(manifest_reader_.get() != nullptr);
Status s; Status s;
// read the manifest and apply new changes to the secondary instance
std::unordered_set<ColumnFamilyData*> cfds_changed; std::unordered_set<ColumnFamilyData*> cfds_changed;
InstrumentedMutexLock lock_guard(&mutex_); InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast<ReactiveVersionSet*>(versions_.get()) s = static_cast<ReactiveVersionSet*>(versions_.get())
@ -389,6 +404,9 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
} }
sv_context.Clean(); sv_context.Clean();
} }
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
s = FindAndRecoverLogFiles();
return s; return s;
} }

@ -194,6 +194,8 @@ class DBImplSecondary : public DBImpl {
using DBImpl::Recover; using DBImpl::Recover;
Status FindAndRecoverLogFiles();
Status FindNewLogNumbers(std::vector<uint64_t>* logs);
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, SequenceNumber* next_sequence,
bool read_only) override; bool read_only) override;

@ -237,6 +237,12 @@ TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
}; };
verify_db_func("foo_value2", "bar_value2"); verify_db_func("foo_value2", "bar_value2");
ASSERT_OK(Put("foo", "new_foo_value"));
ASSERT_OK(Put("bar", "new_bar_value"));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db_func("new_foo_value", "new_bar_value");
} }
TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {

Loading…
Cancel
Save