From d060421c77d3475be45ebcddb2969649d1e666fd Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 1 Mar 2018 20:33:41 -0800 Subject: [PATCH] Fix a leak in prepared_section_completed_ Summary: The zeroed entries were not removed from prepared_section_completed_ map. This patch adds a unit test to show the problem and fixes that by refactoring the code. The new code is more efficient since i) it uses two separate mutex to avoid contention between commit and prepare threads, ii) it uses a sorted vector for maintaining uniq log entires with prepare which avoids a very large heap with many duplicate entries. Closes https://github.com/facebook/rocksdb/pull/3545 Differential Revision: D7106071 Pulled By: maysamyabandeh fbshipit-source-id: b3ae17cb6cd37ef10b6b35e0086c15c758768a48 --- HISTORY.md | 3 + db/db_impl.h | 37 ++++++---- db/db_impl_debug.cc | 6 ++ db/db_impl_files.cc | 81 +++++++++++----------- tools/db_stress.cc | 1 + utilities/transactions/transaction_test.cc | 36 ++++++++++ 6 files changed, 110 insertions(+), 54 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 3a7fed228..c9318e027 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,9 @@ * Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables. * Add "rocksdb.live-sst-files-size" DB property to return total bytes of all SST files belong to the latest LSM tree. +### Bug Fixes +* Fix a leak in prepared_section_completed_ where the zeroed entries would not removed from the map. + ## 5.12.0 (2/14/2018) ### Public API Change * Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake. diff --git a/db/db_impl.h b/db/db_impl.h index e42accfeb..ff61577c4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -14,7 +14,6 @@ #include #include #include -#include #include #include #include @@ -433,6 +432,8 @@ class DBImpl : public DB { uint64_t TEST_FindMinLogContainingOutstandingPrep(); uint64_t TEST_FindMinPrepLogReferencedByMemTable(); + size_t TEST_PreparedSectionCompletedSize(); + size_t TEST_LogsWithPrepSize(); int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; @@ -1298,27 +1299,33 @@ class DBImpl : public DB { // Indicate DB was opened successfully bool opened_successfully_; - // minimum log number still containing prepared data. + // REQUIRES: logs_with_prep_mutex_ held + // + // sorted list of log numbers still containing prepared data. // this is used by FindObsoleteFiles to determine which // flushed logs we must keep around because they still - // contain prepared data which has not been flushed or rolled back - std::priority_queue, std::greater> - min_log_with_prep_; + // contain prepared data which has not been committed or rolled back + struct LogCnt { + uint64_t log; // the log number + uint64_t cnt; // number of prepared sections in the log + }; + std::vector logs_with_prep_; + std::mutex logs_with_prep_mutex_; - // to be used in conjunction with min_log_with_prep_. + // REQUIRES: prepared_section_completed_mutex_ held + // + // to be used in conjunction with logs_with_prep_. // once a transaction with data in log L is committed or rolled back - // rather than removing the value from the heap we add that value - // to prepared_section_completed_ which maps LOG -> instance_count - // since a log could contain multiple prepared sections + // rather than updating logs_with_prep_ directly we keep track of that + // in prepared_section_completed_ which maps LOG -> instance_count. This helps + // avoiding contention between a commit thread and the prepare threads. // // when trying to determine the minimum log still active we first - // consult min_log_with_prep_. while that root value maps to - // a value > 0 in prepared_section_completed_ we decrement the - // instance_count for that log and pop the root value in - // min_log_with_prep_. This will work the same as a min_heap - // where we are deleteing arbitrary elements and the up heaping. + // consult logs_with_prep_. while that root value maps to + // an equal value in prepared_section_completed_ we erase the log from + // both logs_with_prep_ and prepared_section_completed_. std::unordered_map prepared_section_completed_; - std::mutex prep_heap_mutex_; + std::mutex prepared_section_completed_mutex_; // Callback for compaction to check if a key is visible to a snapshot. // REQUIRES: mutex held diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 32c072b8f..3ae271168 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -186,6 +186,12 @@ uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() { return FindMinLogContainingOutstandingPrep(); } +size_t DBImpl::TEST_PreparedSectionCompletedSize() { + return prepared_section_completed_.size(); +} + +size_t DBImpl::TEST_LogsWithPrepSize() { return logs_with_prep_.size(); } + uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { return FindMinPrepLogReferencedByMemTable(); } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 48bcb48aa..d572ac7a6 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -48,58 +48,61 @@ uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() { return min_log; } -// TODO(myabandeh): Avoid using locks void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { assert(log != 0); - std::lock_guard lock(prep_heap_mutex_); + std::lock_guard lock(prepared_section_completed_mutex_); auto it = prepared_section_completed_.find(log); - assert(it != prepared_section_completed_.end()); - it->second += 1; + if (UNLIKELY(it == prepared_section_completed_.end())) { + prepared_section_completed_[log] = 1; + } else { + it->second += 1; + } } -// TODO(myabandeh): Avoid using locks void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { assert(log != 0); - std::lock_guard lock(prep_heap_mutex_); - min_log_with_prep_.push(log); - auto it = prepared_section_completed_.find(log); - if (it == prepared_section_completed_.end()) { - prepared_section_completed_[log] = 0; + std::lock_guard lock(logs_with_prep_mutex_); + + auto rit = logs_with_prep_.rbegin(); + bool updated = false; + // Most probabely the last log is the one that is being marked for + // having a prepare section; so search from the end. + for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) { + if (rit->log == log) { + rit->cnt++; + updated = true; + break; + } + } + if (!updated) { + // We are either at the start, or at a position with rit->log < log + logs_with_prep_.insert(rit.base(), {log, 1}); } } uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { - - if (!allow_2pc()) { - return 0; - } - - std::lock_guard lock(prep_heap_mutex_); - uint64_t min_log = 0; - - // first we look in the prepared heap where we keep - // track of transactions that have been prepared (written to WAL) - // but not yet committed. - while (!min_log_with_prep_.empty()) { - min_log = min_log_with_prep_.top(); - - auto it = prepared_section_completed_.find(min_log); - - // value was marked as 'deleted' from heap - if (it != prepared_section_completed_.end() && it->second > 0) { - it->second -= 1; - min_log_with_prep_.pop(); - - // back to squere one... - min_log = 0; - continue; - } else { - // found a valid value - break; + std::lock_guard lock(logs_with_prep_mutex_); + auto it = logs_with_prep_.begin(); + // start with the smallest log + for (; it != logs_with_prep_.end();) { + auto min_log = it->log; + { + std::lock_guard lock2(prepared_section_completed_mutex_); + auto completed_it = prepared_section_completed_.find(min_log); + if (completed_it == prepared_section_completed_.end() || + completed_it->second < it->cnt) { + return min_log; + } + assert(completed_it != prepared_section_completed_.end() && + completed_it->second == it->cnt); + prepared_section_completed_.erase(completed_it); } + // erase from beigning in vector is not efficient but this function is not + // on the fast path. + it = logs_with_prep_.erase(it); } - - return min_log; + // no such log found + return 0; } uint64_t DBImpl::MinLogNumberToKeep() { diff --git a/tools/db_stress.cc b/tools/db_stress.cc index d7d3405ea..847c1b250 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -40,6 +40,7 @@ int main() { #include #include #include +#include #include #include "db/db_impl.h" diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 7df5afa7c..2015d314c 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -770,6 +770,42 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) { delete txn1; } +TEST_P(TransactionTest, LogMarkLeakTest) { + TransactionOptions txn_options; + WriteOptions write_options; + options.write_buffer_size = 1024; + ReOpenNoDelete(); + Random rnd(47); + std::vector txns; + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + // At the beginning there should be no log containing prepare data + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + for (size_t i = 0; i < 100; i++) { + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName("xid" + ToString(i))); + ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar"))); + ASSERT_OK(txn->Prepare()); + ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + if (rnd.OneIn(5)) { + txns.push_back(txn); + } else { + ASSERT_OK(txn->Commit()); + delete txn; + } + db_impl->TEST_FlushMemTable(true); + } + for (auto txn : txns) { + ASSERT_OK(txn->Commit()); + delete txn; + } + // At the end there should be no log left containing prepare data + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + // Make sure that the underlying data structures are properly truncated and + // cause not leak + ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0); + ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0); +} + TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { for (bool cwb4recovery : {true, false}) { ReOpen();