[CF] Thread-safety guarantees for ColumnFamilySet

Summary: Revised thread-safety guarantees and implemented a way to spinlock the object.

Test Plan: make check

Reviewers: dhruba, haobo, sdong, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15975
main
Igor Canadi 11 years ago
parent 8fa8a708ef
commit f8d5443efe
  1. 11
      db/column_family.cc
  2. 11
      db/column_family.h
  3. 26
      db/db_impl.cc

@ -247,7 +247,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
db_name_(dbname), db_name_(dbname),
db_options_(db_options), db_options_(db_options),
storage_options_(storage_options), storage_options_(storage_options),
table_cache_(table_cache) { table_cache_(table_cache),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list // initialize linked list
dummy_cfd_->prev_.store(dummy_cfd_); dummy_cfd_->prev_.store(dummy_cfd_);
dummy_cfd_->next_.store(dummy_cfd_); dummy_cfd_->next_.store(dummy_cfd_);
@ -332,6 +333,14 @@ void ColumnFamilySet::DropColumnFamily(uint32_t id) {
next->prev_.store(prev); next->prev_.store(prev);
} }
void ColumnFamilySet::Lock() {
// spin lock
while (spin_lock_.test_and_set(std::memory_order_acquire)) {
}
}
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
current_ = column_family_set_->GetColumnFamily(column_family_id); current_ = column_family_set_->GetColumnFamily(column_family_id);
handle_.id = column_family_id; handle_.id = column_family_id;

@ -12,6 +12,7 @@
#include <unordered_map> #include <unordered_map>
#include <string> #include <string>
#include <vector> #include <vector>
#include <atomic>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -220,6 +221,15 @@ class ColumnFamilySet {
iterator begin() { return iterator(dummy_cfd_->next()); } iterator begin() { return iterator(dummy_cfd_->next()); }
iterator end() { return iterator(dummy_cfd_); } iterator end() { return iterator(dummy_cfd_); }
// ColumnFamilySet has interesting thread-safety requirements
// * CreateColumnFamily() or DropColumnFamily() -- need to Lock() and also
// execute inside of DB mutex
// * Iterate -- without any locks
// * GetDefault(), GetColumnFamily(), Exists(), GetID(),
// GetNextColumnFamilyID() -- either inside of DB mutex or call Lock()
void Lock();
void Unlock();
private: private:
std::unordered_map<std::string, uint32_t> column_families_; std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_; std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
@ -233,6 +243,7 @@ class ColumnFamilySet {
const DBOptions* const db_options_; const DBOptions* const db_options_;
const EnvOptions storage_options_; const EnvOptions storage_options_;
Cache* table_cache_; Cache* table_cache_;
std::atomic_flag spin_lock_;
}; };
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {

@ -903,6 +903,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
} }
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
// No need to lock ColumnFamilySet here since its under a DB mutex
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), log_number); &batch, column_family_memtables_.get(), log_number);
@ -921,7 +922,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ApproximateMemoryUsage() > if (cfd->mem()->ApproximateMemoryUsage() >
cfd->options()->write_buffer_size) { cfd->options()->write_buffer_size) {
// If this asserts, it means that ColumnFamilyMemTablesImpl failed in // If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families // filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number); assert(cfd->GetLogNumber() <= log_number);
auto iter = version_edits.find(cfd->GetID()); auto iter = version_edits.find(cfd->GetID());
@ -950,7 +951,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// Column family cfd has already flushed the data // Column family cfd has already flushed the data
// from log_number. Memtable has to be empty because // from log_number. Memtable has to be empty because
// we filter the updates based on log_number // we filter the updates based on log_number
// (in ColumnFamilyMemTablesImpl) // (in WriteBatch::InsertInto)
assert(cfd->mem()->GetFirstSequenceNumber() == 0); assert(cfd->mem()->GetFirstSequenceNumber() == 0);
assert(edit->NumEntries() == 0); assert(edit->NumEntries() == 0);
continue; continue;
@ -2963,7 +2964,8 @@ std::vector<Status> DBImpl::MultiGet(
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name, const std::string& column_family_name,
ColumnFamilyHandle* handle) { ColumnFamilyHandle* handle) {
MutexLock l(&mutex_); // whenever we are writing to column family set, we have to lock
versions_->GetColumnFamilySet()->Lock();
if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { if (versions_->GetColumnFamilySet()->Exists(column_family_name)) {
return Status::InvalidArgument("Column family already exists"); return Status::InvalidArgument("Column family already exists");
} }
@ -2971,11 +2973,16 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
edit.AddColumnFamily(column_family_name); edit.AddColumnFamily(column_family_name);
handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(handle->id); edit.SetColumnFamily(handle->id);
mutex_.Lock();
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
if (s.ok()) { if (s.ok()) {
// add to internal data structures // add to internal data structures
versions_->CreateColumnFamily(options, &edit); versions_->CreateColumnFamily(options, &edit);
} }
mutex_.Unlock();
versions_->GetColumnFamilySet()->Unlock();
Log(options_.info_log, "Created column family %s\n", Log(options_.info_log, "Created column family %s\n",
column_family_name.c_str()); column_family_name.c_str());
return s; return s;
@ -2985,21 +2992,25 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
if (column_family.id == 0) { if (column_family.id == 0) {
return Status::InvalidArgument("Can't drop default column family"); return Status::InvalidArgument("Can't drop default column family");
} }
mutex_.Lock(); // whenever we are writing to column family set, we have to lock
versions_->GetColumnFamilySet()->Lock();
if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) { if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) {
return Status::NotFound("Column family not found"); return Status::NotFound("Column family not found");
} }
VersionEdit edit; VersionEdit edit;
edit.DropColumnFamily(); edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id); edit.SetColumnFamily(column_family.id);
mutex_.Lock();
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
if (s.ok()) { if (s.ok()) {
// remove from internal data structures // remove from internal data structures
versions_->DropColumnFamily(&edit); versions_->DropColumnFamily(&edit);
} }
versions_->GetColumnFamilySet()->Unlock();
DeletionState deletion_state; DeletionState deletion_state;
FindObsoleteFiles(deletion_state, false, true); FindObsoleteFiles(deletion_state, false, true);
mutex_.Unlock(); mutex_.Unlock();
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
Log(options_.info_log, "Dropped column family with id %u\n", Log(options_.info_log, "Dropped column family with id %u\n",
column_family.id); column_family.id);
@ -3193,12 +3204,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
} }
} }
if (status.ok()) { if (status.ok()) {
// TODO(icanadi) this accesses column_family_set_ without any lock. // reading the column family set outside of DB mutex -- should lock
// We'll need to add a spinlock for reading that we also lock when we versions_->GetColumnFamilySet()->Lock();
// write to a column family (only on column family add/drop, which is
// a very rare action)
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(), 0, this, false); updates, column_family_memtables_.get(), 0, this, false);
versions_->GetColumnFamilySet()->Unlock();
if (!status.ok()) { if (!status.ok()) {
// Panic for in-memory corruptions // Panic for in-memory corruptions

Loading…
Cancel
Save