diff --git a/HISTORY.md b/HISTORY.md index 0f2abcc68..45f03f04e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ ### Bug Fixes * Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected. * Fixed prefix extractor with timestamp issues. +* Fixed a bug in atomic flush: in two-phase commit mode, the minimum WAL log number to keep is incorrect. ### New Features * User defined timestamp feature supports `CompactRange` and `GetApproximateSizes`. diff --git a/db/column_family.cc b/db/column_family.cc index ecbb0e168..0ff01bf42 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -708,9 +708,7 @@ uint64_t ColumnFamilyData::OldestLogToKeep() { auto current_log = GetLogNumber(); if (allow_2pc_) { - autovector empty_list; - auto imm_prep_log = - imm()->PrecomputeMinLogContainingPrepSection(empty_list); + auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection(); auto mem_prep_log = mem()->GetMinLogContainingPrepSection(); if (imm_prep_log > 0 && imm_prep_log < current_log) { diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index c894023b9..496cedfa1 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -14,6 +14,7 @@ #include "file/filename.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/utilities/transaction_db.h" #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/mutexlock.h" @@ -620,6 +621,96 @@ TEST_P(DBFlushTestBlobError, FlushError) { #endif // ROCKSDB_LITE } +#ifndef ROCKSDB_LITE +TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.allow_2pc = true; + options.atomic_flush = GetParam(); + // 64MB so that memtable flush won't be trigger by the small writes. + options.write_buffer_size = (static_cast(64) << 20); + + // Destroy the DB to recreate as a TransactionDB. + Close(); + Destroy(options, true); + + // Create a TransactionDB. + TransactionDB* txn_db = nullptr; + TransactionDBOptions txn_db_opts; + txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED; + ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db)); + ASSERT_NE(txn_db, nullptr); + db_ = txn_db; + + // Create two more columns other than default CF. + std::vector cfs = {"puppy", "kitty"}; + CreateColumnFamilies(cfs, options); + ASSERT_EQ(handles_.size(), 2); + ASSERT_EQ(handles_[0]->GetName(), cfs[0]); + ASSERT_EQ(handles_[1]->GetName(), cfs[1]); + const size_t kNumCfToFlush = options.atomic_flush ? 2 : 1; + + WriteOptions wopts; + TransactionOptions txn_opts; + // txn1 only prepare, but does not commit. + // The WAL containing the prepared but uncommitted data must be kept. + Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr); + // txn2 not only prepare, but also commit. + Transaction* txn2 = txn_db->BeginTransaction(wopts, txn_opts, nullptr); + ASSERT_NE(txn1, nullptr); + ASSERT_NE(txn2, nullptr); + for (size_t i = 0; i < kNumCfToFlush; i++) { + ASSERT_OK(txn1->Put(handles_[i], "k1", "v1")); + ASSERT_OK(txn2->Put(handles_[i], "k2", "v2")); + } + // A txn must be named before prepare. + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn2->SetName("txn2")); + // Prepare writes to WAL, but not to memtable. (WriteCommitted) + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn2->Prepare()); + // Commit writes to memtable. + ASSERT_OK(txn2->Commit()); + delete txn1; + delete txn2; + + // There are still data in memtable not flushed. + // But since data is small enough to reside in the active memtable, + // there are no immutable memtable. + for (size_t i = 0; i < kNumCfToFlush; i++) { + auto cfh = static_cast(handles_[i]); + ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); + ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty()); + } + + // Atomic flush memtables, + // the min log with prepared data should be written to MANIFEST. + std::vector cfs_to_flush(kNumCfToFlush); + for (size_t i = 0; i < kNumCfToFlush; i++) { + cfs_to_flush[i] = handles_[i]; + } + ASSERT_OK(txn_db->Flush(FlushOptions(), cfs_to_flush)); + + // There are no remaining data in memtable after flush. + for (size_t i = 0; i < kNumCfToFlush; i++) { + auto cfh = static_cast(handles_[i]); + ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); + ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); + ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush); + } + + // The recovered min log number with prepared data should be non-zero. + // In 2pc mode, MinLogNumberToKeep returns the + // VersionSet::min_log_number_to_keep_2pc recovered from MANIFEST, if it's 0, + // it means atomic flush didn't write the min_log_number_to_keep to MANIFEST. + cfs.push_back(kDefaultColumnFamilyName); + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + DBImpl* db_impl = reinterpret_cast(db_); + ASSERT_TRUE(db_impl->allow_2pc()); + ASSERT_NE(db_impl->MinLogNumberToKeep(), 0); +} +#endif // ROCKSDB_LITE + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; @@ -634,13 +725,22 @@ TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { for (size_t i = 0; i != num_cfs; ++i) { ASSERT_OK(Put(static_cast(i) /*cf*/, "key", "value", wopts)); } + + for (size_t i = 0; i != num_cfs; ++i) { + auto cfh = static_cast(handles_[i]); + ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); + ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty()); + } + std::vector cf_ids; for (size_t i = 0; i != num_cfs; ++i) { cf_ids.emplace_back(static_cast(i)); } ASSERT_OK(Flush(cf_ids)); + for (size_t i = 0; i != num_cfs; ++i) { auto cfh = static_cast(handles_[i]); + ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush); ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed()); ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3ed6d257d..7c743235c 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2234,6 +2234,12 @@ extern uint64_t PrecomputeMinLogNumberToKeep2PC( const autovector& edit_list, const autovector& memtables_to_flush, LogsWithPrepTracker* prep_tracker); +// For atomic flush. +extern uint64_t PrecomputeMinLogNumberToKeep2PC( + VersionSet* vset, const autovector& cfds_to_flush, + const autovector>& edit_lists, + const autovector*>& memtables_to_flush, + LogsWithPrepTracker* prep_tracker); // In non-2PC mode, WALs with log number < the returned number can be // deleted after the cfd_to_flush column family is flushed successfully. @@ -2251,6 +2257,10 @@ extern uint64_t PrecomputeMinLogNumberToKeepNon2PC( extern uint64_t FindMinPrepLogReferencedByMemTable( VersionSet* vset, const ColumnFamilyData* cfd_to_flush, const autovector& memtables_to_flush); +// For atomic flush. +extern uint64_t FindMinPrepLogReferencedByMemTable( + VersionSet* vset, const autovector& cfds_to_flush, + const autovector*>& memtables_to_flush); // Fix user-supplied options to be reasonable template diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index be8018acf..50d59b17d 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -584,7 +584,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = InstallMemtableAtomicFlushResults( nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list, - versions_.get(), &mutex_, tmp_file_meta, + versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta, &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer); } diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 35eb0e182..f5e19dc18 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -659,13 +659,15 @@ uint64_t FindMinPrepLogReferencedByMemTable( // we must look through the memtables for two phase transactions // that have been committed but not yet flushed + std::unordered_set memtables_to_flush_set( + memtables_to_flush.begin(), memtables_to_flush.end()); for (auto loop_cfd : *vset->GetColumnFamilySet()) { if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { continue; } auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection( - memtables_to_flush); + &memtables_to_flush_set); if (log > 0 && (min_log == 0 || log < min_log)) { min_log = log; @@ -681,6 +683,37 @@ uint64_t FindMinPrepLogReferencedByMemTable( return min_log; } +uint64_t FindMinPrepLogReferencedByMemTable( + VersionSet* vset, const autovector& cfds_to_flush, + const autovector*>& memtables_to_flush) { + uint64_t min_log = 0; + + std::unordered_set cfds_to_flush_set(cfds_to_flush.begin(), + cfds_to_flush.end()); + std::unordered_set memtables_to_flush_set; + for (const autovector* memtables : memtables_to_flush) { + memtables_to_flush_set.insert(memtables->begin(), memtables->end()); + } + for (auto loop_cfd : *vset->GetColumnFamilySet()) { + if (loop_cfd->IsDropped() || cfds_to_flush_set.count(loop_cfd)) { + continue; + } + + auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection( + &memtables_to_flush_set); + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + + log = loop_cfd->mem()->GetMinLogContainingPrepSection(); + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + uint64_t PrecomputeMinLogNumberToKeepNon2PC( VersionSet* vset, const ColumnFamilyData& cfd_to_flush, const autovector& edit_list) { @@ -788,6 +821,38 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( return min_log_number_to_keep; } +uint64_t PrecomputeMinLogNumberToKeep2PC( + VersionSet* vset, const autovector& cfds_to_flush, + const autovector>& edit_lists, + const autovector*>& memtables_to_flush, + LogsWithPrepTracker* prep_tracker) { + assert(vset != nullptr); + assert(prep_tracker != nullptr); + assert(cfds_to_flush.size() == edit_lists.size()); + assert(cfds_to_flush.size() == memtables_to_flush.size()); + + uint64_t min_log_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfds_to_flush, edit_lists); + + uint64_t min_log_in_prep_heap = + prep_tracker->FindMinLogContainingOutstandingPrep(); + + if (min_log_in_prep_heap != 0 && + min_log_in_prep_heap < min_log_number_to_keep) { + min_log_number_to_keep = min_log_in_prep_heap; + } + + uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( + vset, cfds_to_flush, memtables_to_flush); + + if (min_log_refed_by_mem != 0 && + min_log_refed_by_mem < min_log_number_to_keep) { + min_log_number_to_keep = min_log_refed_by_mem; + } + + return min_log_number_to_keep; +} + Status DBImpl::SetDBId() { Status s; // Happens when immutable_db_options_.write_dbid_to_manifest is set to true diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 2ac569f77..8b8cde7f8 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -412,8 +412,9 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { Status s = InstallMemtableAtomicFlushResults( nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list, - versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free, - nullptr /* db_directory */, nullptr /* log_buffer */); + versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs, + &job_context.memtables_to_free, nullptr /* db_directory */, + nullptr /* log_buffer */); ASSERT_OK(s); mutex_.Unlock(); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index ffb4d7502..53c373712 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -674,20 +674,11 @@ void MemTableList::RemoveMemTablesOrRestoreFlags( } uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( - const autovector& memtables_to_flush) { + const std::unordered_set* memtables_to_flush) { uint64_t min_log = 0; for (auto& m : current_->memlist_) { - // Assume the list is very short, we can live with O(m*n). We can optimize - // if the performance has some problem. - bool should_skip = false; - for (MemTable* m_to_flush : memtables_to_flush) { - if (m == m_to_flush) { - should_skip = true; - break; - } - } - if (should_skip) { + if (memtables_to_flush && memtables_to_flush->count(m)) { continue; } @@ -707,7 +698,8 @@ Status InstallMemtableAtomicFlushResults( const autovector& cfds, const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, - InstrumentedMutex* mu, const autovector& file_metas, + LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, + const autovector& file_metas, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer) { AutoThreadOperationStageUpdater stage_updater( @@ -752,14 +744,21 @@ Status InstallMemtableAtomicFlushResults( edit_lists.emplace_back(edits); } - // TODO(cc): after https://github.com/facebook/rocksdb/pull/7570, handle 2pc - // here. + WalNumber min_wal_number_to_keep = 0; + if (vset->db_options()->allow_2pc) { + min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( + vset, cfds, edit_lists, mems_list, prep_tracker); + edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep); + } + std::unique_ptr wal_deletion; - if (vset->db_options()->track_and_verify_wals_in_manifest) { - uint64_t min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); - const auto& wals = vset->GetWalSet().GetWals(); - if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) { + if (vset->db_options()->track_and_verify_wals_in_manifest && + !vset->GetWalSet().GetWals().empty()) { + if (!vset->db_options()->allow_2pc) { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); + } + if (min_wal_number_to_keep > vset->GetWalSet().GetWals().begin()->first) { wal_deletion.reset(new VersionEdit); wal_deletion->DeleteWalsBefore(min_wal_number_to_keep); edit_lists.back().push_back(wal_deletion.get()); diff --git a/db/memtable_list.h b/db/memtable_list.h index 62e03cf53..814dbd9f9 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -138,8 +138,8 @@ class MemTableListVersion { const autovector& cfds, const autovector& mutable_cf_options_list, const autovector*>& mems_list, - VersionSet* vset, InstrumentedMutex* mu, - const autovector& file_meta, + VersionSet* vset, LogsWithPrepTracker* prep_tracker, + InstrumentedMutex* mu, const autovector& file_meta, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); @@ -335,7 +335,7 @@ class MemTableList { // Returns the min log containing the prep section after memtables listsed in // `memtables_to_flush` are flushed and their status is persisted in manifest. uint64_t PrecomputeMinLogContainingPrepSection( - const autovector& memtables_to_flush); + const std::unordered_set* memtables_to_flush = nullptr); uint64_t GetEarliestMemTableID() const { auto& memlist = current_->memlist_; @@ -381,8 +381,8 @@ class MemTableList { const autovector& cfds, const autovector& mutable_cf_options_list, const autovector*>& mems_list, - VersionSet* vset, InstrumentedMutex* mu, - const autovector& file_meta, + VersionSet* vset, LogsWithPrepTracker* prep_tracker, + InstrumentedMutex* mu, const autovector& file_meta, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); @@ -431,7 +431,8 @@ extern Status InstallMemtableAtomicFlushResults( const autovector& cfds, const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, - InstrumentedMutex* mu, const autovector& file_meta, + LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu, + const autovector& file_meta, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index e3b7eb621..375892e53 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -185,8 +185,9 @@ class MemTableListTest : public testing::Test { InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); return InstallMemtableAtomicFlushResults( - &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex, - file_meta_ptrs, to_delete, nullptr, &log_buffer); + &lists, cfds, mutable_cf_options_list, mems_list, &versions, + nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr, + &log_buffer); } };