From f8d5443efec67ead51b103e5faa00f708a48da22 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 6 Feb 2014 11:44:50 -0800 Subject: [PATCH] [CF] Thread-safety guarantees for ColumnFamilySet Summary: Revised thread-safety guarantees and implemented a way to spinlock the object. Test Plan: make check Reviewers: dhruba, haobo, sdong, kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15975 --- db/column_family.cc | 11 ++++++++++- db/column_family.h | 11 +++++++++++ db/db_impl.cc | 26 ++++++++++++++++++-------- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index e11872b24..6f396f29f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -247,7 +247,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, db_name_(dbname), db_options_(db_options), storage_options_(storage_options), - table_cache_(table_cache) { + table_cache_(table_cache), + spin_lock_(ATOMIC_FLAG_INIT) { // initialize linked list dummy_cfd_->prev_.store(dummy_cfd_); dummy_cfd_->next_.store(dummy_cfd_); @@ -332,6 +333,14 @@ void ColumnFamilySet::DropColumnFamily(uint32_t id) { next->prev_.store(prev); } +void ColumnFamilySet::Lock() { + // spin lock + while (spin_lock_.test_and_set(std::memory_order_acquire)) { + } +} + +void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); } + bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { current_ = column_family_set_->GetColumnFamily(column_family_id); handle_.id = column_family_id; diff --git a/db/column_family.h b/db/column_family.h index 9e42d70f8..0aa97699a 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "rocksdb/options.h" #include "rocksdb/env.h" @@ -220,6 +221,15 @@ class ColumnFamilySet { iterator begin() { return iterator(dummy_cfd_->next()); } iterator end() { return iterator(dummy_cfd_); } + // ColumnFamilySet has interesting thread-safety requirements + // * CreateColumnFamily() or DropColumnFamily() -- need to Lock() and also + // execute inside of DB mutex + // * Iterate -- without any locks + // * GetDefault(), GetColumnFamily(), Exists(), GetID(), + // GetNextColumnFamilyID() -- either inside of DB mutex or call Lock() + void Lock(); + void Unlock(); + private: std::unordered_map column_families_; std::unordered_map column_family_data_; @@ -233,6 +243,7 @@ class ColumnFamilySet { const DBOptions* const db_options_; const EnvOptions storage_options_; Cache* table_cache_; + std::atomic_flag spin_lock_; }; class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { diff --git a/db/db_impl.cc b/db/db_impl.cc index 37900eb48..d346d915b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -903,6 +903,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } WriteBatchInternal::SetContents(&batch, record); + // No need to lock ColumnFamilySet here since its under a DB mutex status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), log_number); @@ -921,7 +922,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->mem()->ApproximateMemoryUsage() > cfd->options()->write_buffer_size) { - // If this asserts, it means that ColumnFamilyMemTablesImpl failed in + // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families assert(cfd->GetLogNumber() <= log_number); auto iter = version_edits.find(cfd->GetID()); @@ -950,7 +951,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // Column family cfd has already flushed the data // from log_number. Memtable has to be empty because // we filter the updates based on log_number - // (in ColumnFamilyMemTablesImpl) + // (in WriteBatch::InsertInto) assert(cfd->mem()->GetFirstSequenceNumber() == 0); assert(edit->NumEntries() == 0); continue; @@ -2963,7 +2964,8 @@ std::vector DBImpl::MultiGet( Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle* handle) { - MutexLock l(&mutex_); + // whenever we are writing to column family set, we have to lock + versions_->GetColumnFamilySet()->Lock(); if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { return Status::InvalidArgument("Column family already exists"); } @@ -2971,11 +2973,16 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, edit.AddColumnFamily(column_family_name); handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(handle->id); + + mutex_.Lock(); Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); if (s.ok()) { // add to internal data structures versions_->CreateColumnFamily(options, &edit); } + mutex_.Unlock(); + + versions_->GetColumnFamilySet()->Unlock(); Log(options_.info_log, "Created column family %s\n", column_family_name.c_str()); return s; @@ -2985,21 +2992,25 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { if (column_family.id == 0) { return Status::InvalidArgument("Can't drop default column family"); } - mutex_.Lock(); + // whenever we are writing to column family set, we have to lock + versions_->GetColumnFamilySet()->Lock(); if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) { return Status::NotFound("Column family not found"); } VersionEdit edit; edit.DropColumnFamily(); edit.SetColumnFamily(column_family.id); + mutex_.Lock(); Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); if (s.ok()) { // remove from internal data structures versions_->DropColumnFamily(&edit); } + versions_->GetColumnFamilySet()->Unlock(); DeletionState deletion_state; FindObsoleteFiles(deletion_state, false, true); mutex_.Unlock(); + PurgeObsoleteFiles(deletion_state); Log(options_.info_log, "Dropped column family with id %u\n", column_family.id); @@ -3193,12 +3204,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - // TODO(icanadi) this accesses column_family_set_ without any lock. - // We'll need to add a spinlock for reading that we also lock when we - // write to a column family (only on column family add/drop, which is - // a very rare action) + // reading the column family set outside of DB mutex -- should lock + versions_->GetColumnFamilySet()->Lock(); status = WriteBatchInternal::InsertInto( updates, column_family_memtables_.get(), 0, this, false); + versions_->GetColumnFamilySet()->Unlock(); if (!status.ok()) { // Panic for in-memory corruptions