Introduce bottom-pri thread pool for large universal compactions

Summary:
When we had a single thread pool for compactions, a thread could be busy for a long time (minutes) executing a compaction involving the bottom level. In multi-instance setups, the entire thread pool could be consumed by such bottom-level compactions. Then, top-level compactions (e.g., a few L0 files) would be blocked for a long time ("head-of-line blocking"). Such top-level compactions are critical to prevent compaction stalls as they can quickly reduce number of L0 files / sorted runs.

This diff introduces a bottom-priority queue for universal compactions including the bottom level. This alleviates the head-of-line blocking situation for fast, top-level compactions.

- Added `Env::Priority::BOTTOM` thread pool. This feature is only enabled if user explicitly configures it to have a positive number of threads.
- Changed `ThreadPoolImpl`'s default thread limit from one to zero. This change is invisible to users as we call `IncBackgroundThreadsIfNeeded` on the low-pri/high-pri pools during `DB::Open` with values of at least one. It is necessary, though, for bottom-pri to start with zero threads so the feature is disabled by default.
- Separated `ManualCompaction` into two parts in `PrepickedCompaction`. `PrepickedCompaction` is used for any compaction that's picked outside of its execution thread, either manual or automatic.
- Forward universal compactions involving last level to the bottom pool (worker thread's entry point is `BGWorkBottomCompaction`).
- Track `bg_bottom_compaction_scheduled_` so we can wait for bottom-level compactions to finish. We don't count them against the background jobs limits. So users of this feature will get an extra compaction for free.
Closes https://github.com/facebook/rocksdb/pull/2580

Differential Revision: D5422916

Pulled By: ajkr

fbshipit-source-id: a74bd11f1ea4933df3739b16808bb21fcd512333
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 0b814ba92d
commit cc01985db0
  1. 1
      HISTORY.md
  2. 11
      db/db_impl.cc
  3. 36
      db/db_impl.h
  4. 143
      db/db_impl_compaction_flush.cc
  5. 4
      db/db_impl_debug.cc
  6. 97
      db/db_universal_compaction_test.cc
  7. 8
      db/version_set.cc
  8. 1
      db/version_set.h
  9. 12
      env/env_posix.cc
  10. 14
      env/env_test.cc
  11. 2
      include/rocksdb/env.h
  12. 1
      memtable/inlineskiplist_test.cc
  13. 1
      memtable/skiplist_test.cc
  14. 6
      tools/db_bench_tool.cc
  15. 6
      util/threadpool_imp.cc

@ -3,6 +3,7 @@
### New Features ### New Features
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators. * Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1. * Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.
* Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`.
### Bug Fixes ### Bug Fixes
* Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`. * Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`.

@ -168,6 +168,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_batch_group_size_(0), last_batch_group_size_(0),
unscheduled_flushes_(0), unscheduled_flushes_(0),
unscheduled_compactions_(0), unscheduled_compactions_(0),
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0), bg_compaction_scheduled_(0),
num_running_compactions_(0), num_running_compactions_(0),
bg_flush_scheduled_(0), bg_flush_scheduled_(0),
@ -242,7 +243,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
return; return;
} }
// Wait for background work to finish // Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_) { while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
} }
@ -252,15 +254,18 @@ DBImpl::~DBImpl() {
// marker. After this we do a variant of the waiting and unschedule work // marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true)) // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
CancelAllBackgroundWork(false); CancelAllBackgroundWork(false);
int bottom_compactions_unscheduled =
env_->UnSchedule(this, Env::Priority::BOTTOM);
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
mutex_.Lock(); mutex_.Lock();
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
bg_compaction_scheduled_ -= compactions_unscheduled; bg_compaction_scheduled_ -= compactions_unscheduled;
bg_flush_scheduled_ -= flushes_unscheduled; bg_flush_scheduled_ -= flushes_unscheduled;
// Wait for background work to finish // Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_ || while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_purge_scheduled_) { bg_flush_scheduled_ || bg_purge_scheduled_) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait(); bg_cv_.Wait();
} }

@ -658,6 +658,7 @@ class DBImpl : public DB {
} }
}; };
struct PrepickedCompaction;
struct PurgeFileInfo; struct PurgeFileInfo;
// Recover the descriptor from persistent storage. May do a significant // Recover the descriptor from persistent storage. May do a significant
@ -799,14 +800,19 @@ class DBImpl : public DB {
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number, void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
uint32_t path_id, int job_id); uint32_t path_id, int job_id);
static void BGWorkCompaction(void* arg); static void BGWorkCompaction(void* arg);
// Runs a pre-chosen universal compaction involving bottom level in a
// separate, bottom-pri thread pool.
static void BGWorkBottomCompaction(void* arg);
static void BGWorkFlush(void* db); static void BGWorkFlush(void* db);
static void BGWorkPurge(void* arg); static void BGWorkPurge(void* arg);
static void UnscheduleCallback(void* arg); static void UnscheduleCallback(void* arg);
void BackgroundCallCompaction(void* arg); void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri);
void BackgroundCallFlush(); void BackgroundCallFlush();
void BackgroundCallPurge(); void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, void* m = 0); LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context, Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer); LogBuffer* log_buffer);
@ -1059,6 +1065,10 @@ class DBImpl : public DB {
int unscheduled_flushes_; int unscheduled_flushes_;
int unscheduled_compactions_; int unscheduled_compactions_;
// count how many background compactions are running or have been scheduled in
// the BOTTOM pool
int bg_bottom_compaction_scheduled_;
// count how many background compactions are running or have been scheduled // count how many background compactions are running or have been scheduled
int bg_compaction_scheduled_; int bg_compaction_scheduled_;
@ -1075,7 +1085,7 @@ class DBImpl : public DB {
int bg_purge_scheduled_; int bg_purge_scheduled_;
// Information for a manual compaction // Information for a manual compaction
struct ManualCompaction { struct ManualCompactionState {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
int input_level; int input_level;
int output_level; int output_level;
@ -1091,13 +1101,21 @@ class DBImpl : public DB {
InternalKey* manual_end; // how far we are compacting InternalKey* manual_end; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress InternalKey tmp_storage1; // Used to keep track of compaction progress
};
struct PrepickedCompaction {
// background compaction takes ownership of `compaction`.
Compaction* compaction; Compaction* compaction;
// caller retains ownership of `manual_compaction_state` as it is reused
// across background compactions.
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
}; };
std::deque<ManualCompaction*> manual_compaction_dequeue_; std::deque<ManualCompactionState*> manual_compaction_dequeue_;
struct CompactionArg { struct CompactionArg {
// caller retains ownership of `db`.
DBImpl* db; DBImpl* db;
ManualCompaction* m; // background compaction takes ownership of `prepicked_compaction`.
PrepickedCompaction* prepicked_compaction;
}; };
// Have we encountered a background error in paranoid mode? // Have we encountered a background error in paranoid mode?
@ -1231,11 +1249,11 @@ class DBImpl : public DB {
bool HasPendingManualCompaction(); bool HasPendingManualCompaction();
bool HasExclusiveManualCompaction(); bool HasExclusiveManualCompaction();
void AddManualCompaction(ManualCompaction* m); void AddManualCompaction(ManualCompactionState* m);
void RemoveManualCompaction(ManualCompaction* m); void RemoveManualCompaction(ManualCompactionState* m);
bool ShouldntRunManualCompaction(ManualCompaction* m); bool ShouldntRunManualCompaction(ManualCompactionState* m);
bool HaveManualCompaction(ColumnFamilyData* cfd); bool HaveManualCompaction(ColumnFamilyData* cfd);
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;

@ -612,7 +612,8 @@ Status DBImpl::CompactFilesImpl(
Status DBImpl::PauseBackgroundWork() { Status DBImpl::PauseBackgroundWork() {
InstrumentedMutexLock guard_lock(&mutex_); InstrumentedMutexLock guard_lock(&mutex_);
bg_compaction_paused_++; bg_compaction_paused_++;
while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) { while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
bg_flush_scheduled_ > 0) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
bg_work_paused_++; bg_work_paused_++;
@ -808,7 +809,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
bool scheduled = false; bool scheduled = false;
bool manual_conflict = false; bool manual_conflict = false;
ManualCompaction manual; ManualCompactionState manual;
manual.cfd = cfd; manual.cfd = cfd;
manual.input_level = input_level; manual.input_level = input_level;
manual.output_level = output_level; manual.output_level = output_level;
@ -858,7 +859,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
AddManualCompaction(&manual); AddManualCompaction(&manual);
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_); TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
if (exclusive) { if (exclusive) {
while (bg_compaction_scheduled_ > 0) { while (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0) {
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled"); TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
immutable_db_options_.info_log, immutable_db_options_.info_log,
@ -878,14 +880,14 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
while (!manual.done) { while (!manual.done) {
assert(HasPendingManualCompaction()); assert(HasPendingManualCompaction());
manual_conflict = false; manual_conflict = false;
Compaction* compaction;
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) || if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled || scheduled ||
((manual.manual_end = &manual.tmp_storage1)&&( ((manual.manual_end = &manual.tmp_storage1) &&
(manual.compaction = manual.cfd->CompactRange( ((compaction = manual.cfd->CompactRange(
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level, *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
manual.output_level, manual.output_path_id, manual.begin, manual.output_level, manual.output_path_id, manual.begin,
manual.end, &manual.manual_end, &manual_conflict)) == manual.end, &manual.manual_end, &manual_conflict)) == nullptr) &&
nullptr) &&
manual_conflict)) { manual_conflict)) {
// exclusive manual compactions should not see a conflict during // exclusive manual compactions should not see a conflict during
// CompactRange // CompactRange
@ -898,14 +900,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.incomplete = false; manual.incomplete = false;
} }
} else if (!scheduled) { } else if (!scheduled) {
if (manual.compaction == nullptr) { if (compaction == nullptr) {
manual.done = true; manual.done = true;
bg_cv_.SignalAll(); bg_cv_.SignalAll();
continue; continue;
} }
ca = new CompactionArg; ca = new CompactionArg;
ca->db = this; ca->db = this;
ca->m = &manual; ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->manual_compaction_state = &manual;
ca->prepicked_compaction->compaction = compaction;
manual.incomplete = false; manual.incomplete = false;
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
@ -1047,7 +1051,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
unscheduled_compactions_ > 0) { unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg; CompactionArg* ca = new CompactionArg;
ca->db = this; ca->db = this;
ca->m = nullptr; ca->prepicked_compaction = nullptr;
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
unscheduled_compactions_--; unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
@ -1152,7 +1156,23 @@ void DBImpl::BGWorkCompaction(void* arg) {
delete reinterpret_cast<CompactionArg*>(arg); delete reinterpret_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
TEST_SYNC_POINT("DBImpl::BGWorkCompaction"); TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m); auto prepicked_compaction =
static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
prepicked_compaction, Env::Priority::LOW);
delete prepicked_compaction;
}
void DBImpl::BGWorkBottomCompaction(void* arg) {
CompactionArg ca = *(static_cast<CompactionArg*>(arg));
delete static_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
auto* prepicked_compaction = ca.prepicked_compaction;
assert(prepicked_compaction && prepicked_compaction->compaction &&
!prepicked_compaction->manual_compaction_state);
ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
delete prepicked_compaction;
} }
void DBImpl::BGWorkPurge(void* db) { void DBImpl::BGWorkPurge(void* db) {
@ -1165,8 +1185,11 @@ void DBImpl::BGWorkPurge(void* db) {
void DBImpl::UnscheduleCallback(void* arg) { void DBImpl::UnscheduleCallback(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg)); CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
delete reinterpret_cast<CompactionArg*>(arg); delete reinterpret_cast<CompactionArg*>(arg);
if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) { if (ca.prepicked_compaction != nullptr) {
delete ca.m->compaction; if (ca.prepicked_compaction->compaction != nullptr) {
delete ca.prepicked_compaction->compaction;
}
delete ca.prepicked_compaction;
} }
TEST_SYNC_POINT("DBImpl::UnscheduleCallback"); TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
} }
@ -1293,9 +1316,9 @@ void DBImpl::BackgroundCallFlush() {
} }
} }
void DBImpl::BackgroundCallCompaction(void* arg) { void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri) {
bool made_progress = false; bool made_progress = false;
ManualCompaction* m = reinterpret_cast<ManualCompaction*>(arg);
JobContext job_context(next_job_id_.fetch_add(1), true); JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0"); TEST_SYNC_POINT("BackgroundCallCompaction:0");
MaybeDumpStats(); MaybeDumpStats();
@ -1313,9 +1336,11 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
auto pending_outputs_inserted_elem = auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
assert(bg_compaction_scheduled_); assert((bg_thread_pri == Env::Priority::BOTTOM &&
Status s = bg_bottom_compaction_scheduled_) ||
BackgroundCompaction(&made_progress, &job_context, &log_buffer, m); (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction);
TEST_SYNC_POINT("BackgroundCallCompaction:1"); TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (!s.ok() && !s.IsShutdownInProgress()) { if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in // Wait a little bit before retrying background compaction in
@ -1361,17 +1386,24 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
assert(num_running_compactions_ > 0); assert(num_running_compactions_ > 0);
num_running_compactions_--; num_running_compactions_--;
bg_compaction_scheduled_--; if (bg_thread_pri == Env::Priority::LOW) {
bg_compaction_scheduled_--;
} else {
assert(bg_thread_pri == Env::Priority::BOTTOM);
bg_bottom_compaction_scheduled_--;
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
// See if there's more work to be done // See if there's more work to be done
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
if (made_progress || bg_compaction_scheduled_ == 0 || if (made_progress ||
(bg_compaction_scheduled_ == 0 &&
bg_bottom_compaction_scheduled_ == 0) ||
HasPendingManualCompaction()) { HasPendingManualCompaction()) {
// signal if // signal if
// * made_progress -- need to wakeup DelayWrite // * made_progress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is // If none of this is true, there is no need to signal since nobody is
// waiting for it // waiting for it
@ -1386,14 +1418,23 @@ void DBImpl::BackgroundCallCompaction(void* arg) {
Status DBImpl::BackgroundCompaction(bool* made_progress, Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context, JobContext* job_context,
LogBuffer* log_buffer, void* arg) { LogBuffer* log_buffer,
ManualCompaction* manual_compaction = PrepickedCompaction* prepicked_compaction) {
reinterpret_cast<ManualCompaction*>(arg); ManualCompactionState* manual_compaction =
prepicked_compaction == nullptr
? nullptr
: prepicked_compaction->manual_compaction_state;
*made_progress = false; *made_progress = false;
mutex_.AssertHeld(); mutex_.AssertHeld();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start"); TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
bool is_manual = (manual_compaction != nullptr); bool is_manual = (manual_compaction != nullptr);
unique_ptr<Compaction> c;
if (prepicked_compaction != nullptr &&
prepicked_compaction->compaction != nullptr) {
c.reset(prepicked_compaction->compaction);
}
bool is_prepicked = is_manual || c;
// (manual_compaction->in_progress == false); // (manual_compaction->in_progress == false);
bool trivial_move_disallowed = bool trivial_move_disallowed =
@ -1410,7 +1451,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
manual_compaction->status = status; manual_compaction->status = status;
manual_compaction->done = true; manual_compaction->done = true;
manual_compaction->in_progress = false; manual_compaction->in_progress = false;
delete manual_compaction->compaction;
manual_compaction = nullptr; manual_compaction = nullptr;
} }
return status; return status;
@ -1421,13 +1461,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
manual_compaction->in_progress = true; manual_compaction->in_progress = true;
} }
unique_ptr<Compaction> c;
// InternalKey manual_end_storage; // InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage; // InternalKey* manual_end = &manual_end_storage;
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction; ManualCompactionState* m = manual_compaction;
assert(m->in_progress); assert(m->in_progress);
c.reset(std::move(m->compaction));
if (!c) { if (!c) {
m->done = true; m->done = true;
m->manual_end = nullptr; m->manual_end = nullptr;
@ -1449,7 +1487,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
? "(end)" ? "(end)"
: m->manual_end->DebugString().c_str())); : m->manual_end->DebugString().c_str()));
} }
} else if (!compaction_queue_.empty()) { } else if (!is_prepicked && !compaction_queue_.empty()) {
if (HaveManualCompaction(compaction_queue_.front())) { if (HaveManualCompaction(compaction_queue_.front())) {
// Can't compact right now, but try again later // Can't compact right now, but try again later
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict"); TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
@ -1601,6 +1639,28 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Clear Instrument // Clear Instrument
ThreadStatusUtil::ResetThreadStatus(); ThreadStatusUtil::ResetThreadStatus();
} else if (c->column_family_data()->ioptions()->compaction_style ==
kCompactionStyleUniversal &&
!is_prepicked && c->output_level() > 0 &&
c->output_level() ==
c->column_family_data()
->current()
->storage_info()
->MaxOutputLevel(
immutable_db_options_.allow_ingest_behind) &&
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
// Forward universal compactions involving last level to the bottom pool
// if it exists, such that long-running compactions can't block short-
// lived ones, like L0->L0s.
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
CompactionArg* ca = new CompactionArg;
ca->db = this;
ca->prepicked_compaction = new PrepickedCompaction;
ca->prepicked_compaction->compaction = c.release();
ca->prepicked_compaction->manual_compaction_state = nullptr;
++bg_bottom_compaction_scheduled_;
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
this, &DBImpl::UnscheduleCallback);
} else { } else {
int output_level __attribute__((unused)) = c->output_level(); int output_level __attribute__((unused)) = c->output_level();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
@ -1664,7 +1724,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} }
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction; ManualCompactionState* m = manual_compaction;
if (!status.ok()) { if (!status.ok()) {
m->status = status; m->status = status;
m->done = true; m->done = true;
@ -1707,13 +1767,13 @@ bool DBImpl::HasPendingManualCompaction() {
return (!manual_compaction_dequeue_.empty()); return (!manual_compaction_dequeue_.empty());
} }
void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) { void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
manual_compaction_dequeue_.push_back(m); manual_compaction_dequeue_.push_back(m);
} }
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) { void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
// Remove from queue // Remove from queue
std::deque<ManualCompaction*>::iterator it = std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin(); manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) { while (it != manual_compaction_dequeue_.end()) {
if (m == (*it)) { if (m == (*it)) {
@ -1726,16 +1786,17 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
return; return;
} }
bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) { bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
if (num_running_ingest_file_ > 0) { if (num_running_ingest_file_ > 0) {
// We need to wait for other IngestExternalFile() calls to finish // We need to wait for other IngestExternalFile() calls to finish
// before running a manual compaction. // before running a manual compaction.
return true; return true;
} }
if (m->exclusive) { if (m->exclusive) {
return (bg_compaction_scheduled_ > 0); return (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0);
} }
std::deque<ManualCompaction*>::iterator it = std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin(); manual_compaction_dequeue_.begin();
bool seen = false; bool seen = false;
while (it != manual_compaction_dequeue_.end()) { while (it != manual_compaction_dequeue_.end()) {
@ -1756,7 +1817,7 @@ bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) { bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
// Remove from priority queue // Remove from priority queue
std::deque<ManualCompaction*>::iterator it = std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin(); manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) { while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) { if ((*it)->exclusive) {
@ -1774,7 +1835,7 @@ bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
bool DBImpl::HasExclusiveManualCompaction() { bool DBImpl::HasExclusiveManualCompaction() {
// Remove from priority queue // Remove from priority queue
std::deque<ManualCompaction*>::iterator it = std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin(); manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) { while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) { if ((*it)->exclusive) {
@ -1785,7 +1846,7 @@ bool DBImpl::HasExclusiveManualCompaction() {
return false; return false;
} }
bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) { bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
if ((m->exclusive) || (m1->exclusive)) { if ((m->exclusive) || (m1->exclusive)) {
return true; return true;
} }

@ -112,7 +112,9 @@ Status DBImpl::TEST_WaitForCompact() {
// OR flush to finish. // OR flush to finish.
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && bg_error_.ok()) { while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) &&
bg_error_.ok()) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
return bg_error_; return bg_error_;

@ -1370,6 +1370,103 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
Destroy(options); Destroy(options);
} }
TEST_P(DBTestUniversalCompaction, FullCompactionInBottomPriThreadPool) {
const int kNumFilesTrigger = 3;
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
for (bool allow_ingest_behind : {false, true}) {
Options options = CurrentOptions();
options.allow_ingest_behind = allow_ingest_behind;
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
// Trigger compaction if size amplification exceeds 110%
options.compaction_options_universal.max_size_amplification_percent = 110;
DestroyAndReopen(options);
int num_bottom_pri_compactions = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BGWorkBottomCompaction",
[&](void* arg) { ++num_bottom_pri_compactions; });
SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int num = 0; num < kNumFilesTrigger; num++) {
ASSERT_EQ(NumSortedRuns(), num);
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
}
dbfull()->TEST_WaitForCompact();
if (allow_ingest_behind || num_levels_ > 1) {
// allow_ingest_behind increases number of levels while sanitizing.
ASSERT_EQ(1, num_bottom_pri_compactions);
} else {
// for single-level universal, everything's bottom level so nothing should
// be executed in bottom-pri thread pool.
ASSERT_EQ(0, num_bottom_pri_compactions);
}
// Verify that size amplification did occur
ASSERT_EQ(NumSortedRuns(), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
}
TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
if (num_levels_ == 1) {
// for single-level universal, everything's bottom level so nothing should
// be executed in bottom-pri thread pool.
return;
}
const int kNumFilesTrigger = 3;
Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = num_levels_;
options.write_buffer_size = 100 << 10; // 100KB
options.target_file_size_base = 32 << 10; // 32KB
options.level0_file_num_compaction_trigger = kNumFilesTrigger;
// Trigger compaction if size amplification exceeds 110%
options.compaction_options_universal.max_size_amplification_percent = 110;
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{// wait for the full compaction to be picked before adding files intended
// for the second one.
{"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
// the full (bottom-pri) compaction waits until a partial (low-pri)
// compaction has started to verify they can run in parallel.
{"DBImpl::BackgroundCompaction:NonTrivial",
"DBImpl::BGWorkBottomCompaction"}});
SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int i = 0; i < 2; ++i) {
for (int num = 0; num < kNumFilesTrigger; num++) {
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
// use no_wait above because that one waits for flush and compaction. We
// don't want to wait for compaction because the full compaction is
// intentionally blocked while more files are flushed.
dbfull()->TEST_WaitForFlushMemTable();
}
if (i == 0) {
TEST_SYNC_POINT(
"DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
}
}
dbfull()->TEST_WaitForCompact();
// First compaction should output to bottom level. Second should output to L0
// since older L0 files pending compaction prevent it from being placed lower.
ASSERT_EQ(NumSortedRuns(), 2);
ASSERT_GT(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction, INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction,
::testing::Combine(::testing::Values(1, 3, 5), ::testing::Combine(::testing::Values(1, 3, 5),
::testing::Bool())); ::testing::Bool()));

@ -1227,6 +1227,14 @@ int VersionStorageInfo::MaxInputLevel() const {
return 0; return 0;
} }
int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
if (allow_ingest_behind) {
assert(num_levels() > 1);
return num_levels() - 2;
}
return num_levels() - 1;
}
void VersionStorageInfo::EstimateCompactionBytesNeeded( void VersionStorageInfo::EstimateCompactionBytesNeeded(
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
// Only implemented for level-based compaction // Only implemented for level-based compaction

@ -147,6 +147,7 @@ class VersionStorageInfo {
} }
int MaxInputLevel() const; int MaxInputLevel() const;
int MaxOutputLevel(bool allow_ingest_behind) const;
// Return level number that has idx'th highest score // Return level number that has idx'th highest score
int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; } int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; }

12
env/env_posix.cc vendored

@ -761,23 +761,23 @@ class PosixEnv : public Env {
// Allow increasing the number of worker threads. // Allow increasing the number of worker threads.
virtual void SetBackgroundThreads(int num, Priority pri) override { virtual void SetBackgroundThreads(int num, Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].SetBackgroundThreads(num); thread_pools_[pri].SetBackgroundThreads(num);
} }
virtual int GetBackgroundThreads(Priority pri) override { virtual int GetBackgroundThreads(Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].GetBackgroundThreads(); return thread_pools_[pri].GetBackgroundThreads();
} }
// Allow increasing the number of worker threads. // Allow increasing the number of worker threads.
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
} }
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override { virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
assert(pool >= Priority::LOW && pool <= Priority::HIGH); assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX #ifdef OS_LINUX
thread_pools_[pool].LowerIOPriority(); thread_pools_[pool].LowerIOPriority();
#endif #endif
@ -883,7 +883,7 @@ PosixEnv::PosixEnv()
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag, void (*unschedFunction)(void* arg)) { void* tag, void (*unschedFunction)(void* arg)) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
} }
@ -892,7 +892,7 @@ int PosixEnv::UnSchedule(void* arg, Priority pri) {
} }
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
assert(pri >= Priority::LOW && pri <= Priority::HIGH); assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].GetQueueLen(); return thread_pools_[pri].GetQueueLen();
} }

14
env/env_test.cc vendored

@ -125,12 +125,14 @@ static void SetBool(void* ptr) {
reinterpret_cast<std::atomic<bool>*>(ptr)->store(true); reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
} }
TEST_P(EnvPosixTestWithParam, RunImmediately) { TEST_F(EnvPosixTest, RunImmediately) {
std::atomic<bool> called(false); for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) {
env_->Schedule(&SetBool, &called); std::atomic<bool> called(false);
Env::Default()->SleepForMicroseconds(kDelayMicros); env_->SetBackgroundThreads(1, static_cast<Env::Priority>(pri));
ASSERT_TRUE(called.load()); env_->Schedule(&SetBool, &called, static_cast<Env::Priority>(pri));
WaitThreadPoolsEmpty(); Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called.load());
}
} }
TEST_P(EnvPosixTestWithParam, UnSchedule) { TEST_P(EnvPosixTestWithParam, UnSchedule) {

@ -283,7 +283,7 @@ class Env {
virtual Status UnlockFile(FileLock* lock) = 0; virtual Status UnlockFile(FileLock* lock) = 0;
// Priority for scheduling job in thread pool // Priority for scheduling job in thread pool
enum Priority { LOW, HIGH, TOTAL }; enum Priority { BOTTOM, LOW, HIGH, TOTAL };
// Priority for requesting bytes in rate limiter scheduler // Priority for requesting bytes in rate limiter scheduler
enum IOPriority { enum IOPriority {

@ -571,6 +571,7 @@ static void RunConcurrentRead(int run) {
fprintf(stderr, "Run %d of %d\n", i, N); fprintf(stderr, "Run %d of %d\n", i, N);
} }
TestState state(seed + 1); TestState state(seed + 1);
Env::Default()->SetBackgroundThreads(1);
Env::Default()->Schedule(ConcurrentReader, &state); Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING); state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; ++k) { for (int k = 0; k < kSize; ++k) {

@ -363,6 +363,7 @@ static void RunConcurrent(int run) {
fprintf(stderr, "Run %d of %d\n", i, N); fprintf(stderr, "Run %d of %d\n", i, N);
} }
TestState state(seed + 1); TestState state(seed + 1);
Env::Default()->SetBackgroundThreads(1);
Env::Default()->Schedule(ConcurrentReader, &state); Env::Default()->Schedule(ConcurrentReader, &state);
state.Wait(TestState::RUNNING); state.Wait(TestState::RUNNING);
for (int k = 0; k < kSize; k++) { for (int k = 0; k < kSize; k++) {

@ -318,6 +318,10 @@ DEFINE_int32(max_background_jobs,
"The maximum number of concurrent background jobs that can occur " "The maximum number of concurrent background jobs that can occur "
"in parallel."); "in parallel.");
DEFINE_int32(num_bottom_pri_threads, 0,
"The number of threads in the bottom-priority thread pool (used "
"by universal compaction only).");
DEFINE_int32(max_background_compactions, DEFINE_int32(max_background_compactions,
rocksdb::Options().max_background_compactions, rocksdb::Options().max_background_compactions,
"The maximum number of concurrent background compactions" "The maximum number of concurrent background compactions"
@ -5242,6 +5246,8 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes, FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
rocksdb::Env::Priority::HIGH); rocksdb::Env::Priority::HIGH);
FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
rocksdb::Env::Priority::BOTTOM);
// Choose a location for the test database if none given with --db=<path> // Choose a location for the test database if none given with --db=<path>
if (FLAGS_db.empty()) { if (FLAGS_db.empty()) {

@ -123,11 +123,11 @@ private:
inline inline
ThreadPoolImpl::Impl::Impl() ThreadPoolImpl::Impl::Impl()
: :
low_io_priority_(false), low_io_priority_(false),
priority_(Env::LOW), priority_(Env::LOW),
env_(nullptr), env_(nullptr),
total_threads_limit_(1), total_threads_limit_(0),
queue_len_(), queue_len_(),
exit_all_threads_(false), exit_all_threads_(false),
wait_for_jobs_to_complete_(false), wait_for_jobs_to_complete_(false),
@ -372,7 +372,7 @@ int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
return count; return count;
} }
ThreadPoolImpl::ThreadPoolImpl() : ThreadPoolImpl::ThreadPoolImpl() :
impl_(new Impl()) { impl_(new Impl()) {
} }

Loading…
Cancel
Save