[CF] Delete SuperVersion in a special function

Summary: Added a function DeleteSuperVersion that can be called in DBImpl destructor before PurgingObsoleteFiles. That way, PurgeObsoleteFiles will be able to delete all files held by alive super versions.

Test Plan: column_family_test with valgrind

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16545
main
Igor Canadi 11 years ago
parent 9d0577a6be
commit 335b207974
  1. 41
      db/column_family.cc
  2. 5
      db/column_family.h
  3. 6
      db/db_impl.cc

@ -222,28 +222,6 @@ ColumnFamilyData::~ColumnFamilyData() {
prev->next_ = next; prev->next_ = next;
next->prev_ = prev; next->prev_ = prev;
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
// It also needs to be done after FlushMemTable, which can trigger local_sv_
// access.
auto sv = static_cast<SuperVersion*>(local_sv_->Get());
if (sv != nullptr) {
auto mutex = sv->db_mutex;
mutex->Unlock();
delete local_sv_;
mutex->Lock();
} else {
delete local_sv_;
}
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
}
// it's nullptr for dummy CFD // it's nullptr for dummy CFD
if (column_family_set_ != nullptr) { if (column_family_set_ != nullptr) {
// remove from column_family_set // remove from column_family_set
@ -254,6 +232,8 @@ ColumnFamilyData::~ColumnFamilyData() {
current_->Unref(); current_->Unref();
} }
DeleteSuperVersion();
if (dummy_versions_ != nullptr) { if (dummy_versions_ != nullptr) {
// List must be empty // List must be empty
assert(dummy_versions_->next_ == dummy_versions_); assert(dummy_versions_->next_ == dummy_versions_);
@ -330,6 +310,23 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
} }
} }
void ColumnFamilyData::DeleteSuperVersion() {
if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
super_version_ = nullptr;
}
}
ColumnFamilySet::ColumnFamilySet(const std::string& dbname, ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options, const DBOptions* db_options,
const EnvOptions& storage_options, const EnvOptions& storage_options,

@ -180,6 +180,9 @@ class ColumnFamilyData {
port::Mutex* db_mutex); port::Mutex* db_mutex);
void ResetThreadLocalSuperVersions(); void ResetThreadLocalSuperVersions();
// REQUIRED: db mutex held
// Do not access column family after calling this method
void DeleteSuperVersion();
// A Flag indicating whether write needs to slowdown because of there are // A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files. // too many number of level0 files.
@ -227,7 +230,7 @@ class ColumnFamilyData {
// Thread's local copy of SuperVersion pointer // Thread's local copy of SuperVersion pointer
// This needs to be destructed before mutex_ // This needs to be destructed before mutex_
ThreadLocalPtr* local_sv_; std::unique_ptr<ThreadLocalPtr> local_sv_;
// pointers for a circular linked list. we use it to support iterations // pointers for a circular linked list. we use it to support iterations
// that can be concurrent with writes // that can be concurrent with writes

@ -300,6 +300,10 @@ DBImpl::~DBImpl() {
bg_cv_.Wait(); bg_cv_.Wait();
} }
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->DeleteSuperVersion();
}
if (options_.allow_thread_local) { if (options_.allow_thread_local) {
// Clean up obsolete files due to SuperVersion release. // Clean up obsolete files due to SuperVersion release.
// (1) Need to delete to obsolete files before closing because RepairDB() // (1) Need to delete to obsolete files before closing because RepairDB()
@ -329,9 +333,11 @@ DBImpl::~DBImpl() {
env_->UnlockFile(db_lock_); env_->UnlockFile(db_lock_);
} }
mutex_.Lock();
// versions need to be destroyed before table_cache since it can hold // versions need to be destroyed before table_cache since it can hold
// references to table_cache. // references to table_cache.
versions_.reset(); versions_.reset();
mutex_.Unlock();
LogFlush(options_.info_log); LogFlush(options_.info_log);
} }

Loading…
Cancel
Save