Fix a leak in prepared_section_completed_

Summary:
The zeroed entries were not removed from prepared_section_completed_ map. This patch adds a unit test to show the problem and fixes that by refactoring the code. The new code is more efficient since i) it uses two separate mutex to avoid contention between commit and prepare threads, ii) it uses a sorted vector for maintaining uniq log entires with prepare which avoids a very large heap with many duplicate entries.
Closes https://github.com/facebook/rocksdb/pull/3545

Differential Revision: D7106071

Pulled By: maysamyabandeh

fbshipit-source-id: b3ae17cb6cd37ef10b6b35e0086c15c758768a48
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent bf937cf15b
commit d060421c77
  1. 3
      HISTORY.md
  2. 37
      db/db_impl.h
  3. 6
      db/db_impl_debug.cc
  4. 81
      db/db_impl_files.cc
  5. 1
      tools/db_stress.cc
  6. 36
      utilities/transactions/transaction_test.cc

@ -7,6 +7,9 @@
* Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables. * Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables.
* Add "rocksdb.live-sst-files-size" DB property to return total bytes of all SST files belong to the latest LSM tree. * Add "rocksdb.live-sst-files-size" DB property to return total bytes of all SST files belong to the latest LSM tree.
### Bug Fixes
* Fix a leak in prepared_section_completed_ where the zeroed entries would not removed from the map.
## 5.12.0 (2/14/2018) ## 5.12.0 (2/14/2018)
### Public API Change ### Public API Change
* Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake. * Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake.

@ -14,7 +14,6 @@
#include <limits> #include <limits>
#include <list> #include <list>
#include <map> #include <map>
#include <queue>
#include <set> #include <set>
#include <string> #include <string>
#include <utility> #include <utility>
@ -433,6 +432,8 @@ class DBImpl : public DB {
uint64_t TEST_FindMinLogContainingOutstandingPrep(); uint64_t TEST_FindMinLogContainingOutstandingPrep();
uint64_t TEST_FindMinPrepLogReferencedByMemTable(); uint64_t TEST_FindMinPrepLogReferencedByMemTable();
size_t TEST_PreparedSectionCompletedSize();
size_t TEST_LogsWithPrepSize();
int TEST_BGCompactionsAllowed() const; int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const; int TEST_BGFlushesAllowed() const;
@ -1298,27 +1299,33 @@ class DBImpl : public DB {
// Indicate DB was opened successfully // Indicate DB was opened successfully
bool opened_successfully_; bool opened_successfully_;
// minimum log number still containing prepared data. // REQUIRES: logs_with_prep_mutex_ held
//
// sorted list of log numbers still containing prepared data.
// this is used by FindObsoleteFiles to determine which // this is used by FindObsoleteFiles to determine which
// flushed logs we must keep around because they still // flushed logs we must keep around because they still
// contain prepared data which has not been flushed or rolled back // contain prepared data which has not been committed or rolled back
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>> struct LogCnt {
min_log_with_prep_; uint64_t log; // the log number
uint64_t cnt; // number of prepared sections in the log
};
std::vector<LogCnt> logs_with_prep_;
std::mutex logs_with_prep_mutex_;
// to be used in conjunction with min_log_with_prep_. // REQUIRES: prepared_section_completed_mutex_ held
//
// to be used in conjunction with logs_with_prep_.
// once a transaction with data in log L is committed or rolled back // once a transaction with data in log L is committed or rolled back
// rather than removing the value from the heap we add that value // rather than updating logs_with_prep_ directly we keep track of that
// to prepared_section_completed_ which maps LOG -> instance_count // in prepared_section_completed_ which maps LOG -> instance_count. This helps
// since a log could contain multiple prepared sections // avoiding contention between a commit thread and the prepare threads.
// //
// when trying to determine the minimum log still active we first // when trying to determine the minimum log still active we first
// consult min_log_with_prep_. while that root value maps to // consult logs_with_prep_. while that root value maps to
// a value > 0 in prepared_section_completed_ we decrement the // an equal value in prepared_section_completed_ we erase the log from
// instance_count for that log and pop the root value in // both logs_with_prep_ and prepared_section_completed_.
// min_log_with_prep_. This will work the same as a min_heap
// where we are deleteing arbitrary elements and the up heaping.
std::unordered_map<uint64_t, uint64_t> prepared_section_completed_; std::unordered_map<uint64_t, uint64_t> prepared_section_completed_;
std::mutex prep_heap_mutex_; std::mutex prepared_section_completed_mutex_;
// Callback for compaction to check if a key is visible to a snapshot. // Callback for compaction to check if a key is visible to a snapshot.
// REQUIRES: mutex held // REQUIRES: mutex held

@ -186,6 +186,12 @@ uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {
return FindMinLogContainingOutstandingPrep(); return FindMinLogContainingOutstandingPrep();
} }
size_t DBImpl::TEST_PreparedSectionCompletedSize() {
return prepared_section_completed_.size();
}
size_t DBImpl::TEST_LogsWithPrepSize() { return logs_with_prep_.size(); }
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
return FindMinPrepLogReferencedByMemTable(); return FindMinPrepLogReferencedByMemTable();
} }

@ -48,58 +48,61 @@ uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
return min_log; return min_log;
} }
// TODO(myabandeh): Avoid using locks
void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) { void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
assert(log != 0); assert(log != 0);
std::lock_guard<std::mutex> lock(prep_heap_mutex_); std::lock_guard<std::mutex> lock(prepared_section_completed_mutex_);
auto it = prepared_section_completed_.find(log); auto it = prepared_section_completed_.find(log);
assert(it != prepared_section_completed_.end()); if (UNLIKELY(it == prepared_section_completed_.end())) {
it->second += 1; prepared_section_completed_[log] = 1;
} else {
it->second += 1;
}
} }
// TODO(myabandeh): Avoid using locks
void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) { void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
assert(log != 0); assert(log != 0);
std::lock_guard<std::mutex> lock(prep_heap_mutex_); std::lock_guard<std::mutex> lock(logs_with_prep_mutex_);
min_log_with_prep_.push(log);
auto it = prepared_section_completed_.find(log); auto rit = logs_with_prep_.rbegin();
if (it == prepared_section_completed_.end()) { bool updated = false;
prepared_section_completed_[log] = 0; // Most probabely the last log is the one that is being marked for
// having a prepare section; so search from the end.
for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) {
if (rit->log == log) {
rit->cnt++;
updated = true;
break;
}
}
if (!updated) {
// We are either at the start, or at a position with rit->log < log
logs_with_prep_.insert(rit.base(), {log, 1});
} }
} }
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() { uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
std::lock_guard<std::mutex> lock(logs_with_prep_mutex_);
if (!allow_2pc()) { auto it = logs_with_prep_.begin();
return 0; // start with the smallest log
} for (; it != logs_with_prep_.end();) {
auto min_log = it->log;
std::lock_guard<std::mutex> lock(prep_heap_mutex_); {
uint64_t min_log = 0; std::lock_guard<std::mutex> lock2(prepared_section_completed_mutex_);
auto completed_it = prepared_section_completed_.find(min_log);
// first we look in the prepared heap where we keep if (completed_it == prepared_section_completed_.end() ||
// track of transactions that have been prepared (written to WAL) completed_it->second < it->cnt) {
// but not yet committed. return min_log;
while (!min_log_with_prep_.empty()) { }
min_log = min_log_with_prep_.top(); assert(completed_it != prepared_section_completed_.end() &&
completed_it->second == it->cnt);
auto it = prepared_section_completed_.find(min_log); prepared_section_completed_.erase(completed_it);
// value was marked as 'deleted' from heap
if (it != prepared_section_completed_.end() && it->second > 0) {
it->second -= 1;
min_log_with_prep_.pop();
// back to squere one...
min_log = 0;
continue;
} else {
// found a valid value
break;
} }
// erase from beigning in vector is not efficient but this function is not
// on the fast path.
it = logs_with_prep_.erase(it);
} }
// no such log found
return min_log; return 0;
} }
uint64_t DBImpl::MinLogNumberToKeep() { uint64_t DBImpl::MinLogNumberToKeep() {

@ -40,6 +40,7 @@ int main() {
#include <algorithm> #include <algorithm>
#include <chrono> #include <chrono>
#include <exception> #include <exception>
#include <queue>
#include <thread> #include <thread>
#include "db/db_impl.h" #include "db/db_impl.h"

@ -770,6 +770,42 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) {
delete txn1; delete txn1;
} }
TEST_P(TransactionTest, LogMarkLeakTest) {
TransactionOptions txn_options;
WriteOptions write_options;
options.write_buffer_size = 1024;
ReOpenNoDelete();
Random rnd(47);
std::vector<Transaction*> txns;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
// At the beginning there should be no log containing prepare data
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
for (size_t i = 0; i < 100; i++) {
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName("xid" + ToString(i)));
ASSERT_OK(txn->Put(Slice("foo" + ToString(i)), Slice("bar")));
ASSERT_OK(txn->Prepare());
ASSERT_GT(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
if (rnd.OneIn(5)) {
txns.push_back(txn);
} else {
ASSERT_OK(txn->Commit());
delete txn;
}
db_impl->TEST_FlushMemTable(true);
}
for (auto txn : txns) {
ASSERT_OK(txn->Commit());
delete txn;
}
// At the end there should be no log left containing prepare data
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// Make sure that the underlying data structures are properly truncated and
// cause not leak
ASSERT_EQ(db_impl->TEST_PreparedSectionCompletedSize(), 0);
ASSERT_EQ(db_impl->TEST_LogsWithPrepSize(), 0);
}
TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
for (bool cwb4recovery : {true, false}) { for (bool cwb4recovery : {true, false}) {
ReOpen(); ReOpen();

Loading…
Cancel
Save