From c2029f9716b3a4d2d7da687fd9fea6bb59bc50e4 Mon Sep 17 00:00:00 2001 From: Jermy Li Date: Thu, 12 Dec 2019 19:02:51 -0800 Subject: [PATCH] Support concurrent CF iteration and drop (#6147) Summary: It's easy to cause coredump when closing ColumnFamilyHandle with unreleased iterators, especially iterators release is controlled by java GC when using JNI. This patch fixed concurrent CF iteration and drop, we let iterators(actually SuperVersion) hold a ColumnFamilyData reference to prevent the CF from being released too early. fixed https://github.com/facebook/rocksdb/issues/5982 Pull Request resolved: https://github.com/facebook/rocksdb/pull/6147 Differential Revision: D18926378 fbshipit-source-id: 1dff6d068c603d012b81446812368bfee95a5e15 --- db/column_family.cc | 70 ++++++++++++++++---------- db/column_family.h | 10 +++- db/compaction/compaction.cc | 4 +- db/db_filesnapshot.cc | 2 +- db/db_impl/db_impl.cc | 15 ++---- db/db_impl/db_impl_compaction_flush.cc | 26 +++------- db/db_impl/db_impl_open.cc | 2 +- db/db_impl/db_impl_write.cc | 10 ++-- db/flush_scheduler.cc | 8 +-- db/trim_history_scheduler.cc | 9 +--- db/version_set.cc | 13 ++--- 11 files changed, 79 insertions(+), 90 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index f66759818..7dd02913a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -60,11 +60,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options(); JobContext job_context(0); mutex_->Lock(); - if (cfd_->Unref()) { - bool dropped = cfd_->IsDropped(); - - delete cfd_; - + bool dropped = cfd_->IsDropped(); + if (cfd_->UnrefAndTryDelete()) { if (dropped) { db_->FindObsoleteFiles(&job_context, false, true); } @@ -439,13 +436,18 @@ void SuperVersion::Cleanup() { to_delete.push_back(m); } current->Unref(); + if (cfd->Unref()) { + delete cfd; + } } -void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current) { +void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem, + MemTableListVersion* new_imm, Version* new_current) { + cfd = new_cfd; mem = new_mem; imm = new_imm; current = new_current; + cfd->Ref(); mem->Ref(); imm->Ref(); current->Ref(); @@ -581,21 +583,7 @@ ColumnFamilyData::~ColumnFamilyData() { // compaction_queue_ and we destroyed it assert(!queued_for_flush_); assert(!queued_for_compaction_); - - if (super_version_ != nullptr) { - // Release SuperVersion reference kept in ThreadLocalPtr. - // This must be done outside of mutex_ since unref handler can lock mutex. - super_version_->db_mutex->Unlock(); - local_sv_.reset(); - super_version_->db_mutex->Lock(); - - bool is_last_reference __attribute__((__unused__)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - super_version_ = nullptr; - } + assert(super_version_ == nullptr); if (dummy_versions_ != nullptr) { // List must be empty @@ -615,6 +603,36 @@ ColumnFamilyData::~ColumnFamilyData() { } } +bool ColumnFamilyData::UnrefAndTryDelete() { + int old_refs = refs_.fetch_sub(1); + assert(old_refs > 0); + + if (old_refs == 1) { + assert(super_version_ == nullptr); + delete this; + return true; + } + + if (old_refs == 2 && super_version_ != nullptr) { + // Only the super_version_ holds me + SuperVersion* sv = super_version_; + super_version_ = nullptr; + // Release SuperVersion reference kept in ThreadLocalPtr. + // This must be done outside of mutex_ since unref handler can lock mutex. + sv->db_mutex->Unlock(); + local_sv_.reset(); + sv->db_mutex->Lock(); + + if (sv->Unref()) { + // May delete this ColumnFamilyData after calling Cleanup() + sv->Cleanup(); + delete sv; + return true; + } + } + return false; +} + void ColumnFamilyData::SetDropped() { // can't drop default CF assert(id_ != 0); @@ -1169,7 +1187,7 @@ void ColumnFamilyData::InstallSuperVersion( SuperVersion* new_superversion = sv_context->new_superversion.release(); new_superversion->db_mutex = db_mutex; new_superversion->mutable_cf_options = mutable_cf_options; - new_superversion->Init(mem_, imm_.current(), current_); + new_superversion->Init(this, mem_, imm_.current(), current_); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; ++super_version_number_; @@ -1344,14 +1362,12 @@ ColumnFamilySet::~ColumnFamilySet() { // cfd destructor will delete itself from column_family_data_ auto cfd = column_family_data_.begin()->second; bool last_ref __attribute__((__unused__)); - last_ref = cfd->Unref(); + last_ref = cfd->UnrefAndTryDelete(); assert(last_ref); - delete cfd; } bool dummy_last_ref __attribute__((__unused__)); - dummy_last_ref = dummy_cfd_->Unref(); + dummy_last_ref = dummy_cfd_->UnrefAndTryDelete(); assert(dummy_last_ref); - delete dummy_cfd_; } ColumnFamilyData* ColumnFamilySet::GetDefault() const { diff --git a/db/column_family.h b/db/column_family.h index 135504ea2..88ca364d6 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -198,6 +198,7 @@ class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl { struct SuperVersion { // Accessing members of this class is not thread-safe and requires external // synchronization (ie db mutex held or on write thread). + ColumnFamilyData* cfd; MemTable* mem; MemTableListVersion* imm; Version* current; @@ -221,8 +222,8 @@ struct SuperVersion { // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex void Cleanup(); - void Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current); + void Init(ColumnFamilyData* new_cfd, MemTable* new_mem, + MemTableListVersion* new_imm, Version* new_current); // The value of dummy is not actually used. kSVInUse takes its address as a // mark in the thread local storage to indicate the SuperVersion is in use @@ -288,6 +289,11 @@ class ColumnFamilyData { return old_refs == 1; } + // UnrefAndTryDelete() decreases the reference count and do free if needed, + // return true if this is freed else false, UnrefAndTryDelete() can only + // be called while holding a DB mutex, or during single-threaded recovery. + bool UnrefAndTryDelete(); + // SetDropped() can only be called under following conditions: // 1) Holding a DB mutex, // 2) from single-threaded write thread, AND diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index d83bb7197..7f10f28fb 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -275,9 +275,7 @@ Compaction::~Compaction() { input_version_->Unref(); } if (cfd_ != nullptr) { - if (cfd_->Unref()) { - delete cfd_; - } + cfd_->UnrefAndTryDelete(); } } diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index dd5f8f67f..8717b3559 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -102,7 +102,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); mutex_.Lock(); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bb5d3d263..3a3bb886b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -330,7 +330,7 @@ Status DBImpl::ResumeImpl() { mutex_.Unlock(); s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery); mutex_.Lock(); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!s.ok()) { break; } @@ -418,7 +418,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { mutex_.Unlock(); FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); mutex_.Lock(); - cfd->Unref(); + cfd->UnrefAndTryDelete(); } } } @@ -475,17 +475,12 @@ Status DBImpl::CloseHelper() { while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); for (const auto& iter : flush_req) { - ColumnFamilyData* cfd = iter.first; - if (cfd->Unref()) { - delete cfd; - } + iter.first->UnrefAndTryDelete(); } } while (!compaction_queue_.empty()) { auto cfd = PopFirstFromCompactionQueue(); - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) { @@ -4303,7 +4298,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) { } } for (auto cfd : cfd_list) { - cfd->Unref(); + cfd->UnrefAndTryDelete(); } } return s; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 2336b3e92..99c67ea49 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1618,12 +1618,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + InstrumentedMutexLock lock_guard(&mutex_); for (auto* tmp_cfd : cfds) { - if (tmp_cfd->Unref()) { - // Only one thread can reach here. - InstrumentedMutexLock lock_guard(&mutex_); - delete tmp_cfd; - } + tmp_cfd->UnrefAndTryDelete(); } } TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished"); @@ -1683,7 +1680,7 @@ Status DBImpl::AtomicFlushMemTables( } cfd->Ref(); s = SwitchMemtable(cfd, &context); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!s.ok()) { break; } @@ -1723,12 +1720,9 @@ Status DBImpl::AtomicFlushMemTables( } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + InstrumentedMutexLock lock_guard(&mutex_); for (auto* cfd : cfds) { - if (cfd->Unref()) { - // Only one thread can reach here. - InstrumentedMutexLock lock_guard(&mutex_); - delete cfd; - } + cfd->UnrefAndTryDelete(); } } return s; @@ -2209,16 +2203,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, *reason = bg_flush_args[0].cfd_->GetFlushReason(); for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { arg.cfd_ = nullptr; } } } for (auto cfd : column_families_not_to_flush) { - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } return status; } @@ -2547,10 +2538,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // reference). // This will all happen under a mutex so we don't have to be afraid of // somebody else deleting it. - if (cfd->Unref()) { + if (cfd->UnrefAndTryDelete()) { // This was the last reference of the column family, so no need to // compact. - delete cfd; return Status::OK(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 9ca0a940c..05a46af83 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -920,7 +920,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { - cfd->Unref(); + cfd->UnrefAndTryDelete(); // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families assert(cfd->GetLogNumber() <= log_number); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 9d38fb74c..8065f0a79 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1256,7 +1256,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { for (const auto cfd : cfds) { cfd->Ref(); status = SwitchMemtable(cfd, write_context); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } @@ -1335,7 +1335,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { } cfd->Ref(); status = SwitchMemtable(cfd, write_context); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } @@ -1525,8 +1525,7 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) { assert(context->superversion_context.new_superversion.get() != nullptr); cfd->InstallSuperVersion(&context->superversion_context, &mutex_); - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } } @@ -1558,8 +1557,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { if (!cfd->mem()->IsEmpty()) { status = SwitchMemtable(cfd, context); } - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } if (!status.ok()) { diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index cbcb5ce49..e40366408 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -60,9 +60,7 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { } // no longer relevant, retry - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } } @@ -80,9 +78,7 @@ bool FlushScheduler::Empty() { void FlushScheduler::Clear() { ColumnFamilyData* cfd; while ((cfd = TakeNextColumnFamily()) != nullptr) { - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } assert(head_.load(std::memory_order_relaxed) == nullptr); } diff --git a/db/trim_history_scheduler.cc b/db/trim_history_scheduler.cc index a213ac65f..7451ba47d 100644 --- a/db/trim_history_scheduler.cc +++ b/db/trim_history_scheduler.cc @@ -34,10 +34,7 @@ ColumnFamilyData* TrimHistoryScheduler::TakeNextColumnFamily() { // success return cfd; } - if (cfd->Unref()) { - // no longer relevant, retry - delete cfd; - } + cfd->UnrefAndTryDelete(); } } @@ -49,9 +46,7 @@ bool TrimHistoryScheduler::Empty() { void TrimHistoryScheduler::Clear() { ColumnFamilyData* cfd; while ((cfd = TakeNextColumnFamily()) != nullptr) { - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } assert(Empty()); } diff --git a/db/version_set.cc b/db/version_set.cc index 444996e40..f9627f81d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3888,9 +3888,7 @@ Status VersionSet::ProcessManifestWrites( } else if (first_writer.edit_list.front()->is_column_family_drop_) { assert(batch_edits.size() == 1); first_writer.cfd->SetDropped(); - if (first_writer.cfd->Unref()) { - delete first_writer.cfd; - } + first_writer.cfd->UnrefAndTryDelete(); } else { // Each version in versions corresponds to a column family. // For each column family, update its log number indicating that logs @@ -4168,8 +4166,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder( builders.erase(builder); cfd = column_family_set_->GetColumnFamily(edit.column_family_); assert(cfd != nullptr); - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } else { // who else can have reference to cfd!? @@ -4781,8 +4778,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, comparators.erase(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_); assert(cfd != nullptr); - cfd->Unref(); - delete cfd; + cfd->UnrefAndTryDelete(); cfd = nullptr; } else { if (!cf_in_builders) { @@ -5808,8 +5804,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( // secondary instance. (Is it possible that the ref count for cfd is 0 but // the ref count for its versions is higher than 0?) cfd->SetDropped(); - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } active_version_builders_.erase(builder_iter);