diff --git a/HISTORY.md b/HISTORY.md index d45e94bb6..f67a8210d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### New Features * Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature. * Add an option `unordered_write` which trades snapshot guarantees with higher write throughput. When used with WRITE_PREPARED transactions, it offers higher throughput with however no compromise on guarantees. +* Allow DBImplSecondary to remove memtables with obsolete data after replaying MANIFEST and WAL. ### Performance Improvements * Reduce binary search when iterator reseek into the same data block. diff --git a/db/db_impl.h b/db/db_impl.h index c4fae9a6a..08cb19491 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1078,8 +1078,8 @@ class DBImpl : public DB { JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); // REQUIRES: log_numbers are sorted in ascending order - virtual Status RecoverLogFiles(const std::vector& log_numbers, - SequenceNumber* next_sequence, bool read_only); + Status RecoverLogFiles(const std::vector& log_numbers, + SequenceNumber* next_sequence, bool read_only); // The following two methods are used to flush a memtable to // storage. The first one is used at database RecoveryTime (when the diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc index 007910ea5..5dfa2d0c9 100644 --- a/db/db_impl_secondary.cc +++ b/db/db_impl_secondary.cc @@ -18,7 +18,6 @@ namespace rocksdb { #ifndef ROCKSDB_LITE - DBImplSecondary::DBImplSecondary(const DBOptions& db_options, const std::string& dbname) : DBImpl(db_options, dbname) { @@ -35,6 +34,7 @@ Status DBImplSecondary::Recover( bool /*error_if_data_exists_in_logs*/) { mutex_.AssertHeld(); + JobContext job_context(0); Status s; s = static_cast(versions_.get()) ->Recover(column_families, &manifest_reader_, &manifest_reporter_, @@ -59,11 +59,29 @@ Status DBImplSecondary::Recover( single_column_family_mode_ = versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; - s = FindAndRecoverLogFiles(); + std::unordered_set cfds_changed; + s = FindAndRecoverLogFiles(&cfds_changed, &job_context); } // TODO: update options_file_number_ needed? + job_context.Clean(); + return s; +} + +// find new WAL and apply them in order to the secondary instance +Status DBImplSecondary::FindAndRecoverLogFiles( + std::unordered_set* cfds_changed, + JobContext* job_context) { + assert(nullptr != cfds_changed); + assert(nullptr != job_context); + Status s; + std::vector logs; + s = FindNewLogNumbers(&logs); + if (s.ok() && !logs.empty()) { + SequenceNumber next_sequence(kMaxSequenceNumber); + s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context); + } return s; } @@ -151,7 +169,10 @@ Status DBImplSecondary::MaybeInitLogReader( // REQUIRES: log_numbers are sorted in ascending order Status DBImplSecondary::RecoverLogFiles( const std::vector& log_numbers, SequenceNumber* next_sequence, - bool /*read_only*/) { + std::unordered_set* cfds_changed, + JobContext* job_context) { + assert(nullptr != cfds_changed); + assert(nullptr != job_context); mutex_.AssertHeld(); Status status; for (auto log_number : log_numbers) { @@ -184,6 +205,39 @@ Status DBImplSecondary::RecoverLogFiles( continue; } WriteBatchInternal::SetContents(&batch, record); + std::vector column_family_ids; + status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); + if (status.ok()) { + SequenceNumber seq = versions_->LastSequence(); + for (const auto id : column_family_ids) { + ColumnFamilyData* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(id); + if (cfd == nullptr) { + continue; + } + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + auto curr_log_num = port::kMaxUint64; + if (cfd_to_current_log_.count(cfd) > 0) { + curr_log_num = cfd_to_current_log_[cfd]; + } + // If the active memtable contains records added by replaying an + // earlier WAL, then we need to seal the memtable, add it to the + // immutable memtable list and create a new active memtable. + if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 || + curr_log_num != log_number)) { + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + MemTable* new_mem = + cfd->ConstructNewMemtable(mutable_cf_options, seq); + cfd->mem()->SetNextLogNumber(log_number); + cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); + new_mem->Ref(); + cfd->SetMemtable(new_mem); + } + } + } // do not check sequence number because user may toggle disableWAL // between writes which breaks sequence number continuity guarantee @@ -194,12 +248,30 @@ Status DBImplSecondary::RecoverLogFiles( // That's why we set ignore missing column families to true // passing null flush_scheduler will disable memtable flushing which is // needed for secondary instances - bool has_valid_writes = false; - status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), nullptr /* flush_scheduler */, - true, log_number, this, false /* concurrent_memtable_writes */, - next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); - if (!status.ok()) { + if (status.ok()) { + bool has_valid_writes = false; + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), + nullptr /* flush_scheduler */, true, log_number, this, + false /* concurrent_memtable_writes */, next_sequence, + &has_valid_writes, seq_per_batch_, batch_per_txn_); + } + if (status.ok()) { + for (const auto id : column_family_ids) { + ColumnFamilyData* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(id); + if (cfd == nullptr) { + continue; + } + std::unordered_map::iterator iter = + cfd_to_current_log_.find(cfd); + if (iter == cfd_to_current_log_.end()) { + cfd_to_current_log_.insert({cfd, log_number}); + } else if (log_number > iter->second) { + iter->second = log_number; + } + } + } else { // We are treating this as a failure while reading since we read valid // blocks that do not form coherent data reader->GetReporter()->Corruption(record.size(), status); @@ -296,18 +368,6 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options, return s; } -// find new WAL and apply them in order to the secondary instance -Status DBImplSecondary::FindAndRecoverLogFiles() { - Status s; - std::vector logs; - s = FindNewLogNumbers(&logs); - if (s.ok() && !logs.empty()) { - SequenceNumber next_sequence(kMaxSequenceNumber); - s = RecoverLogFiles(logs, &next_sequence, true /*read_only*/); - } - return s; -} - Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) { if (read_options.managed) { @@ -393,20 +453,25 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { Status s; // read the manifest and apply new changes to the secondary instance std::unordered_set cfds_changed; + JobContext job_context(0, true /*create_superversion*/); InstrumentedMutexLock lock_guard(&mutex_); s = static_cast(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); + // list wal_dir to discover new WALs and apply new changes to the secondary + // instance + if (s.ok()) { + s = FindAndRecoverLogFiles(&cfds_changed, &job_context); + } if (s.ok()) { - SuperVersionContext sv_context(true /* create_superversion */); for (auto cfd : cfds_changed) { - sv_context.NewSuperVersion(); + cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), + &job_context.memtables_to_free); + auto& sv_context = job_context.superversion_contexts.back(); cfd->InstallSuperVersion(&sv_context, &mutex_); + sv_context.NewSuperVersion(); } - sv_context.Clean(); + job_context.Clean(); } - // list wal_dir to discover new WALs and apply new changes to the secondary - // instance - s = FindAndRecoverLogFiles(); return s; } diff --git a/db/db_impl_secondary.h b/db/db_impl_secondary.h index 32dbae058..912708b1e 100644 --- a/db/db_impl_secondary.h +++ b/db/db_impl_secondary.h @@ -96,40 +96,40 @@ class DBImplSecondary : public DBImpl { Status Put(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, const Slice& /*value*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::Merge; Status Merge(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, const Slice& /*value*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::Delete; Status Delete(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::SingleDelete; Status SingleDelete(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::CompactRange; Status CompactRange(const CompactRangeOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice* /*begin*/, const Slice* /*end*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::CompactFiles; @@ -140,32 +140,32 @@ class DBImplSecondary : public DBImpl { const int /*output_level*/, const int /*output_path_id*/ = -1, std::vector* const /*output_file_names*/ = nullptr, CompactionJobInfo* /*compaction_job_info*/ = nullptr) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } Status DisableFileDeletions() override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } Status EnableFileDeletions(bool /*force*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } Status GetLiveFiles(std::vector&, uint64_t* /*manifest_file_size*/, bool /*flush_memtable*/ = true) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::Flush; Status Flush(const FlushOptions& /*options*/, ColumnFamilyHandle* /*column_family*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DBImpl::SyncWAL; Status SyncWAL() override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } using DB::IngestExternalFile; @@ -173,7 +173,7 @@ class DBImplSecondary : public DBImpl { ColumnFamilyHandle* /*column_family*/, const std::vector& /*external_files*/, const IngestExternalFileOptions& /*ingestion_options*/) override { - return Status::NotSupported("Not supported operation in read only mode."); + return Status::NotSupported("Not supported operation in secondary mode."); } // Try to catch up with the primary by reading as much as possible from the @@ -185,6 +185,70 @@ class DBImplSecondary : public DBImpl { Status MaybeInitLogReader(uint64_t log_number, log::FragmentBufferedReader** log_reader); + protected: + class ColumnFamilyCollector : public WriteBatch::Handler { + std::unordered_set column_family_ids_; + + Status AddColumnFamilyId(uint32_t column_family_id) { + if (column_family_ids_.find(column_family_id) == + column_family_ids_.end()) { + column_family_ids_.insert(column_family_id); + } + return Status::OK(); + } + + public: + explicit ColumnFamilyCollector() {} + + ~ColumnFamilyCollector() override {} + + Status PutCF(uint32_t column_family_id, const Slice&, + const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status DeleteCF(uint32_t column_family_id, const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status DeleteRangeCF(uint32_t column_family_id, const Slice&, + const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status MergeCF(uint32_t column_family_id, const Slice&, + const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status PutBlobIndexCF(uint32_t column_family_id, const Slice&, + const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + const std::unordered_set& column_families() const { + return column_family_ids_; + } + }; + + Status CollectColumnFamilyIdsFromWriteBatch( + const WriteBatch& batch, std::vector* column_family_ids) { + assert(column_family_ids != nullptr); + column_family_ids->clear(); + ColumnFamilyCollector handler; + Status s = batch.Iterate(&handler); + if (s.ok()) { + for (const auto& cf : handler.column_families()) { + column_family_ids->push_back(cf); + } + } + return s; + } + private: friend class DB; @@ -194,19 +258,25 @@ class DBImplSecondary : public DBImpl { using DBImpl::Recover; - Status FindAndRecoverLogFiles(); + Status FindAndRecoverLogFiles( + std::unordered_set* cfds_changed, + JobContext* job_context); Status FindNewLogNumbers(std::vector* logs); Status RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, - bool read_only) override; + std::unordered_set* cfds_changed, + JobContext* job_context); std::unique_ptr manifest_reader_; std::unique_ptr manifest_reporter_; std::unique_ptr manifest_reader_status_; - // cache log readers for each log number, used for continue WAL replay + // Cache log readers for each log number, used for continue WAL replay // after recovery std::map> log_readers_; + + // Current WAL number replayed for each column family. + std::unordered_map cfd_to_current_log_; }; } // namespace rocksdb diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc index 60ea5ba8d..a4267c7d5 100644 --- a/db/db_secondary_test.cc +++ b/db/db_secondary_test.cc @@ -243,6 +243,11 @@ TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) { ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); verify_db_func("new_foo_value", "new_bar_value"); + + ASSERT_OK(Flush()); + ASSERT_OK(Put("foo", "new_foo_value_1")); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value_1", "new_bar_value"); } TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) { @@ -519,6 +524,131 @@ TEST_F(DBSecondaryTest, SwitchManifest) { ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); range_scan_db(); } + +TEST_F(DBSecondaryTest, SwitchWAL) { + const int kNumKeysPerMemtable = 1; + const std::string kCFName1 = "pikachu"; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + CreateAndReopenWithCF({kCFName1}, options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondaryWithColumnFamilies({kCFName1}, options1); + ASSERT_EQ(2, handles_secondary_.size()); + + const auto& verify_db = [](DB* db1, + const std::vector& handles1, + DB* db2, + const std::vector& handles2) { + ASSERT_NE(nullptr, db1); + ASSERT_NE(nullptr, db2); + ReadOptions read_opts; + read_opts.verify_checksums = true; + ASSERT_EQ(handles1.size(), handles2.size()); + for (size_t i = 0; i != handles1.size(); ++i) { + std::unique_ptr it1(db1->NewIterator(read_opts, handles1[i])); + std::unique_ptr it2(db2->NewIterator(read_opts, handles2[i])); + it1->SeekToFirst(); + it2->SeekToFirst(); + for (; it1->Valid() && it2->Valid(); it1->Next(), it2->Next()) { + ASSERT_EQ(it1->key(), it2->key()); + ASSERT_EQ(it1->value(), it2->value()); + } + ASSERT_FALSE(it1->Valid()); + ASSERT_FALSE(it2->Valid()); + + for (it1->SeekToFirst(); it1->Valid(); it1->Next()) { + std::string value; + ASSERT_OK(db2->Get(read_opts, handles2[i], it1->key(), &value)); + ASSERT_EQ(it1->value(), value); + } + for (it2->SeekToFirst(); it2->Valid(); it2->Next()) { + std::string value; + ASSERT_OK(db1->Get(read_opts, handles1[i], it2->key(), &value)); + ASSERT_EQ(it2->value(), value); + } + } + }; + for (int k = 0; k != 8; ++k) { + ASSERT_OK( + Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + ASSERT_OK( + Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + verify_db(dbfull(), handles_, db_secondary_, handles_secondary_); + } +} + +TEST_F(DBSecondaryTest, CatchUpAfterFlush) { + const int kNumKeysPerMemtable = 16; + Options options; + options.env = env_; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 2; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + Reopen(options); + + Options options1; + options1.env = env_; + options1.max_open_files = -1; + OpenSecondary(options1); + + WriteOptions write_opts; + WriteBatch wb; + wb.Put("key0", "value0"); + wb.Put("key1", "value1"); + ASSERT_OK(dbfull()->Write(write_opts, &wb)); + ReadOptions read_opts; + std::unique_ptr iter1(db_secondary_->NewIterator(read_opts)); + iter1->Seek("key0"); + ASSERT_FALSE(iter1->Valid()); + iter1->Seek("key1"); + ASSERT_FALSE(iter1->Valid()); + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + iter1->Seek("key0"); + ASSERT_FALSE(iter1->Valid()); + iter1->Seek("key1"); + ASSERT_FALSE(iter1->Valid()); + std::unique_ptr iter2(db_secondary_->NewIterator(read_opts)); + iter2->Seek("key0"); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ("value0", iter2->value()); + iter2->Seek("key1"); + ASSERT_TRUE(iter2->Valid()); + ASSERT_EQ("value1", iter2->value()); + + { + WriteBatch wb1; + wb1.Put("key0", "value01"); + wb1.Put("key1", "value11"); + ASSERT_OK(dbfull()->Write(write_opts, &wb1)); + } + + { + WriteBatch wb2; + wb2.Put("key0", "new_value0"); + wb2.Delete("key1"); + ASSERT_OK(dbfull()->Write(write_opts, &wb2)); + } + + ASSERT_OK(Flush()); + + ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); + std::unique_ptr iter3(db_secondary_->NewIterator(read_opts)); + // iter3 should not see value01 and value11 at all. + iter3->Seek("key0"); + ASSERT_TRUE(iter3->Valid()); + ASSERT_EQ("new_value0", iter3->value()); + iter3->Seek("key1"); + ASSERT_FALSE(iter3->Valid()); +} #endif //! ROCKSDB_LITE } // namespace rocksdb diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 21b44b179..d81b1d4d2 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -638,4 +638,22 @@ Status InstallMemtableAtomicFlushResults( return s; } +void MemTableList::RemoveOldMemTables(uint64_t log_number, + autovector* to_delete) { + assert(to_delete != nullptr); + InstallNewVersion(); + auto& memlist = current_->memlist_; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* mem = *it; + if (mem->GetNextLogNumber() > log_number) { + break; + } + current_->Remove(mem, to_delete); + --num_flush_not_started_; + if (0 == num_flush_not_started_) { + imm_flush_needed.store(false, std::memory_order_release); + } + } +} + } // namespace rocksdb diff --git a/db/memtable_list.h b/db/memtable_list.h index b56ad4932..5df35660a 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -294,6 +294,13 @@ class MemTableList { } } + // Used only by DBImplSecondary during log replay. + // Remove memtables whose data were written before the WAL with log_number + // was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are + // not freed, but put into a vector for future deref and reclamation. + void RemoveOldMemTables(uint64_t log_number, + autovector* to_delete); + private: friend Status InstallMemtableAtomicFlushResults( const autovector* imm_lists,