secondary instance: add support for WAL tailing on `OpenAsSecondary`

Summary: PR https://github.com/facebook/rocksdb/pull/4899 implemented the general framework for RocksDB secondary instances. This PR adds the support for WAL tailing in `OpenAsSecondary`, which means after the `OpenAsSecondary` call, the secondary is now able to see primary's writes that are yet to be flushed. The secondary can see primary's writes in the WAL up to the moment of `OpenAsSecondary` call starts.

Differential Revision: D15059905

Pulled By: miasantreble

fbshipit-source-id: 44f71f548a30b38179a7940165e138f622de1f10
main
Zhongyi Xie 6 years ago committed by Facebook Github Bot
parent 1c8cbf315f
commit aa56b7e74a
  1. 4
      db/db_impl.cc
  2. 37
      db/db_impl.h
  3. 170
      db/db_impl_secondary.cc
  4. 61
      db/db_impl_secondary.h
  5. 44
      db/db_secondary_test.cc
  6. 2
      db/log_reader.h

@ -157,6 +157,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, immutable_db_options_)),
seq_per_batch_(seq_per_batch),
batch_per_txn_(batch_per_txn),
db_lock_(nullptr),
shutting_down_(false),
bg_cv_(&mutex_),
@ -202,8 +204,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
opened_successfully_(false),
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(seq_per_batch),
batch_per_txn_(batch_per_txn),
// last_sequencee_ is always maintained by the main queue that also writes
// to the memtable. When two_write_queues_ is disabled last seq in
// memtable is the same as last seq published to the readers. When it is

@ -803,6 +803,23 @@ class DBImpl : public DB {
// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;
std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
// Increase the sequence number after writing each batch, whether memtable is
// disabled for that or not. Otherwise the sequence number is increased after
// writing each key into memtable. This implies that when disable_memtable is
// set, the seq is not increased at all.
//
// Default: false
const bool seq_per_batch_;
// This determines during recovery whether we expect one writebatch per
// recovered transaction, or potentially multiple writebatches per
// transaction. For WriteUnprepared, this is set to false, since multiple
// batches can exist per transaction.
//
// Default: true
const bool batch_per_txn_;
// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
@ -1036,8 +1053,8 @@ class DBImpl : public DB {
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
virtual Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the
@ -1294,7 +1311,6 @@ class DBImpl : public DB {
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_;
std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
void AddSize(uint64_t new_size) { size += new_size; }
@ -1689,20 +1705,7 @@ class DBImpl : public DB {
// In 2PC these are the writes at Prepare phase.
const bool two_write_queues_;
const bool manual_wal_flush_;
// Increase the sequence number after writing each batch, whether memtable is
// disabled for that or not. Otherwise the sequence number is increased after
// writing each key into memtable. This implies that when disable_memtable is
// set, the seq is not increased at all.
//
// Default: false
const bool seq_per_batch_;
// This determines during recovery whether we expect one writebatch per
// recovered transaction, or potentially multiple writebatches per
// transaction. For WriteUnprepared, this is set to false, since multiple
// batches can exist per transaction.
//
// Default: true
const bool batch_per_txn_;
// LastSequence also indicates last published sequence visibile to the
// readers. Otherwise LastPublishedSequence should be used.
const bool last_seq_same_as_publish_seq_;

@ -4,6 +4,12 @@
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_impl_secondary.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "db/db_iter.h"
#include "db/merge_context.h"
#include "monitoring/perf_context_imp.h"
@ -52,12 +58,174 @@ Status DBImplSecondary::Recover(
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
single_column_family_mode_ =
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
// Recover from all newer log files than the ones named in the
// descriptor.
std::vector<std::string> filenames;
s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
if (s.IsNotFound()) {
return Status::InvalidArgument("Failed to open wal_dir",
immutable_db_options_.wal_dir);
} else if (!s.ok()) {
return s;
}
std::vector<uint64_t> logs;
// if log_readers_ is non-empty, it means we have applied all logs with log
// numbers smaller than the smallest log in log_readers_, so there is no
// need to pass these logs to RecoverLogFiles
uint64_t log_number_min = 0;
if (log_readers_.size() > 0) {
log_number_min = log_readers_.begin()->first;
}
for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number;
FileType type;
if (ParseFileName(filenames[i], &number, &type) && type == kLogFile &&
number >= log_number_min) {
logs.push_back(number);
}
}
if (!logs.empty()) {
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
SequenceNumber next_sequence(kMaxSequenceNumber);
s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/);
}
}
// TODO: attempt to recover from WAL files.
// TODO: update options_file_number_ needed?
return s;
}
// try to find log reader using log_number from log_readers_ map, initialize
// if it doesn't exist
Status DBImplSecondary::MaybeInitLogReader(
uint64_t log_number, log::FragmentBufferedReader** log_reader) {
auto iter = log_readers_.find(log_number);
// make sure the log file is still present
if (iter == log_readers_.end() ||
iter->second->reader_->GetLogNumber() != log_number) {
// delete the obsolete log reader if log number mismatch
if (iter != log_readers_.end()) {
log_readers_.erase(iter);
}
// initialize log reader from log_number
// TODO: min_log_number_to_keep_2pc check needed?
// Open the log file
std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode));
std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(
fname, &file, env_->OptimizeForLogRead(env_options_));
if (!status.ok()) {
*log_reader = nullptr;
return status;
}
file_reader.reset(new SequentialFileReader(std::move(file), fname));
}
// Create the log reader.
LogReaderContainer* log_reader_container = new LogReaderContainer(
env_, immutable_db_options_.info_log, std::move(fname),
std::move(file_reader), log_number);
log_readers_.insert(std::make_pair(
log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
}
iter = log_readers_.find(log_number);
assert(iter != log_readers_.end());
*log_reader = iter->second->reader_;
return Status::OK();
}
// After manifest recovery, replay WALs and refresh log_readers_ if necessary
// REQUIRES: log_numbers are sorted in ascending order
Status DBImplSecondary::RecoverLogFiles(
const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
bool /*read_only*/) {
mutex_.AssertHeld();
Status status;
for (auto log_number : log_numbers) {
log::FragmentBufferedReader* reader = nullptr;
status = MaybeInitLogReader(log_number, &reader);
if (!status.ok()) {
return status;
}
assert(reader != nullptr);
}
for (auto log_number : log_numbers) {
auto it = log_readers_.find(log_number);
assert(it != log_readers_.end());
log::FragmentBufferedReader* reader = it->second->reader_;
// Manually update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(log_number);
// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
while (reader->ReadRecord(&record, &scratch,
immutable_db_options_.wal_recovery_mode) &&
status.ok()) {
if (record.size() < WriteBatchInternal::kHeader) {
reader->GetReporter()->Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
// do not check sequence number because user may toggle disableWAL
// between writes which breaks sequence number continuity guarantee
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case --
// we just ignore the update.
// That's why we set ignore missing column families to true
// passing null flush_scheduler will disable memtable flushing which is
// needed for secondary instances
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), nullptr /* flush_scheduler */,
true, log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
// blocks that do not form coherent data
reader->GetReporter()->Corruption(record.size(), status);
continue;
}
}
if (!status.ok()) {
return status;
}
auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) {
versions_->SetLastAllocatedSequence(last_sequence);
versions_->SetLastPublishedSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
}
}
// remove logreaders from map after successfully recovering the WAL
if (log_readers_.size() > 1) {
auto eraseIter = log_readers_.begin();
std::advance(eraseIter, log_readers_.size() - 1);
log_readers_.erase(log_readers_.begin(), eraseIter);
}
return status;
}
// Implementation of the DB interface
Status DBImplSecondary::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,

@ -13,6 +13,55 @@
namespace rocksdb {
class LogReaderContainer {
public:
LogReaderContainer()
: reader_(nullptr), reporter_(nullptr), status_(nullptr) {}
LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log,
std::string fname,
std::unique_ptr<SequentialFileReader>&& file_reader,
uint64_t log_number) {
LogReporter* reporter = new LogReporter();
status_ = new Status();
reporter->env = env;
reporter->info_log = info_log.get();
reporter->fname = std::move(fname);
reporter->status = status_;
reporter_ = reporter;
// We intentially make log::Reader do checksumming even if
// paranoid_checks==false so that corruptions cause entire commits
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader),
reporter, true /*checksum*/,
log_number);
}
log::FragmentBufferedReader* reader_;
log::Reader::Reporter* reporter_;
Status* status_;
~LogReaderContainer() {
delete reader_;
delete reporter_;
delete status_;
}
private:
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
std::string fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
void Corruption(size_t bytes, const Status& s) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""),
fname.c_str(), static_cast<int>(bytes),
s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) {
*this->status = s;
}
}
};
};
class DBImplSecondary : public DBImpl {
public:
DBImplSecondary(const DBOptions& options, const std::string& dbname);
@ -133,6 +182,9 @@ class DBImplSecondary : public DBImpl {
// method can take long time due to all the I/O and CPU costs.
Status TryCatchUpWithPrimary() override;
Status MaybeInitLogReader(uint64_t log_number,
log::FragmentBufferedReader** log_reader);
private:
friend class DB;
@ -142,10 +194,19 @@ class DBImplSecondary : public DBImpl {
using DBImpl::Recover;
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence,
bool read_only) override;
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
// cache log readers for each log number, used for continue WAL replay
// after recovery
std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;
};
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -195,6 +195,50 @@ TEST_F(DBSecondaryTest, OpenAsSecondary) {
verify_db_func("new_foo_value", "new_bar_value");
}
TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Reopen(options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
}
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
ReadOptions ropts;
ropts.verify_checksums = true;
const auto verify_db_func = [&](const std::string& foo_val,
const std::string& bar_val) {
std::string value;
ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
ASSERT_EQ(foo_val, value);
ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
ASSERT_EQ(bar_val, value);
Iterator* iter = db_secondary_->NewIterator(ropts);
ASSERT_NE(nullptr, iter);
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ(foo_val, iter->value().ToString());
iter->Seek("bar");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar", iter->key().ToString());
ASSERT_EQ(bar_val, iter->value().ToString());
size_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
++count;
}
ASSERT_EQ(2, count);
delete iter;
};
verify_db_func("foo_value2", "bar_value2");
}
TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
Options options;
options.env = env_;

@ -89,6 +89,8 @@ class Reader {
Reporter* GetReporter() const { return reporter_; }
uint64_t GetLogNumber() const { return log_number_; }
protected:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;

Loading…
Cancel
Save