Free obsolete memtables outside the dbmutex had a memory leak.

Summary:
The commit at 27bbef1180 had a memory leak
that was detected by valgrind. The memtable that has a refcount decrement
in MemTableList::InstallMemtableFlushResults was not freed.

Test Plan: valgrind ./db_test --leak-check=full

Reviewers: igor

Reviewed By: igor

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14391
main
Dhruba Borthakur 11 years ago
parent fe754fe7e3
commit 98968ba937
  1. 31
      db/db_impl.cc
  2. 10
      db/db_impl.h
  3. 7
      db/memtablelist.cc
  4. 3
      db/memtablelist.h
  5. 2
      db/repair.cc

@ -515,6 +515,19 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// files in sst_delete_files and log_delete_files. // files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method. // It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) { void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// free pending memtables
for (auto m : state.memtables_to_free) {
delete m;
}
// check if there is anything to do
if (!state.all_files.size() &&
!state.sst_delete_files.size() &&
!state.log_delete_files.size()) {
return;
}
// this checks if FindObsoleteFiles() was run before. If not, don't do // this checks if FindObsoleteFiles() was run before. If not, don't do
// PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
// run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
@ -1169,7 +1182,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
// Replace immutable memtable with the generated Table // Replace immutable memtable with the generated Table
s = imm_.InstallMemtableFlushResults( s = imm_.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(), mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_); file_number, pending_outputs_, &deletion_state.memtables_to_free);
if (s.ok()) { if (s.ok()) {
if (madeProgress) { if (madeProgress) {
@ -1655,7 +1668,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
void DBImpl::BackgroundCallFlush() { void DBImpl::BackgroundCallFlush() {
bool madeProgress = false; bool madeProgress = false;
DeletionState deletion_state; DeletionState deletion_state(options_.max_write_buffer_number);
assert(bg_flush_scheduled_); assert(bg_flush_scheduled_);
MutexLock l(&mutex_); MutexLock l(&mutex_);
@ -1701,7 +1714,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
void DBImpl::BackgroundCallCompaction() { void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false; bool madeProgress = false;
DeletionState deletion_state; DeletionState deletion_state(options_.max_write_buffer_number);
MaybeDumpStats(); MaybeDumpStats();
@ -1731,6 +1744,7 @@ void DBImpl::BackgroundCallCompaction() {
// FindObsoleteFiles(). This is because deletion_state does not catch // FindObsoleteFiles(). This is because deletion_state does not catch
// all created files if compaction failed. // all created files if compaction failed.
FindObsoleteFiles(deletion_state, !s.ok()); FindObsoleteFiles(deletion_state, !s.ok());
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete()) { if (deletion_state.HaveSomethingToDelete()) {
mutex_.Unlock(); mutex_.Unlock();
@ -2491,25 +2505,20 @@ struct IterState {
static void CleanupIteratorState(void* arg1, void* arg2) { static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1); IterState* state = reinterpret_cast<IterState*>(arg1);
std::vector<MemTable*> to_delete; DBImpl::DeletionState deletion_state(state->db->GetOptions().
to_delete.reserve(state->mem.size()); max_write_buffer_number);
state->mu->Lock(); state->mu->Lock();
for (unsigned int i = 0; i < state->mem.size(); i++) { for (unsigned int i = 0; i < state->mem.size(); i++) {
MemTable* m = state->mem[i]->Unref(); MemTable* m = state->mem[i]->Unref();
if (m != nullptr) { if (m != nullptr) {
to_delete.push_back(m); deletion_state.memtables_to_free.push_back(m);
} }
} }
state->version->Unref(); state->version->Unref();
// delete only the sst obsolete files
DBImpl::DeletionState deletion_state;
// fast path FindObsoleteFiles // fast path FindObsoleteFiles
state->db->FindObsoleteFiles(deletion_state, false, true); state->db->FindObsoleteFiles(deletion_state, false, true);
state->mu->Unlock(); state->mu->Unlock();
state->db->PurgeObsoleteFiles(deletion_state); state->db->PurgeObsoleteFiles(deletion_state);
// delete obsolete memtables outside the db-mutex
for (MemTable* m : to_delete) delete m;
delete state; delete state;
} }
} // namespace } // namespace

@ -129,10 +129,12 @@ class DBImpl : public DB {
struct DeletionState { struct DeletionState {
inline bool HaveSomethingToDelete() const { inline bool HaveSomethingToDelete() const {
return all_files.size() || return memtables_to_free.size() ||
all_files.size() ||
sst_delete_files.size() || sst_delete_files.size() ||
log_delete_files.size(); log_delete_files.size();
} }
// a list of all files that we'll consider deleting // a list of all files that we'll consider deleting
// (every once in a while this is filled up with all files // (every once in a while this is filled up with all files
// in the DB directory) // in the DB directory)
@ -147,14 +149,18 @@ class DBImpl : public DB {
// a list of log files that we need to delete // a list of log files that we need to delete
std::vector<uint64_t> log_delete_files; std::vector<uint64_t> log_delete_files;
// a list of memtables to be free
std::vector<MemTable *> memtables_to_free;
// the current manifest_file_number, log_number and prev_log_number // the current manifest_file_number, log_number and prev_log_number
// that corresponds to the set of files in 'live'. // that corresponds to the set of files in 'live'.
uint64_t manifest_file_number, log_number, prev_log_number; uint64_t manifest_file_number, log_number, prev_log_number;
DeletionState() { explicit DeletionState(const int num_memtables = 0) {
manifest_file_number = 0; manifest_file_number = 0;
log_number = 0; log_number = 0;
prev_log_number = 0; prev_log_number = 0;
memtables_to_free.reserve(num_memtables);
} }
}; };

@ -80,7 +80,8 @@ Status MemTableList::InstallMemtableFlushResults(
VersionSet* vset, Status flushStatus, VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log, port::Mutex* mu, Logger* info_log,
uint64_t file_number, uint64_t file_number,
std::set<uint64_t>& pending_outputs) { std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete) {
mu->AssertHeld(); mu->AssertHeld();
// If the flush was not successful, then just reset state. // If the flush was not successful, then just reset state.
@ -151,7 +152,9 @@ Status MemTableList::InstallMemtableFlushResults(
// executing compaction threads do not mistakenly assume that this // executing compaction threads do not mistakenly assume that this
// file is not live. // file is not live.
pending_outputs.erase(m->file_number_); pending_outputs.erase(m->file_number_);
m->Unref(); if (m->Unref() != nullptr) {
to_delete->push_back(m);
}
size_--; size_--;
} else { } else {
//commit failed. setup state so that we can flush again. //commit failed. setup state so that we can flush again.

@ -65,7 +65,8 @@ class MemTableList {
VersionSet* vset, Status flushStatus, VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log, port::Mutex* mu, Logger* info_log,
uint64_t file_number, uint64_t file_number,
std::set<uint64_t>& pending_outputs); std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete);
// New memtables are inserted at the front of the list. // New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add(). // Takes ownership of the referenced held on *m by the caller of Add().

@ -227,7 +227,7 @@ class Repairer {
table_cache_, iter, &meta, table_cache_, iter, &meta,
icmp_.user_comparator(), 0, 0, true); icmp_.user_comparator(), 0, 0, true);
delete iter; delete iter;
mem->Unref(); delete mem->Unref();
mem = nullptr; mem = nullptr;
if (status.ok()) { if (status.ok()) {
if (meta.file_size > 0) { if (meta.file_size > 0) {

Loading…
Cancel
Save