diff --git a/HISTORY.md b/HISTORY.md index 55366b006..f645d5cc2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,7 +22,7 @@ * Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress. ### Bug Fixes - +* Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number. ## 6.2.0 (4/30/2019) ### New Features diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc index 586158ef7..a8ea921a2 100644 --- a/db/db_impl_secondary.cc +++ b/db/db_impl_secondary.cc @@ -102,7 +102,7 @@ Status DBImplSecondary::FindNewLogNumbers(std::vector* logs) { // numbers smaller than the smallest log in log_readers_, so there is no // need to pass these logs to RecoverLogFiles uint64_t log_number_min = 0; - if (log_readers_.size() > 0) { + if (!log_readers_.empty()) { log_number_min = log_readers_.begin()->first; } for (size_t i = 0; i < filenames.size(); i++) { @@ -202,11 +202,19 @@ Status DBImplSecondary::RecoverLogFiles( record.size(), Status::Corruption("log record too small")); continue; } + SequenceNumber seq = versions_->LastSequence(); WriteBatchInternal::SetContents(&batch, record); + SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); + // If the write batch's sequence number is smaller than the last sequence + // number of the db, then we should skip this write batch because its + // data must reside in an SST that has already been added in the prior + // MANIFEST replay. + if (seq_of_batch < seq) { + continue; + } std::vector column_family_ids; status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); if (status.ok()) { - SequenceNumber seq = versions_->LastSequence(); for (const auto id : column_family_ids) { ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(id); @@ -235,10 +243,13 @@ Status DBImplSecondary::RecoverLogFiles( cfd->SetMemtable(new_mem); } } + bool has_valid_writes = false; + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), + nullptr /* flush_scheduler */, true, log_number, this, + false /* concurrent_memtable_writes */, next_sequence, + &has_valid_writes, seq_per_batch_, batch_per_txn_); } - // do not check sequence number because user may toggle disableWAL - // between writes which breaks sequence number continuity guarantee - // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the // insert. We don't want to fail the whole write batch in that case -- @@ -246,14 +257,6 @@ Status DBImplSecondary::RecoverLogFiles( // That's why we set ignore missing column families to true // passing null flush_scheduler will disable memtable flushing which is // needed for secondary instances - if (status.ok()) { - bool has_valid_writes = false; - status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), - nullptr /* flush_scheduler */, true, log_number, this, - false /* concurrent_memtable_writes */, next_sequence, - &has_valid_writes, seq_per_batch_, batch_per_txn_); - } if (status.ok()) { for (const auto id : column_family_ids) { ColumnFamilyData* cfd = @@ -269,31 +272,28 @@ Status DBImplSecondary::RecoverLogFiles( iter->second = log_number; } } + auto last_sequence = *next_sequence - 1; + if ((*next_sequence != kMaxSequenceNumber) && + (versions_->LastSequence() <= last_sequence)) { + versions_->SetLastAllocatedSequence(last_sequence); + versions_->SetLastPublishedSequence(last_sequence); + versions_->SetLastSequence(last_sequence); + } } else { // We are treating this as a failure while reading since we read valid // blocks that do not form coherent data reader->GetReporter()->Corruption(record.size(), status); - continue; } } - if (!status.ok()) { return status; } - - auto last_sequence = *next_sequence - 1; - if ((*next_sequence != kMaxSequenceNumber) && - (versions_->LastSequence() <= last_sequence)) { - versions_->SetLastAllocatedSequence(last_sequence); - versions_->SetLastPublishedSequence(last_sequence); - versions_->SetLastSequence(last_sequence); - } } // remove logreaders from map after successfully recovering the WAL if (log_readers_.size() > 1) { - auto eraseIter = log_readers_.begin(); - std::advance(eraseIter, log_readers_.size() - 1); - log_readers_.erase(log_readers_.begin(), eraseIter); + auto erase_iter = log_readers_.begin(); + std::advance(erase_iter, log_readers_.size() - 1); + log_readers_.erase(log_readers_.begin(), erase_iter); } return status; } diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 50a0923b4..23132434f 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -526,6 +526,55 @@ TEST_F(DBSecondaryTest, SwitchManifest) { } TEST_F(DBSecondaryTest, SwitchWAL) { + const int kNumKeysPerMemtable = 1; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + const auto& verify_db = [](DB* db1, DB* db2) { + ASSERT_NE(nullptr, db1); + ASSERT_NE(nullptr, db2); + ReadOptions read_opts; + read_opts.verify_checksums = true; + std::unique_ptr it1(db1->NewIterator(read_opts)); + std::unique_ptr it2(db2->NewIterator(read_opts)); + it1->SeekToFirst(); + it2->SeekToFirst(); + for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { + ASSERT_EQ(it1->key(), it2->key()); + ASSERT_EQ(it1->value(), it2->value()); + } + ASSERT_FALSE(it1->Valid()); + ASSERT_FALSE(it2->Valid()); + + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string value; + ASSERT_OK(db2->Get(read_opts, it1->key(), &value)); + ASSERT_EQ(it1->value(), value); + } + for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { + std::string value; + ASSERT_OK(db1->Get(read_opts, it2->key(), &value)); + ASSERT_EQ(it2->value(), value); + } + }; + for (int k = 0; k != 16; ++k) { + ASSERT_OK(Put("key" + std::to_string(k), "value" + std::to_string(k))); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(dbfull(), db_secondary_); + } +} + +TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { const int kNumKeysPerMemtable = 1; const std::string kCFName1 = "pikachu"; Options options; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 30aafb660..b98fb42c4 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -752,6 +752,19 @@ DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at."); DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024, "Target size of each blob file."); +// Secondary DB instance Options +DEFINE_bool(use_secondary_db, false, + "Open a RocksDB secondary instance. A primary instance can be " + "running in another db_bench process."); + +DEFINE_string(secondary_path, "", + "Path to a directory used by the secondary instance to store " + "private files, e.g. info log."); + +DEFINE_int32(secondary_update_interval, 5, + "Secondary instance attempts to catch up with the primary every " + "secondary_update_interval seconds."); + #endif // ROCKSDB_LITE DEFINE_bool(report_bg_io_stats, false, @@ -2571,36 +2584,38 @@ class Benchmark { return base_name + ToString(id); } -void VerifyDBFromDB(std::string& truth_db_name) { - DBWithColumnFamilies truth_db; - auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db); - if (!s.ok()) { - fprintf(stderr, "open error: %s\n", s.ToString().c_str()); - exit(1); - } - ReadOptions ro; - ro.total_order_seek = true; - std::unique_ptr truth_iter(truth_db.db->NewIterator(ro)); - std::unique_ptr db_iter(db_.db->NewIterator(ro)); - // Verify that all the key/values in truth_db are retrivable in db with ::Get - fprintf(stderr, "Verifying db >= truth_db with ::Get...\n"); - for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) { + void VerifyDBFromDB(std::string& truth_db_name) { + DBWithColumnFamilies truth_db; + auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db); + if (!s.ok()) { + fprintf(stderr, "open error: %s\n", s.ToString().c_str()); + exit(1); + } + ReadOptions ro; + ro.total_order_seek = true; + std::unique_ptr truth_iter(truth_db.db->NewIterator(ro)); + std::unique_ptr db_iter(db_.db->NewIterator(ro)); + // Verify that all the key/values in truth_db are retrivable in db with + // ::Get + fprintf(stderr, "Verifying db >= truth_db with ::Get...\n"); + for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) { std::string value; s = db_.db->Get(ro, truth_iter->key(), &value); assert(s.ok()); // TODO(myabandeh): provide debugging hints assert(Slice(value) == truth_iter->value()); + } + // Verify that the db iterator does not give any extra key/value + fprintf(stderr, "Verifying db == truth_db...\n"); + for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); + db_iter->Next(), truth_iter->Next()) { + assert(truth_iter->Valid()); + assert(truth_iter->value() == db_iter->value()); + } + // No more key should be left unchecked in truth_db + assert(!truth_iter->Valid()); + fprintf(stderr, "...Verified\n"); } - // Verify that the db iterator does not give any extra key/value - fprintf(stderr, "Verifying db == truth_db...\n"); - for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next(), truth_iter->Next()) { - assert(truth_iter->Valid()); - assert(truth_iter->value() == db_iter->value()); - } - // No more key should be left unchecked in truth_db - assert(!truth_iter->Valid()); - fprintf(stderr, "...Verified\n"); -} void Run() { if (!SanityCheck()) { @@ -2934,6 +2949,12 @@ void VerifyDBFromDB(std::string& truth_db_name) { } } + if (secondary_update_thread_) { + secondary_update_stopped_.store(1, std::memory_order_relaxed); + secondary_update_thread_->join(); + secondary_update_thread_.reset(); + } + #ifndef ROCKSDB_LITE if (name != "replay" && FLAGS_trace_file != "") { Status s = db_.db->EndTrace(); @@ -2953,10 +2974,17 @@ void VerifyDBFromDB(std::string& truth_db_name) { ->ToString() .c_str()); } + if (FLAGS_use_secondary_db) { + fprintf(stdout, "Secondary instance updated %" PRIu64 " times.\n", + secondary_db_updates_); + } } private: std::shared_ptr timestamp_emulator_; + std::unique_ptr secondary_update_thread_; + std::atomic secondary_update_stopped_{0}; + uint64_t secondary_db_updates_ = 0; struct ThreadArg { Benchmark* bm; @@ -3618,6 +3646,11 @@ void VerifyDBFromDB(std::string& truth_db_name) { fprintf(stderr, "Cannot use readonly flag with transaction_db\n"); exit(1); } + if (FLAGS_use_secondary_db && + (FLAGS_transaction_db || FLAGS_optimistic_transaction_db)) { + fprintf(stderr, "Cannot use use_secondary_db flag with transaction_db\n"); + exit(1); + } #endif // ROCKSDB_LITE } @@ -3845,6 +3878,32 @@ void VerifyDBFromDB(std::string& truth_db_name) { if (s.ok()) { db->db = ptr; } + } else if (FLAGS_use_secondary_db) { + if (FLAGS_secondary_path.empty()) { + std::string default_secondary_path; + FLAGS_env->GetTestDirectory(&default_secondary_path); + default_secondary_path += "/dbbench_secondary"; + FLAGS_secondary_path = default_secondary_path; + } + s = DB::OpenAsSecondary(options, db_name, FLAGS_secondary_path, &db->db); + if (s.ok() && FLAGS_secondary_update_interval > 0) { + secondary_update_thread_.reset(new port::Thread( + [this](int interval, DBWithColumnFamilies* _db) { + while (0 == secondary_update_stopped_.load( + std::memory_order_relaxed)) { + Status secondary_update_status = + _db->db->TryCatchUpWithPrimary(); + if (!secondary_update_status.ok()) { + fprintf(stderr, "Failed to catch up with primary: %s\n", + secondary_update_status.ToString().c_str()); + break; + } + ++secondary_db_updates_; + FLAGS_env->SleepForMicroseconds(interval * 1000000); + } + }, + FLAGS_secondary_update_interval, db)); + } #endif // ROCKSDB_LITE } else { s = DB::Open(options, db_name, &db->db);