Ankit Gupta 11 years ago
commit 260842a7f8
  1. 9
      db/db_filesnapshot.cc
  2. 2
      include/rocksdb/db.h
  3. 100
      util/env_posix.cc
  4. 191
      util/env_test.cc
  5. 4
      utilities/backupable/backupable_db.cc
  6. 1
      utilities/backupable/backupable_db_test.cc

@ -29,8 +29,11 @@ Status DBImpl::DisableFileDeletions() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
++disable_delete_obsolete_files_; ++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) { if (disable_delete_obsolete_files_ == 1) {
// if not, it has already been disabled, so don't log anything
Log(options_.info_log, "File Deletions Disabled"); Log(options_.info_log, "File Deletions Disabled");
} else {
Log(options_.info_log,
"File Deletions Disabled, but already disabled. Counter: %d",
disable_delete_obsolete_files_);
} }
return Status::OK(); return Status::OK();
} }
@ -50,6 +53,10 @@ Status DBImpl::EnableFileDeletions(bool force) {
Log(options_.info_log, "File Deletions Enabled"); Log(options_.info_log, "File Deletions Enabled");
should_purge_files = true; should_purge_files = true;
FindObsoleteFiles(deletion_state, true); FindObsoleteFiles(deletion_state, true);
} else {
Log(options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
disable_delete_obsolete_files_);
} }
} }
if (should_purge_files) { if (should_purge_files) {

@ -396,7 +396,7 @@ class DB {
// times have the same effect as calling it once. // times have the same effect as calling it once.
virtual Status DisableFileDeletions() = 0; virtual Status DisableFileDeletions() = 0;
// Allow compactions to delete obselete files. // Allow compactions to delete obsolete files.
// If force == true, the call to EnableFileDeletions() will guarantee that // If force == true, the call to EnableFileDeletions() will guarantee that
// file deletions are enabled after the call, even if DisableFileDeletions() // file deletions are enabled after the call, even if DisableFileDeletions()
// was called multiple times before. // was called multiple times before.

@ -1470,17 +1470,54 @@ class PosixEnv : public Env {
} }
} }
void BGThread() { // Return true if there is at least one thread needs to terminate.
bool HasExcessiveThread() {
return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
}
// Return true iff the current thread is the excessive thread to terminate.
// Always terminate the running thread that is added last, even if there are
// more than one thread to terminate.
bool IsLastExcessiveThread(size_t thread_id) {
return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
}
// Is one of the threads to terminate.
bool IsExcessiveThread(size_t thread_id) {
return static_cast<int>(thread_id) >= total_threads_limit_;
}
void BGThread(size_t thread_id) {
while (true) { while (true) {
// Wait until there is an item that is ready to run // Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty() && !exit_all_threads_) { // Stop waiting if the thread needs to do work or needs to terminate.
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
(queue_.empty() || IsExcessiveThread(thread_id))) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
} }
if (exit_all_threads_) { // mechanism to let BG threads exit safely if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
break; break;
} }
if (IsLastExcessiveThread(thread_id)) {
// Current thread is the last generated one and is excessive.
// We always terminate excessive thread in the reverse order of
// generation time.
auto terminating_thread = bgthreads_.back();
pthread_detach(terminating_thread);
bgthreads_.pop_back();
if (HasExcessiveThread()) {
// There is still at least more excessive thread to terminate.
WakeUpAllThreads();
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
// TODO(sdong): temp logging. Need to help debugging. Remove it when
// the feature is proved to be stable.
fprintf(stdout, "Bg thread %zu terminates %llx\n", thread_id,
static_cast<long long unsigned int>(terminating_thread));
break;
}
void (*function)(void*) = queue_.front().function; void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg; void* arg = queue_.front().arg;
queue_.pop_front(); queue_.pop_front();
@ -1491,36 +1528,50 @@ class PosixEnv : public Env {
} }
} }
// Helper struct for passing arguments when creating threads.
struct BGThreadMetadata {
ThreadPool* thread_pool_;
size_t thread_id_; // Thread count in the thread.
explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {}
};
static void* BGThreadWrapper(void* arg) { static void* BGThreadWrapper(void* arg) {
reinterpret_cast<ThreadPool*>(arg)->BGThread(); BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_;
ThreadPool* tp = meta->thread_pool_;
delete meta;
tp->BGThread(thread_id);
return nullptr; return nullptr;
} }
void WakeUpAllThreads() {
PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
}
void SetBackgroundThreads(int num) { void SetBackgroundThreads(int num) {
PthreadCall("lock", pthread_mutex_lock(&mu_)); PthreadCall("lock", pthread_mutex_lock(&mu_));
if (num > total_threads_limit_) { if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
if (num != total_threads_limit_) {
total_threads_limit_ = num; total_threads_limit_ = num;
WakeUpAllThreads();
StartBGThreads();
} }
assert(total_threads_limit_ > 0); assert(total_threads_limit_ > 0);
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
} }
void Schedule(void (*function)(void*), void* arg) { void StartBGThreads() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
// Start background thread if necessary // Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) { while ((int)bgthreads_.size() < total_threads_limit_) {
pthread_t t; pthread_t t;
PthreadCall( PthreadCall(
"create thread", "create thread",
pthread_create(&t, pthread_create(&t, nullptr, &ThreadPool::BGThreadWrapper,
nullptr, new BGThreadMetadata(this, bgthreads_.size())));
&ThreadPool::BGThreadWrapper,
this));
// Set the thread name to aid debugging // Set the thread name to aid debugging
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
@ -1534,6 +1585,17 @@ class PosixEnv : public Env {
bgthreads_.push_back(t); bgthreads_.push_back(t);
} }
}
void Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
if (exit_all_threads_) {
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
return;
}
StartBGThreads();
// Add to priority queue // Add to priority queue
queue_.push_back(BGItem()); queue_.push_back(BGItem());
@ -1541,8 +1603,14 @@ class PosixEnv : public Env {
queue_.back().arg = arg; queue_.back().arg = arg;
queue_len_.store(queue_.size(), std::memory_order_relaxed); queue_len_.store(queue_.size(), std::memory_order_relaxed);
// always wake up at least one waiting thread. if (!HasExcessiveThread()) {
// Wake up at least one waiting thread.
PthreadCall("signal", pthread_cond_signal(&bgsignal_)); PthreadCall("signal", pthread_cond_signal(&bgsignal_));
} else {
// Need to wake up all threads to make sure the one woken
// up is not the one to terminate.
WakeUpAllThreads();
}
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); PthreadCall("unlock", pthread_mutex_unlock(&mu_));
} }

@ -200,6 +200,197 @@ TEST(EnvPosixTest, TwoPools) {
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
} }
TEST(EnvPosixTest, DecreaseNumBgThreads) {
class SleepingBackgroundTask {
public:
explicit SleepingBackgroundTask()
: bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {}
void DoSleep() {
MutexLock l(&mutex_);
sleeping_ = true;
while (should_sleep_) {
bg_cv_.Wait();
}
sleeping_ = false;
bg_cv_.SignalAll();
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
while (sleeping_) {
bg_cv_.Wait();
}
}
bool IsSleeping() {
MutexLock l(&mutex_);
return sleeping_;
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
bool sleeping_;
};
std::vector<SleepingBackgroundTask> tasks(10);
// Set number of thread to 1 first.
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
// Schedule 3 tasks. 0 running; Task 1, 2 waiting.
for (size_t i = 0; i < 3; i++) {
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
}
ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// Increase to 2 threads. Task 0, 1 running; 2 waiting
env_->SetBackgroundThreads(2, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// The last task finishes. Task 0 running, 2 waiting.
tasks[1].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(!tasks[1].IsSleeping());
ASSERT_TRUE(!tasks[2].IsSleeping());
// Increase to 5 threads. Task 0 and 2 running.
env_->SetBackgroundThreads(5, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[0].IsSleeping());
ASSERT_TRUE(tasks[2].IsSleeping());
// Change number of threads a couple of times while there is no sufficient
// tasks.
env_->SetBackgroundThreads(7, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
tasks[2].WakeUp();
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(3, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(5, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
Env::Default()->SleepForMicroseconds(kDelayMicros * 50);
// Enqueue 5 more tasks. Thread pool size now is 4.
// Task 0, 3, 4, 5 running;6, 7 waiting.
for (size_t i = 3; i < 8; i++) {
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i],
Env::Priority::HIGH);
}
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[3].IsSleeping());
ASSERT_TRUE(tasks[4].IsSleeping());
ASSERT_TRUE(tasks[5].IsSleeping());
ASSERT_TRUE(!tasks[6].IsSleeping());
ASSERT_TRUE(!tasks[7].IsSleeping());
// Wake up task 0, 3 and 4. Task 5, 6, 7 running.
tasks[0].WakeUp();
tasks[3].WakeUp();
tasks[4].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
for (size_t i = 5; i < 8; i++) {
ASSERT_TRUE(tasks[i].IsSleeping());
}
// Shrink back to 1 thread. Still task 5, 6, 7 running
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(tasks[5].IsSleeping());
ASSERT_TRUE(tasks[6].IsSleeping());
ASSERT_TRUE(tasks[7].IsSleeping());
// Wake up task 6. Task 5, 7 running
tasks[6].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(tasks[5].IsSleeping());
ASSERT_TRUE(!tasks[6].IsSleeping());
ASSERT_TRUE(tasks[7].IsSleeping());
// Wake up threads 7. Task 5 running
tasks[7].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[7].IsSleeping());
// Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[8],
Env::Priority::HIGH);
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9],
Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0);
ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
// Increase to 4 threads. Task 5, 8, 9 running.
env_->SetBackgroundThreads(4, Env::Priority::HIGH);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
ASSERT_TRUE(tasks[8].IsSleeping());
ASSERT_TRUE(tasks[9].IsSleeping());
// Shrink to 1 thread
env_->SetBackgroundThreads(1, Env::Priority::HIGH);
// Wake up thread 9.
tasks[9].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[9].IsSleeping());
ASSERT_TRUE(tasks[8].IsSleeping());
// Wake up thread 8
tasks[8].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[8].IsSleeping());
// Wake up the last thread
tasks[5].WakeUp();
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(!tasks[5].IsSleeping());
}
#ifdef OS_LINUX #ifdef OS_LINUX
// To make sure the Env::GetUniqueId() related tests work correctly, The files // To make sure the Env::GetUniqueId() related tests work correctly, The files
// should be stored in regular storage like "hard disk" or "flash device". // should be stored in regular storage like "hard disk" or "flash device".

@ -426,7 +426,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
s = db->GetSortedWalFiles(live_wal_files); s = db->GetSortedWalFiles(live_wal_files);
} }
if (!s.ok()) { if (!s.ok()) {
db->EnableFileDeletions(); db->EnableFileDeletions(false);
return s; return s;
} }
@ -495,7 +495,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
} }
// we copied all the files, enable file deletions // we copied all the files, enable file deletions
db->EnableFileDeletions(); db->EnableFileDeletions(false);
if (s.ok()) { if (s.ok()) {
// move tmp private backup to real backup folder // move tmp private backup to real backup folder

@ -930,7 +930,6 @@ TEST(BackupableDBTest, RateLimiting) {
auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) /
backupable_options_->restore_rate_limit; backupable_options_->restore_rate_limit;
ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time);
ASSERT_LT(restore_time, 2.5 * rate_limited_restore_time);
AssertBackupConsistency(0, 0, 100000, 100010); AssertBackupConsistency(0, 0, 100000, 100010);
} }

Loading…
Cancel
Save