From d59549298fcd50e4aaa59af3dfc039d9a4db5623 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Thu, 3 May 2018 15:35:11 -0700 Subject: [PATCH] Skip deleted WALs during recovery Summary: This patch record min log number to keep to the manifest while flushing SST files to ignore them and any WAL older than them during recovery. This is to avoid scenarios when we have a gap between the WAL files are fed to the recovery procedure. The gap could happen by for example out-of-order WAL deletion. Such gap could cause problems in 2PC recovery where the prepared and commit entry are placed into two separate WAL and gap in the WALs could result into not processing the WAL with the commit entry and hence breaking the 2PC recovery logic. Before the commit, for 2PC case, we determined which log number to keep in FindObsoleteFiles(). We looked at the earliest logs with outstanding prepare entries, or prepare entries whose respective commit or abort are in memtable. With the commit, the same calculation is done while we apply the SST flush. Just before installing the flush file, we precompute the earliest log file to keep after the flush finishes using the same logic (but skipping the memtables just flushed), record this information to the manifest entry for this new flushed SST file. This pre-computed value is also remembered in memory, and will later be used to determine whether a log file can be deleted. This value is unlikely to change until next flush because the commit entry will stay in memtable. (In WritePrepared, we could have removed the older log files as soon as all prepared entries are committed. It's not yet done anyway. Even if we do it, the only thing we loss with this new approach is earlier log deletion between two flushes, which does not guarantee to happen anyway because the obsolete file clean-up function is only executed after flush or compaction) This min log number to keep is stored in the manifest using the safely-ignore customized field of AddFile entry, in order to guarantee that the DB generated using newer release can be opened by previous releases no older than 4.2. Closes https://github.com/facebook/rocksdb/pull/3765 Differential Revision: D7747618 Pulled By: siying fbshipit-source-id: d00c92105b4f83852e9754a1b70d6b64cb590729 --- CMakeLists.txt | 1 + TARGETS | 11 +- db/column_family.cc | 4 +- db/db_flush_test.cc | 8 +- db/db_impl.cc | 3 +- db/db_impl.h | 66 +++--- db/db_impl_compaction_flush.cc | 2 +- db/db_impl_debug.cc | 12 +- db/db_impl_files.cc | 206 ++++++++---------- db/db_impl_open.cc | 7 + db/db_impl_write.cc | 38 ++-- db/db_test_util.h | 5 +- db/db_wal_test.cc | 102 ++++++++- db/external_sst_file_test.cc | 8 +- db/flush_job.cc | 5 +- db/flush_job.h | 5 +- db/flush_job_test.cc | 2 +- db/internal_stats.h | 2 +- db/logs_with_prep_tracker.cc | 67 ++++++ db/logs_with_prep_tracker.h | 61 ++++++ db/memtable_list.cc | 34 ++- db/memtable_list.h | 13 +- db/memtable_list_test.cc | 8 +- db/version_edit.cc | 45 +++- db/version_edit.h | 11 + db/version_edit_test.cc | 10 + db/version_set.cc | 50 ++++- db/version_set.h | 32 ++- src.mk | 1 + .../transactions/pessimistic_transaction.cc | 9 +- utilities/transactions/transaction_test.cc | 18 +- 31 files changed, 621 insertions(+), 225 deletions(-) create mode 100644 db/logs_with_prep_tracker.cc create mode 100644 db/logs_with_prep_tracker.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 35d427dfe..5477bca41 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -456,6 +456,7 @@ set(SOURCES db/flush_scheduler.cc db/forward_iterator.cc db/internal_stats.cc + db/logs_with_prep_tracker.cc db/log_reader.cc db/log_writer.cc db/malloc_stats.cc diff --git a/TARGETS b/TARGETS index 356dc5cae..20390f073 100644 --- a/TARGETS +++ b/TARGETS @@ -103,6 +103,7 @@ cpp_library( "db/internal_stats.cc", "db/log_reader.cc", "db/log_writer.cc", + "db/logs_with_prep_tracker.cc", "db/malloc_stats.cc", "db/managed_iterator.cc", "db/memtable.cc", @@ -659,11 +660,6 @@ ROCKS_TESTS = [ "utilities/document/document_db_test.cc", "serial", ], - [ - "obsolete_files_test", - "db/obsolete_files_test.cc", - "serial", - ], [ "dynamic_bloom_test", "util/dynamic_bloom_test.cc", @@ -834,6 +830,11 @@ ROCKS_TESTS = [ "utilities/object_registry_test.cc", "serial", ], + [ + "obsolete_files_test", + "db/obsolete_files_test.cc", + "serial", + ], [ "optimistic_transaction_test", "utilities/transactions/optimistic_transaction_test.cc", diff --git a/db/column_family.cc b/db/column_family.cc index 9b5c3ed39..8ea3d1216 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -558,7 +558,9 @@ uint64_t ColumnFamilyData::OldestLogToKeep() { auto current_log = GetLogNumber(); if (allow_2pc_) { - auto imm_prep_log = imm()->GetMinLogContainingPrepSection(); + autovector empty_list; + auto imm_prep_log = + imm()->PrecomputeMinLogContainingPrepSection(empty_list); auto mem_prep_log = mem()->GetMinLogContainingPrepSection(); if (imm_prep_log > 0 && imm_prep_log < current_log) { diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 1efd58862..e6758c2ec 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -72,19 +72,23 @@ TEST_F(DBFlushTest, SyncFail) { auto* cfd = reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(); - int refs_before = cfd->current()->TEST_refs(); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(dbfull()->Flush(flush_options)); + // Flush installs a new super-version. Get the ref count after that. + auto current_before = cfd->current(); + int refs_before = cfd->current()->TEST_refs(); fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); + // Now the background job will do the flush; wait for it. dbfull()->TEST_WaitForFlushMemTable(); #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE - // Flush job should release ref count to current version. + // Backgroun flush job should release ref count to current version. + ASSERT_EQ(current_before, cfd->current()); ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 53c9a9ca7..d190b3490 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -183,7 +183,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, last_stats_dump_time_microsec_(0), next_job_id_(1), has_unpersisted_data_(false), - unable_to_flush_oldest_log_(false), + unable_to_release_oldest_log_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( env_options_, immutable_db_options_)), @@ -3020,5 +3020,4 @@ void DBImpl::WaitForIngestFile() { } #endif // ROCKSDB_LITE - } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index 076c4e17a..9620a53de 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -27,6 +27,7 @@ #include "db/flush_scheduler.h" #include "db/internal_stats.h" #include "db/log_writer.h" +#include "db/logs_with_prep_tracker.h" #include "db/pre_release_callback.h" #include "db/read_callback.h" #include "db/snapshot_checker.h" @@ -354,6 +355,10 @@ class DBImpl : public DB { Arena* arena, RangeDelAggregator* range_del_agg, ColumnFamilyHandle* column_family = nullptr); + LogsWithPrepTracker* logs_with_prep_tracker() { + return &logs_with_prep_tracker_; + } + #ifndef NDEBUG // Extra methods (for testing) that are not in the public DB interface // Implemented in db_impl_debug.cc @@ -365,9 +370,7 @@ class DBImpl : public DB { void TEST_SwitchWAL(); - bool TEST_UnableToFlushOldestLog() { - return unable_to_flush_oldest_log_; - } + bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; } bool TEST_IsLogGettingFlushed() { return alive_log_files_.begin()->getting_flushed; @@ -594,7 +597,7 @@ class DBImpl : public DB { size_t batch_cnt) { recovered_transactions_[name] = new RecoveredTransaction(log, name, batch, seq, batch_cnt); - MarkLogAsContainingPrepSection(log); + logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log); } void DeleteRecoveredTransaction(const std::string& name) { @@ -602,7 +605,7 @@ class DBImpl : public DB { assert(it != recovered_transactions_.end()); auto* trx = it->second; recovered_transactions_.erase(it); - MarkLogAsHavingPrepSectionFlushed(trx->log_number_); + logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(trx->log_number_); delete trx; } @@ -614,8 +617,6 @@ class DBImpl : public DB { recovered_transactions_.clear(); } - void MarkLogAsHavingPrepSectionFlushed(uint64_t log); - void MarkLogAsContainingPrepSection(uint64_t log); void AddToLogsToFreeQueue(log::Writer* log_writer) { logs_to_free_queue_.push_back(log_writer); } @@ -728,8 +729,6 @@ class DBImpl : public DB { uint64_t* seq_used = nullptr, size_t batch_cnt = 0, PreReleaseCallback* pre_release_callback = nullptr); - uint64_t FindMinLogContainingOutstandingPrep(); - uint64_t FindMinPrepLogReferencedByMemTable(); // write cached_recoverable_state_ to memtable if it is not empty // The writer must be the leader in write_thread_ and holding mutex_ Status WriteRecoverableState(); @@ -1302,7 +1301,7 @@ class DBImpl : public DB { // We must attempt to free the dependent memtables again // at a later time after the transaction in the oldest // log is fully commited. - bool unable_to_flush_oldest_log_; + bool unable_to_release_oldest_log_; static const int KEEP_LOG_FILE_NUM = 1000; // MSVC version 1800 still does not have constexpr for ::max() @@ -1339,33 +1338,7 @@ class DBImpl : public DB { // Indicate DB was opened successfully bool opened_successfully_; - // REQUIRES: logs_with_prep_mutex_ held - // - // sorted list of log numbers still containing prepared data. - // this is used by FindObsoleteFiles to determine which - // flushed logs we must keep around because they still - // contain prepared data which has not been committed or rolled back - struct LogCnt { - uint64_t log; // the log number - uint64_t cnt; // number of prepared sections in the log - }; - std::vector logs_with_prep_; - std::mutex logs_with_prep_mutex_; - - // REQUIRES: prepared_section_completed_mutex_ held - // - // to be used in conjunction with logs_with_prep_. - // once a transaction with data in log L is committed or rolled back - // rather than updating logs_with_prep_ directly we keep track of that - // in prepared_section_completed_ which maps LOG -> instance_count. This helps - // avoiding contention between a commit thread and the prepare threads. - // - // when trying to determine the minimum log still active we first - // consult logs_with_prep_. while that root value maps to - // an equal value in prepared_section_completed_ we erase the log from - // both logs_with_prep_ and prepared_section_completed_. - std::unordered_map prepared_section_completed_; - std::mutex prepared_section_completed_mutex_; + LogsWithPrepTracker logs_with_prep_tracker_; // Callback for compaction to check if a key is visible to a snapshot. // REQUIRES: mutex held @@ -1461,6 +1434,25 @@ extern CompressionType GetCompressionFlush( const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options); +// Return the earliest log file to keep after the memtable flush is +// finalized. +// `cfd_to_flush` is the column family whose memtable (specified in +// `memtables_to_flush`) will be flushed and thus will not depend on any WAL +// file. +// The function is only applicable to 2pc mode. +extern uint64_t PrecomputeMinLogNumberToKeep( + VersionSet* vset, const ColumnFamilyData& cfd_to_flush, + autovector edit_list, + const autovector& memtables_to_flush, + LogsWithPrepTracker* prep_tracker); + +// `cfd_to_flush` is the column family whose memtable will be flushed and thus +// will not depend on any WAL file. nullptr means no memtable is being flushed. +// The function is only applicable to 2pc mode. +extern uint64_t FindMinPrepLogReferencedByMemTable( + VersionSet* vset, const ColumnFamilyData* cfd_to_flush, + const autovector& memtables_to_flush); + // Fix user-supplied options to be reasonable template static void ClipToRange(T* ptr, V minvalue, V maxvalue) { diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index a4095de6a..9c43b6ab9 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -160,7 +160,7 @@ Status DBImpl::FlushMemTableToOutputFile( // and EventListener callback will be called when the db_mutex // is unlocked by the current thread. if (s.ok()) { - s = flush_job.Run(&file_meta); + s = flush_job.Run(&logs_with_prep_tracker_, &file_meta); } else { flush_job.Cancel(); } diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 88b752b4c..793abd35f 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -184,17 +184,21 @@ Status DBImpl::TEST_GetAllImmutableCFOptions( } uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() { - return FindMinLogContainingOutstandingPrep(); + return logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep(); } size_t DBImpl::TEST_PreparedSectionCompletedSize() { - return prepared_section_completed_.size(); + return logs_with_prep_tracker_.TEST_PreparedSectionCompletedSize(); } -size_t DBImpl::TEST_LogsWithPrepSize() { return logs_with_prep_.size(); } +size_t DBImpl::TEST_LogsWithPrepSize() { + return logs_with_prep_tracker_.TEST_LogsWithPrepSize(); +} uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { - return FindMinPrepLogReferencedByMemTable(); + autovector empty_list; + return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr, + empty_list); } Status DBImpl::TEST_GetLatestMutableCFOptions( diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 9892c5b9d..6d3b4b996 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -14,124 +14,17 @@ #include #include #include "db/event_helpers.h" +#include "db/memtable_list.h" #include "util/file_util.h" #include "util/sst_file_manager_impl.h" - namespace rocksdb { -uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { - if (!allow_2pc()) { - return 0; - } - - uint64_t min_log = 0; - - // we must look through the memtables for two phase transactions - // that have been committed but not yet flushed - for (auto loop_cfd : *versions_->GetColumnFamilySet()) { - if (loop_cfd->IsDropped()) { - continue; - } - - auto log = loop_cfd->imm()->GetMinLogContainingPrepSection(); - - if (log > 0 && (min_log == 0 || log < min_log)) { - min_log = log; - } - - log = loop_cfd->mem()->GetMinLogContainingPrepSection(); - - if (log > 0 && (min_log == 0 || log < min_log)) { - min_log = log; - } - } - - return min_log; -} - -void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { - assert(log != 0); - std::lock_guard lock(prepared_section_completed_mutex_); - auto it = prepared_section_completed_.find(log); - if (UNLIKELY(it == prepared_section_completed_.end())) { - prepared_section_completed_[log] = 1; - } else { - it->second += 1; - } -} - -void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { - assert(log != 0); - std::lock_guard lock(logs_with_prep_mutex_); - - auto rit = logs_with_prep_.rbegin(); - bool updated = false; - // Most probably the last log is the one that is being marked for - // having a prepare section; so search from the end. - for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) { - if (rit->log == log) { - rit->cnt++; - updated = true; - break; - } - } - if (!updated) { - // We are either at the start, or at a position with rit->log < log - logs_with_prep_.insert(rit.base(), {log, 1}); - } -} - -uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { - std::lock_guard lock(logs_with_prep_mutex_); - auto it = logs_with_prep_.begin(); - // start with the smallest log - for (; it != logs_with_prep_.end();) { - auto min_log = it->log; - { - std::lock_guard lock2(prepared_section_completed_mutex_); - auto completed_it = prepared_section_completed_.find(min_log); - if (completed_it == prepared_section_completed_.end() || - completed_it->second < it->cnt) { - return min_log; - } - assert(completed_it != prepared_section_completed_.end() && - completed_it->second == it->cnt); - prepared_section_completed_.erase(completed_it); - } - // erase from beginning in vector is not efficient but this function is not - // on the fast path. - it = logs_with_prep_.erase(it); - } - // no such log found - return 0; -} - uint64_t DBImpl::MinLogNumberToKeep() { - uint64_t log_number = versions_->MinLogNumber(); - if (allow_2pc()) { - // if are 2pc we must consider logs containing prepared - // sections of outstanding transactions. - // - // We must check min logs with outstanding prep before we check - // logs references by memtables because a log referenced by the - // first data structure could transition to the second under us. - // - // TODO(horuff): iterating over all column families under db mutex. - // should find more optimal solution - auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep(); - - if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) { - log_number = min_log_in_prep_heap; - } - - auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(); - - if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) { - log_number = min_log_refed_by_mem; - } + return versions_->min_log_number_to_keep_2pc(); + } else { + return versions_->MinLogNumberWithUnflushedData(); } - return log_number; } // * Returns the list of live files in 'sst_live' @@ -200,7 +93,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->pending_manifest_file_number = versions_->pending_manifest_file_number(); job_context->log_number = MinLogNumberToKeep(); - job_context->prev_log_number = versions_->prev_log_number(); versions_->AddLiveFiles(&job_context->sst_live); @@ -621,4 +513,94 @@ void DBImpl::DeleteObsoleteFiles() { mutex_.Lock(); } +uint64_t FindMinPrepLogReferencedByMemTable( + VersionSet* vset, const ColumnFamilyData* cfd_to_flush, + const autovector& memtables_to_flush) { + uint64_t min_log = 0; + + // we must look through the memtables for two phase transactions + // that have been committed but not yet flushed + for (auto loop_cfd : *vset->GetColumnFamilySet()) { + if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { + continue; + } + + auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection( + memtables_to_flush); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + + log = loop_cfd->mem()->GetMinLogContainingPrepSection(); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + +uint64_t PrecomputeMinLogNumberToKeep( + VersionSet* vset, const ColumnFamilyData& cfd_to_flush, + autovector edit_list, + const autovector& memtables_to_flush, + LogsWithPrepTracker* prep_tracker) { + assert(vset != nullptr); + assert(prep_tracker != nullptr); + // Calculate updated min_log_number_to_keep + // Since the function should only be called in 2pc mode, log number in + // the version edit should be sufficient. + + // Precompute the min log number containing unflushed data for the column + // family being flushed (`cfd_to_flush`). + uint64_t cf_min_log_number_to_keep = 0; + for (auto& e : edit_list) { + if (e->has_log_number()) { + cf_min_log_number_to_keep = + std::max(cf_min_log_number_to_keep, e->log_number()); + } + } + if (cf_min_log_number_to_keep == 0) { + // No version edit contains information on log number. The log number + // for this column family should stay the same as it is. + cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber(); + } + + // Get min log number containing unflushed data for other column families. + uint64_t min_log_number_to_keep = + vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush); + if (cf_min_log_number_to_keep != 0) { + min_log_number_to_keep = + std::min(cf_min_log_number_to_keep, min_log_number_to_keep); + } + + // if are 2pc we must consider logs containing prepared + // sections of outstanding transactions. + // + // We must check min logs with outstanding prep before we check + // logs references by memtables because a log referenced by the + // first data structure could transition to the second under us. + // + // TODO: iterating over all column families under db mutex. + // should find more optimal solution + auto min_log_in_prep_heap = + prep_tracker->FindMinLogContainingOutstandingPrep(); + + if (min_log_in_prep_heap != 0 && + min_log_in_prep_heap < min_log_number_to_keep) { + min_log_number_to_keep = min_log_in_prep_heap; + } + + uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( + vset, &cfd_to_flush, memtables_to_flush); + + if (min_log_refed_by_mem != 0 && + min_log_refed_by_mem < min_log_number_to_keep) { + min_log_number_to_keep = min_log_refed_by_mem; + } + return min_log_number_to_keep; +} + } // namespace rocksdb diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 616ab3f4f..37d222dd6 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -532,6 +532,13 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool flushed = false; uint64_t corrupted_log_number = kMaxSequenceNumber; for (auto log_number : log_numbers) { + if (log_number < versions_->min_log_number_to_keep_2pc()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Skipping log #%" PRIu64 + " since it is older than min log to keep #%" PRIu64, + log_number, versions_->min_log_number_to_keep_2pc()); + continue; + } // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index c0416fdf9..9b43bd5fc 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1033,28 +1033,34 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { } auto oldest_alive_log = alive_log_files_.begin()->number; - auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep(); - - if (allow_2pc() && - oldest_log_with_uncommited_prep > 0 && - oldest_log_with_uncommited_prep <= oldest_alive_log) { - if (unable_to_flush_oldest_log_) { + bool flush_wont_release_oldest_log = false; + if (allow_2pc()) { + auto oldest_log_with_uncommited_prep = + logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep(); + + assert(oldest_log_with_uncommited_prep == 0 || + oldest_log_with_uncommited_prep >= oldest_alive_log); + if (oldest_log_with_uncommited_prep > 0 && + oldest_log_with_uncommited_prep == oldest_alive_log) { + if (unable_to_release_oldest_log_) { // we already attempted to flush all column families dependent on - // the oldest alive log but the log still contained uncommited transactions. - // the oldest alive log STILL contains uncommited transaction so there - // is still nothing that we can do. + // the oldest alive log but the log still contained uncommited + // transactions so there is still nothing that we can do. return status; - } else { - ROCKS_LOG_WARN( - immutable_db_options_.info_log, - "Unable to release oldest log due to uncommited transaction"); - unable_to_flush_oldest_log_ = true; + } else { + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Unable to release oldest log due to uncommited transaction"); + unable_to_release_oldest_log_ = true; + flush_wont_release_oldest_log = true; + } } - } else { + } + if (!flush_wont_release_oldest_log) { // we only mark this log as getting flushed if we have successfully // flushed all data in this log. If this log contains outstanding prepared // transactions then we cannot flush this log until those transactions are commited. - unable_to_flush_oldest_log_ = false; + unable_to_release_oldest_log_ = false; alive_log_files_.begin()->getting_flushed = true; } diff --git a/db/db_test_util.h b/db/db_test_util.h index b4d027646..f9d1eca3f 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -451,8 +451,9 @@ class SpecialEnv : public EnvWrapper { return s; } - Status NewSequentialFile(const std::string& f, unique_ptr* r, - const EnvOptions& soptions) override { + virtual Status NewSequentialFile(const std::string& f, + unique_ptr* r, + const EnvOptions& soptions) override { class CountingFile : public SequentialFile { public: CountingFile(unique_ptr&& target, diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 796ef251c..0349bdc8d 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -20,6 +20,106 @@ class DBWALTest : public DBTestBase { DBWALTest() : DBTestBase("/db_wal_test") {} }; +// A SpecialEnv enriched to give more insight about deleted files +class EnrichedSpecialEnv : public SpecialEnv { + public: + explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {} + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) override { + InstrumentedMutexLock l(&env_mutex_); + if (f == skipped_wal) { + deleted_wal_reopened = true; + if (IsWAL(f) && largetest_deleted_wal.size() != 0 && + f.compare(largetest_deleted_wal) <= 0) { + gap_in_wals = true; + } + } + return SpecialEnv::NewSequentialFile(f, r, soptions); + } + Status DeleteFile(const std::string& fname) override { + if (IsWAL(fname)) { + deleted_wal_cnt++; + InstrumentedMutexLock l(&env_mutex_); + // If this is the first WAL, remember its name and skip deleting it. We + // remember its name partly because the application might attempt to + // delete the file again. + if (skipped_wal.size() != 0 && skipped_wal != fname) { + if (largetest_deleted_wal.size() == 0 || + largetest_deleted_wal.compare(fname) < 0) { + largetest_deleted_wal = fname; + } + } else { + skipped_wal = fname; + return Status::OK(); + } + } + return SpecialEnv::DeleteFile(fname); + } + bool IsWAL(const std::string& fname) { + // printf("iswal %s\n", fname.c_str()); + return fname.compare(fname.size() - 3, 3, "log") == 0; + } + + InstrumentedMutex env_mutex_; + // the wal whose actual delete was skipped by the env + std::string skipped_wal = ""; + // the largest WAL that was requested to be deleted + std::string largetest_deleted_wal = ""; + // number of WALs that were successfully deleted + std::atomic deleted_wal_cnt = {0}; + // the WAL whose delete from fs was skipped is reopened during recovery + std::atomic deleted_wal_reopened = {false}; + // whether a gap in the WALs was detected during recovery + std::atomic gap_in_wals = {false}; +}; + +class DBWALTestWithEnrichedEnv : public DBTestBase { + public: + DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") { + enriched_env_ = new EnrichedSpecialEnv(env_->target()); + auto options = CurrentOptions(); + options.env = enriched_env_; + Reopen(options); + delete env_; + // to be deleted by the parent class + env_ = enriched_env_; + } + + protected: + EnrichedSpecialEnv* enriched_env_; +}; + +// Test that the recovery would successfully avoid the gaps between the logs. +// One known scenario that could cause this is that the application issue the +// WAL deletion out of order. For the sake of simplicity in the test, here we +// create the gap by manipulating the env to skip deletion of the first WAL but +// not the ones after it. +TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) { + auto options = last_options_; + // To cause frequent WAL deletion + options.write_buffer_size = 128; + Reopen(options); + + WriteOptions writeOpt = WriteOptions(); + for (int i = 0; i < 128 * 5; i++) { + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); + } + FlushOptions fo; + fo.wait = true; + ASSERT_OK(db_->Flush(fo)); + + // some wals are deleted + ASSERT_NE(0, enriched_env_->deleted_wal_cnt); + // but not the first one + ASSERT_NE(0, enriched_env_->skipped_wal.size()); + + // Test that the WAL that was not deleted will be skipped during recovery + options = last_options_; + Reopen(options); + ASSERT_FALSE(enriched_env_->deleted_wal_reopened); + ASSERT_FALSE(enriched_env_->gap_in_wals); +} + TEST_F(DBWALTest, WAL) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -891,7 +991,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { // Record the offset at this point Env* env = options.env; - int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1; + uint64_t wal_file_id = dbfull()->TEST_LogfileNumber(); std::string fname = LogFileName(dbname_, wal_file_id); uint64_t offset_to_corrupt; ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index f72937792..c70b5c053 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1413,8 +1413,12 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { // fit in L3 but will overlap with compaction so will be added // to L2 but a compaction will trivially move it to L3 // and break LSM consistency - ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); - ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + static std::atomic called = {false}; + if (!called) { + called = true; + ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/flush_job.cc b/db/flush_job.cc index b53b229e4..0e4657984 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -185,7 +185,8 @@ void FlushJob::PickMemTable() { base_->Ref(); // it is likely that we do not need this reference } -Status FlushJob::Run(FileMetaData* file_meta) { +Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, + FileMetaData* file_meta) { db_mutex_->AssertHeld(); assert(pick_memtable_called); AutoThreadOperationStageUpdater stage_run( @@ -226,7 +227,7 @@ Status FlushJob::Run(FileMetaData* file_meta) { TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( - cfd_, mutable_cf_options_, mems_, versions_, db_mutex_, + cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, log_buffer_); } diff --git a/db/flush_job.h b/db/flush_job.h index 81a8de306..c3115c4a6 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -22,6 +22,7 @@ #include "db/internal_stats.h" #include "db/job_context.h" #include "db/log_writer.h" +#include "db/logs_with_prep_tracker.h" #include "db/memtable_list.h" #include "db/snapshot_impl.h" #include "db/version_edit.h" @@ -42,6 +43,7 @@ namespace rocksdb { +class DBImpl; class MemTable; class SnapshotChecker; class TableCache; @@ -71,7 +73,8 @@ class FlushJob { // Require db_mutex held. // Once PickMemTable() is called, either Run() or Cancel() has to be called. void PickMemTable(); - Status Run(FileMetaData* file_meta = nullptr); + Status Run(LogsWithPrepTracker* prep_tracker = nullptr, + FileMetaData* file_meta = nullptr); void Cancel(); TableProperties GetTableProperties() const { return table_properties_; } diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index a2309afe2..4e94001ec 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -150,7 +150,7 @@ TEST_F(FlushJobTest, NonEmpty) { FileMetaData fd; mutex_.Lock(); flush_job.PickMemTable(); - ASSERT_OK(flush_job.Run(&fd)); + ASSERT_OK(flush_job.Run(nullptr, &fd)); mutex_.Unlock(); db_options_.statistics->histogramData(FLUSH_TIME, &hist); ASSERT_GT(hist.average, 0.0); diff --git a/db/internal_stats.h b/db/internal_stats.h index 1cd2b6764..4655c8951 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -19,8 +19,8 @@ class ColumnFamilyData; namespace rocksdb { -class MemTableList; class DBImpl; +class MemTableList; // Config for retrieving a property's value. struct DBPropertyInfo { diff --git a/db/logs_with_prep_tracker.cc b/db/logs_with_prep_tracker.cc new file mode 100644 index 000000000..1082dc102 --- /dev/null +++ b/db/logs_with_prep_tracker.cc @@ -0,0 +1,67 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "db/logs_with_prep_tracker.h" + +#include "port/likely.h" + +namespace rocksdb { +void LogsWithPrepTracker::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { + assert(log != 0); + std::lock_guard lock(prepared_section_completed_mutex_); + auto it = prepared_section_completed_.find(log); + if (UNLIKELY(it == prepared_section_completed_.end())) { + prepared_section_completed_[log] = 1; + } else { + it->second += 1; + } +} + +void LogsWithPrepTracker::MarkLogAsContainingPrepSection(uint64_t log) { + assert(log != 0); + std::lock_guard lock(logs_with_prep_mutex_); + + auto rit = logs_with_prep_.rbegin(); + bool updated = false; + // Most probably the last log is the one that is being marked for + // having a prepare section; so search from the end. + for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) { + if (rit->log == log) { + rit->cnt++; + updated = true; + break; + } + } + if (!updated) { + // We are either at the start, or at a position with rit->log < log + logs_with_prep_.insert(rit.base(), {log, 1}); + } +} + +uint64_t LogsWithPrepTracker::FindMinLogContainingOutstandingPrep() { + std::lock_guard lock(logs_with_prep_mutex_); + auto it = logs_with_prep_.begin(); + // start with the smallest log + for (; it != logs_with_prep_.end();) { + auto min_log = it->log; + { + std::lock_guard lock2(prepared_section_completed_mutex_); + auto completed_it = prepared_section_completed_.find(min_log); + if (completed_it == prepared_section_completed_.end() || + completed_it->second < it->cnt) { + return min_log; + } + assert(completed_it != prepared_section_completed_.end() && + completed_it->second == it->cnt); + prepared_section_completed_.erase(completed_it); + } + // erase from beginning in vector is not efficient but this function is not + // on the fast path. + it = logs_with_prep_.erase(it); + } + // no such log found + return 0; +} +} // namespace rocksdb diff --git a/db/logs_with_prep_tracker.h b/db/logs_with_prep_tracker.h new file mode 100644 index 000000000..639d8f806 --- /dev/null +++ b/db/logs_with_prep_tracker.h @@ -0,0 +1,61 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +// This class is used to track the log files with outstanding prepare entries. +class LogsWithPrepTracker { + public: + // Called when a transaction prepared in `log` has been committed or aborted. + void MarkLogAsHavingPrepSectionFlushed(uint64_t log); + // Called when a transaction is prepared in `log`. + void MarkLogAsContainingPrepSection(uint64_t log); + // Return the earliest log file with outstanding prepare entries. + uint64_t FindMinLogContainingOutstandingPrep(); + size_t TEST_PreparedSectionCompletedSize() { + return prepared_section_completed_.size(); + } + size_t TEST_LogsWithPrepSize() { return logs_with_prep_.size(); } + + private: + // REQUIRES: logs_with_prep_mutex_ held + // + // sorted list of log numbers still containing prepared data. + // this is used by FindObsoleteFiles to determine which + // flushed logs we must keep around because they still + // contain prepared data which has not been committed or rolled back + struct LogCnt { + uint64_t log; // the log number + uint64_t cnt; // number of prepared sections in the log + }; + std::vector logs_with_prep_; + std::mutex logs_with_prep_mutex_; + + // REQUIRES: prepared_section_completed_mutex_ held + // + // to be used in conjunction with logs_with_prep_. + // once a transaction with data in log L is committed or rolled back + // rather than updating logs_with_prep_ directly we keep track of that + // in prepared_section_completed_ which maps LOG -> instance_count. This helps + // avoiding contention between a commit thread and the prepare threads. + // + // when trying to determine the minimum log still active we first + // consult logs_with_prep_. while that root value maps to + // an equal value in prepared_section_completed_ we erase the log from + // both logs_with_prep_ and prepared_section_completed_. + std::unordered_map prepared_section_completed_; + std::mutex prepared_section_completed_mutex_; + +}; +} // namespace rocksdb diff --git a/db/memtable_list.cc b/db/memtable_list.cc index e3cd64cfe..7cb208beb 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -12,6 +12,7 @@ #include #include #include +#include "db/db_impl.h" #include "db/memtable.h" #include "db/version_set.h" #include "monitoring/thread_status_util.h" @@ -322,9 +323,10 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& mems, VersionSet* vset, InstrumentedMutex* mu, - uint64_t file_number, autovector* to_delete, - Directory* db_directory, LogBuffer* log_buffer) { + const autovector& mems, LogsWithPrepTracker* prep_tracker, + VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -361,6 +363,7 @@ Status MemTableList::InstallMemtableFlushResults( uint64_t batch_file_number = 0; size_t batch_count = 0; autovector edit_list; + autovector memtables_to_flush; // enumerate from the last (earliest) element to see how many batch finished for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; @@ -373,11 +376,20 @@ Status MemTableList::InstallMemtableFlushResults( "[%s] Level-0 commit table #%" PRIu64 " started", cfd->GetName().c_str(), m->file_number_); edit_list.push_back(&m->edit_); + memtables_to_flush.push_back(m); } batch_count++; } if (batch_count > 0) { + if (vset->db_options()->allow_2pc) { + assert(edit_list.size() > 0); + // We piggyback the information of earliest log file to keep in the + // manifest entry for the last file flushed. + edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker)); + } + // this can release and reacquire the mutex. s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory); @@ -468,13 +480,21 @@ void MemTableList::InstallNewVersion() { } } -uint64_t MemTableList::GetMinLogContainingPrepSection() { +uint64_t MemTableList::PrecomputeMinLogContainingPrepSection( + const autovector& memtables_to_flush) { uint64_t min_log = 0; for (auto& m : current_->memlist_) { - // this mem has been flushed it no longer - // needs to hold on the its prep section - if (m->flush_completed_) { + // Assume the list is very short, we can live with O(m*n). We can optimize + // if the performance has some problem. + bool should_skip = false; + for (MemTable* m_to_flush : memtables_to_flush) { + if (m == m_to_flush) { + should_skip = true; + break; + } + } + if (should_skip) { continue; } diff --git a/db/memtable_list.h b/db/memtable_list.h index c2ac65a2f..7fd9de7ad 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -13,6 +13,7 @@ #include #include "db/dbformat.h" +#include "db/logs_with_prep_tracker.h" #include "db/memtable.h" #include "db/range_del_aggregator.h" #include "monitoring/instrumented_mutex.h" @@ -210,9 +211,10 @@ class MemTableList { // Commit a successful flush in the manifest file Status InstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - const autovector& m, VersionSet* vset, InstrumentedMutex* mu, - uint64_t file_number, autovector* to_delete, - Directory* db_directory, LogBuffer* log_buffer); + const autovector& m, LogsWithPrepTracker* prep_tracker, + VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). @@ -243,7 +245,10 @@ class MemTableList { size_t* current_memory_usage() { return ¤t_memory_usage_; } - uint64_t GetMinLogContainingPrepSection(); + // Returns the min log containing the prep section after memtables listsed in + // `memtables_to_flush` are flushed and their status is persisted in manifest. + uint64_t PrecomputeMinLogContainingPrepSection( + const autovector& memtables_to_flush); uint64_t GetEarliestMemTableID() const { auto& memlist = current_->memlist_; diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 30e516663..a565eba3c 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -82,10 +82,10 @@ class MemTableListTest : public testing::Test { // Create dummy mutex. InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - - return list->InstallMemtableFlushResults(cfd, mutable_cf_options, m, - &versions, &mutex, 1, to_delete, - nullptr, &log_buffer); + LogsWithPrepTracker dummy_prep_tracker; + return list->InstallMemtableFlushResults( + cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, 1, + to_delete, nullptr, &log_buffer); } }; diff --git a/db/version_edit.cc b/db/version_edit.cc index ebfc10584..dd84d0986 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -30,6 +30,7 @@ enum Tag { kNewFile = 7, // 8 was used for large value refs kPrevLogNumber = 9, + kMinLogNumberToKeep = 10, // these are new formats divergent from open source leveldb kNewFile2 = 100, @@ -44,6 +45,11 @@ enum Tag { enum CustomTag { kTerminate = 1, // The end of customized fields kNeedCompaction = 2, + // Since Manifest is not entirely currently forward-compatible, and the only + // forward-compatbile part is the CutsomtTag of kNewFile, we currently encode + // kMinLogNumberToKeep as part of a CustomTag as a hack. This should be + // removed when manifest becomes forward-comptabile. + kMinLogNumberToKeepHack = 3, kPathId = 65, }; // If this bit for the custom tag is set, opening DB should fail if @@ -63,12 +69,14 @@ void VersionEdit::Clear() { last_sequence_ = 0; next_file_number_ = 0; max_column_family_ = 0; + min_log_number_to_keep_ = 0; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; has_max_column_family_ = false; + has_min_log_number_to_keep_ = false; deleted_files_.clear(); new_files_.clear(); column_family_ = 0; @@ -97,19 +105,19 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (has_max_column_family_) { PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_); } - for (const auto& deleted : deleted_files_) { PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, deleted.second /* file number */); } + bool min_log_num_written = false; for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; if (!f.smallest.Valid() || !f.largest.Valid()) { return false; } bool has_customized_fields = false; - if (f.marked_for_compaction) { + if (f.marked_for_compaction || has_min_log_number_to_keep_) { PutVarint32(dst, kNewFile4); has_customized_fields = true; } else if (f.fd.GetPathId() == 0) { @@ -165,6 +173,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const { char p = static_cast(1); PutLengthPrefixedSlice(dst, Slice(&p, 1)); } + if (has_min_log_number_to_keep_ && !min_log_num_written) { + PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack); + std::string varint_log_number; + PutFixed64(&varint_log_number, min_log_number_to_keep_); + PutLengthPrefixedSlice(dst, Slice(varint_log_number)); + min_log_num_written = true; + } TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields", dst); @@ -218,6 +233,9 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { uint64_t number; uint32_t path_id = 0; uint64_t file_size; + // Since this is the only forward-compatible part of the code, we hack new + // extension into this record. When we do, we set this boolean to distinguish + // the record from the normal NewFile records. if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetInternalKey(input, &f.largest) && @@ -252,6 +270,14 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } f.marked_for_compaction = (field[0] == 1); break; + case kMinLogNumberToKeepHack: + // This is a hack to encode kMinLogNumberToKeep in a + // forward-compatbile fashion. + if (!GetFixed64(&field, &min_log_number_to_keep_)) { + return "deleted log number malformatted"; + } + has_min_log_number_to_keep_ = true; + break; default: if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) { // Should not proceed if cannot understand it @@ -331,6 +357,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kMinLogNumberToKeep: + if (GetVarint64(&input, &min_log_number_to_keep_)) { + has_min_log_number_to_keep_ = true; + } else { + msg = "min log number to kee"; + } + break; + case kCompactPointer: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { @@ -475,6 +509,10 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n NextFileNumber: "); AppendNumberTo(&r, next_file_number_); } + if (has_min_log_number_to_keep_) { + r.append("\n MinLogNumberToKeep: "); + AppendNumberTo(&r, min_log_number_to_keep_); + } if (has_last_sequence_) { r.append("\n LastSeq: "); AppendNumberTo(&r, last_sequence_); @@ -582,6 +620,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { if (has_max_column_family_) { jw << "MaxColumnFamily" << max_column_family_; } + if (has_min_log_number_to_keep_) { + jw << "MinLogNumberToKeep" << min_log_number_to_keep_; + } jw.EndObject(); diff --git a/db/version_edit.h b/db/version_edit.h index 391e61434..5b858391e 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -199,6 +199,14 @@ class VersionEdit { has_max_column_family_ = true; max_column_family_ = max_column_family; } + void SetMinLogNumberToKeep(uint64_t num) { + has_min_log_number_to_keep_ = true; + min_log_number_to_keep_ = num; + } + + bool has_log_number() { return has_log_number_; } + + uint64_t log_number() { return log_number_; } // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) @@ -285,6 +293,8 @@ class VersionEdit { uint64_t prev_log_number_; uint64_t next_file_number_; uint32_t max_column_family_; + // The most recent WAL log number that is deleted + uint64_t min_log_number_to_keep_; SequenceNumber last_sequence_; bool has_comparator_; bool has_log_number_; @@ -292,6 +302,7 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; bool has_max_column_family_; + bool has_min_log_number_to_keep_; DeletedFileSet deleted_files_; std::vector> new_files_; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 338bb36f6..0dd6a76ca 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -181,6 +181,16 @@ TEST_F(VersionEditTest, ColumnFamilyTest) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, MinLogNumberToKeep) { + VersionEdit edit; + edit.SetMinLogNumberToKeep(13); + TestEncodeDecode(edit); + + edit.Clear(); + edit.SetMinLogNumberToKeep(23); + TestEncodeDecode(edit); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index b5f8e3d29..3a753604a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2663,16 +2663,16 @@ struct VersionSet::ManifestWriter { }; VersionSet::VersionSet(const std::string& dbname, - const ImmutableDBOptions* db_options, + const ImmutableDBOptions* _db_options, const EnvOptions& storage_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller) : column_family_set_( - new ColumnFamilySet(dbname, db_options, storage_options, table_cache, + new ColumnFamilySet(dbname, _db_options, storage_options, table_cache, write_buffer_manager, write_controller)), - env_(db_options->env), + env_(_db_options->env), dbname_(dbname), - db_options_(db_options), + db_options_(_db_options), next_file_number_(2), manifest_file_number_(0), // Filled by Recover() options_file_number_(0), @@ -2957,16 +2957,26 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } else { uint64_t max_log_number_in_batch = 0; + uint64_t min_log_number_to_keep = 0; for (auto& e : batch_edits) { if (e->has_log_number_) { max_log_number_in_batch = std::max(max_log_number_in_batch, e->log_number_); } + if (e->has_min_log_number_to_keep_) { + min_log_number_to_keep = + std::max(min_log_number_to_keep, e->min_log_number_to_keep_); + } } if (max_log_number_in_batch != 0) { assert(column_family_data->GetLogNumber() <= max_log_number_in_batch); column_family_data->SetLogNumber(max_log_number_in_batch); } + if (min_log_number_to_keep != 0) { + // Should only be set in 2PC mode. + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + } + AppendVersion(column_family_data, v); } @@ -3122,6 +3132,7 @@ Status VersionSet::Recover( uint64_t log_number = 0; uint64_t previous_log_number = 0; uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; std::unordered_map builders; // add default column family @@ -3262,6 +3273,11 @@ Status VersionSet::Recover( max_column_family = edit.max_column_family_; } + if (edit.has_min_log_number_to_keep_) { + min_log_number_to_keep = + std::max(min_log_number_to_keep, edit.min_log_number_to_keep_); + } + if (edit.has_last_sequence_) { last_sequence = edit.last_sequence_; have_last_sequence = true; @@ -3284,6 +3300,9 @@ Status VersionSet::Recover( column_family_set_->UpdateMaxColumnFamily(max_column_family); + // When reading DB generated using old release, min_log_number_to_keep=0. + // All log files will be scanned for potential prepare entries. + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); MarkFileNumberUsed(previous_log_number); MarkFileNumberUsed(log_number); } @@ -3355,11 +3374,12 @@ Status VersionSet::Recover( "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," - "max_column_family is %u\n", + "max_column_family is %u," + "min_log_number_to_keep is %lu\n", manifest_filename.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -3647,6 +3667,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, cfd->SetLogNumber(edit.log_number_); } + if (edit.has_prev_log_number_) { previous_log_number = edit.prev_log_number_; have_prev_log_number = true; @@ -3665,6 +3686,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, if (edit.has_max_column_family_) { column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); } + + if (edit.has_min_log_number_to_keep_) { + MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_); + } } } file_reader.reset(); @@ -3723,10 +3748,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, printf( "next_file_number %lu last_sequence " - "%lu prev_log_number %lu max_column_family %u\n", + "%lu prev_log_number %lu max_column_family %u min_log_number_to_keep " + "%" PRIu64 "\n", (unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)previous_log_number, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); } return s; @@ -3741,6 +3767,14 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } } +// Called only either from ::LogAndApply which is protected by mutex or during +// recovery which is single-threaded. +void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { + if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) { + min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed); + } +} + Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? diff --git a/db/version_set.h b/db/version_set.h index 3a6830489..563054d17 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -802,6 +802,10 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } + uint64_t min_log_number_to_keep_2pc() const { + return min_log_number_to_keep_2pc_.load(); + } + // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } @@ -849,15 +853,31 @@ class VersionSet { // REQUIRED: this is only called during single-threaded recovery or repair. void MarkFileNumberUsed(uint64_t number); + // Mark the specified log number as deleted + // REQUIRED: this is only called during single-threaded recovery or repair, or + // from ::LogAndApply where the global mutex is held. + void MarkMinLogNumberToKeep2PC(uint64_t number); + // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t prev_log_number() const { return prev_log_number_; } - // Returns the minimum log number such that all - // log numbers less than or equal to it can be deleted - uint64_t MinLogNumber() const { + // Returns the minimum log number which still has data not flushed to any SST + // file. + // In non-2PC mode, all the log numbers smaller than this number can be safely + // deleted. + uint64_t MinLogNumberWithUnflushedData() const { + return PreComputeMinLogNumberWithUnflushedData(nullptr); + } + // Returns the minimum log number which still has data not flushed to any SST + // file, except data from `cfd_to_skip`. + uint64_t PreComputeMinLogNumberWithUnflushedData( + const ColumnFamilyData* cfd_to_skip) const { uint64_t min_log_num = std::numeric_limits::max(); for (auto cfd : *column_family_set_) { + if (cfd == cfd_to_skip) { + continue; + } // It's safe to ignore dropped column families here: // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { @@ -908,6 +928,8 @@ class VersionSet { new_options.writable_file_max_buffer_size; } + const ImmutableDBOptions* db_options() const { return db_options_; } + static uint64_t GetNumLiveVersions(Version* dummy_versions); static uint64_t GetTotalSstFilesSize(Version* dummy_versions); @@ -946,6 +968,10 @@ class VersionSet { const std::string dbname_; const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; + // Any log number equal or lower than this should be ignored during recovery, + // and is qualified for being deleted in 2PC mode. In non-2PC mode, this + // number is ignored. + std::atomic min_log_number_to_keep_2pc_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_; diff --git a/src.mk b/src.mk index 0369658a6..1a1a7b58c 100644 --- a/src.mk +++ b/src.mk @@ -33,6 +33,7 @@ LIB_SOURCES = \ db/flush_scheduler.cc \ db/forward_iterator.cc \ db/internal_stats.cc \ + db/logs_with_prep_tracker.cc \ db/log_reader.cc \ db/log_writer.cc \ db/malloc_stats.cc \ diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index 3a7d801aa..befa19f04 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -197,7 +197,8 @@ Status PessimisticTransaction::Prepare() { s = PrepareInternal(); if (s.ok()) { assert(log_number_ != 0); - dbimpl_->MarkLogAsContainingPrepSection(log_number_); + dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( + log_number_); txn_state_.store(PREPARED); } } else if (txn_state_ == LOCKS_STOLEN) { @@ -284,7 +285,8 @@ Status PessimisticTransaction::Commit() { // to determine what prep logs must be kept around, // not the prep section heap. assert(log_number_ > 0); - dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); txn_db_impl_->UnregisterTransaction(this); Clear(); @@ -341,7 +343,8 @@ Status PessimisticTransaction::Rollback() { if (s.ok()) { // we do not need to keep our prepared section around assert(log_number_ > 0); - dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); Clear(); txn_state_.store(ROLLEDBACK); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index d6e19ec0d..54c386d62 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -900,6 +900,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { switch (txn_db_options.write_policy) { case WRITE_COMMITTED: // but now our memtable should be referencing the prep section + ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep()); ASSERT_EQ(log_containing_prep, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); break; @@ -925,6 +926,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { } // after memtable flush we can now relese the log + ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep); ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); delete txn; @@ -1279,6 +1281,8 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { // but now our memtable should be referencing the prep section ASSERT_EQ(log_containing_prep, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep()); + break; case WRITE_PREPARED: case WRITE_UNPREPARED: @@ -1289,9 +1293,15 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { assert(false); } + // Add a dummy record to memtable before a flush. Otherwise, the + // memtable will be empty and flush will be skipped. + s = db->Put(write_options, Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + db_impl->TEST_FlushMemTable(true); - // after memtable flush we can now relese the log + // after memtable flush we can now release the log + ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep); ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); delete txn; @@ -1805,14 +1815,14 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { assert(false); } - ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); + ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog()); // request a flush for all column families such that the earliest // alive log file can be killed db_impl->TEST_SwitchWAL(); // log cannot be flushed because txn2 has not been commited ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); - ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); + ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog()); // assert that cfa has a flush requested ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested()); @@ -1836,7 +1846,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) { ASSERT_OK(s); db_impl->TEST_SwitchWAL(); - ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); + ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog()); // we should see that cfb now has a flush requested ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());