diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 1e1ec9757..582355ccd 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -29,8 +29,11 @@ Status DBImpl::DisableFileDeletions() { MutexLock l(&mutex_); ++disable_delete_obsolete_files_; if (disable_delete_obsolete_files_ == 1) { - // if not, it has already been disabled, so don't log anything Log(options_.info_log, "File Deletions Disabled"); + } else { + Log(options_.info_log, + "File Deletions Disabled, but already disabled. Counter: %d", + disable_delete_obsolete_files_); } return Status::OK(); } @@ -50,6 +53,10 @@ Status DBImpl::EnableFileDeletions(bool force) { Log(options_.info_log, "File Deletions Enabled"); should_purge_files = true; FindObsoleteFiles(deletion_state, true); + } else { + Log(options_.info_log, + "File Deletions Enable, but not really enabled. Counter: %d", + disable_delete_obsolete_files_); } } if (should_purge_files) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index e743b4c88..33b443f40 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -396,7 +396,7 @@ class DB { // times have the same effect as calling it once. virtual Status DisableFileDeletions() = 0; - // Allow compactions to delete obselete files. + // Allow compactions to delete obsolete files. // If force == true, the call to EnableFileDeletions() will guarantee that // file deletions are enabled after the call, even if DisableFileDeletions() // was called multiple times before. diff --git a/util/env_posix.cc b/util/env_posix.cc index 5cbd5bd00..7ffba6f53 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1470,17 +1470,54 @@ class PosixEnv : public Env { } } - void BGThread() { + // Return true if there is at least one thread needs to terminate. + bool HasExcessiveThread() { + return static_cast(bgthreads_.size()) > total_threads_limit_; + } + + // Return true iff the current thread is the excessive thread to terminate. + // Always terminate the running thread that is added last, even if there are + // more than one thread to terminate. + bool IsLastExcessiveThread(size_t thread_id) { + return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; + } + + // Is one of the threads to terminate. + bool IsExcessiveThread(size_t thread_id) { + return static_cast(thread_id) >= total_threads_limit_; + } + + void BGThread(size_t thread_id) { while (true) { // Wait until there is an item that is ready to run PthreadCall("lock", pthread_mutex_lock(&mu_)); - while (queue_.empty() && !exit_all_threads_) { + // Stop waiting if the thread needs to do work or needs to terminate. + while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && + (queue_.empty() || IsExcessiveThread(thread_id))) { PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); } if (exit_all_threads_) { // mechanism to let BG threads exit safely PthreadCall("unlock", pthread_mutex_unlock(&mu_)); break; } + if (IsLastExcessiveThread(thread_id)) { + // Current thread is the last generated one and is excessive. + // We always terminate excessive thread in the reverse order of + // generation time. + auto terminating_thread = bgthreads_.back(); + pthread_detach(terminating_thread); + bgthreads_.pop_back(); + if (HasExcessiveThread()) { + // There is still at least more excessive thread to terminate. + WakeUpAllThreads(); + } + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + // TODO(sdong): temp logging. Need to help debugging. Remove it when + // the feature is proved to be stable. + fprintf(stdout, "Bg thread %zu terminates %llx\n", thread_id, + static_cast(terminating_thread)); + break; + } void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); @@ -1491,36 +1528,50 @@ class PosixEnv : public Env { } } + // Helper struct for passing arguments when creating threads. + struct BGThreadMetadata { + ThreadPool* thread_pool_; + size_t thread_id_; // Thread count in the thread. + explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + : thread_pool_(thread_pool), thread_id_(thread_id) {} + }; + static void* BGThreadWrapper(void* arg) { - reinterpret_cast(arg)->BGThread(); + BGThreadMetadata* meta = reinterpret_cast(arg); + size_t thread_id = meta->thread_id_; + ThreadPool* tp = meta->thread_pool_; + delete meta; + tp->BGThread(thread_id); return nullptr; } + void WakeUpAllThreads() { + PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_)); + } + void SetBackgroundThreads(int num) { PthreadCall("lock", pthread_mutex_lock(&mu_)); - if (num > total_threads_limit_) { + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } + if (num != total_threads_limit_) { total_threads_limit_ = num; + WakeUpAllThreads(); + StartBGThreads(); } assert(total_threads_limit_ > 0); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } - void Schedule(void (*function)(void*), void* arg) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); - - if (exit_all_threads_) { - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - return; - } + void StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { pthread_t t; PthreadCall( - "create thread", - pthread_create(&t, - nullptr, - &ThreadPool::BGThreadWrapper, - this)); + "create thread", + pthread_create(&t, nullptr, &ThreadPool::BGThreadWrapper, + new BGThreadMetadata(this, bgthreads_.size()))); // Set the thread name to aid debugging #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) @@ -1534,6 +1585,17 @@ class PosixEnv : public Env { bgthreads_.push_back(t); } + } + + void Schedule(void (*function)(void*), void* arg) { + PthreadCall("lock", pthread_mutex_lock(&mu_)); + + if (exit_all_threads_) { + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + return; + } + + StartBGThreads(); // Add to priority queue queue_.push_back(BGItem()); @@ -1541,8 +1603,14 @@ class PosixEnv : public Env { queue_.back().arg = arg; queue_len_.store(queue_.size(), std::memory_order_relaxed); - // always wake up at least one waiting thread. - PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + if (!HasExcessiveThread()) { + // Wake up at least one waiting thread. + PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + } else { + // Need to wake up all threads to make sure the one woken + // up is not the one to terminate. + WakeUpAllThreads(); + } PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } diff --git a/util/env_test.cc b/util/env_test.cc index 1ac3773b2..2abce6f3a 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -200,6 +200,197 @@ TEST(EnvPosixTest, TwoPools) { ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); } +TEST(EnvPosixTest, DecreaseNumBgThreads) { + class SleepingBackgroundTask { + public: + explicit SleepingBackgroundTask() + : bg_cv_(&mutex_), should_sleep_(true), sleeping_(false) {} + void DoSleep() { + MutexLock l(&mutex_); + sleeping_ = true; + while (should_sleep_) { + bg_cv_.Wait(); + } + sleeping_ = false; + bg_cv_.SignalAll(); + } + + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + + while (sleeping_) { + bg_cv_.Wait(); + } + } + + bool IsSleeping() { + MutexLock l(&mutex_); + return sleeping_; + } + + static void DoSleepTask(void* arg) { + reinterpret_cast(arg)->DoSleep(); + } + + private: + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; + bool sleeping_; + }; + + std::vector tasks(10); + + // Set number of thread to 1 first. + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + + // Schedule 3 tasks. 0 running; Task 1, 2 waiting. + for (size_t i = 0; i < 3; i++) { + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i], + Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + } + ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Increase to 2 threads. Task 0, 1 running; 2 waiting + env_->SetBackgroundThreads(2, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // The last task finishes. Task 0 running, 2 waiting. + tasks[1].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Increase to 5 threads. Task 0 and 2 running. + env_->SetBackgroundThreads(5, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[2].IsSleeping()); + + // Change number of threads a couple of times while there is no sufficient + // tasks. + env_->SetBackgroundThreads(7, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + tasks[2].WakeUp(); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(3, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(5, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + Env::Default()->SleepForMicroseconds(kDelayMicros * 50); + + // Enqueue 5 more tasks. Thread pool size now is 4. + // Task 0, 3, 4, 5 running;6, 7 waiting. + for (size_t i = 3; i < 8; i++) { + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[i], + Env::Priority::HIGH); + } + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[3].IsSleeping()); + ASSERT_TRUE(tasks[4].IsSleeping()); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(!tasks[6].IsSleeping()); + ASSERT_TRUE(!tasks[7].IsSleeping()); + + // Wake up task 0, 3 and 4. Task 5, 6, 7 running. + tasks[0].WakeUp(); + tasks[3].WakeUp(); + tasks[4].WakeUp(); + + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + for (size_t i = 5; i < 8; i++) { + ASSERT_TRUE(tasks[i].IsSleeping()); + } + + // Shrink back to 1 thread. Still task 5, 6, 7 running + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(tasks[6].IsSleeping()); + ASSERT_TRUE(tasks[7].IsSleeping()); + + // Wake up task 6. Task 5, 7 running + tasks[6].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(!tasks[6].IsSleeping()); + ASSERT_TRUE(tasks[7].IsSleeping()); + + // Wake up threads 7. Task 5 running + tasks[7].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[7].IsSleeping()); + + // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running. + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[8], + Env::Priority::HIGH); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &tasks[9], + Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), 0); + ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping()); + + // Increase to 4 threads. Task 5, 8, 9 running. + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[8].IsSleeping()); + ASSERT_TRUE(tasks[9].IsSleeping()); + + // Shrink to 1 thread + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + + // Wake up thread 9. + tasks[9].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[9].IsSleeping()); + ASSERT_TRUE(tasks[8].IsSleeping()); + + // Wake up thread 8 + tasks[8].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[8].IsSleeping()); + + // Wake up the last thread + tasks[5].WakeUp(); + + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[5].IsSleeping()); +} + #ifdef OS_LINUX // To make sure the Env::GetUniqueId() related tests work correctly, The files // should be stored in regular storage like "hard disk" or "flash device". diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 87901e0ef..3ac1d90a1 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -426,7 +426,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { s = db->GetSortedWalFiles(live_wal_files); } if (!s.ok()) { - db->EnableFileDeletions(); + db->EnableFileDeletions(false); return s; } @@ -495,7 +495,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { } // we copied all the files, enable file deletions - db->EnableFileDeletions(); + db->EnableFileDeletions(false); if (s.ok()) { // move tmp private backup to real backup folder diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 6f0c6bc88..e6874fe5d 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -930,7 +930,6 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / backupable_options_->restore_rate_limit; ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); - ASSERT_LT(restore_time, 2.5 * rate_limited_restore_time); AssertBackupConsistency(0, 0, 100000, 100010); }