diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index a9fc33499..cc421fa08 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -299,8 +299,31 @@ TEST_F(DBSSTTest, RateLimitedDelete) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "DeleteScheduler::BackgroundEmptyTrash:Wait", [&](void* arg) { penalties.push_back(*(static_cast(arg))); }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) { + // Turn timed wait into a simulated sleep + uint64_t* abs_time_us = static_cast(arg); + int64_t cur_time = 0; + env_->GetCurrentTime(&cur_time); + if (*abs_time_us > static_cast(cur_time)) { + env_->addon_time_.fetch_add(*abs_time_us - + static_cast(cur_time)); + } + + // Randomly sleep shortly + env_->addon_time_.fetch_add( + static_cast(Random::GetTLSInstance()->Uniform(10))); + + // Set wait until time to before current to force not to sleep. + int64_t real_cur_time = 0; + Env::Default()->GetCurrentTime(&real_cur_time); + *abs_time_us = static_cast(real_cur_time); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + env_->no_sleep_ = true; + env_->time_elapse_only_sleep_ = true; Options options = CurrentOptions(); options.disable_auto_compactions = true; options.env = env_; @@ -348,6 +371,7 @@ TEST_F(DBSSTTest, RateLimitedDelete) { ASSERT_EQ(expected_penlty, penalties[i]); } ASSERT_GT(time_spent_deleting, expected_penlty * 0.9); + ASSERT_LT(time_spent_deleting, expected_penlty * 1.1); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } diff --git a/util/delete_scheduler.cc b/util/delete_scheduler.cc index b403c0572..3a83f124b 100644 --- a/util/delete_scheduler.cc +++ b/util/delete_scheduler.cc @@ -38,7 +38,7 @@ DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir, DeleteScheduler::~DeleteScheduler() { { - MutexLock l(&mu_); + InstrumentedMutexLock l(&mu_); closing_ = true; cv_.SignalAll(); } @@ -74,7 +74,7 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path) { // Add file to delete queue { - MutexLock l(&mu_); + InstrumentedMutexLock l(&mu_); queue_.push(path_in_trash); pending_files_++; if (pending_files_ == 1) { @@ -85,7 +85,7 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path) { } std::map DeleteScheduler::GetBackgroundErrors() { - MutexLock l(&mu_); + InstrumentedMutexLock l(&mu_); return bg_errors_; } @@ -107,7 +107,7 @@ Status DeleteScheduler::MoveToTrash(const std::string& file_path, // TODO(tec) : Implement Env::RenameFileIfNotExist and remove // file_move_mu mutex. - MutexLock l(&file_move_mu_); + InstrumentedMutexLock l(&file_move_mu_); while (true) { s = env_->FileExists(*path_in_trash + unique_suffix); if (s.IsNotFound()) { @@ -133,7 +133,7 @@ void DeleteScheduler::BackgroundEmptyTrash() { TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash"); while (true) { - MutexLock l(&mu_); + InstrumentedMutexLock l(&mu_); while (queue_.empty() && !closing_) { cv_.Wait(); } @@ -204,7 +204,7 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, } void DeleteScheduler::WaitForEmptyTrash() { - MutexLock l(&mu_); + InstrumentedMutexLock l(&mu_); while (pending_files_ > 0 && !closing_) { cv_.Wait(); } diff --git a/util/delete_scheduler.h b/util/delete_scheduler.h index eec118708..331507da4 100644 --- a/util/delete_scheduler.h +++ b/util/delete_scheduler.h @@ -11,6 +11,7 @@ #include #include "port/port.h" +#include "util/instrumented_mutex.h" #include "rocksdb/status.h" @@ -63,7 +64,7 @@ class DeleteScheduler { // Maximum number of bytes that should be deleted per second int64_t rate_bytes_per_sec_; // Mutex to protect queue_, pending_files_, bg_errors_, closing_ - port::Mutex mu_; + InstrumentedMutex mu_; // Queue of files in trash that need to be deleted std::queue queue_; // Number of files in trash that are waiting to be deleted @@ -76,11 +77,11 @@ class DeleteScheduler { // - pending_files_ value change from 0 => 1 // - pending_files_ value change from 1 => 0 // - closing_ value is set to true - port::CondVar cv_; + InstrumentedCondVar cv_; // Background thread running BackgroundEmptyTrash std::unique_ptr bg_thread_; // Mutex to protect threads from file name conflicts - port::Mutex file_move_mu_; + InstrumentedMutex file_move_mu_; Logger* info_log_; SstFileManagerImpl* sst_file_manager_; static const uint64_t kMicrosInSecond = 1000 * 1000LL; diff --git a/util/instrumented_mutex.cc b/util/instrumented_mutex.cc index c32a86df7..c00855943 100644 --- a/util/instrumented_mutex.cc +++ b/util/instrumented_mutex.cc @@ -5,6 +5,7 @@ #include "util/instrumented_mutex.h" #include "util/perf_context_imp.h" +#include "util/sync_point.h" #include "util/thread_status_util.h" namespace rocksdb { @@ -80,6 +81,10 @@ bool InstrumentedCondVar::TimedWaitInternal(uint64_t abs_time_us) { #ifndef NDEBUG ThreadStatusUtil::TEST_StateDelay(ThreadStatus::STATE_MUTEX_WAIT); #endif + + TEST_SYNC_POINT_CALLBACK("InstrumentedCondVar::TimedWaitInternal", + &abs_time_us); + return cond_.TimedWait(abs_time_us); }