From 5aea98ddd8eda9865ace5082a108f183d39e48bd Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 14 Jul 2015 09:35:48 +0200 Subject: [PATCH] Deprecate WriteOptions::timeout_hint_us Summary: In one of our recent meetings, we discussed deprecating features that are not being actively used. One of those features, at least within Facebook, is timeout_hint. The feature is really nicely implemented, but if nobody needs it, we should remove it from our code-base (until we get a valid use-case). Some arguments: * Less code == better icache hit rate, smaller builds, simpler code * The motivation for adding timeout_hint_us was to work-around RocksDB's stall issue. However, we're currently addressing the stall issue itself (see @sdong's recent work on stall write_rate), so we should never see sharp lock-ups in the future. * Nobody is using the feature within Facebook's code-base. Googling for `timeout_hint_us` also doesn't yield any users. Test Plan: make check Reviewers: anthony, kradhakrishnan, sdong, yhchiang Reviewed By: yhchiang Subscribers: sdong, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41937 --- HISTORY.md | 5 + db/db_impl.cc | 84 +++-------- db/db_impl.h | 2 +- db/db_impl_debug.cc | 4 +- db/db_test.cc | 263 +++++++++-------------------------- db/listener_test.cc | 18 +-- db/write_thread.cc | 50 +------ db/write_thread.h | 6 +- include/rocksdb/options.h | 10 +- include/rocksdb/statistics.h | 1 - include/rocksdb/status.h | 6 - util/status.cc | 3 - 12 files changed, 106 insertions(+), 346 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index c67620383..1a31479e3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,10 @@ # Rocksdb Change Log +## Unreleased + +### Public API changes +* Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it. + ## 3.12.0 (7/2/2015) ### New Features * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. diff --git a/db/db_impl.cc b/db/db_impl.cc index ff679c69a..afae447df 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2019,8 +2019,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } WriteThread::Writer w(&mutex_); - s = write_thread_.EnterWriteThread(&w, 0); - assert(s.ok() && !w.done); // No timeout and nobody should do our job + write_thread_.EnterWriteThread(&w); + assert(!w.done); // Nobody should do our job // SwitchMemtable() will release and reacquire mutex // during execution @@ -3014,8 +3014,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, Options opt(db_options_, cf_options); { // write thread WriteThread::Writer w(&mutex_); - s = write_thread_.EnterWriteThread(&w, 0); - assert(s.ok() && !w.done); // No timeout and nobody should do our job + write_thread_.EnterWriteThread(&w); + assert(!w.done); // Nobody should do our job // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object s = versions_->LogAndApply( @@ -3076,8 +3076,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { if (s.ok()) { // we drop column family from a single write thread WriteThread::Writer w(&mutex_); - s = write_thread_.EnterWriteThread(&w, 0); - assert(s.ok() && !w.done); // No timeout and nobody should do our job + write_thread_.EnterWriteThread(&w); + assert(!w.done); // Nobody should do our job s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_); write_thread_.ExitWriteThread(&w, &w, s); @@ -3364,6 +3364,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } + if (write_options.timeout_hint_us != 0) { + return Status::InvalidArgument("timeout_hint_us is deprecated"); + } Status status; bool xfunc_attempted_write = false; @@ -3384,16 +3387,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.in_batch_group = false; w.done = false; w.has_callback = (callback != nullptr) ? true : false; - w.timeout_hint_us = write_options.timeout_hint_us; - - uint64_t expiration_time = 0; - bool has_timeout = false; - if (w.timeout_hint_us == 0) { - w.timeout_hint_us = WriteThread::kNoTimeOut; - } else { - expiration_time = env_->NowMicros() + w.timeout_hint_us; - has_timeout = true; - } if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); @@ -3408,13 +3401,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); } - status = write_thread_.EnterWriteThread(&w, expiration_time); - assert(status.ok() || status.IsTimedOut()); - if (status.IsTimedOut()) { - mutex_.Unlock(); - RecordTick(stats_, WRITE_TIMEDOUT); - return Status::TimedOut(); - } + write_thread_.EnterWriteThread(&w); if (w.done) { // write was done by someone else default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, 1); @@ -3500,15 +3487,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. - status = DelayWrite(last_batch_group_size_, expiration_time); + status = DelayWrite(last_batch_group_size_); PERF_TIMER_START(write_pre_and_post_process_time); } - if (UNLIKELY(status.ok() && has_timeout && - env_->NowMicros() > expiration_time)) { - status = Status::TimedOut(); - } - uint64_t last_sequence = versions_->LastSequence(); WriteThread::Writer* last_writer = &w; autovector write_batch_group; @@ -3627,7 +3609,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Lock(); } - if (db_options_.paranoid_checks && !status.ok() && !status.IsTimedOut() && + if (db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } @@ -3637,56 +3619,30 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Unlock(); - if (status.IsTimedOut()) { - RecordTick(stats_, WRITE_TIMEDOUT); - } - return status; } // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { +Status DBImpl::DelayWrite(uint64_t num_bytes) { uint64_t time_delayed = 0; bool delayed = false; - bool timed_out = false; { StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); - bool has_timeout = (expiration_time > 0); auto delay = write_controller_.GetDelay(env_, num_bytes); if (delay > 0) { mutex_.Unlock(); delayed = true; - // hopefully we don't have to sleep more than 2 billion microseconds TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); - if (has_timeout) { - auto time_now = env_->NowMicros(); - if (time_now + delay >= expiration_time) { - if (expiration_time > time_now) { - env_->SleepForMicroseconds( - static_cast(expiration_time - time_now)); - } - timed_out = true; - } - } - if (!timed_out) { - env_->SleepForMicroseconds(static_cast(delay)); - } + // hopefully we don't have to sleep more than 2 billion microseconds + env_->SleepForMicroseconds(static_cast(delay)); mutex_.Lock(); } - while (!timed_out && bg_error_.ok() && write_controller_.IsStopped()) { + while (bg_error_.ok() && write_controller_.IsStopped()) { delayed = true; - if (has_timeout) { - TEST_SYNC_POINT("DBImpl::DelayWrite:TimedWait"); - bg_cv_.TimedWait(expiration_time); - if (env_->NowMicros() > expiration_time) { - timed_out = true; - break; - } - } else { - bg_cv_.Wait(); - } + TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); + bg_cv_.Wait(); } } if (delayed) { @@ -3695,10 +3651,6 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { RecordTick(stats_, STALL_MICROS, time_delayed); } - if (timed_out) { - return Status::TimedOut(); - } - return bg_error_; } diff --git a/db/db_impl.h b/db/db_impl.h index 711211fa6..39c52fe29 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -437,7 +437,7 @@ class DBImpl : public DB { // num_bytes: for slowdown case, delay time is calculated based on // `num_bytes` going through. - Status DelayWrite(uint64_t num_bytes, uint64_t expiration_time); + Status DelayWrite(uint64_t num_bytes); Status ScheduleFlushes(WriteContext* context); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 66177ed7a..e99b5983f 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -132,8 +132,8 @@ void DBImpl::TEST_UnlockMutex() { void* DBImpl::TEST_BeginWrite() { auto w = new WriteThread::Writer(&mutex_); - Status s = write_thread_.EnterWriteThread(w, 0); - assert(s.ok() && !w->done); // No timeout and nobody should do our job + write_thread_.EnterWriteThread(w); + assert(!w->done); // Nobody should do our job return reinterpret_cast(w); } diff --git a/db/db_test.cc b/db/db_test.cc index 9b4a83951..b2745b8b7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1998,6 +1998,16 @@ class SleepingBackgroundTask { bg_cv_.Wait(); } } + bool WokenUp() { + MutexLock l(&mutex_); + return should_sleep_ == false; + } + + void Reset() { + MutexLock l(&mutex_); + should_sleep_ = true; + done_with_sleep_ = false; + } static void DoSleepTask(void* arg) { reinterpret_cast(arg)->DoSleep(); @@ -9152,157 +9162,15 @@ TEST_F(DBTest, FIFOCompactionTest) { } } +// verify that we correctly deprecated timeout_hint_us TEST_F(DBTest, SimpleWriteTimeoutTest) { - // Block compaction thread, which will also block the flushes because - // max_background_flushes == 0, so flushes are getting executed by the - // compaction thread - env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, - Env::Priority::LOW); - - Options options; - options.env = env_; - options.create_if_missing = true; - options.write_buffer_size = 100000; - options.max_background_flushes = 0; - options.max_write_buffer_number = 2; - options.max_total_wal_size = std::numeric_limits::max(); WriteOptions write_opt; write_opt.timeout_hint_us = 0; - DestroyAndReopen(options); - // fill the two write buffers - ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt)); - ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt)); - // As the only two write buffers are full in this moment, the third - // Put is expected to be timed-out. - write_opt.timeout_hint_us = 50; - ASSERT_TRUE( - Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut()); - - sleeping_task_low.WakeUp(); - sleeping_task_low.WaitUntilDone(); + ASSERT_OK(Put(Key(1), Key(1) + std::string(100, 'v'), write_opt)); + write_opt.timeout_hint_us = 10; + ASSERT_NOK(Put(Key(1), Key(1) + std::string(100, 'v'), write_opt)); } -// Multi-threaded Timeout Test -namespace { - -static const int kValueSize = 1000; -static const int kWriteBufferSize = 100000; - -struct TimeoutWriterState { - int id; - DB* db; - std::atomic done; - std::map success_kvs; -}; - -static void RandomTimeoutWriter(void* arg) { - TimeoutWriterState* state = reinterpret_cast(arg); - static const uint64_t kTimerBias = 50; - int thread_id = state->id; - DB* db = state->db; - - Random rnd(1000 + thread_id); - WriteOptions write_opt; - write_opt.timeout_hint_us = 500; - int timeout_count = 0; - int num_keys = kNumKeys * 5; - - for (int k = 0; k < num_keys; ++k) { - int key = k + thread_id * num_keys; - std::string value = DBTestBase::RandomString(&rnd, kValueSize); - // only the second-half is randomized - if (k > num_keys / 2) { - switch (rnd.Next() % 5) { - case 0: - write_opt.timeout_hint_us = 500 * thread_id; - break; - case 1: - write_opt.timeout_hint_us = num_keys - k; - break; - case 2: - write_opt.timeout_hint_us = 1; - break; - default: - write_opt.timeout_hint_us = 0; - state->success_kvs.insert({key, value}); - } - } - - uint64_t time_before_put = db->GetEnv()->NowMicros(); - Status s = db->Put(write_opt, DBTestBase::Key(key), value); - uint64_t put_duration = db->GetEnv()->NowMicros() - time_before_put; - if (write_opt.timeout_hint_us == 0 || - put_duration + kTimerBias < write_opt.timeout_hint_us) { - ASSERT_OK(s); - } - if (s.IsTimedOut()) { - timeout_count++; - ASSERT_GT(put_duration + kTimerBias, write_opt.timeout_hint_us); - } - } - - state->done = true; -} - -TEST_F(DBTest, MTRandomTimeoutTest) { - Options options; - options.env = env_; - options.create_if_missing = true; - options.max_write_buffer_number = 2; - options.compression = kNoCompression; - options.level0_slowdown_writes_trigger = 10; - options.level0_stop_writes_trigger = 20; - options.write_buffer_size = kWriteBufferSize; - DestroyAndReopen(options); - - TimeoutWriterState thread_states[kNumThreads]; - for (int tid = 0; tid < kNumThreads; ++tid) { - thread_states[tid].id = tid; - thread_states[tid].db = db_; - thread_states[tid].done = false; - env_->StartThread(RandomTimeoutWriter, &thread_states[tid]); - } - - for (int tid = 0; tid < kNumThreads; ++tid) { - while (thread_states[tid].done == false) { - env_->SleepForMicroseconds(100000); - } - } - - Flush(); - - for (int tid = 0; tid < kNumThreads; ++tid) { - auto& success_kvs = thread_states[tid].success_kvs; - for (auto it = success_kvs.begin(); it != success_kvs.end(); ++it) { - ASSERT_EQ(Get(Key(it->first)), it->second); - } - } -} - -TEST_F(DBTest, Level0StopWritesTest) { - Options options = CurrentOptions(); - options.level0_slowdown_writes_trigger = 2; - options.level0_stop_writes_trigger = 4; - options.disable_auto_compactions = true; - options.max_mem_compaction_level = 0; - Reopen(options); - - // create 4 level0 tables - for (int i = 0; i < 4; ++i) { - Put("a", "b"); - Flush(); - } - - WriteOptions woptions; - woptions.timeout_hint_us = 30 * 1000; // 30 ms - Status s = Put("a", "b", woptions); - ASSERT_TRUE(s.IsTimedOut()); -} - -} // anonymous namespace - /* * This test is not reliable enough as it heavily depends on disk behavior. */ @@ -9896,8 +9764,8 @@ TEST_F(DBTest, DynamicMemtableOptions) { // max_background_flushes == 0, so flushes are getting executed by the // compaction thread env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low1; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, + SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); // Start from scratch and disable compaction/flush. Flush can only happen // during compaction but trigger is pretty high @@ -9905,28 +9773,24 @@ TEST_F(DBTest, DynamicMemtableOptions) { options.disable_auto_compactions = true; DestroyAndReopen(options); - // Put until timeout, bounded by 256 puts. We should see timeout at ~128KB + // Put until writes are stopped, bounded by 256 puts. We should see stop at + // ~128KB int count = 0; Random rnd(301); - WriteOptions wo; - wo.timeout_hint_us = 100000; // Reasonabley long timeout to make sure sleep - // triggers but not forever. - std::atomic sleep_count(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::DelayWrite:TimedWait", - [&](void* arg) { sleep_count.fetch_add(1); }); + "DBImpl::DelayWrite:Wait", + [&](void* arg) { sleeping_task_low.WakeUp(); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) { + while (!sleeping_task_low.WokenUp() && count < 256) { + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions())); count++; } - ASSERT_GT(sleep_count.load(), 0); ASSERT_GT(static_cast(count), 128 * 0.8); ASSERT_LT(static_cast(count), 128 * 1.2); - sleeping_task_low1.WakeUp(); - sleeping_task_low1.WaitUntilDone(); + sleeping_task_low.WaitUntilDone(); // Increase ASSERT_OK(dbfull()->SetOptions({ @@ -9935,23 +9799,21 @@ TEST_F(DBTest, DynamicMemtableOptions) { // Clean up memtable and L0 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); - SleepingBackgroundTask sleeping_task_low2; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, + sleeping_task_low.Reset(); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; - sleep_count.store(0); - while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { + while (!sleeping_task_low.WokenUp() && count < 1024) { + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions())); count++; } - ASSERT_GT(sleep_count.load(), 0); -// Windows fails this test. Will tune in the future and figure out -// approp number + // Windows fails this test. Will tune in the future and figure out + // approp number #ifndef OS_WIN ASSERT_GT(static_cast(count), 512 * 0.8); ASSERT_LT(static_cast(count), 512 * 1.2); #endif - sleeping_task_low2.WakeUp(); - sleeping_task_low2.WaitUntilDone(); + sleeping_task_low.WaitUntilDone(); // Decrease ASSERT_OK(dbfull()->SetOptions({ @@ -9960,24 +9822,22 @@ TEST_F(DBTest, DynamicMemtableOptions) { // Clean up memtable and L0 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); - SleepingBackgroundTask sleeping_task_low3; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3, + sleeping_task_low.Reset(); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; - sleep_count.store(0); - while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { + while (!sleeping_task_low.WokenUp() && count < 1024) { + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions())); count++; } - ASSERT_GT(sleep_count.load(), 0); -// Windows fails this test. Will tune in the future and figure out -// approp number + // Windows fails this test. Will tune in the future and figure out + // approp number #ifndef OS_WIN ASSERT_GT(static_cast(count), 256 * 0.8); ASSERT_LT(static_cast(count), 266 * 1.2); #endif - sleeping_task_low3.WakeUp(); - sleeping_task_low3.WaitUntilDone(); + sleeping_task_low.WaitUntilDone(); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } @@ -10760,50 +10620,61 @@ TEST_F(DBTest, DynamicCompactionOptions) { // Test level0_stop_writes_trigger. // Clean up memtable and L0. Block compaction threads. If continue to write - // and flush memtables. We should see put timeout after 8 memtable flushes + // and flush memtables. We should see put stop after 8 memtable flushes // since level0_stop_writes_trigger = 8 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // Block compaction - SleepingBackgroundTask sleeping_task_low1; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, + SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Wait", + [&](void* arg) { sleeping_task_low.WakeUp(); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); int count = 0; Random rnd(301); WriteOptions wo; - wo.timeout_hint_us = 10000; - while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { + while (count < 64) { + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); + if (sleeping_task_low.WokenUp()) { + break; + } dbfull()->TEST_FlushMemTable(true); count++; } // Stop trigger = 8 ASSERT_EQ(count, 8); // Unblock - sleeping_task_low1.WakeUp(); - sleeping_task_low1.WaitUntilDone(); + sleeping_task_low.WaitUntilDone(); // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0. // Block compaction thread again. Perform the put and memtable flushes - // until we see timeout after 6 memtable flushes. + // until we see the stop after 6 memtable flushes. ASSERT_OK(dbfull()->SetOptions({ {"level0_stop_writes_trigger", "6"} })); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); - // Block compaction - SleepingBackgroundTask sleeping_task_low2; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, + // Block compaction again + sleeping_task_low.Reset(); + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; - while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { + while (count < 64) { + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); + if (sleeping_task_low.WokenUp()) { + break; + } dbfull()->TEST_FlushMemTable(true); count++; } ASSERT_EQ(count, 6); // Unblock - sleeping_task_low2.WakeUp(); - sleeping_task_low2.WaitUntilDone(); + sleeping_task_low.WaitUntilDone(); // Test disable_auto_compactions // Compaction thread is unblocked but auto compaction is disabled. Write @@ -10818,7 +10689,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { for (int i = 0; i < 4; ++i) { ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); - // Wait for compaction so that put won't timeout + // Wait for compaction so that put won't stop dbfull()->TEST_FlushMemTable(true); } dbfull()->TEST_WaitForCompact(); @@ -10834,7 +10705,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { for (int i = 0; i < 4; ++i) { ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); - // Wait for compaction so that put won't timeout + // Wait for compaction so that put won't stop dbfull()->TEST_FlushMemTable(true); } dbfull()->TEST_WaitForCompact(); @@ -10877,6 +10748,8 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_EQ(NumTableFilesAtLevel(0), 1); ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_EQ(NumTableFilesAtLevel(2), 1); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(DBTest, FileCreationRandomFailure) { @@ -12265,10 +12138,6 @@ TEST_F(DBTest, DelayedWriteRate) { // Spread the size range to more. size_t entry_size = rand_num * rand_num * rand_num; WriteOptions wo; - // For a small chance, set a timeout. - if (rnd.Uniform(20) == 6) { - wo.timeout_hint_us = 1500; - } Put(Key(i), std::string(entry_size, 'x'), wo); estimated_total_size += entry_size + 20; // Ocassionally sleep a while diff --git a/db/listener_test.cc b/db/listener_test.cc index cc7c4547c..f15f74b2d 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -438,10 +438,13 @@ TEST_F(EventListenerTest, DisableBGCompaction) { options.enable_thread_tracking = true; #endif // ROCKSDB_USING_THREAD_STATUS TestFlushListener* listener = new TestFlushListener(options.env); + const int kCompactionTrigger = 1; const int kSlowdownTrigger = 5; - const int kStopTrigger = 10; + const int kStopTrigger = 100; + options.level0_file_num_compaction_trigger = kCompactionTrigger; options.level0_slowdown_writes_trigger = kSlowdownTrigger; options.level0_stop_writes_trigger = kStopTrigger; + options.max_write_buffer_number = 10; options.listeners.emplace_back(listener); // BG compaction is disabled. Number of L0 files will simply keeps // increasing in this test. @@ -450,18 +453,17 @@ TEST_F(EventListenerTest, DisableBGCompaction) { options.write_buffer_size = 100000; // Small write buffer CreateAndReopenWithCF({"pikachu"}, &options); - WriteOptions wopts; - wopts.timeout_hint_us = 100000; ColumnFamilyMetaData cf_meta; db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); + // keep writing until writes are forced to stop. - for (int i = 0; static_cast(cf_meta.file_count) < kStopTrigger; ++i) { - Put(1, ToString(i), std::string(100000, 'x'), wopts); - db_->Flush(FlushOptions()); + for (int i = 0; static_cast(cf_meta.file_count) < kSlowdownTrigger * 10; + ++i) { + Put(1, ToString(i), std::string(10000, 'x'), WriteOptions()); + db_->Flush(FlushOptions(), handles_[1]); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); } - ASSERT_GE(listener->slowdown_count, kStopTrigger - kSlowdownTrigger); - ASSERT_GE(listener->stop_count, 1); + ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); } } // namespace rocksdb diff --git a/db/write_thread.cc b/db/write_thread.cc index c77df9253..7a9d16003 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -7,8 +7,7 @@ namespace rocksdb { -Status WriteThread::EnterWriteThread(WriteThread::Writer* w, - uint64_t expiration_time) { +void WriteThread::EnterWriteThread(WriteThread::Writer* w) { // the following code block pushes the current writer "w" into the writer // queue "writers_" and wait until one of the following conditions met: // 1. the job of "w" has been done by some other writers. @@ -16,48 +15,9 @@ Status WriteThread::EnterWriteThread(WriteThread::Writer* w, // 3. "w" timed-out. writers_.push_back(w); - bool timed_out = false; while (!w->done && w != writers_.front()) { - if (expiration_time == 0) { - w->cv.Wait(); - } else if (w->cv.TimedWait(expiration_time)) { - if (w->in_batch_group) { - // then it means the front writer is currently doing the - // write on behalf of this "timed-out" writer. Then it - // should wait until the write completes. - expiration_time = 0; - } else { - timed_out = true; - break; - } - } - } - - if (timed_out) { -#ifndef NDEBUG - bool found = false; -#endif - for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { - if (*iter == w) { - writers_.erase(iter); -#ifndef NDEBUG - found = true; -#endif - break; - } - } -#ifndef NDEBUG - assert(found); -#endif - // writers_.front() might still be in cond_wait without a time-out. - // As a result, we need to signal it to wake it up. Otherwise no - // one else will wake him up, and RocksDB will hang. - if (!writers_.empty()) { - writers_.front()->cv.Signal(); - } - return Status::TimedOut(); + w->cv.Wait(); } - return Status::OK(); } void WriteThread::ExitWriteThread(WriteThread::Writer* w, @@ -128,12 +88,6 @@ size_t WriteThread::BuildBatchGroup( break; } - if (w->timeout_hint_us < first->timeout_hint_us) { - // Do not include those writes with shorter timeout. Otherwise, we might - // execute a write that should instead be aborted because of timeout. - break; - } - if (w->has_callback) { // Do not include writes which may be aborted if the callback does not // succeed. diff --git a/db/write_thread.h b/db/write_thread.h index 31870d9e1..a4a392246 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -18,7 +18,6 @@ namespace rocksdb { class WriteThread { public: - static const uint64_t kNoTimeOut = port::kMaxUint64; // Information kept for every waiting writer struct Writer { Status status; @@ -28,7 +27,6 @@ class WriteThread { bool in_batch_group; bool done; bool has_callback; - uint64_t timeout_hint_us; InstrumentedCondVar cv; explicit Writer(InstrumentedMutex* mu) @@ -38,7 +36,6 @@ class WriteThread { in_batch_group(false), done(false), has_callback(false), - timeout_hint_us(kNoTimeOut), cv(mu) {} }; @@ -52,10 +49,9 @@ class WriteThread { // for examples), so check it via w.done before applying changes. // // Writer* w: writer to be placed in the queue - // uint64_t expiration_time: maximum time to be in the queue // See also: ExitWriteThread // REQUIRES: db mutex held - Status EnterWriteThread(Writer* w, uint64_t expiration_time); + void EnterWriteThread(Writer* w); // After doing write job, we need to remove already used writers from // writers_ queue and notify head of the queue about it. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8d2b23a88..8cb0d1215 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1212,15 +1212,7 @@ struct WriteOptions { // and the write may got lost after a crash. bool disableWAL; - // If non-zero, then associated write waiting longer than the specified - // time MAY be aborted and returns Status::TimedOut. A write that takes - // less than the specified time is guaranteed to not fail with - // Status::TimedOut. - // - // The number of times a write call encounters a timeout is recorded in - // Statistics.WRITE_TIMEDOUT - // - // Default: 0 + // The option is deprecated. It's not used anymore. uint64_t timeout_hint_us; // If true and if user is trying to write to column families that don't exist diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 82405276d..b7644f1e3 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -206,7 +206,6 @@ const std::vector> TickersNameMap = { {WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"}, {WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, - {WRITE_TIMEDOUT, "rocksdb.write.timedout"}, {WRITE_WITH_WAL, "rocksdb.write.wal"}, {FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index b50be2042..9fe0c9a85 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -68,12 +68,6 @@ class Status { const Slice& msg2 = Slice()) { return Status(kShutdownInProgress, msg, msg2); } - static Status TimedOut() { - return Status(kTimedOut); - } - static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) { - return Status(kTimedOut, msg, msg2); - } static Status Aborted() { return Status(kAborted); } diff --git a/util/status.cc b/util/status.cc index 3fe292dd3..d956eb476 100644 --- a/util/status.cc +++ b/util/status.cc @@ -67,9 +67,6 @@ std::string Status::ToString() const { case kShutdownInProgress: type = "Shutdown in progress: "; break; - case kTimedOut: - type = "Operation timed out: "; - break; case kAborted: type = "Operation aborted: "; break;