Fix a race condition in persisting options

Summary:
This patch fix a race condition in persisting options which will cause a crash when:

* Thread A obtain cf options and start to persist options based on that cf options.
* Thread B kicks in and finish DropColumnFamily and delete cf_handle.
* Thread A wakes up and tries to finish the persisting options and crashes.

Test Plan: Add a test in column_family_test that can reproduce the crash

Reviewers: anthony, IslamAbdelRahman, rven, kradhakrishnan, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D51609
main
Yueh-Hsuan Chiang 9 years ago
parent f276c3a821
commit 2fa3ed5180
  1. 79
      db/column_family_test.cc
  2. 114
      db/db_impl.cc
  3. 10
      db/db_impl.h
  4. 2
      db/write_thread.cc
  5. 2
      util/options_parser.cc

@ -68,7 +68,9 @@ class ColumnFamilyTest : public testing::Test {
void Close() { void Close() {
for (auto h : handles_) { for (auto h : handles_) {
delete h; if (h) {
delete h;
}
} }
handles_.clear(); handles_.clear();
names_.clear(); names_.clear();
@ -1260,6 +1262,81 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
namespace {
std::atomic<int> test_stage(0);
const int kMainThreadStartPersistingOptionsFile = 1;
const int kChildThreadFinishDroppingColumnFamily = 2;
const int kChildThreadWaitingMainThreadPersistOptions = 3;
void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id,
std::vector<Comparator*> comparators) {
while (test_stage < kMainThreadStartPersistingOptionsFile) {
Env::Default()->SleepForMicroseconds(100);
}
cf_test->DropColumnFamilies({cf_id});
delete comparators[cf_id];
comparators[cf_id] = nullptr;
test_stage = kChildThreadFinishDroppingColumnFamily;
}
} // namespace
TEST_F(ColumnFamilyTest, CreateAndDropRace) {
const int kCfCount = 5;
std::vector<ColumnFamilyOptions> cf_opts;
std::vector<Comparator*> comparators;
for (int i = 0; i < kCfCount; ++i) {
cf_opts.emplace_back();
comparators.push_back(new test::SimpleSuffixReverseComparator());
cf_opts.back().comparator = comparators.back();
}
db_options_.create_if_missing = true;
db_options_.create_missing_column_families = true;
auto main_thread_id = std::this_thread::get_id();
rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start",
[&](void* arg) {
auto current_thread_id = std::this_thread::get_id();
// If it's the main thread hitting this sync-point, then it
// will be blocked until some other thread update the test_stage.
if (main_thread_id == current_thread_id) {
test_stage = kMainThreadStartPersistingOptionsFile;
while (test_stage < kChildThreadFinishDroppingColumnFamily) {
Env::Default()->SleepForMicroseconds(100);
}
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::EnterUnbatched:Wait", [&](void* arg) {
// This means a thread doing DropColumnFamily() is waiting for
// other thread to finish persisting options.
// In such case, we update the test_stage to unblock the main thread.
test_stage = kChildThreadWaitingMainThreadPersistOptions;
// Note that based on the test setting, this must not be the
// main thread.
ASSERT_NE(main_thread_id, std::this_thread::get_id());
});
// Create a database with four column families
Open({"default", "one", "two", "three"},
{cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Start a thread that will drop the first column family
// and its comparator
std::thread drop_cf_thread(DropSingleColumnFamily, this, 1, comparators);
DropColumnFamilies({2});
drop_cf_thread.join();
Close();
Destroy();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -1930,24 +1930,21 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
MutableCFOptions new_options; MutableCFOptions new_options;
Status s; Status s;
Status persist_options_status;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
s = cfd->SetOptions(options_map); s = cfd->SetOptions(options_map);
if (s.ok()) { if (s.ok()) {
new_options = *cfd->GetLatestMutableCFOptions(); new_options = *cfd->GetLatestMutableCFOptions();
} }
} if (s.ok()) {
if (s.ok()) { // Persist RocksDB options under the single write thread
Status persist_options_status = WriteOptionsFile(); WriteThread::Writer w;
if (!persist_options_status.ok()) { write_thread_.EnterUnbatched(&w, &mutex_);
if (db_options_.fail_if_options_file_error) {
s = Status::IOError( persist_options_status = WriteOptionsFile();
"SetOptions succeeded, but unable to persist options",
persist_options_status.ToString()); write_thread_.ExitUnbatched(&w);
}
Warn(db_options_.info_log,
"Unable to persist options in SetOptions() -- %s",
persist_options_status.ToString().c_str());
} }
} }
@ -1963,6 +1960,16 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
db_options_.info_log, "[%s] SetOptions succeeded", db_options_.info_log, "[%s] SetOptions succeeded",
cfd->GetName().c_str()); cfd->GetName().c_str());
new_options.Dump(db_options_.info_log.get()); new_options.Dump(db_options_.info_log.get());
if (!persist_options_status.ok()) {
if (db_options_.fail_if_options_file_error) {
s = Status::IOError(
"SetOptions succeeded, but unable to persist options",
persist_options_status.ToString());
}
Warn(db_options_.info_log,
"Unable to persist options in SetOptions() -- %s",
persist_options_status.ToString().c_str());
}
} else { } else {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] SetOptions failed", cfd->GetName().c_str()); "[%s] SetOptions failed", cfd->GetName().c_str());
@ -3446,6 +3453,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name, const std::string& column_family_name,
ColumnFamilyHandle** handle) { ColumnFamilyHandle** handle) {
Status s; Status s;
Status persist_options_status;
*handle = nullptr; *handle = nullptr;
s = CheckCompressionSupported(cf_options); s = CheckCompressionSupported(cf_options);
@ -3478,6 +3486,12 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
s = versions_->LogAndApply( s = versions_->LogAndApply(
nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit, nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit,
&mutex_, directories_.GetDbDir(), false, &cf_options); &mutex_, directories_.GetDbDir(), false, &cf_options);
if (s.ok()) {
// If the column family was created successfully, we then persist
// the updated RocksDB options under the same single write thread
persist_options_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
} }
if (s.ok()) { if (s.ok()) {
@ -3505,7 +3519,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
// this is outside the mutex // this is outside the mutex
if (s.ok()) { if (s.ok()) {
Status persist_options_status = WriteOptionsFile(); NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
if (!persist_options_status.ok()) { if (!persist_options_status.ok()) {
if (db_options_.fail_if_options_file_error) { if (db_options_.fail_if_options_file_error) {
s = Status::IOError( s = Status::IOError(
@ -3517,8 +3532,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
"Unable to persist options in CreateColumnFamily() -- %s", "Unable to persist options in CreateColumnFamily() -- %s",
persist_options_status.ToString().c_str()); persist_options_status.ToString().c_str());
} }
NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
} }
return s; return s;
} }
@ -3537,6 +3550,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
edit.SetColumnFamily(cfd->GetID()); edit.SetColumnFamily(cfd->GetID());
Status s; Status s;
Status options_persist_status;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
if (cfd->IsDropped()) { if (cfd->IsDropped()) {
@ -3548,6 +3562,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_); &edit, &mutex_);
if (s.ok()) {
// If the column family was dropped successfully, we then persist
// the updated RocksDB options under the same single write thread
options_persist_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
} }
@ -3574,7 +3593,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
mutable_cf_options->max_write_buffer_number; mutable_cf_options->max_write_buffer_number;
auto options_persist_status = WriteOptionsFile(); Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Dropped column family with id %u\n", cfd->GetID());
if (!options_persist_status.ok()) { if (!options_persist_status.ok()) {
if (db_options_.fail_if_options_file_error) { if (db_options_.fail_if_options_file_error) {
s = Status::IOError( s = Status::IOError(
@ -3586,9 +3607,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
"Unable to persist options in DropColumnFamily() -- %s", "Unable to persist options in DropColumnFamily() -- %s",
options_persist_status.ToString().c_str()); options_persist_status.ToString().c_str());
} }
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Dropped column family with id %u\n",
cfd->GetID());
} else { } else {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Dropping column family with id %u FAILED -- %s\n", "Dropping column family with id %u FAILED -- %s\n",
@ -5024,7 +5042,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
} }
} }
TEST_SYNC_POINT("DBImpl::Open:Opened"); TEST_SYNC_POINT("DBImpl::Open:Opened");
Status persist_options_status;
if (s.ok()) { if (s.ok()) {
// Persist RocksDB Options before scheduling the compaction.
// The WriteOptionsFile() will release and lock the mutex internally.
persist_options_status = impl->WriteOptionsFile();
*dbptr = impl; *dbptr = impl;
impl->opened_successfully_ = true; impl->opened_successfully_ = true;
impl->MaybeScheduleFlushOrCompaction(); impl->MaybeScheduleFlushOrCompaction();
@ -5035,8 +5058,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p", Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p",
impl); impl);
LogFlush(impl->db_options_.info_log); LogFlush(impl->db_options_.info_log);
auto persist_options_status = impl->WriteOptionsFile();
if (!persist_options_status.ok()) { if (!persist_options_status.ok()) {
if (db_options.fail_if_options_file_error) { if (db_options.fail_if_options_file_error) {
s = Status::IOError( s = Status::IOError(
@ -5165,38 +5186,34 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
Status DBImpl::WriteOptionsFile() { Status DBImpl::WriteOptionsFile() {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
std::string file_name; mutex_.AssertHeld();
Status s = WriteOptionsToTempFile(&file_name);
if (!s.ok()) {
return s;
}
s = RenameTempFileToOptionsFile(file_name);
return s;
#else
return Status::OK();
#endif // !ROCKSDB_LITE
}
Status DBImpl::WriteOptionsToTempFile(std::string* file_name) {
#ifndef ROCKSDB_LITE
std::vector<std::string> cf_names; std::vector<std::string> cf_names;
std::vector<ColumnFamilyOptions> cf_opts; std::vector<ColumnFamilyOptions> cf_opts;
{
InstrumentedMutexLock l(&mutex_); // This part requires mutex to protect the column family options
// This part requires mutex to protect the column family options for (auto cfd : *versions_->GetColumnFamilySet()) {
for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) {
if (cfd->IsDropped()) { continue;
continue;
}
cf_names.push_back(cfd->GetName());
cf_opts.push_back(BuildColumnFamilyOptions(
*cfd->options(), *cfd->GetLatestMutableCFOptions()));
} }
cf_names.push_back(cfd->GetName());
cf_opts.push_back(BuildColumnFamilyOptions(
*cfd->options(), *cfd->GetLatestMutableCFOptions()));
} }
*file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber());
Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts, // Unlock during expensive operations. New writes cannot get here
*file_name, GetEnv()); // because the single write thread ensures all new writes get queued.
mutex_.Unlock();
std::string file_name =
TempOptionsFileName(GetName(), versions_->NewFileNumber());
Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts, file_name,
GetEnv());
if (s.ok()) {
s = RenameTempFileToOptionsFile(file_name);
}
mutex_.Lock();
return s; return s;
#else #else
return Status::OK(); return Status::OK();
@ -5224,8 +5241,6 @@ void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
Status DBImpl::DeleteObsoleteOptionsFiles() { Status DBImpl::DeleteObsoleteOptionsFiles() {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
options_files_mutex_.AssertHeld();
std::vector<std::string> filenames; std::vector<std::string> filenames;
// use ordered map to store keep the filenames sorted from the newest // use ordered map to store keep the filenames sorted from the newest
// to the oldest. // to the oldest.
@ -5257,7 +5272,6 @@ Status DBImpl::DeleteObsoleteOptionsFiles() {
Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) { Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
InstrumentedMutexLock l(&options_files_mutex_);
Status s; Status s;
std::string options_file_name = std::string options_file_name =
OptionsFileName(GetName(), versions_->NewFileNumber()); OptionsFileName(GetName(), versions_->NewFileNumber());

@ -405,10 +405,14 @@ class DBImpl : public DB {
SuperVersion* super_version, SuperVersion* super_version,
Arena* arena); Arena* arena);
// The following options file related functions should not be // Except in DB::Open(), WriteOptionsFile can only be called when:
// called while DB mutex is held. // 1. WriteThread::Writer::EnterUnbatched() is used.
// 2. db_mutex is held
Status WriteOptionsFile(); Status WriteOptionsFile();
Status WriteOptionsToTempFile(std::string* file_name);
// The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
// 2. db_mutex is NOT held
Status RenameTempFileToOptionsFile(const std::string& file_name); Status RenameTempFileToOptionsFile(const std::string& file_name);
Status DeleteObsoleteOptionsFiles(); Status DeleteObsoleteOptionsFiles();

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
#include "db/write_thread.h" #include "db/write_thread.h"
#include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
@ -188,6 +189,7 @@ void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
LinkOne(w, &wait_needed); LinkOne(w, &wait_needed);
if (wait_needed) { if (wait_needed) {
mu->Unlock(); mu->Unlock();
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
Await(w); Await(w);
mu->Lock(); mu->Lock();
} }

@ -17,6 +17,7 @@
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "util/options_helper.h" #include "util/options_helper.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h"
#include "port/port.h" #include "port/port.h"
@ -34,6 +35,7 @@ Status PersistRocksDBOptions(const DBOptions& db_opt,
const std::vector<std::string>& cf_names, const std::vector<std::string>& cf_names,
const std::vector<ColumnFamilyOptions>& cf_opts, const std::vector<ColumnFamilyOptions>& cf_opts,
const std::string& file_name, Env* env) { const std::string& file_name, Env* env) {
TEST_SYNC_POINT("PersistRocksDBOptions:start");
if (cf_names.size() != cf_opts.size()) { if (cf_names.size() != cf_opts.size()) {
return Status::InvalidArgument( return Status::InvalidArgument(
"cf_names.size() and cf_opts.size() must be the same"); "cf_names.size() and cf_opts.size() must be the same");

Loading…
Cancel
Save