Simplify column family concurrency

Summary:
This patch changes concurrency guarantees around ColumnFamilySet::column_families_ and ColumnFamilySet::column_families_data_.

Before:
* When mutating: lock DB mutex and spin lock
* When reading: lock DB mutex OR spin lock

After:
* When mutating: lock DB mutex and be in write thread
* When reading: lock DB mutex or be in write thread

That way, we eliminate the spin lock that protects these hash maps and  simplify concurrency. That means we don't need to lock the spin lock during writing, since writing is mutually exclusive with column family create/drop (the only operations that mutate those hash maps).

With these new restrictions, I also needed to move column family create to the write thread (column family drop was already in the write thread).

Even though we don't need to lock the spin lock during write, impact on performance should be minimal -- the spin lock is almost never busy, so locking it is almost free.

This addresses task t5116919.

Test Plan:
make check

Stress test with lots and lots of column family drop and create:

   time ./db_stress --threads=30 --ops_per_thread=5000000 --max_key=5000 --column_families=200 --clear_column_family_one_in=100000 --verify_before_write=0  --reopen=15 --max_background_compactions=10 --max_background_flushes=10 --db=/fast-rocksdb-tmp/db_stress/

Reviewers: yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D30651
main
Igor Canadi 10 years ago
parent 07aa4e0e35
commit 7731d51c82
  1. 45
      db/column_family.cc
  2. 54
      db/column_family.h
  3. 14
      db/db_impl.cc
  4. 9
      db/write_batch.cc

@ -306,9 +306,10 @@ ColumnFamilyData::~ColumnFamilyData() {
prev->next_ = next; prev->next_ = next;
next->prev_ = prev; next->prev_ = prev;
// it's nullptr for dummy CFD if (!dropped_ && column_family_set_ != nullptr) {
if (column_family_set_ != nullptr) { // If it's dropped, it's already removed from column family set
// remove from column_family_set // If column_family_set_ == nullptr, this is dummy CFD and not in
// ColumnFamilySet
column_family_set_->RemoveColumnFamily(this); column_family_set_->RemoveColumnFamily(this);
} }
@ -353,6 +354,16 @@ ColumnFamilyData::~ColumnFamilyData() {
} }
} }
void ColumnFamilyData::SetDropped() {
// can't drop default CF
assert(id_ != 0);
dropped_ = true;
write_controller_token_.reset();
// remove from column_family_set
column_family_set_->RemoveColumnFamily(this);
}
void ColumnFamilyData::RecalculateWriteStallConditions( void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) { if (current_ != nullptr) {
@ -635,8 +646,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
env_options_(env_options), env_options_(env_options),
table_cache_(table_cache), table_cache_(table_cache),
write_buffer_(write_buffer), write_buffer_(write_buffer),
write_controller_(write_controller), write_controller_(write_controller) {
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list // initialize linked list
dummy_cfd_->prev_ = dummy_cfd_; dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_; dummy_cfd_->next_ = dummy_cfd_;
@ -693,7 +703,7 @@ size_t ColumnFamilySet::NumberOfColumnFamilies() const {
return column_families_.size(); return column_families_.size();
} }
// under a DB mutex // under a DB mutex AND write thread
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions, const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) { const ColumnFamilyOptions& options) {
@ -702,10 +712,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
new ColumnFamilyData(id, name, dummy_versions, table_cache_, new ColumnFamilyData(id, name, dummy_versions, table_cache_,
write_buffer_, options, db_options_, write_buffer_, options, db_options_,
env_options_, this); env_options_, this);
Lock();
column_families_.insert({name, id}); column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd}); column_family_data_.insert({id, new_cfd});
Unlock();
max_column_family_ = std::max(max_column_family_, id); max_column_family_ = std::max(max_column_family_, id);
// add to linked list // add to linked list
new_cfd->next_ = dummy_cfd_; new_cfd->next_ = dummy_cfd_;
@ -719,14 +727,6 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
return new_cfd; return new_cfd;
} }
void ColumnFamilySet::Lock() {
// spin lock
while (spin_lock_.test_and_set(std::memory_order_acquire)) {
}
}
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
// REQUIRES: DB mutex held // REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() { void ColumnFamilySet::FreeDeadColumnFamilies() {
autovector<ColumnFamilyData*> to_delete; autovector<ColumnFamilyData*> to_delete;
@ -741,30 +741,21 @@ void ColumnFamilySet::FreeDeadColumnFamilies() {
} }
} }
// under a DB mutex // under a DB mutex AND from a write thread
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID()); auto cfd_iter = column_family_data_.find(cfd->GetID());
assert(cfd_iter != column_family_data_.end()); assert(cfd_iter != column_family_data_.end());
Lock();
column_family_data_.erase(cfd_iter); column_family_data_.erase(cfd_iter);
column_families_.erase(cfd->GetName()); column_families_.erase(cfd->GetName());
Unlock();
} }
// under a DB mutex OR from a write thread
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
if (column_family_id == 0) { if (column_family_id == 0) {
// optimization for common case // optimization for common case
current_ = column_family_set_->GetDefault(); current_ = column_family_set_->GetDefault();
} else { } else {
// maybe outside of db mutex, should lock
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id); current_ = column_family_set_->GetColumnFamily(column_family_id);
column_family_set_->Unlock();
// TODO(icanadi) Maybe remove column family from the hash table when it's
// dropped?
if (current_ != nullptr && current_->IsDropped()) {
current_ = nullptr;
}
} }
handle_.SetCFD(current_); handle_.SetCFD(current_);
return current_ != nullptr; return current_ != nullptr;

@ -123,8 +123,7 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
class ColumnFamilySet; class ColumnFamilySet;
// This class keeps all the data that a column family needs. It's mosly dumb and // This class keeps all the data that a column family needs.
// used just to provide access to metadata.
// Most methods require DB mutex held, unless otherwise noted // Most methods require DB mutex held, unless otherwise noted
class ColumnFamilyData { class ColumnFamilyData {
public: public:
@ -145,7 +144,10 @@ class ColumnFamilyData {
return --refs_ == 0; return --refs_ == 0;
} }
// This can only be called from single-threaded VersionSet::LogAndApply() // SetDropped() can only be called under following conditions:
// 1) Holding a DB mutex,
// 2) from single-threaded write thread, AND
// 3) from single-threaded VersionSet::LogAndApply()
// After dropping column family no other operation on that column family // After dropping column family no other operation on that column family
// will be executed. All the files and memory will be, however, kept around // will be executed. All the files and memory will be, however, kept around
// until client drops the column family handle. That way, client can still // until client drops the column family handle. That way, client can still
@ -153,17 +155,12 @@ class ColumnFamilyData {
// Column family can be dropped and still alive. In that state: // Column family can be dropped and still alive. In that state:
// *) Column family is not included in the iteration. // *) Column family is not included in the iteration.
// *) Compaction and flush is not executed on the dropped column family. // *) Compaction and flush is not executed on the dropped column family.
// *) Client can continue writing and reading from column family. However, all // *) Client can continue reading from column family. Writes will fail unless
// writes stay in the current memtable. // WriteOptions::ignore_missing_column_families is true
// When the dropped column family is unreferenced, then we: // When the dropped column family is unreferenced, then we:
// *) delete all memory associated with that column family // *) delete all memory associated with that column family
// *) delete all the files associated with that column family // *) delete all the files associated with that column family
void SetDropped() { void SetDropped();
// can't drop default CF
assert(id_ != 0);
dropped_ = true;
write_controller_token_.reset();
}
bool IsDropped() const { return dropped_; } bool IsDropped() const { return dropped_; }
// thread-safe // thread-safe
@ -348,18 +345,21 @@ class ColumnFamilyData {
}; };
// ColumnFamilySet has interesting thread-safety requirements // ColumnFamilySet has interesting thread-safety requirements
// * CreateColumnFamily() or RemoveColumnFamily() -- need to protect by DB // * CreateColumnFamily() or RemoveColumnFamily() -- need to be protected by DB
// mutex. Inside, column_family_data_ and column_families_ will be protected // mutex AND executed in the write thread.
// by Lock() and Unlock(). CreateColumnFamily() should ONLY be called from // CreateColumnFamily() should ONLY be called from VersionSet::LogAndApply() AND
// VersionSet::LogAndApply() in the normal runtime. It is also called // single-threaded write thread. It is also called during Recovery and in
// during Recovery and in DumpManifest(). RemoveColumnFamily() is called // DumpManifest().
// from ColumnFamilyData destructor // RemoveColumnFamily() is only called from SetDropped(). DB mutex needs to be
// held and it needs to be executed from the write thread. SetDropped() also
// guarantees that it will be called only from single-threaded LogAndApply(),
// but this condition is not that important.
// * Iteration -- hold DB mutex, but you can release it in the body of // * Iteration -- hold DB mutex, but you can release it in the body of
// iteration. If you release DB mutex in body, reference the column // iteration. If you release DB mutex in body, reference the column
// family before the mutex and unreference after you unlock, since the column // family before the mutex and unreference after you unlock, since the column
// family might get dropped when the DB mutex is released // family might get dropped when the DB mutex is released
// * GetDefault() -- thread safe // * GetDefault() -- thread safe
// * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock() // * GetColumnFamily() -- either inside of DB mutex or from a write thread
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(), // * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(),
// NumberOfColumnFamilies -- inside of DB mutex // NumberOfColumnFamilies -- inside of DB mutex
class ColumnFamilySet { class ColumnFamilySet {
@ -410,9 +410,6 @@ class ColumnFamilySet {
iterator begin() { return iterator(dummy_cfd_->next_); } iterator begin() { return iterator(dummy_cfd_->next_); }
iterator end() { return iterator(dummy_cfd_); } iterator end() { return iterator(dummy_cfd_); }
void Lock();
void Unlock();
// REQUIRES: DB mutex held // REQUIRES: DB mutex held
// Don't call while iterating over ColumnFamilySet // Don't call while iterating over ColumnFamilySet
void FreeDeadColumnFamilies(); void FreeDeadColumnFamilies();
@ -424,9 +421,12 @@ class ColumnFamilySet {
void RemoveColumnFamily(ColumnFamilyData* cfd); void RemoveColumnFamily(ColumnFamilyData* cfd);
// column_families_ and column_family_data_ need to be protected: // column_families_ and column_family_data_ need to be protected:
// * when mutating: 1. DB mutex locked first, 2. spinlock locked second // * when mutating both conditions have to be satisfied:
// * when reading, either: 1. lock DB mutex, or 2. lock spinlock // 1. DB mutex locked
// (if both, respect the ordering to avoid deadlock!) // 2. thread currently in single-threaded write thread
// * when reading, at least one condition needs to be satisfied:
// 1. DB mutex locked
// 2. accessed from a single-threaded write thread
std::unordered_map<std::string, uint32_t> column_families_; std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_; std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
@ -444,7 +444,6 @@ class ColumnFamilySet {
Cache* table_cache_; Cache* table_cache_;
WriteBuffer* write_buffer_; WriteBuffer* write_buffer_;
WriteController* write_controller_; WriteController* write_controller_;
std::atomic_flag spin_lock_;
}; };
// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access // We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
@ -459,17 +458,22 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
// sets current_ to ColumnFamilyData with column_family_id // sets current_ to ColumnFamilyData with column_family_id
// returns false if column family doesn't exist // returns false if column family doesn't exist
// REQUIRES: under a DB mutex OR from a write thread
bool Seek(uint32_t column_family_id) override; bool Seek(uint32_t column_family_id) override;
// Returns log number of the selected column family // Returns log number of the selected column family
// REQUIRES: under a DB mutex OR from a write thread
uint64_t GetLogNumber() const override; uint64_t GetLogNumber() const override;
// REQUIRES: Seek() called first // REQUIRES: Seek() called first
// REQUIRES: under a DB mutex OR from a write thread
virtual MemTable* GetMemTable() const override; virtual MemTable* GetMemTable() const override;
// Returns column family handle for the selected column family // Returns column family handle for the selected column family
// REQUIRES: under a DB mutex OR from a write thread
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
// REQUIRES: under a DB mutex OR from a write thread
virtual void CheckMemtableFull() override; virtual void CheckMemtableFull() override;
private: private:

@ -2579,9 +2579,17 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
// LogAndApply will both write the creation in MANIFEST and create // LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object // ColumnFamilyData object
Options opt(db_options_, cf_options); Options opt(db_options_, cf_options);
s = versions_->LogAndApply(nullptr, { // write thread
MutableCFOptions(opt, ImmutableCFOptions(opt)), WriteThread::Writer w(&mutex_);
&edit, &mutex_, db_directory_.get(), false, &cf_options); s = write_thread_.EnterWriteThread(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
s = versions_->LogAndApply(
nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit,
&mutex_, db_directory_.get(), false, &cf_options);
write_thread_.ExitWriteThread(&w, &w, s);
}
if (s.ok()) { if (s.ok()) {
single_column_family_mode_ = false; single_column_family_mode_ = false;
auto* cfd = auto* cfd =

@ -280,6 +280,8 @@ void WriteBatch::PutLogData(const Slice& blob) {
} }
namespace { namespace {
// This class can *only* be used from a single-threaded write thread, because it
// calls ColumnFamilyMemTablesImpl::Seek()
class MemTableInserter : public WriteBatch::Handler { class MemTableInserter : public WriteBatch::Handler {
public: public:
SequenceNumber sequence_; SequenceNumber sequence_;
@ -305,6 +307,8 @@ class MemTableInserter : public WriteBatch::Handler {
} }
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
// We are only allowed to call this from a single-threaded write thread
// (or while holding DB mutex)
bool found = cf_mems_->Seek(column_family_id); bool found = cf_mems_->Seek(column_family_id);
if (!found) { if (!found) {
if (ignore_missing_column_families_) { if (ignore_missing_column_families_) {
@ -485,6 +489,11 @@ class MemTableInserter : public WriteBatch::Handler {
}; };
} // namespace } // namespace
// This function can only be called in these conditions:
// 1) During Recovery()
// 2) during Write(), in a single-threaded write thread
// The reason is that it calles ColumnFamilyMemTablesImpl::Seek(), which needs
// to be called from a single-threaded write thread (or while holding DB mutex)
Status WriteBatchInternal::InsertInto(const WriteBatch* b, Status WriteBatchInternal::InsertInto(const WriteBatch* b,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
bool ignore_missing_column_families, bool ignore_missing_column_families,

Loading…
Cancel
Save