Add bulk create/drop column family API

Summary:
Adding DB::CreateColumnFamilie() and DB::DropColumnFamilies() to bulk create/drop column families. This is to address the problem creating/dropping 1k column families takes minutes. The bottleneck is we persist options files for every single column family create/drop, and it parses the persisted options file for verification, which take a lot CPU time.

The new APIs simply create/drop column families individually, and persist options file once at the end. This improves create 1k column families to within ~0.1s. Further improvement can be merge manifest write to one IO.
Closes https://github.com/facebook/rocksdb/pull/2248

Differential Revision: D5001578

Pulled By: yiwu-arbug

fbshipit-source-id: d4e00bda671451e0b314c13e12ad194b1704aa03
main
Yi Wu 8 years ago committed by Facebook Github Bot
parent 40af2381ec
commit 2cd00773c7
  1. 4
      HISTORY.md
  2. 32
      db/column_family_test.cc
  3. 206
      db/db_impl.cc
  4. 24
      db/db_impl.h
  5. 14
      db/db_impl_open.cc
  6. 27
      include/rocksdb/db.h
  7. 18
      include/rocksdb/utilities/stackable_db.h

@ -1,13 +1,13 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
* Introduce WriteBatch::PopSavePoint to pop the most recent save point explicitly
### New Features
* FIFO compaction to support Intra L0 compaction too with CompactionOptionsFIFO.allow_compaction=true.
* DB::ResetStats() to reset internal stats.
* Statistics::Reset() to reset user stats.
* ldb add option --try_load_options, which will open DB with its own option file.
* Introduce WriteBatch::PopSavePoint to pop the most recent save point explicitly.
* Support dynamically change `max_open_files` option via SetDBOptions()
* Added DB::CreateColumnFamilie() and DB::DropColumnFamilies() to bulk create/drop column families.
## 5.4.0 (04/11/2017)
### Public API Change

@ -676,6 +676,38 @@ TEST_F(ColumnFamilyTest, AddDrop) {
std::vector<std::string>({"default", "four", "three"}));
}
TEST_F(ColumnFamilyTest, BulkAddDrop) {
constexpr int kNumCF = 1000;
ColumnFamilyOptions cf_options;
WriteOptions write_options;
Open();
std::vector<std::string> cf_names;
std::vector<ColumnFamilyHandle*> cf_handles;
for (int i = 1; i <= kNumCF; i++) {
cf_names.push_back("cf1-" + ToString(i));
}
ASSERT_OK(db_->CreateColumnFamilies(cf_options, cf_names, &cf_handles));
for (int i = 1; i <= kNumCF; i++) {
ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
}
ASSERT_OK(db_->DropColumnFamilies(cf_handles));
std::vector<ColumnFamilyDescriptor> cf_descriptors;
cf_handles.clear();
for (int i = 1; i <= kNumCF; i++) {
cf_descriptors.emplace_back("cf2-" + ToString(i), ColumnFamilyOptions());
}
ASSERT_OK(db_->CreateColumnFamilies(cf_descriptors, &cf_handles));
for (int i = 1; i <= kNumCF; i++) {
ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
}
ASSERT_OK(db_->DropColumnFamilies(cf_handles));
Close();
std::vector<std::string> families;
ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
std::sort(families.begin(), families.end());
ASSERT_TRUE(families == std::vector<std::string>({"default"}));
}
TEST_F(ColumnFamilyTest, DropTest) {
// first iteration - dont reopen DB before dropping
// second iteration - reopen DB before dropping

@ -516,9 +516,8 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
delete old_sv;
write_thread_.EnterUnbatched(&w, &mutex_);
persist_options_status = WriteOptionsFile();
write_thread_.ExitUnbatched(&w);
persist_options_status = WriteOptionsFile(
false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
}
}
@ -534,14 +533,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
"[%s] SetOptions() succeeded", cfd->GetName().c_str());
new_options.Dump(immutable_db_options_.info_log.get());
if (!persist_options_status.ok()) {
if (immutable_db_options_.fail_if_options_file_error) {
s = Status::IOError(
"SetOptions() succeeded, but unable to persist options",
persist_options_status.ToString());
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to persist options in SetOptions() -- %s",
persist_options_status.ToString().c_str());
s = persist_options_status;
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
@ -596,7 +588,8 @@ Status DBImpl::SetDBOptions(
purge_wal_status.ToString().c_str());
}
}
persist_options_status = WriteOptionsFile();
persist_options_status = WriteOptionsFile(
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
write_thread_.ExitUnbatched(&w);
}
}
@ -1126,8 +1119,76 @@ std::vector<Status> DBImpl::MultiGet(
}
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
const std::string& column_family,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
if (s.ok()) {
s = WriteOptionsFile(true /*need_mutex_lock*/,
true /*need_enter_write_thread*/);
}
return s;
}
Status DBImpl::CreateColumnFamilies(
const ColumnFamilyOptions& cf_options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
handles->clear();
size_t num_cf = column_family_names.size();
Status s;
bool success_once = false;
for (size_t i = 0; i < num_cf; i++) {
ColumnFamilyHandle* handle;
s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
if (!s.ok()) {
break;
}
handles->push_back(handle);
success_once = true;
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
}
return s;
}
Status DBImpl::CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
handles->clear();
size_t num_cf = column_families.size();
Status s;
bool success_once = false;
for (size_t i = 0; i < num_cf; i++) {
ColumnFamilyHandle* handle;
s = CreateColumnFamilyImpl(column_families[i].options,
column_families[i].name, &handle);
if (!s.ok()) {
break;
}
handles->push_back(handle);
success_once = true;
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
}
return s;
}
Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {
Status s;
Status persist_options_status;
*handle = nullptr;
@ -1164,12 +1225,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
&mutex_, directories_.GetDbDir(), false,
&cf_options);
if (s.ok()) {
// If the column family was created successfully, we then persist
// the updated RocksDB options under the same single write thread
persist_options_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
@ -1199,22 +1254,42 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
if (s.ok()) {
NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
if (!persist_options_status.ok()) {
if (immutable_db_options_.fail_if_options_file_error) {
s = Status::IOError(
"ColumnFamily has been created, but unable to persist"
"options in CreateColumnFamily()",
persist_options_status.ToString().c_str());
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to persist options in CreateColumnFamily() -- %s",
persist_options_status.ToString().c_str());
}
}
return s;
}
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
assert(column_family != nullptr);
Status s = DropColumnFamilyImpl(column_family);
if (s.ok()) {
s = WriteOptionsFile(true /*need_mutex_lock*/,
true /*need_enter_write_thread*/);
}
return s;
}
Status DBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) {
Status s;
bool success_once = false;
for (auto* handle : column_families) {
s = DropColumnFamilyImpl(handle);
if (!s.ok()) {
break;
}
success_once = true;
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
}
return s;
}
Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
if (cfd->GetID() == 0) {
@ -1228,7 +1303,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
edit.SetColumnFamily(cfd->GetID());
Status s;
Status options_persist_status;
{
InstrumentedMutexLock l(&mutex_);
if (cfd->IsDropped()) {
@ -1240,11 +1314,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_);
if (s.ok()) {
// If the column family was dropped successfully, we then persist
// the updated RocksDB options under the same single write thread
options_persist_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w);
}
@ -1273,18 +1342,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
mutable_cf_options->max_write_buffer_number;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Dropped column family with id %u\n", cfd->GetID());
if (!options_persist_status.ok()) {
if (immutable_db_options_.fail_if_options_file_error) {
s = Status::IOError(
"ColumnFamily has been dropped, but unable to persist "
"options in DropColumnFamily()",
options_persist_status.ToString().c_str());
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to persist options in DropColumnFamily() -- %s",
options_persist_status.ToString().c_str());
}
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Dropping column family with id %u FAILED -- %s\n",
@ -2112,9 +2169,29 @@ Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
ColumnFamilyHandle** handle) {
return Status::NotSupported("");
}
Status DB::CreateColumnFamilies(
const ColumnFamilyOptions& cf_options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) {
return Status::NotSupported("");
}
Status DB::CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) {
return Status::NotSupported("");
}
Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) {
return Status::NotSupported("");
}
Status DB::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) {
return Status::NotSupported("");
}
Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
delete column_family;
return Status::OK();
@ -2221,9 +2298,18 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
return result;
}
Status DBImpl::WriteOptionsFile() {
Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
bool need_enter_write_thread) {
#ifndef ROCKSDB_LITE
mutex_.AssertHeld();
WriteThread::Writer w;
if (need_mutex_lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
if (need_enter_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
}
std::vector<std::string> cf_names;
std::vector<ColumnFamilyOptions> cf_opts;
@ -2251,11 +2337,23 @@ Status DBImpl::WriteOptionsFile() {
if (s.ok()) {
s = RenameTempFileToOptionsFile(file_name);
}
mutex_.Lock();
return s;
#else
return Status::OK();
// restore lock
if (!need_mutex_lock) {
mutex_.Lock();
}
if (need_enter_write_thread) {
write_thread_.ExitUnbatched(&w);
}
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unnable to persist options -- %s", s.ToString().c_str());
if (immutable_db_options_.fail_if_options_file_error) {
return Status::IOError("Unable to persist options.",
s.ToString().c_str());
}
}
#endif // !ROCKSDB_LITE
return Status::OK();
}
#ifndef ROCKSDB_LITE

@ -102,10 +102,19 @@ class DBImpl : public DB {
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) override;
virtual Status CreateColumnFamilies(
const ColumnFamilyOptions& cf_options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) override;
virtual Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) override;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
virtual Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) override;
// Returns false if key doesn't exist in the database and true if it may.
// If value_found is not passed in as null, then return the value if found in
@ -551,9 +560,10 @@ class DBImpl : public DB {
RangeDelAggregator* range_del_agg);
// Except in DB::Open(), WriteOptionsFile can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
// 2. db_mutex is held
Status WriteOptionsFile();
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
// If need_enter_write_thread = false, the method will enter write thread.
Status WriteOptionsFile(bool need_mutex_lock, bool need_enter_write_thread);
// The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
@ -636,6 +646,12 @@ class DBImpl : public DB {
const Status CreateArchivalDirectory();
Status CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& cf_name,
ColumnFamilyHandle** handle);
Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
// Delete obsolete files and log status and information of file deletion

@ -1034,7 +1034,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
// Persist RocksDB Options before scheduling the compaction.
// The WriteOptionsFile() will release and lock the mutex internally.
persist_options_status = impl->WriteOptionsFile();
persist_options_status = impl->WriteOptionsFile(
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
*dbptr = impl;
impl->opened_successfully_ = true;
@ -1067,14 +1068,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
LogFlush(impl->immutable_db_options_.info_log);
if (!persist_options_status.ok()) {
if (db_options.fail_if_options_file_error) {
s = Status::IOError(
"DB::Open() failed --- Unable to persist Options file",
persist_options_status.ToString());
}
ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
"Unable to persist options in DB::Open() -- %s",
persist_options_status.ToString().c_str());
s = Status::IOError(
"DB::Open() failed --- Unable to persist Options file",
persist_options_status.ToString());
}
}
if (!s.ok()) {

@ -179,10 +179,37 @@ class DB {
const std::string& column_family_name,
ColumnFamilyHandle** handle);
// Bulk create column families with the same column family options.
// Return the handles of the column families through the argument handles.
// In case of error, the request may succeed partially, and handles will
// contain column family handles that it managed to create, and have size
// equal to the number of created column families.
virtual Status CreateColumnFamilies(
const ColumnFamilyOptions& options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles);
// Bulk create column families.
// Return the handles of the column families through the argument handles.
// In case of error, the request may succeed partially, and handles will
// contain column family handles that it managed to create, and have size
// equal to the number of created column families.
virtual Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles);
// Drop a column family specified by column_family handle. This call
// only records a drop record in the manifest and prevents the column
// family from flushing and compacting.
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family);
// Bulk drop column families. This call only records drop records in the
// manifest and prevents the column families from flushing and compacting.
// In case of error, the request may succeed partially. User may call
// ListColumnFamilies to check the result.
virtual Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families);
// Close a column family specified by column_family handle and destroy
// the column family handle specified to avoid double deletion. This call
// deletes the column family handle by default. Use this method to

@ -37,10 +37,28 @@ class StackableDB : public DB {
return db_->CreateColumnFamily(options, column_family_name, handle);
}
virtual Status CreateColumnFamilies(
const ColumnFamilyOptions& options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) override {
return db_->CreateColumnFamilies(options, column_family_names, handles);
}
virtual Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) override {
return db_->CreateColumnFamilies(column_families, handles);
}
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override {
return db_->DropColumnFamily(column_family);
}
virtual Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) override {
return db_->DropColumnFamilies(column_families);
}
virtual Status DestroyColumnFamilyHandle(
ColumnFamilyHandle* column_family) override {
return db_->DestroyColumnFamilyHandle(column_family);

Loading…
Cancel
Save