diff --git a/CMakeLists.txt b/CMakeLists.txt index 35d427dfe..5477bca41 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -456,6 +456,7 @@ set(SOURCES db/flush_scheduler.cc db/forward_iterator.cc db/internal_stats.cc + db/logs_with_prep_tracker.cc db/log_reader.cc db/log_writer.cc db/malloc_stats.cc diff --git a/TARGETS b/TARGETS index 356dc5cae..20390f073 100644 --- a/TARGETS +++ b/TARGETS @@ -103,6 +103,7 @@ cpp_library( "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", + "db/logs_with_prep_tracker.cc", "db/malloc_stats.cc", "db/managed_iterator.cc", "db/memtable.cc", @@ -659,11 +660,6 @@ ROCKS_TESTS = [ "utilities/document/document_db_test.cc", "serial", ], - [ - "obsolete_files_test", - "db/obsolete_files_test.cc", - "serial", - ], [ "dynamic_bloom_test", "util/dynamic_bloom_test.cc", @@ -834,6 +830,11 @@ ROCKS_TESTS = [ "utilities/object_registry_test.cc", "serial", ], + [ + "obsolete_files_test", + "db/obsolete_files_test.cc", + "serial", + ], [ "optimistic_transaction_test", "utilities/transactions/optimistic_transaction_test.cc", diff --git a/db/column_family.cc b/db/column_family.cc index 9b5c3ed39..8ea3d1216 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -558,7 +558,9 @@ uint64_t ColumnFamilyData::OldestLogToKeep() { auto current_log = GetLogNumber(); if (allow_2pc_) { - auto imm_prep_log = imm()->GetMinLogContainingPrepSection(); + autovector empty_list; + auto imm_prep_log = + imm()->PrecomputeMinLogContainingPrepSection(empty_list); auto mem_prep_log = mem()->GetMinLogContainingPrepSection(); if (imm_prep_log > 0 && imm_prep_log < current_log) { diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 1efd58862..e6758c2ec 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -72,19 +72,23 @@ TEST_F(DBFlushTest, SyncFail) { auto* cfd = reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(); - int refs_before = cfd->current()->TEST_refs(); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(dbfull()->Flush(flush_options)); + // Flush installs a new super-version. Get the ref count after that. + auto current_before = cfd->current(); + int refs_before = cfd->current()->TEST_refs(); fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); + // Now the background job will do the flush; wait for it. dbfull()->TEST_WaitForFlushMemTable(); #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE - // Flush job should release ref count to current version. + // Backgroun flush job should release ref count to current version. + ASSERT_EQ(current_before, cfd->current()); ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 53c9a9ca7..d190b3490 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -183,7 +183,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, last_stats_dump_time_microsec_(0), next_job_id_(1), has_unpersisted_data_(false), - unable_to_flush_oldest_log_(false), + unable_to_release_oldest_log_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( env_options_, immutable_db_options_)), @@ -3020,5 +3020,4 @@ void DBImpl::WaitForIngestFile() { } #endif // ROCKSDB_LITE - } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index 076c4e17a..9620a53de 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -27,6 +27,7 @@ #include "db/flush_scheduler.h" #include "db/internal_stats.h" #include "db/log_writer.h" +#include "db/logs_with_prep_tracker.h" #include "db/pre_release_callback.h" #include "db/read_callback.h" #include "db/snapshot_checker.h" @@ -354,6 +355,10 @@ class DBImpl : public DB { Arena* arena, RangeDelAggregator* range_del_agg, ColumnFamilyHandle* column_family = nullptr); + LogsWithPrepTracker* logs_with_prep_tracker() { + return &logs_with_prep_tracker_; + } + #ifndef NDEBUG // Extra methods (for testing) that are not in the public DB interface // Implemented in db_impl_debug.cc @@ -365,9 +370,7 @@ class DBImpl : public DB { void TEST_SwitchWAL(); - bool TEST_UnableToFlushOldestLog() { - return unable_to_flush_oldest_log_; - } + bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; } bool TEST_IsLogGettingFlushed() { return alive_log_files_.begin()->getting_flushed; @@ -594,7 +597,7 @@ class DBImpl : public DB { size_t batch_cnt) { recovered_transactions_[name] = new RecoveredTransaction(log, name, batch, seq, batch_cnt); - MarkLogAsContainingPrepSection(log); + logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log); } void DeleteRecoveredTransaction(const std::string& name) { @@ -602,7 +605,7 @@ class DBImpl : public DB { assert(it != recovered_transactions_.end()); auto* trx = it->second; recovered_transactions_.erase(it); - MarkLogAsHavingPrepSectionFlushed(trx->log_number_); + logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(trx->log_number_); delete trx; } @@ -614,8 +617,6 @@ class DBImpl : public DB { recovered_transactions_.clear(); } - void MarkLogAsHavingPrepSectionFlushed(uint64_t log); - void MarkLogAsContainingPrepSection(uint64_t log); void AddToLogsToFreeQueue(log::Writer* log_writer) { logs_to_free_queue_.push_back(log_writer); } @@ -728,8 +729,6 @@ class DBImpl : public DB { uint64_t* seq_used = nullptr, size_t batch_cnt = 0, PreReleaseCallback* pre_release_callback = nullptr); - uint64_t FindMinLogContainingOutstandingPrep(); - uint64_t FindMinPrepLogReferencedByMemTable(); // write cached_recoverable_state_ to memtable if it is not empty // The writer must be the leader in write_thread_ and holding mutex_ Status WriteRecoverableState(); @@ -1302,7 +1301,7 @@ class DBImpl : public DB { // We must attempt to free the dependent memtables again // at a later time after the transaction in the oldest // log is fully commited. - bool unable_to_flush_oldest_log_; + bool unable_to_release_oldest_log_; static const int KEEP_LOG_FILE_NUM = 1000; // MSVC version 1800 still does not have constexpr for ::max() @@ -1339,33 +1338,7 @@ class DBImpl : public DB { // Indicate DB was opened successfully bool opened_successfully_; - // REQUIRES: logs_with_prep_mutex_ held - // - // sorted list of log numbers still containing prepared data. - // this is used by FindObsoleteFiles to determine which - // flushed logs we must keep around because they still - // contain prepared data which has not been committed or rolled back - struct LogCnt { - uint64_t log; // the log number - uint64_t cnt; // number of prepared sections in the log - }; - std::vector logs_with_prep_; - std::mutex logs_with_prep_mutex_; - - // REQUIRES: prepared_section_completed_mutex_ held - // - // to be used in conjunction with logs_with_prep_. - // once a transaction with data in log L is committed or rolled back - // rather than updating logs_with_prep_ directly we keep track of that - // in prepared_section_completed_ which maps LOG -> instance_count. This helps - // avoiding contention between a commit thread and the prepare threads. - // - // when trying to determine the minimum log still active we first - // consult logs_with_prep_. while that root value maps to - // an equal value in prepared_section_completed_ we erase the log from - // both logs_with_prep_ and prepared_section_completed_. - std::unordered_map prepared_section_completed_; - std::mutex prepared_section_completed_mutex_; + LogsWithPrepTracker logs_with_prep_tracker_; // Callback for compaction to check if a key is visible to a snapshot. // REQUIRES: mutex held @@ -1461,6 +1434,25 @@ extern CompressionType GetCompressionFlush( const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options); +// Return the earliest log file to keep after the memtable flush is +// finalized. +// `cfd_to_flush` is the column family whose memtable (specified in +// `memtables_to_flush`) will be flushed and thus will not depend on any WAL +// file. +// The function is only applicable to 2pc mode. +extern uint64_t PrecomputeMinLogNumberToKeep( + VersionSet* vset, const ColumnFamilyData& cfd_to_flush, + autovector edit_list, + const autovector& memtables_to_flush, + LogsWithPrepTracker* prep_tracker); + +// `cfd_to_flush` is the column family whose memtable will be flushed and thus +// will not depend on any WAL file. nullptr means no memtable is being flushed. +// The function is only applicable to 2pc mode. +extern uint64_t FindMinPrepLogReferencedByMemTable( + VersionSet* vset, const ColumnFamilyData* cfd_to_flush, + const autovector& memtables_to_flush); + // Fix user-supplied options to be reasonable template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index a4095de6a..9c43b6ab9 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -160,7 +160,7 @@ Status DBImpl::FlushMemTableToOutputFile( // and EventListener callback will be called when the db_mutex // is unlocked by the current thread. if (s.ok()) { - s = flush_job.Run(&file_meta); + s = flush_job.Run(&logs_with_prep_tracker_, &file_meta); } else { flush_job.Cancel(); } diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 88b752b4c..793abd35f 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -184,17 +184,21 @@ Status DBImpl::TEST_GetAllImmutableCFOptions( } uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() { - return FindMinLogContainingOutstandingPrep(); + return logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep(); } size_t DBImpl::TEST_PreparedSectionCompletedSize() { - return prepared_section_completed_.size(); + return logs_with_prep_tracker_.TEST_PreparedSectionCompletedSize(); } -size_t DBImpl::TEST_LogsWithPrepSize() { return logs_with_prep_.size(); } +size_t DBImpl::TEST_LogsWithPrepSize() { + return logs_with_prep_tracker_.TEST_LogsWithPrepSize(); +} uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { - return FindMinPrepLogReferencedByMemTable(); + autovector empty_list; + return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr, + empty_list); } Status DBImpl::TEST_GetLatestMutableCFOptions( diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 9892c5b9d..6d3b4b996 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -14,124 +14,17 @@ #include #include #include "db/event_helpers.h" +#include "db/memtable_list.h" #include "util/file_util.h" #include "util/sst_file_manager_impl.h" - namespace rocksdb { -uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { - if (!allow_2pc()) { - return 0; - } - - uint64_t min_log = 0; - - // we must look through the memtables for two phase transactions - // that have been committed but not yet flushed - for (auto loop_cfd : *versions_->GetColumnFamilySet()) { - if (loop_cfd->IsDropped()) { - continue; - } - - auto log = loop_cfd->imm()->GetMinLogContainingPrepSection(); - - if (log > 0 && (min_log == 0 || log < min_log)) { - min_log = log; - } - - log = loop_cfd->mem()->GetMinLogContainingPrepSection(); - - if (log > 0 && (min_log == 0 || log < min_log)) { - min_log = log; - } - } - - return min_log; -} - -void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { - assert(log != 0); - std::lock_guard lock(prepared_section_completed_mutex_); - auto it = prepared_section_completed_.find(log); - if (UNLIKELY(it == prepared_section_completed_.end())) { - prepared_section_completed_[log] = 1; - } else { - it->second += 1; - } -} - -void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { - assert(log != 0); - std::lock_guard lock(logs_with_prep_mutex_); - - auto rit = logs_with_prep_.rbegin(); - bool updated = false; - // Most probably the last log is the one that is being marked for - // having a prepare section; so search from the end. - for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) { - if (rit->log == log) { - rit->cnt++; - updated = true; - break; - } - } - if (!updated) { - // We are either at the start, or at a position with rit->log < log - logs_with_prep_.insert(rit.base(), {log, 1}); - } -} - -uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { - std::lock_guard lock(logs_with_prep_mutex_); - auto it = logs_with_prep_.begin(); - // start with the smallest log - for (; it != logs_with_prep_.end();) { - auto min_log = it->log; - { - std::lock_guard lock2(prepared_section_completed_mutex_); - auto completed_it = prepared_section_completed_.find(min_log); - if (completed_it == prepared_section_completed_.end() || - completed_it->second < it->cnt) { - return min_log; - } - assert(completed_it != prepared_section_completed_.end() && - completed_it->second == it->cnt); - prepared_section_completed_.erase(completed_it); - } - // erase from beginning in vector is not efficient but this function is not - // on the fast path. - it = logs_with_prep_.erase(it); - } - // no such log found - return 0; -} - uint64_t DBImpl::MinLogNumberToKeep() { - uint64_t log_number = versions_->MinLogNumber(); - if (allow_2pc()) { - // if are 2pc we must consider logs containing prepared - // sections of outstanding transactions. - // - // We must check min logs with outstanding prep before we check - // logs references by memtables because a log referenced by the - // first data structure could transition to the second under us. - // - // TODO(horuff): iterating over all column families under db mutex. - // should find more optimal solution - auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep(); - - if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) { - log_number = min_log_in_prep_heap; - } - - auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(); - - if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) { - log_number = min_log_refed_by_mem; - } + return versions_->min_log_number_to_keep_2pc(); + } else { + return versions_->MinLogNumberWithUnflushedData(); } - return log_number; } // * Returns the list of live files in 'sst_live' @@ -200,7 +93,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->pending_manifest_file_number = versions_->pending_manifest_file_number(); job_context->log_number = MinLogNumberToKeep(); - job_context->prev_log_number = versions_->prev_log_number(); versions_->AddLiveFiles(&job_context->sst_live); @@ -621,4 +513,94 @@ void DBImpl::DeleteObsoleteFiles() { mutex_.Lock(); } +uint64_t FindMinPrepLogReferencedByMemTable( + VersionSet* vset, const ColumnFamilyData* cfd_to_flush, + const autovector& memtables_to_flush) { + uint64_t min_log = 0; + + // we must look through the memtables for two phase transactions + // that have been committed but not yet flushed + for (auto loop_cfd : *vset->GetColumnFamilySet()) { + if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { + continue; + } + + auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection( + memtables_to_flush); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + + log = loop_cfd->mem()->GetMinLogContainingPrepSection(); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + +uint64_t PrecomputeMinLogNumberToKeep( + VersionSet* vset, const ColumnFamilyData& cfd_to_flush, + autovector edit_list, + const autovector& memtables_to_flush, + LogsWithPrepTracker* prep_tracker) { + assert(vset != nullptr); + assert(prep_tracker != nullptr); + // Calculate updated min_log_number_to_keep + // Since the function should only be called in 2pc mode, log number in + // the version edit should be sufficient. + + // Precompute the min log number containing unflushed data for the column + // family being flushed (`cfd_to_flush`). + uint64_t cf_min_log_number_to_keep = 0; + for (auto& e : edit_list) { + if (e->has_log_number()) { + cf_min_log_number_to_keep = + std::max(cf_min_log_number_to_keep, e->log_number()); + } + } + if (cf_min_log_number_to_keep == 0) { + // No version edit contains information on log number. The log number + // for this column family should stay the same as it is. + cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber(); + } + + // Get min log number containing unflushed data for other column families. + uint64_t min_log_number_to_keep = + vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush); + if (cf_min_log_number_to_keep != 0) { + min_log_number_to_keep = + std::min(cf_min_log_number_to_keep, min_log_number_to_keep); + } + + // if are 2pc we must consider logs containing prepared + // sections of outstanding transactions. + // + // We must check min logs with outstanding prep before we check + // logs references by memtables because a log referenced by the + // first data structure could transition to the second under us. + // + // TODO: iterating over all column families under db mutex. + // should find more optimal solution + auto min_log_in_prep_heap = + prep_tracker->FindMinLogContainingOutstandingPrep(); + + if (min_log_in_prep_heap != 0 && + min_log_in_prep_heap < min_log_number_to_keep) { + min_log_number_to_keep = min_log_in_prep_heap; + } + + uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( + vset, &cfd_to_flush, memtables_to_flush); + + if (min_log_refed_by_mem != 0 && + min_log_refed_by_mem < min_log_number_to_keep) { + min_log_number_to_keep = min_log_refed_by_mem; + } + return min_log_number_to_keep; +} + } // namespace rocksdb diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 616ab3f4f..37d222dd6 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -532,6 +532,13 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool flushed = false; uint64_t corrupted_log_number = kMaxSequenceNumber; for (auto log_number : log_numbers) { + if (log_number < versions_->min_log_number_to_keep_2pc()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Skipping log #%" PRIu64 + " since it is older than min log to keep #%" PRIu64, + log_number, versions_->min_log_number_to_keep_2pc()); + continue; + } // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index c0416fdf9..9b43bd5fc 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1033,28 +1033,34 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { } auto oldest_alive_log = alive_log_files_.begin()->number; - auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep(); - - if (allow_2pc() && - oldest_log_with_uncommited_prep > 0 && - oldest_log_with_uncommited_prep <= oldest_alive_log) { - if (unable_to_flush_oldest_log_) { + bool flush_wont_release_oldest_log = false; + if (allow_2pc()) { + auto oldest_log_with_uncommited_prep = + logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep(); + + assert(oldest_log_with_uncommited_prep == 0 || + oldest_log_with_uncommited_prep >= oldest_alive_log); + if (oldest_log_with_uncommited_prep > 0 && + oldest_log_with_uncommited_prep == oldest_alive_log) { + if (unable_to_release_oldest_log_) { // we already attempted to flush all column families dependent on - // the oldest alive log but the log still contained uncommited transactions. - // the oldest alive log STILL contains uncommited transaction so there - // is still nothing that we can do. + // the oldest alive log but the log still contained uncommited + // transactions so there is still nothing that we can do. return status; - } else { - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Unable to release oldest log due to uncommited transaction"); - unable_to_flush_oldest_log_ = true; + } else { + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Unable to release oldest log due to uncommited transaction"); + unable_to_release_oldest_log_ = true; + flush_wont_release_oldest_log = true; + } } - } else { + } + if (!flush_wont_release_oldest_log) { // we only mark this log as getting flushed if we have successfully // flushed all data in this log. If this log contains outstanding prepared // transactions then we cannot flush this log until those transactions are commited. - unable_to_flush_oldest_log_ = false; + unable_to_release_oldest_log_ = false; alive_log_files_.begin()->getting_flushed = true; } diff --git a/db/db_test_util.h b/db/db_test_util.h index b4d027646..f9d1eca3f 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -451,8 +451,9 @@ class SpecialEnv : public EnvWrapper { return s; } - Status NewSequentialFile(const std::string& f, unique_ptr* r, - const EnvOptions& soptions) override { + virtual Status NewSequentialFile(const std::string& f, + unique_ptr* r, + const EnvOptions& soptions) override { class CountingFile : public SequentialFile { public: CountingFile(unique_ptr&& target, diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 796ef251c..0349bdc8d 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -20,6 +20,106 @@ class DBWALTest : public DBTestBase { DBWALTest() : DBTestBase("/db_wal_test") {} }; +// A SpecialEnv enriched to give more insight about deleted files +class EnrichedSpecialEnv : public SpecialEnv { + public: + explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {} + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) override { + InstrumentedMutexLock l(&env_mutex_); + if (f == skipped_wal) { + deleted_wal_reopened = true; + if (IsWAL(f) && largetest_deleted_wal.size() != 0 && + f.compare(largetest_deleted_wal) <= 0) { + gap_in_wals = true; + } + } + return SpecialEnv::NewSequentialFile(f, r, soptions); + } + Status DeleteFile(const std::string& fname) override { + if (IsWAL(fname)) { + deleted_wal_cnt++; + InstrumentedMutexLock l(&env_mutex_); + // If this is the first WAL, remember its name and skip deleting it. We + // remember its name partly because the application might attempt to + // delete the file again. + if (skipped_wal.size() != 0 && skipped_wal != fname) { + if (largetest_deleted_wal.size() == 0 || + largetest_deleted_wal.compare(fname) < 0) { + largetest_deleted_wal = fname; + } + } else { + skipped_wal = fname; + return Status::OK(); + } + } + return SpecialEnv::DeleteFile(fname); + } + bool IsWAL(const std::string& fname) { + // printf("iswal %s\n", fname.c_str()); + return fname.compare(fname.size() - 3, 3, "log") == 0; + } + + InstrumentedMutex env_mutex_; + // the wal whose actual delete was skipped by the env + std::string skipped_wal = ""; + // the largest WAL that was requested to be deleted + std::string largetest_deleted_wal = ""; + // number of WALs that were successfully deleted + std::atomic deleted_wal_cnt = {0}; + // the WAL whose delete from fs was skipped is reopened during recovery + std::atomic deleted_wal_reopened = {false}; + // whether a gap in the WALs was detected during recovery + std::atomic gap_in_wals = {false}; +}; + +class DBWALTestWithEnrichedEnv : public DBTestBase { + public: + DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") { + enriched_env_ = new EnrichedSpecialEnv(env_->target()); + auto options = CurrentOptions(); + options.env = enriched_env_; + Reopen(options); + delete env_; + // to be deleted by the parent class + env_ = enriched_env_; + } + + protected: + EnrichedSpecialEnv* enriched_env_; +}; + +// Test that the recovery would successfully avoid the gaps between the logs. +// One known scenario that could cause this is that the application issue the +// WAL deletion out of order. For the sake of simplicity in the test, here we +// create the gap by manipulating the env to skip deletion of the first WAL but +// not the ones after it. +TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) { + auto options = last_options_; + // To cause frequent WAL deletion + options.write_buffer_size = 128; + Reopen(options); + + WriteOptions writeOpt = WriteOptions(); + for (int i = 0; i < 128 * 5; i++) { + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); + } + FlushOptions fo; + fo.wait = true; + ASSERT_OK(db_->Flush(fo)); + + // some wals are deleted + ASSERT_NE(0, enriched_env_->deleted_wal_cnt); + // but not the first one + ASSERT_NE(0, enriched_env_->skipped_wal.size()); + + // Test that the WAL that was not deleted will be skipped during recovery + options = last_options_; + Reopen(options); + ASSERT_FALSE(enriched_env_->deleted_wal_reopened); + ASSERT_FALSE(enriched_env_->gap_in_wals); +} + TEST_F(DBWALTest, WAL) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -891,7 +991,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { // Record the offset at this point Env* env = options.env; - int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1; + uint64_t wal_file_id = dbfull()->TEST_LogfileNumber(); std::string fname = LogFileName(dbname_, wal_file_id); uint64_t offset_to_corrupt; ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index f72937792..c70b5c053 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1413,8 +1413,12 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { // fit in L3 but will overlap with compaction so will be added // to L2 but a compaction will trivially move it to L3 // and break LSM consistency - ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); - ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + static std::atomic called = {false}; + if (!called) { + called = true; + ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/flush_job.cc b/db/flush_job.cc index b53b229e4..0e4657984 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -185,7 +185,8 @@ void FlushJob::PickMemTable() { base_->Ref(); // it is likely that we do not need this reference } -Status FlushJob::Run(FileMetaData* file_meta) { +Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, + FileMetaData* file_meta) { db_mutex_->AssertHeld(); assert(pick_memtable_called); AutoThreadOperationStageUpdater stage_run( @@ -226,7 +227,7 @@ Status FlushJob::Run(FileMetaData* file_meta) { TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( - cfd_, mutable_cf_options_, mems_, versions_, db_mutex_, + cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, log_buffer_); } diff --git a/db/flush_job.h b/db/flush_job.h index 81a8de306..c3115c4a6 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -22,6 +22,7 @@ #include "db/internal_stats.h" #include "db/job_context.h" #include "db/log_writer.h" +#include "db/logs_with_prep_tracker.h" #include "db/memtable_list.h" #include "db/snapshot_impl.h" #include "db/version_edit.h" @@ -42,6 +43,7 @@ namespace rocksdb { +class DBImpl; class MemTable; class SnapshotChecker; class TableCache; @@ -71,7 +73,8 @@ class FlushJob { // Require db_mutex held. // Once PickMemTable() is called, either Run() or Cancel() has to be called. void PickMemTable(); - Status Run(FileMetaData* file_meta = nullptr); + Status Run(LogsWithPrepTracker* prep_tracker = nullptr, + FileMetaData* file_meta = nullptr); void Cancel(); TableProperties GetTableProperties() const { return table_properties_; } diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index a2309afe2..4e94001ec 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -150,7 +150,7 @@ TEST_F(FlushJobTest, NonEmpty) { FileMetaData fd; mutex_.Lock(); flush_job.PickMemTable(); - ASSERT_OK(flush_job.Run(&fd)); + ASSERT_OK(flush_job.Run(nullptr, &fd)); mutex_.Unlock(); db_options_.statistics->histogramData(FLUSH_TIME, &hist); ASSERT_GT(hist.average, 0.0); diff --git a/db/internal_stats.h b/db/internal_stats.h index 1cd2b6764..4655c8951 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -19,8 +19,8 @@ class ColumnFamilyData; namespace rocksdb { -class MemTableList; class DBImpl; +class MemTableList; // Config for retrieving a property's value. struct DBPropertyInfo { diff --git a/db/logs_with_prep_tracker.cc b/db/logs_with_prep_tracker.cc new file mode 100644 index 000000000..1082dc102 --- /dev/null +++ b/db/logs_with_prep_tracker.cc @@ -0,0 +1,67 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "db/logs_with_prep_tracker.h" + +#include "port/likely.h" + +namespace rocksdb { +void LogsWithPrepTracker::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { + assert(log != 0); + std::lock_guard lock(prepared_section_completed_mutex_); + auto it = prepared_section_completed_.find(log); + if (UNLIKELY(it == prepared_section_completed_.end())) { + prepared_section_completed_[log] = 1; + } else { + it->second += 1; + } +} + +void LogsWithPrepTracker::MarkLogAsContainingPrepSection(uint64_t log) { + assert(log != 0); + std::lock_guard lock(logs_with_prep_mutex_); + + auto rit = logs_with_prep_.rbegin(); + bool updated = false; + // Most probably the last log is the one that is being marked for + // having a prepare section; so search from the end. + for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) { + if (rit->log == log) { + rit->cnt++; + updated = true; + break; + } + } + if (!updated) { + // We are either at the start, or at a position with rit->log < log + logs_with_prep_.insert(rit.base(), {log, 1}); + } +} + +uint64_t LogsWithPrepTracker::FindMinLogContainingOutstandingPrep() { + std::lock_guard lock(logs_with_prep_mutex_); + auto it = logs_with_prep_.begin(); + // start with the smallest log + for (; it != logs_with_prep_.end();) { + auto min_log = it->log; + { + std::lock_guard lock2(prepared_section_completed_mutex_); + auto completed_it = prepared_section_completed_.find(min_log); + if (completed_it == prepared_section_completed_.end() || + completed_it->second < it->cnt) { + return min_log; + } + assert(completed_it != prepared_section_completed_.end() && + completed_it->second == it->cnt); + prepared_section_completed_.erase(completed_it); + } + // erase from beginning in vector is not efficient but this function is not + // on the fast path. + it = logs_with_prep_.erase(it); + } + // no such log found + return 0; +} +} // namespace rocksdb diff --git a/db/logs_with_prep_tracker.h b/db/logs_with_prep_tracker.h new file mode 100644 index 000000000..639d8f806 --- /dev/null +++ b/db/logs_with_prep_tracker.h @@ -0,0 +1,61 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +// This class is used to track the log files with outstanding prepare entries. +class LogsWithPrepTracker { + public: + // Called when a transaction prepared in `log` has been committed or aborted. + void MarkLogAsHavingPrepSectionFlushed(uint64_t log); + // Called when a transaction is prepared in `log`. + void MarkLogAsContainingPrepSection(uint64_t log); + // Return the earliest log file with outstanding prepare entries. + uint64_t FindMinLogContainingOutstandingPrep(); + size_t TEST_PreparedSectionCompletedSize() { + return prepared_section_completed_.size(); + } + size_t TEST_LogsWithPrepSize() { return logs_with_prep_.size(); } + + private: + // REQUIRES: logs_with_prep_mutex_ held + // + // sorted list of log numbers still containing prepared data. + // this is used by FindObsoleteFiles to determine which + // flushed logs we must keep around because they still + // contain prepared data which has not been committed or rolled back + struct LogCnt { + uint64_t log; // the log number + uint64_t cnt; // number of prepared sections in the log + }; + std::vector logs_with_prep_; + std::mutex logs_with_prep_mutex_; + + // REQUIRES: prepared_section_completed_mutex_ held + // + // to be used in conjunction with logs_with_prep_. + // once a transaction with data in log L is committed or rolled back + // rather than updating logs_with_prep_ directly we keep track of that + // in prepared_section_completed_ which maps LOG -> instance_count. This helps + // avoiding contention between a commit thread and the prepare threads. + // + // when trying to determine the minimum log still active we first + // consult logs_with_prep_. while that root value maps to + // an equal value in prepared_section_completed_ we erase the log from + // both logs_with_prep_ and prepared_section_completed_. + std::unordered_map prepared_section_completed_; + std::mutex prepared_section_completed_mutex_; + +}; +} // namespace rocksdb diff --git a/db/memtable_list.cc b/db/memtable_list.cc index e3cd64cfe..7cb208beb 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -12,6 +12,7 @@ #include #include #include +#include "db/db_impl.h" #include "db/memtable.h" #include "db/version_set.h" #include "monitoring/thread_status_util.h" @@ -322,9 +323,10 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& mems, VersionSet* vset, InstrumentedMutex* mu, - uint64_t file_number, autovector* to_delete, - Directory* db_directory, LogBuffer* log_buffer) { + const autovector& mems, LogsWithPrepTracker* prep_tracker, + VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -361,6 +363,7 @@ Status MemTableList::InstallMemtableFlushResults( uint64_t batch_file_number = 0; size_t batch_count = 0; autovector edit_list; + autovector memtables_to_flush; // enumerate from the last (earliest) element to see how many batch finished for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; @@ -373,11 +376,20 @@ Status MemTableList::InstallMemtableFlushResults( "[%s] Level-0 commit table #%" PRIu64 " started", cfd->GetName().c_str(), m->file_number_); edit_list.push_back(&m->edit_); + memtables_to_flush.push_back(m); } batch_count++; } if (batch_count > 0) { + if (vset->db_options()->allow_2pc) { + assert(edit_list.size() > 0); + // We piggyback the information of earliest log file to keep in the + // manifest entry for the last file flushed. + edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker)); + } + // this can release and reacquire the mutex. s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory); @@ -468,13 +480,21 @@ void MemTableList::InstallNewVersion() { } } -uint64_t MemTableList::GetMinLogContainingPrepSection() { +uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( + const autovector& memtables_to_flush) { uint64_t min_log = 0; for (auto& m : current_->memlist_) { - // this mem has been flushed it no longer - // needs to hold on the its prep section - if (m->flush_completed_) { + // Assume the list is very short, we can live with O(m*n). We can optimize + // if the performance has some problem. + bool should_skip = false; + for (MemTable* m_to_flush : memtables_to_flush) { + if (m == m_to_flush) { + should_skip = true; + break; + } + } + if (should_skip) { continue; } diff --git a/db/memtable_list.h b/db/memtable_list.h index c2ac65a2f..7fd9de7ad 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -13,6 +13,7 @@ #include #include "db/dbformat.h" +#include "db/logs_with_prep_tracker.h" #include "db/memtable.h" #include "db/range_del_aggregator.h" #include "monitoring/instrumented_mutex.h" @@ -210,9 +211,10 @@ class MemTableList { // Commit a successful flush in the manifest file Status InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& m, VersionSet* vset, InstrumentedMutex* mu, - uint64_t file_number, autovector* to_delete, - Directory* db_directory, LogBuffer* log_buffer); + const autovector& m, LogsWithPrepTracker* prep_tracker, + VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). @@ -243,7 +245,10 @@ class MemTableList { size_t* current_memory_usage() { return ¤t_memory_usage_; } - uint64_t GetMinLogContainingPrepSection(); + // Returns the min log containing the prep section after memtables listsed in + // `memtables_to_flush` are flushed and their status is persisted in manifest. + uint64_t PrecomputeMinLogContainingPrepSection( + const autovector& memtables_to_flush); uint64_t GetEarliestMemTableID() const { auto& memlist = current_->memlist_; diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 30e516663..a565eba3c 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -82,10 +82,10 @@ class MemTableListTest : public testing::Test { // Create dummy mutex. InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - - return list->InstallMemtableFlushResults(cfd, mutable_cf_options, m, - &versions, &mutex, 1, to_delete, - nullptr, &log_buffer); + LogsWithPrepTracker dummy_prep_tracker; + return list->InstallMemtableFlushResults( + cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, 1, + to_delete, nullptr, &log_buffer); } }; diff --git a/db/version_edit.cc b/db/version_edit.cc index ebfc10584..dd84d0986 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -30,6 +30,7 @@ enum Tag { kNewFile = 7, // 8 was used for large value refs kPrevLogNumber = 9, + kMinLogNumberToKeep = 10, // these are new formats divergent from open source leveldb kNewFile2 = 100, @@ -44,6 +45,11 @@ enum Tag { enum CustomTag { kTerminate = 1, // The end of customized fields kNeedCompaction = 2, + // Since Manifest is not entirely currently forward-compatible, and the only + // forward-compatbile part is the CutsomtTag of kNewFile, we currently encode + // kMinLogNumberToKeep as part of a CustomTag as a hack. This should be + // removed when manifest becomes forward-comptabile. + kMinLogNumberToKeepHack = 3, kPathId = 65, }; // If this bit for the custom tag is set, opening DB should fail if @@ -63,12 +69,14 @@ void VersionEdit::Clear() { last_sequence_ = 0; next_file_number_ = 0; max_column_family_ = 0; + min_log_number_to_keep_ = 0; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; has_max_column_family_ = false; + has_min_log_number_to_keep_ = false; deleted_files_.clear(); new_files_.clear(); column_family_ = 0; @@ -97,19 +105,19 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (has_max_column_family_) { PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_); } - for (const auto& deleted : deleted_files_) { PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, deleted.second /* file number */); } + bool min_log_num_written = false; for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; if (!f.smallest.Valid() || !f.largest.Valid()) { return false; } bool has_customized_fields = false; - if (f.marked_for_compaction) { + if (f.marked_for_compaction || has_min_log_number_to_keep_) { PutVarint32(dst, kNewFile4); has_customized_fields = true; } else if (f.fd.GetPathId() == 0) { @@ -165,6 +173,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const { char p = static_cast(1); PutLengthPrefixedSlice(dst, Slice(&p, 1)); } + if (has_min_log_number_to_keep_ && !min_log_num_written) { + PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack); + std::string varint_log_number; + PutFixed64(&varint_log_number, min_log_number_to_keep_); + PutLengthPrefixedSlice(dst, Slice(varint_log_number)); + min_log_num_written = true; + } TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields", dst); @@ -218,6 +233,9 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { uint64_t number; uint32_t path_id = 0; uint64_t file_size; + // Since this is the only forward-compatible part of the code, we hack new + // extension into this record. When we do, we set this boolean to distinguish + // the record from the normal NewFile records. if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetInternalKey(input, &f.largest) && @@ -252,6 +270,14 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } f.marked_for_compaction = (field[0] == 1); break; + case kMinLogNumberToKeepHack: + // This is a hack to encode kMinLogNumberToKeep in a + // forward-compatbile fashion. + if (!GetFixed64(&field, &min_log_number_to_keep_)) { + return "deleted log number malformatted"; + } + has_min_log_number_to_keep_ = true; + break; default: if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) { // Should not proceed if cannot understand it @@ -331,6 +357,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kMinLogNumberToKeep: + if (GetVarint64(&input, &min_log_number_to_keep_)) { + has_min_log_number_to_keep_ = true; + } else { + msg = "min log number to kee"; + } + break; + case kCompactPointer: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { @@ -475,6 +509,10 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n NextFileNumber: "); AppendNumberTo(&r, next_file_number_); } + if (has_min_log_number_to_keep_) { + r.append("\n MinLogNumberToKeep: "); + AppendNumberTo(&r, min_log_number_to_keep_); + } if (has_last_sequence_) { r.append("\n LastSeq: "); AppendNumberTo(&r, last_sequence_); @@ -582,6 +620,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { if (has_max_column_family_) { jw << "MaxColumnFamily" << max_column_family_; } + if (has_min_log_number_to_keep_) { + jw << "MinLogNumberToKeep" << min_log_number_to_keep_; + } jw.EndObject(); diff --git a/db/version_edit.h b/db/version_edit.h index 391e61434..5b858391e 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -199,6 +199,14 @@ class VersionEdit { has_max_column_family_ = true; max_column_family_ = max_column_family; } + void SetMinLogNumberToKeep(uint64_t num) { + has_min_log_number_to_keep_ = true; + min_log_number_to_keep_ = num; + } + + bool has_log_number() { return has_log_number_; } + + uint64_t log_number() { return log_number_; } // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) @@ -285,6 +293,8 @@ class VersionEdit { uint64_t prev_log_number_; uint64_t next_file_number_; uint32_t max_column_family_; + // The most recent WAL log number that is deleted + uint64_t min_log_number_to_keep_; SequenceNumber last_sequence_; bool has_comparator_; bool has_log_number_; @@ -292,6 +302,7 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; bool has_max_column_family_; + bool has_min_log_number_to_keep_; DeletedFileSet deleted_files_; std::vector> new_files_; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 338bb36f6..0dd6a76ca 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -181,6 +181,16 @@ TEST_F(VersionEditTest, ColumnFamilyTest) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, MinLogNumberToKeep) { + VersionEdit edit; + edit.SetMinLogNumberToKeep(13); + TestEncodeDecode(edit); + + edit.Clear(); + edit.SetMinLogNumberToKeep(23); + TestEncodeDecode(edit); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index b5f8e3d29..3a753604a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2663,16 +2663,16 @@ struct VersionSet::ManifestWriter { }; VersionSet::VersionSet(const std::string& dbname, - const ImmutableDBOptions* db_options, + const ImmutableDBOptions* _db_options, const EnvOptions& storage_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller) : column_family_set_( - new ColumnFamilySet(dbname, db_options, storage_options, table_cache, + new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, write_buffer_manager, write_controller)), - env_(db_options->env), + env_(_db_options->env), dbname_(dbname), - db_options_(db_options), + db_options_(_db_options), next_file_number_(2), manifest_file_number_(0), // Filled by Recover() options_file_number_(0), @@ -2957,16 +2957,26 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } else { uint64_t max_log_number_in_batch = 0; + uint64_t min_log_number_to_keep = 0; for (auto& e : batch_edits) { if (e->has_log_number_) { max_log_number_in_batch = std::max(max_log_number_in_batch, e->log_number_); } + if (e->has_min_log_number_to_keep_) { + min_log_number_to_keep = + std::max(min_log_number_to_keep, e->min_log_number_to_keep_); + } } if (max_log_number_in_batch != 0) { assert(column_family_data->GetLogNumber() <= max_log_number_in_batch); column_family_data->SetLogNumber(max_log_number_in_batch); } + if (min_log_number_to_keep != 0) { + // Should only be set in 2PC mode. + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + } + AppendVersion(column_family_data, v); } @@ -3122,6 +3132,7 @@ Status VersionSet::Recover( uint64_t log_number = 0; uint64_t previous_log_number = 0; uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; std::unordered_map builders; // add default column family @@ -3262,6 +3273,11 @@ Status VersionSet::Recover( max_column_family = edit.max_column_family_; } + if (edit.has_min_log_number_to_keep_) { + min_log_number_to_keep = + std::max(min_log_number_to_keep, edit.min_log_number_to_keep_); + } + if (edit.has_last_sequence_) { last_sequence = edit.last_sequence_; have_last_sequence = true; @@ -3284,6 +3300,9 @@ Status VersionSet::Recover( column_family_set_->UpdateMaxColumnFamily(max_column_family); + // When reading DB generated using old release, min_log_number_to_keep=0. + // All log files will be scanned for potential prepare entries. + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); MarkFileNumberUsed(previous_log_number); MarkFileNumberUsed(log_number); } @@ -3355,11 +3374,12 @@ Status VersionSet::Recover( "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," - "max_column_family is %u\n", + "max_column_family is %u," + "min_log_number_to_keep is %lu\n", manifest_filename.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -3647,6 +3667,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, cfd->SetLogNumber(edit.log_number_); } + if (edit.has_prev_log_number_) { previous_log_number = edit.prev_log_number_; have_prev_log_number = true; @@ -3665,6 +3686,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, if (edit.has_max_column_family_) { column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); } + + if (edit.has_min_log_number_to_keep_) { + MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_); + } } } file_reader.reset(); @@ -3723,10 +3748,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, printf( "next_file_number %lu last_sequence " - "%lu prev_log_number %lu max_column_family %u\n", + "%lu prev_log_number %lu max_column_family %u min_log_number_to_keep " + "%" PRIu64 "\n", (unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)previous_log_number, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); } return s; @@ -3741,6 +3767,14 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } } +// Called only either from ::LogAndApply which is protected by mutex or during +// recovery which is single-threaded. +void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { + if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) { + min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed); + } +} + Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? diff --git a/db/version_set.h b/db/version_set.h index 3a6830489..563054d17 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -802,6 +802,10 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } + uint64_t min_log_number_to_keep_2pc() const { + return min_log_number_to_keep_2pc_.load(); + } + // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } @@ -849,15 +853,31 @@ class VersionSet { // REQUIRED: this is only called during single-threaded recovery or repair. void MarkFileNumberUsed(uint64_t number); + // Mark the specified log number as deleted + // REQUIRED: this is only called during single-threaded recovery or repair, or + // from ::LogAndApply where the global mutex is held. + void MarkMinLogNumberToKeep2PC(uint64_t number); + // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t prev_log_number() const { return prev_log_number_; } - // Returns the minimum log number such that all - // log numbers less than or equal to it can be deleted - uint64_t MinLogNumber() const { + // Returns the minimum log number which still has data not flushed to any SST + // file. + // In non-2PC mode, all the log numbers smaller than this number can be safely + // deleted. + uint64_t MinLogNumberWithUnflushedData() const { + return PreComputeMinLogNumberWithUnflushedData(nullptr); + } + // Returns the minimum log number which still has data not flushed to any SST + // file, except data from `cfd_to_skip`. + uint64_t PreComputeMinLogNumberWithUnflushedData( + const ColumnFamilyData* cfd_to_skip) const { uint64_t min_log_num = std::numeric_limits::max(); for (auto cfd : *column_family_set_) { + if (cfd == cfd_to_skip) { + continue; + } // It's safe to ignore dropped column families here: // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { @@ -908,6 +928,8 @@ class VersionSet { new_options.writable_file_max_buffer_size; } + const ImmutableDBOptions* db_options() const { return db_options_; } + static uint64_t GetNumLiveVersions(Version* dummy_versions); static uint64_t GetTotalSstFilesSize(Version* dummy_versions); @@ -946,6 +968,10 @@ class VersionSet { const std::string dbname_; const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; + // Any log number equal or lower than this should be ignored during recovery, + // and is qualified for being deleted in 2PC mode. In non-2PC mode, this + // number is ignored. + std::atomic min_log_number_to_keep_2pc_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_; diff --git a/src.mk b/src.mk index 0369658a6..1a1a7b58c 100644 --- a/src.mk +++ b/src.mk @@ -33,6 +33,7 @@ LIB_SOURCES = \ db/flush_scheduler.cc \ db/forward_iterator.cc \ db/internal_stats.cc \ + db/logs_with_prep_tracker.cc \ db/log_reader.cc \ db/log_writer.cc \ db/malloc_stats.cc \ diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 3a7d801aa..befa19f04 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -197,7 +197,8 @@ Status PessimisticTransaction::Prepare() { s = PrepareInternal(); if (s.ok()) { assert(log_number_ != 0); - dbimpl_->MarkLogAsContainingPrepSection(log_number_); + dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( + log_number_); txn_state_.store(PREPARED); } } else if (txn_state_ == LOCKS_STOLEN) { @@ -284,7 +285,8 @@ Status PessimisticTransaction::Commit() { // to determine what prep logs must be kept around, // not the prep section heap. assert(log_number_ > 0); - dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); txn_db_impl_->UnregisterTransaction(this); Clear(); @@ -341,7 +343,8 @@ Status PessimisticTransaction::Rollback() { if (s.ok()) { // we do not need to keep our prepared section around assert(log_number_ > 0); - dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); Clear(); txn_state_.store(ROLLEDBACK); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index d6e19ec0d..54c386d62 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -900,6 +900,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { switch (txn_db_options.write_policy) { case WRITE_COMMITTED: // but now our memtable should be referencing the prep section + ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep()); ASSERT_EQ(log_containing_prep, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); break; @@ -925,6 +926,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { } // after memtable flush we can now relese the log + ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep); ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); delete txn; @@ -1279,6 +1281,8 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { // but now our memtable should be referencing the prep section ASSERT_EQ(log_containing_prep, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep()); + break; case WRITE_PREPARED: case WRITE_UNPREPARED: @@ -1289,9 +1293,15 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { assert(false); } + // Add a dummy record to memtable before a flush. Otherwise, the + // memtable will be empty and flush will be skipped. + s = db->Put(write_options, Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + db_impl->TEST_FlushMemTable(true); - // after memtable flush we can now relese the log + // after memtable flush we can now release the log + ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep); ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); delete txn; @@ -1805,14 +1815,14 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { assert(false); } - ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); + ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog()); // request a flush for all column families such that the earliest // alive log file can be killed db_impl->TEST_SwitchWAL(); // log cannot be flushed because txn2 has not been commited ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); - ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); + ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog()); // assert that cfa has a flush requested ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested()); @@ -1836,7 +1846,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { ASSERT_OK(s); db_impl->TEST_SwitchWAL(); - ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); + ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog()); // we should see that cfb now has a flush requested ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());