group multiple batch of flush into one manifest file (one call to LogAndApply)

Summary: Currently, if several flush outputs are committed together, we issue each manifest write per batch (1 batch = 1 flush = 1 sst file = 1+ continuous memtables). Each manifest write requires one fsync and one fsync to parent directory. In some cases, it becomes the bottleneck of write. We should batch them and write in one manifest write when possible.

Test Plan:
` ./db_bench -benchmarks="fillseq" -max_write_buffer_number=16 -max_background_flushes=16 -disable_auto_compactions=true -min_write_buffer_number_to_merge=1 -write_buffer_size=65536 -level0_stop_writes_trigger=10000 -level0_slowdown_writes_trigger=10000`
**Before**
```
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 4.9
Date:       Fri Jul  1 15:38:17 2016
CPU:        32 * Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
CPUCache:   20480 KB
Keys:       16 bytes each
Values:     100 bytes each (50 bytes after compression)
Entries:    1000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    110.6 MB (estimated)
FileSize:   62.9 MB (estimated)
Write rate: 0 bytes/second
Compression: Snappy
Memtablerep: skip_list
Perf Level: 1
WARNING: Assertions are enabled; benchmarks unnecessarily slow
------------------------------------------------
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
DB path: [/tmp/rocksdbtest-112628/dbbench]
fillseq      :     166.277 micros/op 6014 ops/sec;    0.7 MB/s
```
**After**
```
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 4.9
Date:       Fri Jul  1 15:35:05 2016
CPU:        32 * Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz
CPUCache:   20480 KB
Keys:       16 bytes each
Values:     100 bytes each (50 bytes after compression)
Entries:    1000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    110.6 MB (estimated)
FileSize:   62.9 MB (estimated)
Write rate: 0 bytes/second
Compression: Snappy
Memtablerep: skip_list
Perf Level: 1
WARNING: Assertions are enabled; benchmarks unnecessarily slow
------------------------------------------------
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
DB path: [/tmp/rocksdbtest-112628/dbbench]
fillseq      :      52.328 micros/op 19110 ops/sec;    2.1 MB/s
```

Reviewers: andrewkr, IslamAbdelRahman, yhchiang, sdong

Reviewed By: sdong

Subscribers: igor, andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60075
main
Aaron Gao 8 years ago
parent a45ee83181
commit 5aaef91d4a
  1. 31
      db/memtable_list.cc
  2. 62
      db/version_set.cc
  3. 13
      db/version_set.h

@ -306,17 +306,28 @@ Status MemTableList::InstallMemtableFlushResults(
// scan all memtables from the earliest, and commit those // scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing. Memetables // (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created. // are always committed in the order that they were created.
while (!current_->memlist_.empty() && s.ok()) { uint64_t batch_file_number = 0;
MemTable* m = current_->memlist_.back(); // get the last element size_t batch_count = 0;
autovector<VersionEdit*> edit_list;
auto& memlist = current_->memlist_;
// enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (!m->flush_completed_) { if (!m->flush_completed_) {
break; break;
} }
if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
batch_file_number = m->file_number_;
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_); cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_);
}
batch_count++;
}
if (batch_count > 0) {
// this can release and reacquire the mutex. // this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory); s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory);
// we will be changing the version in the next code path, // we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable // so we better create a new one, since versions are immutable
@ -325,14 +336,19 @@ Status MemTableList::InstallMemtableFlushResults(
// All the later memtables that have the same filenum // All the later memtables that have the same filenum
// are part of the same batch. They can be committed now. // are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables have been flushed. uint64_t mem_id = 1; // how many memtables have been flushed.
do {
if (s.ok()) { // commit new state if (s.ok()) { // commit new state
while (batch_count-- > 0) {
MemTable* m = current_->memlist_.back();
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done", ": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id); cfd->GetName().c_str(), m->file_number_, mem_id);
assert(m->file_number_ > 0); assert(m->file_number_ > 0);
current_->Remove(m, to_delete); current_->Remove(m, to_delete);
++mem_id;
}
} else { } else {
for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) {
MemTable* m = *it;
// commit failed. setup state so that we can flush again. // commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed", ": memtable #%" PRIu64 " failed",
@ -343,10 +359,9 @@ Status MemTableList::InstallMemtableFlushResults(
num_flush_not_started_++; num_flush_not_started_++;
m->file_number_ = 0; m->file_number_ = 0;
imm_flush_needed.store(true, std::memory_order_release); imm_flush_needed.store(true, std::memory_order_release);
}
++mem_id; ++mem_id;
} while (!current_->memlist_.empty() && (nullptr != (m = current_->memlist_.back())) && }
(m->file_number_ == file_number)); }
} }
commit_in_progress_ = false; commit_in_progress_ = false;
return s; return s;

@ -2077,11 +2077,11 @@ struct VersionSet::ManifestWriter {
bool done; bool done;
InstrumentedCondVar cv; InstrumentedCondVar cv;
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
VersionEdit* edit; const autovector<VersionEdit*>& edit_list;
explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
VersionEdit* e) const autovector<VersionEdit*>& e)
: done(false), cv(mu), cfd(_cfd), edit(e) {} : done(false), cv(mu), cfd(_cfd), edit_list(e) {}
}; };
VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options,
@ -2150,20 +2150,32 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
VersionEdit* edit, InstrumentedMutex* mu, const autovector<VersionEdit*>& edit_list,
Directory* db_directory, bool new_descriptor_log, InstrumentedMutex* mu, Directory* db_directory,
bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) { const ColumnFamilyOptions* new_cf_options) {
mu->AssertHeld(); mu->AssertHeld();
// num of edits
auto num_edits = edit_list.size();
if (num_edits == 0) {
return Status::OK();
} else if (num_edits > 1) {
// no group commits for column family add or drop
for (auto& edit : edit_list) {
assert(!edit->IsColumnFamilyManipulation());
}
}
// column_family_data can be nullptr only if this is column_family_add. // column_family_data can be nullptr only if this is column_family_add.
// in that case, we also need to specify ColumnFamilyOptions // in that case, we also need to specify ColumnFamilyOptions
if (column_family_data == nullptr) { if (column_family_data == nullptr) {
assert(edit->is_column_family_add_); assert(num_edits == 1);
assert(edit_list[0]->is_column_family_add_);
assert(new_cf_options != nullptr); assert(new_cf_options != nullptr);
} }
// queue our request // queue our request
ManifestWriter w(mu, column_family_data, edit); ManifestWriter w(mu, column_family_data, edit_list);
manifest_writers_.push_back(&w); manifest_writers_.push_back(&w);
while (!w.done && &w != manifest_writers_.front()) { while (!w.done && &w != manifest_writers_.front()) {
w.cv.Wait(); w.cv.Wait();
@ -2183,7 +2195,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
return Status::ShutdownInProgress(); return Status::ShutdownInProgress();
} }
std::vector<VersionEdit*> batch_edits; autovector<VersionEdit*> batch_edits;
Version* v = nullptr; Version* v = nullptr;
std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(nullptr); std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(nullptr);
@ -2191,24 +2203,26 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
ManifestWriter* last_writer = &w; ManifestWriter* last_writer = &w;
assert(!manifest_writers_.empty()); assert(!manifest_writers_.empty());
assert(manifest_writers_.front() == &w); assert(manifest_writers_.front() == &w);
if (edit->IsColumnFamilyManipulation()) { if (w.edit_list.front()->IsColumnFamilyManipulation()) {
// no group commits for column family add or drop // no group commits for column family add or drop
LogAndApplyCFHelper(edit); LogAndApplyCFHelper(w.edit_list.front());
batch_edits.push_back(edit); batch_edits.push_back(w.edit_list.front());
} else { } else {
v = new Version(column_family_data, this, current_version_number_++); v = new Version(column_family_data, this, current_version_number_++);
builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data)); builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data));
auto* builder = builder_guard->version_builder(); auto* builder = builder_guard->version_builder();
for (const auto& writer : manifest_writers_) { for (const auto& writer : manifest_writers_) {
if (writer->edit->IsColumnFamilyManipulation() || if (writer->edit_list.front()->IsColumnFamilyManipulation() ||
writer->cfd->GetID() != column_family_data->GetID()) { writer->cfd->GetID() != column_family_data->GetID()) {
// no group commits for column family add or drop // no group commits for column family add or drop
// also, group commits across column families are not supported // also, group commits across column families are not supported
break; break;
} }
last_writer = writer; last_writer = writer;
LogAndApplyHelper(column_family_data, builder, v, last_writer->edit, mu); for (const auto& edit : writer->edit_list) {
batch_edits.push_back(last_writer->edit); LogAndApplyHelper(column_family_data, builder, v, edit, mu);
batch_edits.push_back(edit);
}
} }
builder->SaveTo(v->storage_info()); builder->SaveTo(v->storage_info());
} }
@ -2231,7 +2245,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (new_descriptor_log) { if (new_descriptor_log) {
// if we're writing out new snapshot make sure to persist max column family // if we're writing out new snapshot make sure to persist max column family
if (column_family_set_->GetMaxColumnFamily() > 0) { if (column_family_set_->GetMaxColumnFamily() > 0) {
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); w.edit_list.front()->SetMaxColumnFamily(
column_family_set_->GetMaxColumnFamily());
} }
} }
@ -2242,7 +2257,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
mu->Unlock(); mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
if (!edit->IsColumnFamilyManipulation() && if (!w.edit_list.front()->IsColumnFamilyManipulation() &&
db_options_->max_open_files == -1) { db_options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now. // unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex. // Need to do it out of the mutex.
@ -2268,12 +2283,13 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(descriptor_file), opt_env_opts)); new WritableFileWriter(std::move(descriptor_file), opt_env_opts));
descriptor_log_.reset(new log::Writer(std::move(file_writer), 0, false)); descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get()); s = WriteSnapshot(descriptor_log_.get());
} }
} }
if (!edit->IsColumnFamilyManipulation()) { if (!w.edit_list.front()->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex. // This is cpu-heavy operations, which should be called outside mutex.
v->PrepareApply(mutable_cf_options, true); v->PrepareApply(mutable_cf_options, true);
} }
@ -2315,7 +2331,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
new_manifest_file_size = descriptor_log_->file()->GetFileSize(); new_manifest_file_size = descriptor_log_->file()->GetFileSize();
} }
if (edit->is_column_family_drop_) { if (w.edit_list.front()->is_column_family_drop_) {
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
@ -2335,12 +2351,12 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// Install the new version // Install the new version
if (s.ok()) { if (s.ok()) {
if (edit->is_column_family_add_) { if (w.edit_list.front()->is_column_family_add_) {
// no group commit on column family add // no group commit on column family add
assert(batch_edits.size() == 1); assert(batch_edits.size() == 1);
assert(new_cf_options != nullptr); assert(new_cf_options != nullptr);
CreateColumnFamily(*new_cf_options, edit); CreateColumnFamily(*new_cf_options, w.edit_list.front());
} else if (edit->is_column_family_drop_) { } else if (w.edit_list.front()->is_column_family_drop_) {
assert(batch_edits.size() == 1); assert(batch_edits.size() == 1);
column_family_data->SetDropped(); column_family_data->SetDropped();
if (column_family_data->Unref()) { if (column_family_data->Unref()) {
@ -2363,7 +2379,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
manifest_file_number_ = pending_manifest_file_number_; manifest_file_number_ = pending_manifest_file_number_;
manifest_file_size_ = new_manifest_file_size; manifest_file_size_ = new_manifest_file_size;
prev_log_number_ = edit->prev_log_number_; prev_log_number_ = w.edit_list.front()->prev_log_number_;
} else { } else {
std::string version_edits; std::string version_edits;
for (auto& e : batch_edits) { for (auto& e : batch_edits) {

@ -589,6 +589,19 @@ class VersionSet {
const MutableCFOptions& mutable_cf_options, VersionEdit* edit, const MutableCFOptions& mutable_cf_options, VersionEdit* edit,
InstrumentedMutex* mu, Directory* db_directory = nullptr, InstrumentedMutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false, bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr) {
autovector<VersionEdit*> edit_list;
edit_list.push_back(edit);
return LogAndApply(column_family_data, mutable_cf_options, edit_list, mu,
db_directory, new_descriptor_log, column_family_options);
}
// The batch version. If edit_list.size() > 1, caller must ensure that
// no edit in the list column family add or drop
Status LogAndApply(
ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options,
const autovector<VersionEdit*>& edit_list, InstrumentedMutex* mu,
Directory* db_directory = nullptr, bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options = nullptr); const ColumnFamilyOptions* column_family_options = nullptr);
// Recover the last saved descriptor from persistent storage. // Recover the last saved descriptor from persistent storage.

Loading…
Cancel
Save