From b9f590065872db9b818874ba4bf4402ddd476cc3 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 30 May 2019 19:29:34 -0700 Subject: [PATCH] Fix WAL replay by skipping old write batches (#5170) Summary: 1. Fix a bug in WAL replay in which write batches with old sequence numbers are mistakenly inserted into memtables. 2. Add support for benchmarking secondary instance to db_bench_tool. With changes made in this PR, we can start benchmarking secondary instance using two processes. It is also possible to vary the frequency at which the secondary instance tries to catch up with the primary. The info log of the secondary can be found in a directory whose path can be specified with '-secondary_path'. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5170 Differential Revision: D15564608 Pulled By: riversand963 fbshipit-source-id: ce97688ed3d33f69d3a0b9266ebbbbf887aa0ec8 --- HISTORY.md | 2 +- db/db_impl_secondary.cc | 52 +++++++++---------- db/db_secondary_test.cc | 49 ++++++++++++++++++ tools/db_bench_tool.cc | 107 +++++++++++++++++++++++++++++++--------- 4 files changed, 159 insertions(+), 51 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 55366b006..f645d5cc2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,7 +22,7 @@ * Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress. ### Bug Fixes - +* Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number. ## 6.2.0 (4/30/2019) ### New Features diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc index 586158ef7..a8ea921a2 100644 --- a/db/db_impl_secondary.cc +++ b/db/db_impl_secondary.cc @@ -102,7 +102,7 @@ Status DBImplSecondary::FindNewLogNumbers(std::vector* logs) { // numbers smaller than the smallest log in log_readers_, so there is no // need to pass these logs to RecoverLogFiles uint64_t log_number_min = 0; - if (log_readers_.size() > 0) { + if (!log_readers_.empty()) { log_number_min = log_readers_.begin()->first; } for (size_t i = 0; i < filenames.size(); i++) { @@ -202,11 +202,19 @@ Status DBImplSecondary::RecoverLogFiles( record.size(), Status::Corruption("log record too small")); continue; } + SequenceNumber seq = versions_->LastSequence(); WriteBatchInternal::SetContents(&batch, record); + SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); + // If the write batch's sequence number is smaller than the last sequence + // number of the db, then we should skip this write batch because its + // data must reside in an SST that has already been added in the prior + // MANIFEST replay. + if (seq_of_batch < seq) { + continue; + } std::vector column_family_ids; status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); if (status.ok()) { - SequenceNumber seq = versions_->LastSequence(); for (const auto id : column_family_ids) { ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(id); @@ -235,10 +243,13 @@ Status DBImplSecondary::RecoverLogFiles( cfd->SetMemtable(new_mem); } } + bool has_valid_writes = false; + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), + nullptr /* flush_scheduler */, true, log_number, this, + false /* concurrent_memtable_writes */, next_sequence, + &has_valid_writes, seq_per_batch_, batch_per_txn_); } - // do not check sequence number because user may toggle disableWAL - // between writes which breaks sequence number continuity guarantee - // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the // insert. We don't want to fail the whole write batch in that case -- @@ -246,14 +257,6 @@ Status DBImplSecondary::RecoverLogFiles( // That's why we set ignore missing column families to true // passing null flush_scheduler will disable memtable flushing which is // needed for secondary instances - if (status.ok()) { - bool has_valid_writes = false; - status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), - nullptr /* flush_scheduler */, true, log_number, this, - false /* concurrent_memtable_writes */, next_sequence, - &has_valid_writes, seq_per_batch_, batch_per_txn_); - } if (status.ok()) { for (const auto id : column_family_ids) { ColumnFamilyData* cfd = @@ -269,31 +272,28 @@ Status DBImplSecondary::RecoverLogFiles( iter->second = log_number; } } + auto last_sequence = *next_sequence - 1; + if ((*next_sequence != kMaxSequenceNumber) && + (versions_->LastSequence() <= last_sequence)) { + versions_->SetLastAllocatedSequence(last_sequence); + versions_->SetLastPublishedSequence(last_sequence); + versions_->SetLastSequence(last_sequence); + } } else { // We are treating this as a failure while reading since we read valid // blocks that do not form coherent data reader->GetReporter()->Corruption(record.size(), status); - continue; } } - if (!status.ok()) { return status; } - - auto last_sequence = *next_sequence - 1; - if ((*next_sequence != kMaxSequenceNumber) && - (versions_->LastSequence() <= last_sequence)) { - versions_->SetLastAllocatedSequence(last_sequence); - versions_->SetLastPublishedSequence(last_sequence); - versions_->SetLastSequence(last_sequence); - } } // remove logreaders from map after successfully recovering the WAL if (log_readers_.size() > 1) { - auto eraseIter = log_readers_.begin(); - std::advance(eraseIter, log_readers_.size() - 1); - log_readers_.erase(log_readers_.begin(), eraseIter); + auto erase_iter = log_readers_.begin(); + std::advance(erase_iter, log_readers_.size() - 1); + log_readers_.erase(log_readers_.begin(), erase_iter); } return status; } diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 50a0923b4..23132434f 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -526,6 +526,55 @@ TEST_F(DBSecondaryTest, SwitchManifest) { } TEST_F(DBSecondaryTest, SwitchWAL) { + const int kNumKeysPerMemtable = 1; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + const auto& verify_db = [](DB* db1, DB* db2) { + ASSERT_NE(nullptr, db1); + ASSERT_NE(nullptr, db2); + ReadOptions read_opts; + read_opts.verify_checksums = true; + std::unique_ptr it1(db1->NewIterator(read_opts)); + std::unique_ptr it2(db2->NewIterator(read_opts)); + it1->SeekToFirst(); + it2->SeekToFirst(); + for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { + ASSERT_EQ(it1->key(), it2->key()); + ASSERT_EQ(it1->value(), it2->value()); + } + ASSERT_FALSE(it1->Valid()); + ASSERT_FALSE(it2->Valid()); + + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string value; + ASSERT_OK(db2->Get(read_opts, it1->key(), &value)); + ASSERT_EQ(it1->value(), value); + } + for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { + std::string value; + ASSERT_OK(db1->Get(read_opts, it2->key(), &value)); + ASSERT_EQ(it2->value(), value); + } + }; + for (int k = 0; k != 16; ++k) { + ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k))); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(dbfull(), db_secondary_); + } +} + +TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { const int kNumKeysPerMemtable = 1; const std::string kCFName1 = "pikachu"; Options options; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 30aafb660..b98fb42c4 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -752,6 +752,19 @@ DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at."); DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024, "Target size of each blob file."); +// Secondary DB instance Options +DEFINE_bool(use_secondary_db, false, + "Open a RocksDB secondary instance. A primary instance can be " + "running in another db_bench process."); + +DEFINE_string(secondary_path, "", + "Path to a directory used by the secondary instance to store " + "private files, e.g. info log."); + +DEFINE_int32(secondary_update_interval, 5, + "Secondary instance attempts to catch up with the primary every " + "secondary_update_interval seconds."); + #endif // ROCKSDB_LITE DEFINE_bool(report_bg_io_stats, false, @@ -2571,36 +2584,38 @@ class Benchmark { return base_name + ToString(id); } -void VerifyDBFromDB(std::string& truth_db_name) { - DBWithColumnFamilies truth_db; - auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db); - if (!s.ok()) { - fprintf(stderr, "open error: %s\n", s.ToString().c_str()); - exit(1); - } - ReadOptions ro; - ro.total_order_seek = true; - std::unique_ptr truth_iter(truth_db.db->NewIterator(ro)); - std::unique_ptr db_iter(db_.db->NewIterator(ro)); - // Verify that all the key/values in truth_db are retrivable in db with ::Get - fprintf(stderr, "Verifying db >= truth_db with ::Get...\n"); - for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) { + void VerifyDBFromDB(std::string& truth_db_name) { + DBWithColumnFamilies truth_db; + auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db); + if (!s.ok()) { + fprintf(stderr, "open error: %s\n", s.ToString().c_str()); + exit(1); + } + ReadOptions ro; + ro.total_order_seek = true; + std::unique_ptr truth_iter(truth_db.db->NewIterator(ro)); + std::unique_ptr db_iter(db_.db->NewIterator(ro)); + // Verify that all the key/values in truth_db are retrivable in db with + // ::Get + fprintf(stderr, "Verifying db >= truth_db with ::Get...\n"); + for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) { std::string value; s = db_.db->Get(ro, truth_iter->key(), &value); assert(s.ok()); // TODO(myabandeh): provide debugging hints assert(Slice(value) == truth_iter->value()); + } + // Verify that the db iterator does not give any extra key/value + fprintf(stderr, "Verifying db == truth_db...\n"); + for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); + db_iter->Next(), truth_iter->Next()) { + assert(truth_iter->Valid()); + assert(truth_iter->value() == db_iter->value()); + } + // No more key should be left unchecked in truth_db + assert(!truth_iter->Valid()); + fprintf(stderr, "...Verified\n"); } - // Verify that the db iterator does not give any extra key/value - fprintf(stderr, "Verifying db == truth_db...\n"); - for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next(), truth_iter->Next()) { - assert(truth_iter->Valid()); - assert(truth_iter->value() == db_iter->value()); - } - // No more key should be left unchecked in truth_db - assert(!truth_iter->Valid()); - fprintf(stderr, "...Verified\n"); -} void Run() { if (!SanityCheck()) { @@ -2934,6 +2949,12 @@ void VerifyDBFromDB(std::string& truth_db_name) { } } + if (secondary_update_thread_) { + secondary_update_stopped_.store(1, std::memory_order_relaxed); + secondary_update_thread_->join(); + secondary_update_thread_.reset(); + } + #ifndef ROCKSDB_LITE if (name != "replay" && FLAGS_trace_file != "") { Status s = db_.db->EndTrace(); @@ -2953,10 +2974,17 @@ void VerifyDBFromDB(std::string& truth_db_name) { ->ToString() .c_str()); } + if (FLAGS_use_secondary_db) { + fprintf(stdout, "Secondary instance updated %" PRIu64 " times.\n", + secondary_db_updates_); + } } private: std::shared_ptr timestamp_emulator_; + std::unique_ptr secondary_update_thread_; + std::atomic secondary_update_stopped_{0}; + uint64_t secondary_db_updates_ = 0; struct ThreadArg { Benchmark* bm; @@ -3618,6 +3646,11 @@ void VerifyDBFromDB(std::string& truth_db_name) { fprintf(stderr, "Cannot use readonly flag with transaction_db\n"); exit(1); } + if (FLAGS_use_secondary_db && + (FLAGS_transaction_db || FLAGS_optimistic_transaction_db)) { + fprintf(stderr, "Cannot use use_secondary_db flag with transaction_db\n"); + exit(1); + } #endif // ROCKSDB_LITE } @@ -3845,6 +3878,32 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (s.ok()) { db->db = ptr; } + } else if (FLAGS_use_secondary_db) { + if (FLAGS_secondary_path.empty()) { + std::string default_secondary_path; + FLAGS_env->GetTestDirectory(&default_secondary_path); + default_secondary_path += "/dbbench_secondary"; + FLAGS_secondary_path = default_secondary_path; + } + s = DB::OpenAsSecondary(options, db_name, FLAGS_secondary_path, &db->db); + if (s.ok() && FLAGS_secondary_update_interval > 0) { + secondary_update_thread_.reset(new port::Thread( + [this](int interval, DBWithColumnFamilies* _db) { + while (0 == secondary_update_stopped_.load( + std::memory_order_relaxed)) { + Status secondary_update_status = + _db->db->TryCatchUpWithPrimary(); + if (!secondary_update_status.ok()) { + fprintf(stderr, "Failed to catch up with primary: %s\n", + secondary_update_status.ToString().c_str()); + break; + } + ++secondary_db_updates_; + FLAGS_env->SleepForMicroseconds(interval * 1000000); + } + }, + FLAGS_secondary_update_interval, db)); + } #endif // ROCKSDB_LITE } else { s = DB::Open(options, db_name, &db->db);