Speed up rocksDB close call.

Summary:
On RocksDB, when there are multiple instances doing
flushes/compactions in the background, the close call takes a long time
because the flushes/compactions need to complete before the database can
shut down. If another instance is using the background threads and the compaction for this instance is in the queue since it has been scheduled, we still cannot shutdown. We now remove the scheduled background tasks which have not yet started running, so that shutdown is speeded up.

Test Plan: DB Test added.

Reviewers: yhchiang, igor, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D33741
main
Venkatesh Radhakrishnan 10 years ago
parent a7aba2ef6b
commit b2b3086524
  1. 27
      db/db_impl.cc
  2. 61
      db/db_test.cc
  3. 12
      hdfs/env_hdfs.h
  4. 22
      include/rocksdb/env.h
  5. 45
      util/env_posix.cc
  6. 108
      util/env_test.cc

@ -258,14 +258,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
void DBImpl::CancelAllBackgroundWork(bool wait) {
shutting_down_.store(true, std::memory_order_release);
if (!wait) {
return;
}
// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) {
bg_cv_.Wait();
}
}
DBImpl::~DBImpl() {
@ -285,6 +277,17 @@ DBImpl::~DBImpl() {
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
}
CancelAllBackgroundWork(true);
mutex_.Unlock();
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
mutex_.Lock();
bg_compaction_scheduled_ -= compactions_unscheduled;
bg_flush_scheduled_ -= flushes_unscheduled;
// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) {
bg_cv_.Wait();
}
listeners_.clear();
flush_scheduler_.Clear();
@ -1718,7 +1721,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
} else {
manual_compaction_ = &manual;
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this);
}
}
@ -1793,7 +1796,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bg_flush_scheduled_ < db_options_.max_background_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
if (db_options_.max_background_flushes == 0 &&
@ -1808,14 +1811,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
unscheduled_compactions_--;
}
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this);
}
while (bg_compaction_scheduled_ < db_options_.max_background_compactions &&
unscheduled_compactions_ > 0) {
bg_compaction_scheduled_++;
unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW, this);
}
}

@ -1105,13 +1105,15 @@ class DBTest {
return sst_count;
}
void GenerateNewFile(Random* rnd, int* key_idx) {
void GenerateNewFile(Random* rnd, int* key_idx, bool nowait = false) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(*key_idx), RandomString(rnd, (i == 10) ? 1 : 10000)));
(*key_idx)++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
if (!nowait) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
}
std::string IterStatus(Iterator* iter) {
@ -11699,6 +11701,59 @@ TEST(DBTest, DeleteObsoleteFilesPendingOutputs) {
ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + file_on_L2));
}
TEST(DBTest, CloseSpeedup) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024;
options.max_write_buffer_number = 16;
// Block background threads
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high,
Env::Priority::HIGH);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames);
// Delete archival files.
for (size_t i = 0; i < filenames.size(); ++i) {
env_->DeleteFile(dbname_ + "/" + filenames[i]);
}
env_->DeleteDir(dbname_);
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH);
Random rnd(301);
int key_idx = 0;
// First three 110KB files are not going to level 2
// After that, (100K, 200K)
for (int num = 0; num < 5; num++) {
GenerateNewFile(&rnd, &key_idx, true);
}
ASSERT_EQ(0, GetSstFileCount(dbname_));
Close();
ASSERT_EQ(0, GetSstFileCount(dbname_));
// Unblock background threads
sleeping_task_high.WakeUp();
sleeping_task_high.WaitUntilDone();
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
Destroy(options);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -103,8 +103,12 @@ class HdfsEnv : public Env {
std::shared_ptr<Logger>* result);
virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW) {
posixEnv->Schedule(function, arg, pri);
Priority pri = LOW, void* tag = nullptr) {
posixEnv->Schedule(function, arg, pri, tag);
}
virtual int UnSchedule(void* arg, Priority pri) {
posixEnv->UnSchedule(arg, pri);
}
virtual void StartThread(void (*function)(void* arg), void* arg) {
@ -322,7 +326,9 @@ class HdfsEnv : public Env {
}
virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW) override {}
Priority pri = LOW, void* tag = nullptr) override {}
virtual int UnSchedule(void* arg, Priority pri) override { return 0; }
virtual void StartThread(void (*function)(void* arg), void* arg) override {}

@ -225,10 +225,12 @@ class Env {
// added to the same Env may run concurrently in different threads.
// I.e., the caller may not assume that background work items are
// serialized.
virtual void Schedule(
void (*function)(void* arg),
void* arg,
Priority pri = LOW) = 0;
virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr) = 0;
// Arrange to remove jobs for given arg from the queue_ if they are not
// already scheduled. Caller is expected to have exclusive lock on arg.
virtual int UnSchedule(void* arg, Priority pri) { return 0; }
// Start a new thread, invoking "function(arg)" within the new thread.
// When "function(arg)" returns, the thread will be destroyed.
@ -804,10 +806,18 @@ class EnvWrapper : public Env {
Status LockFile(const std::string& f, FileLock** l) override {
return target_->LockFile(f, l);
}
Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); }
void Schedule(void (*f)(void*), void* a, Priority pri) override {
return target_->Schedule(f, a, pri);
void Schedule(void (*f)(void* arg), void* a, Priority pri,
void* tag = nullptr) override {
return target_->Schedule(f, a, pri, tag);
}
int UnSchedule(void* tag, Priority pri) {
return target_->UnSchedule(tag, pri);
}
void StartThread(void (*f)(void*), void* a) override {
return target_->StartThread(f, a);
}

@ -41,6 +41,7 @@
#include "util/random.h"
#include "util/iostats_context_imp.h"
#include "util/rate_limiter.h"
#include "util/sync_point.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
@ -1344,8 +1345,10 @@ class PosixEnv : public Env {
return result;
}
virtual void Schedule(void (*function)(void*), void* arg,
Priority pri = LOW) override;
virtual void Schedule(void (*function)(void* arg1), void* arg,
Priority pri = LOW, void* tag = nullptr) override;
virtual int UnSchedule(void* arg, Priority pri) override;
virtual void StartThread(void (*function)(void* arg), void* arg) override;
@ -1765,7 +1768,7 @@ class PosixEnv : public Env {
}
}
void Schedule(void (*function)(void*), void* arg) {
void Schedule(void (*function)(void* arg1), void* arg, void* tag) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
@ -1779,6 +1782,7 @@ class PosixEnv : public Env {
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
queue_.back().tag = tag;
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
@ -1794,13 +1798,37 @@ class PosixEnv : public Env {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
int UnSchedule(void* arg) {
int count = 0;
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Remove from priority queue
BGQueue::iterator it = queue_.begin();
while (it != queue_.end()) {
if (arg == (*it).tag) {
it = queue_.erase(it);
count++;
} else {
it++;
}
}
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return count;
}
unsigned int GetQueueLen() const {
return queue_len_.load(std::memory_order_relaxed);
}
private:
// Entry per Schedule() call
struct BGItem { void* arg; void (*function)(void*); };
struct BGItem {
void* arg;
void (*function)(void*);
void* tag;
};
typedef std::deque<BGItem> BGQueue;
pthread_mutex_t mu_;
@ -1836,9 +1864,14 @@ PosixEnv::PosixEnv() : checkedDiskForMmap_(false),
thread_status_updater_ = CreateThreadStatusUpdater();
}
void PosixEnv::Schedule(void (*function)(void*), void* arg, Priority pri) {
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag) {
assert(pri >= Priority::LOW && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg);
thread_pools_[pri].Schedule(function, arg, tag);
}
int PosixEnv::UnSchedule(void* arg, Priority pri) {
return thread_pools_[pri].UnSchedule(arg);
}
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {

@ -49,6 +49,46 @@ static void SetBool(void* ptr) {
->store(true, std::memory_order_relaxed);
}
class SleepingBackgroundTask {
public:
explicit SleepingBackgroundTask()
: bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {}
void DoSleep() {
MutexLock l(&mutex_);
sleeping_ = true;
while (should_sleep_) {
bg_cv_.Wait();
}
sleeping_ = false;
bg_cv_.SignalAll();
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
while (sleeping_) {
bg_cv_.Wait();
}
}
bool IsSleeping() {
MutexLock l(&mutex_);
return sleeping_;
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
bool sleeping_;
};
TEST(EnvPosixTest, RunImmediately) {
std::atomic<bool> called(false);
env_->Schedule(&SetBool, &called);
@ -56,6 +96,34 @@ TEST(EnvPosixTest, RunImmediately) {
ASSERT_TRUE(called.load(std::memory_order_relaxed));
}
TEST(EnvPosixTest, UnSchedule) {
std::atomic<bool> called(false);
env_->SetBackgroundThreads(1, Env::LOW);
/* Block the low priority queue */
SleepingBackgroundTask sleeping_task, sleeping_task1;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::LOW);
/* Schedule another task */
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task1,
Env::Priority::LOW, &sleeping_task1);
/* Remove it with a different tag */
ASSERT_EQ(0, env_->UnSchedule(&called, Env::Priority::LOW));
/* Remove it from the queue with the right tag */
ASSERT_EQ(1, env_->UnSchedule(&sleeping_task1, Env::Priority::LOW));
// Unblock background thread
sleeping_task.WakeUp();
/* Schedule another task */
env_->Schedule(&SetBool, &called);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called.load(std::memory_order_relaxed));
}
TEST(EnvPosixTest, RunMany) {
std::atomic<int> last_id(0);
@ -240,46 +308,6 @@ TEST(EnvPosixTest, TwoPools) {
}
TEST(EnvPosixTest, DecreaseNumBgThreads) {
class SleepingBackgroundTask {
public:
explicit SleepingBackgroundTask()
: bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {}
void DoSleep() {
MutexLock l(&mutex_);
sleeping_ = true;
while (should_sleep_) {
bg_cv_.Wait();
}
sleeping_ = false;
bg_cv_.SignalAll();
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
while (sleeping_) {
bg_cv_.Wait();
}
}
bool IsSleeping() {
MutexLock l(&mutex_);
return sleeping_;
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
bool sleeping_;
};
std::vector<SleepingBackgroundTask> tasks(10);
// Set number of thread to 1 first.

Loading…
Cancel
Save