Enable iterating column families with a concurrent writer

Summary:
Sometimes we iterate through column families, and unlock the mutex in the body of the iteration. While mutex is unlocked, some column family might be created or dropped. We need to be able to continue iterating through column families even though our current column family got dropped.

This diff implements circular linked lists that connect all column families. It then uses the link list to enable iterating through linked lists. Even if the column family is dropped, its next_ pointer still can be used to advance to another alive column family.

Test Plan: make check

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15603
main
Igor Canadi 11 years ago
parent 6973bb1722
commit 9ca638a86d
  1. 45
      db/column_family.cc
  2. 29
      db/column_family.h
  3. 7
      db/db_impl.cc

@ -69,6 +69,8 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
imm_(options.min_write_buffer_number_to_merge),
super_version_(nullptr),
super_version_number_(0),
next_(nullptr),
prev_(nullptr),
log_number_(0),
need_slowdown_for_num_level0_files_(false) {}
@ -80,9 +82,11 @@ ColumnFamilyData::~ColumnFamilyData() {
super_version_->Cleanup();
delete super_version_;
}
if (dummy_versions_ != nullptr) {
// List must be empty
assert(dummy_versions_->next_ == dummy_versions_);
delete dummy_versions_;
}
if (mem_ != nullptr) {
delete mem_->Unref();
@ -123,7 +127,13 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
return nullptr;
}
ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {}
ColumnFamilySet::ColumnFamilySet()
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, ColumnFamilyOptions())) {
// initialize linked list
dummy_cfd_->prev_.store(dummy_cfd_);
dummy_cfd_->next_.store(dummy_cfd_);
}
ColumnFamilySet::~ColumnFamilySet() {
for (auto& cfd : column_family_data_) {
@ -132,12 +142,14 @@ ColumnFamilySet::~ColumnFamilySet() {
for (auto& cfd : droppped_column_families_) {
delete cfd;
}
delete dummy_cfd_;
}
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
auto ret = GetColumnFamily(0);
assert(ret != nullptr); // default column family should always exist
return ret;
auto cfd = GetColumnFamily(0);
// default column family should always exist
assert(cfd != nullptr);
return cfd;
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
@ -176,16 +188,29 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
new ColumnFamilyData(id, name, dummy_versions, options);
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
// add to linked list
new_cfd->next_.store(dummy_cfd_);
auto prev = dummy_cfd_->prev_.load();
new_cfd->prev_.store(prev);
prev->next_.store(new_cfd);
dummy_cfd_->prev_.store(new_cfd);
return new_cfd;
}
void ColumnFamilySet::DropColumnFamily(uint32_t id) {
auto cfd = column_family_data_.find(id);
assert(cfd != column_family_data_.end());
column_families_.erase(cfd->second->GetName());
cfd->second->current()->Unref();
droppped_column_families_.push_back(cfd->second);
column_family_data_.erase(cfd);
assert(id != 0);
auto cfd_iter = column_family_data_.find(id);
assert(cfd_iter != column_family_data_.end());
auto cfd = cfd_iter->second;
column_families_.erase(cfd->GetName());
cfd->current()->Unref();
droppped_column_families_.push_back(cfd);
column_family_data_.erase(cfd_iter);
// remove from linked list
auto prev = cfd->prev_.load();
auto next = cfd->next_.load();
prev->next_.store(next);
next->prev_.store(prev);
}
MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) {

@ -92,6 +92,10 @@ class ColumnFamilyData {
}
private:
friend class ColumnFamilySet;
ColumnFamilyData* next() { return next_.load(); }
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
@ -107,6 +111,11 @@ class ColumnFamilyData {
// changes.
std::atomic<uint64_t> super_version_number_;
// pointers for a circular linked list. we use it to support iterations
// that can be concurrent with writes
std::atomic<ColumnFamilyData*> next_;
std::atomic<ColumnFamilyData*> prev_;
// This is the earliest log file number that contains data from this
// Column Family. All earlier log files must be ignored and not
// recovered from
@ -123,18 +132,19 @@ class ColumnFamilySet {
public:
class iterator {
public:
explicit iterator(
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr)
: itr_(itr) {}
explicit iterator(ColumnFamilyData* cfd)
: current_(cfd) {}
iterator& operator++() {
++itr_;
current_ = current_->next();
return *this;
}
bool operator!=(const iterator& other) { return this->itr_ != other.itr_; }
ColumnFamilyData* operator*() { return itr_->second; }
bool operator!=(const iterator& other) {
return this->current_ != other.current_;
}
ColumnFamilyData* operator*() { return current_; }
private:
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr_;
ColumnFamilyData* current_;
};
ColumnFamilySet();
@ -159,8 +169,8 @@ class ColumnFamilySet {
const ColumnFamilyOptions& options);
void DropColumnFamily(uint32_t id);
iterator begin() { return iterator(column_family_data_.begin()); }
iterator end() { return iterator(column_family_data_.end()); }
iterator begin() { return iterator(dummy_cfd_->next()); }
iterator end() { return iterator(dummy_cfd_); }
private:
std::unordered_map<std::string, uint32_t> column_families_;
@ -169,6 +179,7 @@ class ColumnFamilySet {
// all of column family data members (options for example)
std::vector<ColumnFamilyData*> droppped_column_families_;
uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_;
};
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {

@ -2948,13 +2948,6 @@ std::vector<Status> DBImpl::MultiGet(
return statList;
}
// TODO(icanadi) creating column family while writing will cause a data race.
// In write code path, we iterate through column families and call
// MakeRoomForWrite() for each. MakeRoomForWrite() can unlock the mutex
// and wait (delay the write). If we create or drop a column family when
// that mutex is unlocked for delay, that's bad.
// Solution TODO: enable iteration by chaining column families in
// circular linked lists
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle* handle) {

Loading…
Cancel
Save