Flushes should always go to HIGH priority thread pool

Summary:
This is not column-family related diff. It is in columnfamily branch because the change is significant and we want to push it with next major release (3.0).

It removes the leveldb notion of one thread pool and expands it to two thread pools by default (HIGH and LOW). Flush process is removed from compaction process and all flush threads are executed on HIGH thread pool, since we don't want long-running compactions to influence flush latency.

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15987
main
Igor Canadi 11 years ago
parent f8d5443efe
commit 0b4ccf765c
  1. 7
      HISTORY.md
  2. 64
      db/db_impl.cc
  3. 14
      include/rocksdb/options.h
  4. 2
      util/options.cc

@ -1,6 +1,11 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased (will be released in 3.0)
* By default, max_background_flushes is 1 and flush process is
removed from background compaction process. Flush process is now always
executed in high priority thread pool.
## Unreleased (will be relased in 2.8)
* By default, checksums are verified on every read from database * By default, checksums are verified on every read from database

@ -142,6 +142,9 @@ Options SanitizeOptions(const std::string& dbname,
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
DBOptions result = src; DBOptions result = src;
ClipToRange(&result.max_open_files, 20, 1000000); ClipToRange(&result.max_open_files, 20, 1000000);
if (result.max_background_flushes == 0) {
result.max_background_flushes = 1;
}
if (result.info_log == nullptr) { if (result.info_log == nullptr) {
Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env, Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
@ -1704,12 +1707,16 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
is_flush_pending = true; is_flush_pending = true;
} }
} }
if (is_flush_pending && if (is_flush_pending) {
(bg_flush_scheduled_ < options_.max_background_flushes)) {
// memtable flush needed // memtable flush needed
// max_background_compactions should not be 0, because that means
// flush will never get executed
assert(options_.max_background_flushes != 0);
if (bg_flush_scheduled_ < options_.max_background_flushes) {
bg_flush_scheduled_++; bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
} }
}
bool is_compaction_needed = false; bool is_compaction_needed = false;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->current()->NeedsCompaction()) { if (cfd->current()->NeedsCompaction()) {
@ -1718,12 +1725,10 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} }
} }
// Schedule BGWorkCompaction if there's a compaction pending (or a memtable // Schedule BGWorkCompaction if there's a compaction pending
// flush, but the HIGH pool is not enabled). Do it only if // Do it only if max_background_compactions hasn't been reached and, in case
// max_background_compactions hasn't been reached and, in case
// bg_manual_only_ > 0, if it's a manual compaction. // bg_manual_only_ > 0, if it's a manual compaction.
if ((manual_compaction_ || is_compaction_needed || if ((manual_compaction_ || is_compaction_needed) &&
(is_flush_pending && (options_.max_background_flushes <= 0))) &&
bg_compaction_scheduled_ < options_.max_background_compactions && bg_compaction_scheduled_ < options_.max_background_compactions &&
(!bg_manual_only_ || manual_compaction_)) { (!bg_manual_only_ || manual_compaction_)) {
@ -1868,41 +1873,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
*madeProgress = false; *madeProgress = false;
mutex_.AssertHeld(); mutex_.AssertHeld();
unique_ptr<Compaction> c;
bool is_manual = (manual_compaction_ != nullptr) && bool is_manual = (manual_compaction_ != nullptr) &&
(manual_compaction_->in_progress == false); (manual_compaction_->in_progress == false);
if (is_manual) {
// another thread cannot pick up the same work
manual_compaction_->in_progress = true;
}
// TODO: remove memtable flush from formal compaction
for (auto cfd : *versions_->GetColumnFamilySet()) {
while (cfd->imm()->IsFlushPending()) {
Log(options_.info_log,
"BackgroundCompaction doing FlushMemTableToOutputFile with column "
"family %d, compaction slots available %d",
cfd->GetID(),
options_.max_background_compactions - bg_compaction_scheduled_);
Status stat =
FlushMemTableToOutputFile(cfd, madeProgress, deletion_state);
if (!stat.ok()) {
if (is_manual) {
manual_compaction_->status = stat;
manual_compaction_->done = true;
manual_compaction_->in_progress = false;
manual_compaction_ = nullptr;
}
return stat;
}
}
}
unique_ptr<Compaction> c;
InternalKey manual_end_storage; InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage; InternalKey* manual_end = &manual_end_storage;
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction_; ManualCompaction* m = manual_compaction_;
assert(m->in_progress); m->in_progress = true;
c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin, c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin,
m->end, &manual_end)); m->end, &manual_end));
if (!c) { if (!c) {
@ -2299,20 +2277,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
} }
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
// TODO: remove memtable flush from normal compaction work
if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log);
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
FlushMemTableToOutputFile(cfd, nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key = input->key(); Slice key = input->key();
Slice value = input->value(); Slice value = input->value();

@ -526,13 +526,17 @@ struct DBOptions {
// regardless of this setting // regardless of this setting
uint64_t delete_obsolete_files_period_micros; uint64_t delete_obsolete_files_period_micros;
// Maximum number of concurrent background jobs, submitted to // Maximum number of concurrent background compaction jobs, submitted to
// the default LOW priority thread pool // the default LOW priority thread pool.
// If you're increasing this, also consider increasing number of threads in
// LOW priority thread pool. For more information, see
// Env::SetBackgroundThreads
// Default: 1 // Default: 1
int max_background_compactions; int max_background_compactions;
// Maximum number of concurrent background memtable flush jobs, submitted to // Maximum number of concurrent background memtable flush jobs, submitted to
// the HIGH priority thread pool. // the HIGH priority thread pool.
//
// By default, all background jobs (major compaction and memtable flush) go // By default, all background jobs (major compaction and memtable flush) go
// to the LOW priority pool. If this option is set to a positive number, // to the LOW priority pool. If this option is set to a positive number,
// memtable flush jobs will be submitted to the HIGH priority pool. // memtable flush jobs will be submitted to the HIGH priority pool.
@ -540,7 +544,11 @@ struct DBOptions {
// Without a separate pool, long running major compaction jobs could // Without a separate pool, long running major compaction jobs could
// potentially block memtable flush jobs of other db instances, leading to // potentially block memtable flush jobs of other db instances, leading to
// unnecessary Put stalls. // unnecessary Put stalls.
// Default: 0 //
// If you're increasing this, also consider increasing number of threads in
// HIGH priority thread pool. For more information, see
// Env::SetBackgroundThreads
// Default: 1
int max_background_flushes; int max_background_flushes;
// Specify the maximal size of the info log file. If the log file // Specify the maximal size of the info log file. If the log file

@ -150,7 +150,7 @@ DBOptions::DBOptions()
wal_dir(""), wal_dir(""),
delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL), delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL),
max_background_compactions(1), max_background_compactions(1),
max_background_flushes(0), max_background_flushes(1),
max_log_file_size(0), max_log_file_size(0),
log_file_time_to_roll(0), log_file_time_to_roll(0),
keep_log_file_num(1000), keep_log_file_num(1000),

Loading…
Cancel
Save