diff --git a/db/column_family.cc b/db/column_family.cc index f66759818..7dd02913a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -60,11 +60,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options(); JobContext job_context(0); mutex_->Lock(); - if (cfd_->Unref()) { - bool dropped = cfd_->IsDropped(); - - delete cfd_; - + bool dropped = cfd_->IsDropped(); + if (cfd_->UnrefAndTryDelete()) { if (dropped) { db_->FindObsoleteFiles(&job_context, false, true); } @@ -439,13 +436,18 @@ void SuperVersion::Cleanup() { to_delete.push_back(m); } current->Unref(); + if (cfd->Unref()) { + delete cfd; + } } -void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current) { +void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem, + MemTableListVersion* new_imm, Version* new_current) { + cfd = new_cfd; mem = new_mem; imm = new_imm; current = new_current; + cfd->Ref(); mem->Ref(); imm->Ref(); current->Ref(); @@ -581,21 +583,7 @@ ColumnFamilyData::~ColumnFamilyData() { // compaction_queue_ and we destroyed it assert(!queued_for_flush_); assert(!queued_for_compaction_); - - if (super_version_ != nullptr) { - // Release SuperVersion reference kept in ThreadLocalPtr. - // This must be done outside of mutex_ since unref handler can lock mutex. - super_version_->db_mutex->Unlock(); - local_sv_.reset(); - super_version_->db_mutex->Lock(); - - bool is_last_reference __attribute__((__unused__)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - super_version_ = nullptr; - } + assert(super_version_ == nullptr); if (dummy_versions_ != nullptr) { // List must be empty @@ -615,6 +603,36 @@ ColumnFamilyData::~ColumnFamilyData() { } } +bool ColumnFamilyData::UnrefAndTryDelete() { + int old_refs = refs_.fetch_sub(1); + assert(old_refs > 0); + + if (old_refs == 1) { + assert(super_version_ == nullptr); + delete this; + return true; + } + + if (old_refs == 2 && super_version_ != nullptr) { + // Only the super_version_ holds me + SuperVersion* sv = super_version_; + super_version_ = nullptr; + // Release SuperVersion reference kept in ThreadLocalPtr. + // This must be done outside of mutex_ since unref handler can lock mutex. + sv->db_mutex->Unlock(); + local_sv_.reset(); + sv->db_mutex->Lock(); + + if (sv->Unref()) { + // May delete this ColumnFamilyData after calling Cleanup() + sv->Cleanup(); + delete sv; + return true; + } + } + return false; +} + void ColumnFamilyData::SetDropped() { // can't drop default CF assert(id_ != 0); @@ -1169,7 +1187,7 @@ void ColumnFamilyData::InstallSuperVersion( SuperVersion* new_superversion = sv_context->new_superversion.release(); new_superversion->db_mutex = db_mutex; new_superversion->mutable_cf_options = mutable_cf_options; - new_superversion->Init(mem_, imm_.current(), current_); + new_superversion->Init(this, mem_, imm_.current(), current_); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; ++super_version_number_; @@ -1344,14 +1362,12 @@ ColumnFamilySet::~ColumnFamilySet() { // cfd destructor will delete itself from column_family_data_ auto cfd = column_family_data_.begin()->second; bool last_ref __attribute__((__unused__)); - last_ref = cfd->Unref(); + last_ref = cfd->UnrefAndTryDelete(); assert(last_ref); - delete cfd; } bool dummy_last_ref __attribute__((__unused__)); - dummy_last_ref = dummy_cfd_->Unref(); + dummy_last_ref = dummy_cfd_->UnrefAndTryDelete(); assert(dummy_last_ref); - delete dummy_cfd_; } ColumnFamilyData* ColumnFamilySet::GetDefault() const { diff --git a/db/column_family.h b/db/column_family.h index 135504ea2..88ca364d6 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -198,6 +198,7 @@ class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl { struct SuperVersion { // Accessing members of this class is not thread-safe and requires external // synchronization (ie db mutex held or on write thread). + ColumnFamilyData* cfd; MemTable* mem; MemTableListVersion* imm; Version* current; @@ -221,8 +222,8 @@ struct SuperVersion { // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex void Cleanup(); - void Init(MemTable* new_mem, MemTableListVersion* new_imm, - Version* new_current); + void Init(ColumnFamilyData* new_cfd, MemTable* new_mem, + MemTableListVersion* new_imm, Version* new_current); // The value of dummy is not actually used. kSVInUse takes its address as a // mark in the thread local storage to indicate the SuperVersion is in use @@ -288,6 +289,11 @@ class ColumnFamilyData { return old_refs == 1; } + // UnrefAndTryDelete() decreases the reference count and do free if needed, + // return true if this is freed else false, UnrefAndTryDelete() can only + // be called while holding a DB mutex, or during single-threaded recovery. + bool UnrefAndTryDelete(); + // SetDropped() can only be called under following conditions: // 1) Holding a DB mutex, // 2) from single-threaded write thread, AND diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index d83bb7197..7f10f28fb 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -275,9 +275,7 @@ Compaction::~Compaction() { input_version_->Unref(); } if (cfd_ != nullptr) { - if (cfd_->Unref()) { - delete cfd_; - } + cfd_->UnrefAndTryDelete(); } } diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index dd5f8f67f..8717b3559 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -102,7 +102,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); mutex_.Lock(); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bb5d3d263..3a3bb886b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -330,7 +330,7 @@ Status DBImpl::ResumeImpl() { mutex_.Unlock(); s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery); mutex_.Lock(); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!s.ok()) { break; } @@ -418,7 +418,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { mutex_.Unlock(); FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown); mutex_.Lock(); - cfd->Unref(); + cfd->UnrefAndTryDelete(); } } } @@ -475,17 +475,12 @@ Status DBImpl::CloseHelper() { while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); for (const auto& iter : flush_req) { - ColumnFamilyData* cfd = iter.first; - if (cfd->Unref()) { - delete cfd; - } + iter.first->UnrefAndTryDelete(); } } while (!compaction_queue_.empty()) { auto cfd = PopFirstFromCompactionQueue(); - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) { @@ -4303,7 +4298,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) { } } for (auto cfd : cfd_list) { - cfd->Unref(); + cfd->UnrefAndTryDelete(); } } return s; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 2336b3e92..99c67ea49 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1618,12 +1618,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + InstrumentedMutexLock lock_guard(&mutex_); for (auto* tmp_cfd : cfds) { - if (tmp_cfd->Unref()) { - // Only one thread can reach here. - InstrumentedMutexLock lock_guard(&mutex_); - delete tmp_cfd; - } + tmp_cfd->UnrefAndTryDelete(); } } TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished"); @@ -1683,7 +1680,7 @@ Status DBImpl::AtomicFlushMemTables( } cfd->Ref(); s = SwitchMemtable(cfd, &context); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!s.ok()) { break; } @@ -1723,12 +1720,9 @@ Status DBImpl::AtomicFlushMemTables( } s = WaitForFlushMemTables(cfds, flush_memtable_ids, (flush_reason == FlushReason::kErrorRecovery)); + InstrumentedMutexLock lock_guard(&mutex_); for (auto* cfd : cfds) { - if (cfd->Unref()) { - // Only one thread can reach here. - InstrumentedMutexLock lock_guard(&mutex_); - delete cfd; - } + cfd->UnrefAndTryDelete(); } } return s; @@ -2209,16 +2203,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, *reason = bg_flush_args[0].cfd_->GetFlushReason(); for (auto& arg : bg_flush_args) { ColumnFamilyData* cfd = arg.cfd_; - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { arg.cfd_ = nullptr; } } } for (auto cfd : column_families_not_to_flush) { - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } return status; } @@ -2547,10 +2538,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // reference). // This will all happen under a mutex so we don't have to be afraid of // somebody else deleting it. - if (cfd->Unref()) { + if (cfd->UnrefAndTryDelete()) { // This was the last reference of the column family, so no need to // compact. - delete cfd; return Status::OK(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 9ca0a940c..05a46af83 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -920,7 +920,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { - cfd->Unref(); + cfd->UnrefAndTryDelete(); // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families assert(cfd->GetLogNumber() <= log_number); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 9d38fb74c..8065f0a79 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1256,7 +1256,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { for (const auto cfd : cfds) { cfd->Ref(); status = SwitchMemtable(cfd, write_context); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } @@ -1335,7 +1335,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { } cfd->Ref(); status = SwitchMemtable(cfd, write_context); - cfd->Unref(); + cfd->UnrefAndTryDelete(); if (!status.ok()) { break; } @@ -1525,8 +1525,7 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) { assert(context->superversion_context.new_superversion.get() != nullptr); cfd->InstallSuperVersion(&context->superversion_context, &mutex_); - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } } @@ -1558,8 +1557,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { if (!cfd->mem()->IsEmpty()) { status = SwitchMemtable(cfd, context); } - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } if (!status.ok()) { diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc index cbcb5ce49..e40366408 100644 --- a/db/flush_scheduler.cc +++ b/db/flush_scheduler.cc @@ -60,9 +60,7 @@ ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { } // no longer relevant, retry - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } } @@ -80,9 +78,7 @@ bool FlushScheduler::Empty() { void FlushScheduler::Clear() { ColumnFamilyData* cfd; while ((cfd = TakeNextColumnFamily()) != nullptr) { - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } assert(head_.load(std::memory_order_relaxed) == nullptr); } diff --git a/db/trim_history_scheduler.cc b/db/trim_history_scheduler.cc index a213ac65f..7451ba47d 100644 --- a/db/trim_history_scheduler.cc +++ b/db/trim_history_scheduler.cc @@ -34,10 +34,7 @@ ColumnFamilyData* TrimHistoryScheduler::TakeNextColumnFamily() { // success return cfd; } - if (cfd->Unref()) { - // no longer relevant, retry - delete cfd; - } + cfd->UnrefAndTryDelete(); } } @@ -49,9 +46,7 @@ bool TrimHistoryScheduler::Empty() { void TrimHistoryScheduler::Clear() { ColumnFamilyData* cfd; while ((cfd = TakeNextColumnFamily()) != nullptr) { - if (cfd->Unref()) { - delete cfd; - } + cfd->UnrefAndTryDelete(); } assert(Empty()); } diff --git a/db/version_set.cc b/db/version_set.cc index 444996e40..f9627f81d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3888,9 +3888,7 @@ Status VersionSet::ProcessManifestWrites( } else if (first_writer.edit_list.front()->is_column_family_drop_) { assert(batch_edits.size() == 1); first_writer.cfd->SetDropped(); - if (first_writer.cfd->Unref()) { - delete first_writer.cfd; - } + first_writer.cfd->UnrefAndTryDelete(); } else { // Each version in versions corresponds to a column family. // For each column family, update its log number indicating that logs @@ -4168,8 +4166,7 @@ Status VersionSet::ApplyOneVersionEditToBuilder( builders.erase(builder); cfd = column_family_set_->GetColumnFamily(edit.column_family_); assert(cfd != nullptr); - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } else { // who else can have reference to cfd!? @@ -4781,8 +4778,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, comparators.erase(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_); assert(cfd != nullptr); - cfd->Unref(); - delete cfd; + cfd->UnrefAndTryDelete(); cfd = nullptr; } else { if (!cf_in_builders) { @@ -5808,8 +5804,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( // secondary instance. (Is it possible that the ref count for cfd is 0 but // the ref count for its versions is higher than 0?) cfd->SetDropped(); - if (cfd->Unref()) { - delete cfd; + if (cfd->UnrefAndTryDelete()) { cfd = nullptr; } active_version_builders_.erase(builder_iter);