added public api to schedule flush/compaction, code to prevent race with db::open

Summary:
Fixes T8781168.

Added a new function EnableAutoCompactions in db.h to be publicly
avialable.  This allows compaction to be re-enabled after disabling it via
SetOptions

Refactored code to set the dbptr earlier on in TransactionDB::Open and DB::Open
Temporarily disable auto_compaction in TransactionDB::Open until dbptr is set to
prevent race condition.

Test Plan:
Ran make all check

verified fix on myrocks side:
was able to reproduce the seg fault with
../tools/mysqltest.sh --mem --force rocksdb.drop_table

method was to manually sleep the thread after DB::Open but before TransactionDB ptr was
assigned in transaction_db_impl.cc:
  DB::Open(db_options, dbname, column_families_copy, handles, &db);
  clock_t goal = (60000 * 10) + clock();
  while (goal > clock());
  ...dbptr(aka rdb) gets assigned below

verified my changes fixed the issue.

Also added unit test 'ToggleAutoCompaction' in transaction_test.cc

Reviewers: hermanlee4, anthony

Reviewed By: anthony

Subscribers: alex, dhruba

Differential Revision: https://reviews.facebook.net/D51147
main
Alex Yang 9 years ago
parent 19b1201b2b
commit e8180f9901
  1. 23
      db/db_impl.cc
  2. 3
      db/db_impl.h
  3. 5
      db/db_test.cc
  4. 6
      include/rocksdb/db.h
  5. 12
      include/rocksdb/utilities/stackable_db.h
  6. 26
      utilities/transactions/transaction_db_impl.cc
  7. 61
      utilities/transactions/transaction_test.cc

@ -2334,6 +2334,24 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
return s; return s;
} }
Status DBImpl::EnableAutoCompaction(
const std::vector<ColumnFamilyHandle*>& column_family_handles) {
Status s;
for (auto cf_ptr : column_family_handles) {
// check options here, enable only if didn't initially disable
if (s.ok()) {
s = this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
}
}
if (s.ok()) {
InstrumentedMutexLock guard_lock(&mutex_);
MaybeScheduleFlushOrCompaction();
}
return s;
}
void DBImpl::MaybeScheduleFlushOrCompaction() { void DBImpl::MaybeScheduleFlushOrCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (!opened_successfully_) { if (!opened_successfully_) {
@ -5007,6 +5025,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
} }
TEST_SYNC_POINT("DBImpl::Open:Opened"); TEST_SYNC_POINT("DBImpl::Open:Opened");
if (s.ok()) { if (s.ok()) {
*dbptr = impl;
impl->opened_successfully_ = true; impl->opened_successfully_ = true;
impl->MaybeScheduleFlushOrCompaction(); impl->MaybeScheduleFlushOrCompaction();
} }
@ -5029,9 +5048,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
persist_options_status.ToString().c_str()); persist_options_status.ToString().c_str());
} }
} }
if (s.ok()) { if (!s.ok()) {
*dbptr = impl;
} else {
for (auto* h : *handles) { for (auto* h : *handles) {
delete h; delete h;
} }

@ -144,6 +144,9 @@ class DBImpl : public DB {
virtual Status PauseBackgroundWork() override; virtual Status PauseBackgroundWork() override;
virtual Status ContinueBackgroundWork() override; virtual Status ContinueBackgroundWork() override;
virtual Status EnableAutoCompaction(
const std::vector<ColumnFamilyHandle*>& column_family_handles) override;
using DB::SetOptions; using DB::SetOptions;
Status SetOptions( Status SetOptions(
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,

@ -5850,6 +5850,11 @@ class ModelDB: public DB {
return Status::NotSupported("Not supported operation."); return Status::NotSupported("Not supported operation.");
} }
Status EnableAutoCompaction(
const std::vector<ColumnFamilyHandle*>& column_family_handles) override {
return Status::NotSupported("Not supported operation.");
}
using DB::NumberLevels; using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family) override { virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
return 1; return 1;

@ -558,6 +558,12 @@ class DB {
virtual Status PauseBackgroundWork() = 0; virtual Status PauseBackgroundWork() = 0;
virtual Status ContinueBackgroundWork() = 0; virtual Status ContinueBackgroundWork() = 0;
// This function will enable automatic compactions for the given column
// families if they were previously disabled via the disable_auto_compactions
// option.
virtual Status EnableAutoCompaction(
const std::vector<ColumnFamilyHandle*>& column_family_handles) = 0;
// Number of levels used for this DB. // Number of levels used for this DB.
virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0;
virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); }

@ -183,6 +183,11 @@ class StackableDB : public DB {
return db_->ContinueBackgroundWork(); return db_->ContinueBackgroundWork();
} }
virtual Status EnableAutoCompaction(
const std::vector<ColumnFamilyHandle*>& column_family_handles) override {
return db_->EnableAutoCompaction(column_family_handles);
}
using DB::NumberLevels; using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family) override { virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
return db_->NumberLevels(column_family); return db_->NumberLevels(column_family);
@ -274,9 +279,10 @@ class StackableDB : public DB {
} }
using DB::SetOptions; using DB::SetOptions;
virtual Status SetOptions( virtual Status SetOptions(ColumnFamilyHandle* column_family_handle,
const std::unordered_map<std::string, std::string>& new_options) override { const std::unordered_map<std::string, std::string>&
return db_->SetOptions(new_options); new_options) override {
return db_->SetOptions(column_family_handle, new_options);
} }
using DB::GetPropertiesOfAllTables; using DB::GetPropertiesOfAllTables;

@ -8,6 +8,7 @@
#include "utilities/transactions/transaction_db_impl.h" #include "utilities/transactions/transaction_db_impl.h"
#include <string> #include <string>
#include <unordered_set>
#include <vector> #include <vector>
#include "db/db_impl.h" #include "db/db_impl.h"
@ -77,28 +78,45 @@ Status TransactionDB::Open(
DB* db; DB* db;
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families; std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
std::vector<size_t> compaction_enabled_cf_indices;
// Enable MemTable History if not already enabled // Enable MemTable History if not already enabled
for (auto& column_family : column_families_copy) { for (size_t i = 0; i < column_families_copy.size(); i++) {
ColumnFamilyOptions* options = &column_family.options; ColumnFamilyOptions* options = &column_families_copy[i].options;
if (options->max_write_buffer_number_to_maintain == 0) { if (options->max_write_buffer_number_to_maintain == 0) {
// Setting to -1 will set the History size to max_write_buffer_number. // Setting to -1 will set the History size to max_write_buffer_number.
options->max_write_buffer_number_to_maintain = -1; options->max_write_buffer_number_to_maintain = -1;
} }
if (!options->disable_auto_compactions) {
// Disable compactions momentarily to prevent race with DB::Open
options->disable_auto_compactions = true;
compaction_enabled_cf_indices.push_back(i);
}
} }
s = DB::Open(db_options, dbname, column_families, handles, &db); s = DB::Open(db_options, dbname, column_families_copy, handles, &db);
if (s.ok()) { if (s.ok()) {
TransactionDBImpl* txn_db = new TransactionDBImpl( TransactionDBImpl* txn_db = new TransactionDBImpl(
db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options)); db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options));
*dbptr = txn_db;
for (auto cf_ptr : *handles) { for (auto cf_ptr : *handles) {
txn_db->AddColumnFamily(cf_ptr); txn_db->AddColumnFamily(cf_ptr);
} }
*dbptr = txn_db; // Re-enable compaction for the column families that initially had
// compaction enabled.
assert(column_families_copy.size() == (*handles).size());
std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
for (auto index : compaction_enabled_cf_indices) {
compaction_enabled_cf_handles.push_back((*handles)[index]);
}
s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles);
} }
return s; return s;

@ -7,7 +7,9 @@
#include <string> #include <string>
#include "db/db_impl.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "util/logging.h" #include "util/logging.h"
@ -2208,6 +2210,65 @@ TEST_F(TransactionTest, ClearSnapshotTest) {
delete txn; delete txn;
} }
TEST_F(TransactionTest, ToggleAutoCompactionTest) {
Status s;
TransactionOptions txn_options;
ColumnFamilyHandle *cfa, *cfb;
ColumnFamilyOptions cf_options;
// Create 2 new column families
s = db->CreateColumnFamily(cf_options, "CFA", &cfa);
ASSERT_OK(s);
s = db->CreateColumnFamily(cf_options, "CFB", &cfb);
ASSERT_OK(s);
delete cfa;
delete cfb;
delete db;
// open DB with three column families
std::vector<ColumnFamilyDescriptor> column_families;
// have to open default column family
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
// open the new column families
column_families.push_back(
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
column_families.push_back(
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
ColumnFamilyOptions* cf_opt_default = &column_families[0].options;
ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options;
ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options;
cf_opt_default->disable_auto_compactions = false;
cf_opt_cfa->disable_auto_compactions = true;
cf_opt_cfb->disable_auto_compactions = false;
std::vector<ColumnFamilyHandle*> handles;
s = TransactionDB::Open(options, txn_db_options, dbname, column_families,
&handles, &db);
ASSERT_OK(s);
auto cfh_default = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[0]);
auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions();
auto cfh_a = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[1]);
auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions();
auto cfh_b = reinterpret_cast<ColumnFamilyHandleImpl*>(handles[2]);
auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions();
ASSERT_EQ(opt_default.disable_auto_compactions, false);
ASSERT_EQ(opt_a.disable_auto_compactions, true);
ASSERT_EQ(opt_b.disable_auto_compactions, false);
for (auto handle : handles) {
delete handle;
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save