Log replay integration for secondary instance (#5305)

Summary:
RocksDB secondary can replay both MANIFEST and WAL now.
On the one hand, the memory usage by memtables will grow after replaying WAL for sometime. On the other hand, replaying the MANIFEST can bring the database persistent data to a more recent point in time, giving us the opportunity to discard some memtables containing out-dated data.
This PR coordinates the MANIFEST and WAL replay, using the updates from MANIFEST replay to update the active memtable and immutable memtable list of each column family.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5305

Differential Revision: D15386512

Pulled By: riversand963

fbshipit-source-id: a3ea6fc415f8382d8cf624f52a71ebdcffa3e355
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent f3a7847598
commit fb4c6a31ce
  1. 1
      HISTORY.md
  2. 2
      db/db_impl.h
  3. 115
      db/db_impl_secondary.cc
  4. 102
      db/db_impl_secondary.h
  5. 130
      db/db_secondary_test.cc
  6. 18
      db/memtable_list.cc
  7. 7
      db/memtable_list.h

@ -6,6 +6,7 @@
### New Features
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.
* Add an option `unordered_write` which trades snapshot guarantees with higher write throughput. When used with WRITE_PREPARED transactions, it offers higher throughput with however no compromise on guarantees.
* Allow DBImplSecondary to remove memtables with obsolete data after replaying MANIFEST and WAL.
### Performance Improvements
* Reduce binary search when iterator reseek into the same data block.

@ -1078,7 +1078,7 @@ class DBImpl : public DB {
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
// REQUIRES: log_numbers are sorted in ascending order
virtual Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
// The following two methods are used to flush a memtable to

@ -18,7 +18,6 @@
namespace rocksdb {
#ifndef ROCKSDB_LITE
DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
const std::string& dbname)
: DBImpl(db_options, dbname) {
@ -35,6 +34,7 @@ Status DBImplSecondary::Recover(
bool /*error_if_data_exists_in_logs*/) {
mutex_.AssertHeld();
JobContext job_context(0);
Status s;
s = static_cast<ReactiveVersionSet*>(versions_.get())
->Recover(column_families, &manifest_reader_, &manifest_reporter_,
@ -59,11 +59,29 @@ Status DBImplSecondary::Recover(
single_column_family_mode_ =
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
s = FindAndRecoverLogFiles();
std::unordered_set<ColumnFamilyData*> cfds_changed;
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
}
// TODO: update options_file_number_ needed?
job_context.Clean();
return s;
}
// find new WAL and apply them in order to the secondary instance
Status DBImplSecondary::FindAndRecoverLogFiles(
std::unordered_set<ColumnFamilyData*>* cfds_changed,
JobContext* job_context) {
assert(nullptr != cfds_changed);
assert(nullptr != job_context);
Status s;
std::vector<uint64_t> logs;
s = FindNewLogNumbers(&logs);
if (s.ok() && !logs.empty()) {
SequenceNumber next_sequence(kMaxSequenceNumber);
s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
}
return s;
}
@ -151,7 +169,10 @@ Status DBImplSecondary::MaybeInitLogReader(
// REQUIRES: log_numbers are sorted in ascending order
Status DBImplSecondary::RecoverLogFiles(
const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
bool /*read_only*/) {
std::unordered_set<ColumnFamilyData*>* cfds_changed,
JobContext* job_context) {
assert(nullptr != cfds_changed);
assert(nullptr != job_context);
mutex_.AssertHeld();
Status status;
for (auto log_number : log_numbers) {
@ -184,6 +205,39 @@ Status DBImplSecondary::RecoverLogFiles(
continue;
}
WriteBatchInternal::SetContents(&batch, record);
std::vector<uint32_t> column_family_ids;
status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
if (status.ok()) {
SequenceNumber seq = versions_->LastSequence();
for (const auto id : column_family_ids) {
ColumnFamilyData* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(id);
if (cfd == nullptr) {
continue;
}
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
}
auto curr_log_num = port::kMaxUint64;
if (cfd_to_current_log_.count(cfd) > 0) {
curr_log_num = cfd_to_current_log_[cfd];
}
// If the active memtable contains records added by replaying an
// earlier WAL, then we need to seal the memtable, add it to the
// immutable memtable list and create a new active memtable.
if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 ||
curr_log_num != log_number)) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
MemTable* new_mem =
cfd->ConstructNewMemtable(mutable_cf_options, seq);
cfd->mem()->SetNextLogNumber(log_number);
cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
new_mem->Ref();
cfd->SetMemtable(new_mem);
}
}
}
// do not check sequence number because user may toggle disableWAL
// between writes which breaks sequence number continuity guarantee
@ -194,12 +248,30 @@ Status DBImplSecondary::RecoverLogFiles(
// That's why we set ignore missing column families to true
// passing null flush_scheduler will disable memtable flushing which is
// needed for secondary instances
if (status.ok()) {
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), nullptr /* flush_scheduler */,
true, log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
if (!status.ok()) {
&batch, column_family_memtables_.get(),
nullptr /* flush_scheduler */, true, log_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
}
if (status.ok()) {
for (const auto id : column_family_ids) {
ColumnFamilyData* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(id);
if (cfd == nullptr) {
continue;
}
std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
cfd_to_current_log_.find(cfd);
if (iter == cfd_to_current_log_.end()) {
cfd_to_current_log_.insert({cfd, log_number});
} else if (log_number > iter->second) {
iter->second = log_number;
}
}
} else {
// We are treating this as a failure while reading since we read valid
// blocks that do not form coherent data
reader->GetReporter()->Corruption(record.size(), status);
@ -296,18 +368,6 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
return s;
}
// find new WAL and apply them in order to the secondary instance
Status DBImplSecondary::FindAndRecoverLogFiles() {
Status s;
std::vector<uint64_t> logs;
s = FindNewLogNumbers(&logs);
if (s.ok() && !logs.empty()) {
SequenceNumber next_sequence(kMaxSequenceNumber);
s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/);
}
return s;
}
Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
if (read_options.managed) {
@ -393,20 +453,25 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
Status s;
// read the manifest and apply new changes to the secondary instance
std::unordered_set<ColumnFamilyData*> cfds_changed;
JobContext job_context(0, true /*create_superversion*/);
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast<ReactiveVersionSet*>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
if (s.ok()) {
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
}
if (s.ok()) {
SuperVersionContext sv_context(true /* create_superversion */);
for (auto cfd : cfds_changed) {
sv_context.NewSuperVersion();
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
}
sv_context.Clean();
job_context.Clean();
}
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
s = FindAndRecoverLogFiles();
return s;
}

@ -96,40 +96,40 @@ class DBImplSecondary : public DBImpl {
Status Put(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::Merge;
Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::Delete;
Status Delete(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SingleDelete;
Status SingleDelete(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
Status Write(const WriteOptions& /*options*/,
WriteBatch* /*updates*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::CompactRange;
Status CompactRange(const CompactRangeOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice* /*begin*/, const Slice* /*end*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::CompactFiles;
@ -140,32 +140,32 @@ class DBImplSecondary : public DBImpl {
const int /*output_level*/, const int /*output_path_id*/ = -1,
std::vector<std::string>* const /*output_file_names*/ = nullptr,
CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
Status DisableFileDeletions() override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
Status EnableFileDeletions(bool /*force*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
Status GetLiveFiles(std::vector<std::string>&,
uint64_t* /*manifest_file_size*/,
bool /*flush_memtable*/ = true) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::Flush;
Status Flush(const FlushOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SyncWAL;
Status SyncWAL() override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DB::IngestExternalFile;
@ -173,7 +173,7 @@ class DBImplSecondary : public DBImpl {
ColumnFamilyHandle* /*column_family*/,
const std::vector<std::string>& /*external_files*/,
const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
return Status::NotSupported("Not supported operation in secondary mode.");
}
// Try to catch up with the primary by reading as much as possible from the
@ -185,6 +185,70 @@ class DBImplSecondary : public DBImpl {
Status MaybeInitLogReader(uint64_t log_number,
log::FragmentBufferedReader** log_reader);
protected:
class ColumnFamilyCollector : public WriteBatch::Handler {
std::unordered_set<uint32_t> column_family_ids_;
Status AddColumnFamilyId(uint32_t column_family_id) {
if (column_family_ids_.find(column_family_id) ==
column_family_ids_.end()) {
column_family_ids_.insert(column_family_id);
}
return Status::OK();
}
public:
explicit ColumnFamilyCollector() {}
~ColumnFamilyCollector() override {}
Status PutCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status DeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status DeleteRangeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status MergeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status PutBlobIndexCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
const std::unordered_set<uint32_t>& column_families() const {
return column_family_ids_;
}
};
Status CollectColumnFamilyIdsFromWriteBatch(
const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) {
assert(column_family_ids != nullptr);
column_family_ids->clear();
ColumnFamilyCollector handler;
Status s = batch.Iterate(&handler);
if (s.ok()) {
for (const auto& cf : handler.column_families()) {
column_family_ids->push_back(cf);
}
}
return s;
}
private:
friend class DB;
@ -194,19 +258,25 @@ class DBImplSecondary : public DBImpl {
using DBImpl::Recover;
Status FindAndRecoverLogFiles();
Status FindAndRecoverLogFiles(
std::unordered_set<ColumnFamilyData*>* cfds_changed,
JobContext* job_context);
Status FindNewLogNumbers(std::vector<uint64_t>* logs);
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence,
bool read_only) override;
std::unordered_set<ColumnFamilyData*>* cfds_changed,
JobContext* job_context);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
// cache log readers for each log number, used for continue WAL replay
// Cache log readers for each log number, used for continue WAL replay
// after recovery
std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;
// Current WAL number replayed for each column family.
std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_;
};
} // namespace rocksdb

@ -243,6 +243,11 @@ TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db_func("new_foo_value", "new_bar_value");
ASSERT_OK(Flush());
ASSERT_OK(Put("foo", "new_foo_value_1"));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db_func("new_foo_value_1", "new_bar_value");
}
TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
@ -519,6 +524,131 @@ TEST_F(DBSecondaryTest, SwitchManifest) {
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
range_scan_db();
}
TEST_F(DBSecondaryTest, SwitchWAL) {
const int kNumKeysPerMemtable = 1;
const std::string kCFName1 = "pikachu";
Options options;
options.env = env_;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 2;
options.memtable_factory.reset(
new SpecialSkipListFactory(kNumKeysPerMemtable));
CreateAndReopenWithCF({kCFName1}, options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondaryWithColumnFamilies({kCFName1}, options1);
ASSERT_EQ(2, handles_secondary_.size());
const auto& verify_db = [](DB* db1,
const std::vector<ColumnFamilyHandle*>& handles1,
DB* db2,
const std::vector<ColumnFamilyHandle*>& handles2) {
ASSERT_NE(nullptr, db1);
ASSERT_NE(nullptr, db2);
ReadOptions read_opts;
read_opts.verify_checksums = true;
ASSERT_EQ(handles1.size(), handles2.size());
for (size_t i = 0; i != handles1.size(); ++i) {
std::unique_ptr<Iterator> it1(db1->NewIterator(read_opts, handles1[i]));
std::unique_ptr<Iterator> it2(db2->NewIterator(read_opts, handles2[i]));
it1->SeekToFirst();
it2->SeekToFirst();
for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) {
ASSERT_EQ(it1->key(), it2->key());
ASSERT_EQ(it1->value(), it2->value());
}
ASSERT_FALSE(it1->Valid());
ASSERT_FALSE(it2->Valid());
for (it1->SeekToFirst(); it1->Valid(); it1->Next()) {
std::string value;
ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value));
ASSERT_EQ(it1->value(), value);
}
for (it2->SeekToFirst(); it2->Valid(); it2->Next()) {
std::string value;
ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value));
ASSERT_EQ(it2->value(), value);
}
}
};
for (int k = 0; k != 8; ++k) {
ASSERT_OK(
Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
ASSERT_OK(
Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db(dbfull(), handles_, db_secondary_, handles_secondary_);
}
}
TEST_F(DBSecondaryTest, CatchUpAfterFlush) {
const int kNumKeysPerMemtable = 16;
Options options;
options.env = env_;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 2;
options.memtable_factory.reset(
new SpecialSkipListFactory(kNumKeysPerMemtable));
Reopen(options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
WriteOptions write_opts;
WriteBatch wb;
wb.Put("key0", "value0");
wb.Put("key1", "value1");
ASSERT_OK(dbfull()->Write(write_opts, &wb));
ReadOptions read_opts;
std::unique_ptr<Iterator> iter1(db_secondary_->NewIterator(read_opts));
iter1->Seek("key0");
ASSERT_FALSE(iter1->Valid());
iter1->Seek("key1");
ASSERT_FALSE(iter1->Valid());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
iter1->Seek("key0");
ASSERT_FALSE(iter1->Valid());
iter1->Seek("key1");
ASSERT_FALSE(iter1->Valid());
std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(read_opts));
iter2->Seek("key0");
ASSERT_TRUE(iter2->Valid());
ASSERT_EQ("value0", iter2->value());
iter2->Seek("key1");
ASSERT_TRUE(iter2->Valid());
ASSERT_EQ("value1", iter2->value());
{
WriteBatch wb1;
wb1.Put("key0", "value01");
wb1.Put("key1", "value11");
ASSERT_OK(dbfull()->Write(write_opts, &wb1));
}
{
WriteBatch wb2;
wb2.Put("key0", "new_value0");
wb2.Delete("key1");
ASSERT_OK(dbfull()->Write(write_opts, &wb2));
}
ASSERT_OK(Flush());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
std::unique_ptr<Iterator> iter3(db_secondary_->NewIterator(read_opts));
// iter3 should not see value01 and value11 at all.
iter3->Seek("key0");
ASSERT_TRUE(iter3->Valid());
ASSERT_EQ("new_value0", iter3->value());
iter3->Seek("key1");
ASSERT_FALSE(iter3->Valid());
}
#endif //! ROCKSDB_LITE
} // namespace rocksdb

@ -638,4 +638,22 @@ Status InstallMemtableAtomicFlushResults(
return s;
}
void MemTableList::RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete) {
assert(to_delete != nullptr);
InstallNewVersion();
auto& memlist = current_->memlist_;
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* mem = *it;
if (mem->GetNextLogNumber() > log_number) {
break;
}
current_->Remove(mem, to_delete);
--num_flush_not_started_;
if (0 == num_flush_not_started_) {
imm_flush_needed.store(false, std::memory_order_release);
}
}
}
} // namespace rocksdb

@ -294,6 +294,13 @@ class MemTableList {
}
}
// Used only by DBImplSecondary during log replay.
// Remove memtables whose data were written before the WAL with log_number
// was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are
// not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
private:
friend Status InstallMemtableAtomicFlushResults(
const autovector<MemTableList*>* imm_lists,

Loading…
Cancel
Save