Simplify cleanup of dead (refcount == 0) column families

main
Igor Canadi 11 years ago
parent e48348d196
commit b42ceb9598
  1. 14
      db/column_family.cc
  2. 12
      db/column_family.h
  3. 10
      db/db_filesnapshot.cc
  4. 45
      db/db_impl.cc

@ -428,6 +428,20 @@ void ColumnFamilySet::Lock() {
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); } void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
autovector<ColumnFamilyData*> to_delete;
for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
if (cfd->refs_ == 0) {
to_delete.push_back(cfd);
}
}
for (auto cfd : to_delete) {
// this is very rare, so it's not a problem that we do it under a mutex
delete cfd;
}
}
// under a DB mutex // under a DB mutex
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID()); auto cfd_iter = column_family_data_.find(cfd->GetID());

@ -91,8 +91,7 @@ struct SuperVersion {
SuperVersion() = default; SuperVersion() = default;
~SuperVersion(); ~SuperVersion();
SuperVersion* Ref(); SuperVersion* Ref();
// Returns true if this was the last reference and caller should
// call Clenaup() and delete the object
bool Unref(); bool Unref();
// call these two methods with db mutex held // call these two methods with db mutex held
@ -133,8 +132,9 @@ class ColumnFamilyData {
void Ref() { ++refs_; } void Ref() { ++refs_; }
// will just decrease reference count to 0, but will not delete it. returns // will just decrease reference count to 0, but will not delete it. returns
// true if the ref count was decreased to zero and needs to be cleaned up by // true if the ref count was decreased to zero. in that case, it can be
// the caller // deleted by the caller immediatelly, or later, by calling
// FreeDeadColumnFamilies()
bool Unref() { bool Unref() {
assert(refs_ > 0); assert(refs_ > 0);
return --refs_ == 0; return --refs_ == 0;
@ -343,6 +343,10 @@ class ColumnFamilySet {
void Lock(); void Lock();
void Unlock(); void Unlock();
// REQUIRES: DB mutex held
// Don't call while iterating over ColumnFamilySet
void FreeDeadColumnFamilies();
private: private:
friend class ColumnFamilyData; friend class ColumnFamilyData;
// helper function that gets called from cfd destructor // helper function that gets called from cfd destructor

@ -67,23 +67,19 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
if (flush_memtable) { if (flush_memtable) {
// flush all dirty data to disk. // flush all dirty data to disk.
autovector<ColumnFamilyData*> to_delete;
Status status; Status status;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref(); cfd->Ref();
mutex_.Unlock(); mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions()); status = FlushMemTable(cfd, FlushOptions());
mutex_.Lock(); mutex_.Lock();
if (cfd->Unref()) { cfd->Unref();
to_delete.push_back(cfd);
}
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
} }
for (auto cfd : to_delete) { versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
delete cfd;
}
if (!status.ok()) { if (!status.ok()) {
mutex_.Unlock(); mutex_.Unlock();
Log(options_.info_log, "Cannot Flush data %s\n", Log(options_.info_log, "Cannot Flush data %s\n",

@ -404,21 +404,16 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
mutex_.Lock(); mutex_.Lock();
if (flush_on_destroy_) { if (flush_on_destroy_) {
autovector<ColumnFamilyData*> to_delete;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->GetFirstSequenceNumber() != 0) { if (cfd->mem()->GetFirstSequenceNumber() != 0) {
cfd->Ref(); cfd->Ref();
mutex_.Unlock(); mutex_.Unlock();
FlushMemTable(cfd, FlushOptions()); FlushMemTable(cfd, FlushOptions());
mutex_.Lock(); mutex_.Lock();
if (cfd->Unref()) { cfd->Unref();
to_delete.push_back(cfd);
}
} }
} }
for (auto cfd : to_delete) { versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
delete cfd;
}
} }
// Wait for background work to finish // Wait for background work to finish
@ -1941,7 +1936,6 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
// flushing one column family reports a failure, we will continue flushing // flushing one column family reports a failure, we will continue flushing
// other column families. however, call_status will be a failure in that case. // other column families. however, call_status will be a failure in that case.
Status call_status; Status call_status;
autovector<ColumnFamilyData*> to_delete;
// refcounting in iteration // refcounting in iteration
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref(); cfd->Ref();
@ -1958,13 +1952,9 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
if (call_status.ok() && !flush_status.ok()) { if (call_status.ok() && !flush_status.ok()) {
call_status = flush_status; call_status = flush_status;
} }
if (cfd->Unref()) { cfd->Unref();
to_delete.push_back(cfd);
}
}
for (auto cfd : to_delete) {
delete cfd;
} }
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
return call_status; return call_status;
} }
@ -2105,6 +2095,8 @@ void DBImpl::BackgroundCallCompaction() {
MaybeScheduleLogDBDeployStats(); MaybeScheduleLogDBDeployStats();
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
// Previous compaction may have produced too many files in a level, // Previous compaction may have produced too many files in a level,
// So reschedule another compaction if we made progress in the // So reschedule another compaction if we made progress in the
// last compaction. // last compaction.
@ -2139,7 +2131,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} }
// FLUSH preempts compaction // FLUSH preempts compaction
autovector<ColumnFamilyData*> to_delete;
Status flush_stat; Status flush_stat;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
while (cfd->imm()->IsFlushPending()) { while (cfd->imm()->IsFlushPending()) {
@ -2151,9 +2142,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
cfd->Ref(); cfd->Ref();
flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state, flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state,
log_buffer); log_buffer);
if (cfd->Unref()) { cfd->Unref();
to_delete.push_back(cfd);
}
if (!flush_stat.ok()) { if (!flush_stat.ok()) {
if (is_manual) { if (is_manual) {
manual_compaction_->status = flush_stat; manual_compaction_->status = flush_stat;
@ -2161,18 +2150,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
manual_compaction_->in_progress = false; manual_compaction_->in_progress = false;
manual_compaction_ = nullptr; manual_compaction_ = nullptr;
} }
break; return flush_stat;
} }
} }
if (!flush_stat.ok()) {
break;
}
}
for (auto cfd : to_delete) {
delete cfd;
}
if (!flush_stat.ok()) {
return flush_stat;
} }
unique_ptr<Compaction> c; unique_ptr<Compaction> c;
@ -3840,22 +3820,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
} }
Status status; Status status;
autovector<ColumnFamilyData*> to_delete;
// refcounting cfd in iteration // refcounting cfd in iteration
bool dead_cfd = false;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref(); cfd->Ref();
// May temporarily unlock and wait. // May temporarily unlock and wait.
status = MakeRoomForWrite(cfd, my_batch == nullptr); status = MakeRoomForWrite(cfd, my_batch == nullptr);
if (cfd->Unref()) { if (cfd->Unref()) {
to_delete.push_back(cfd); dead_cfd = true;
} }
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
} }
for (auto cfd : to_delete) { if (dead_cfd) {
delete cfd; versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
} }
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions

Loading…
Cancel
Save