Deprecate WriteOptions::timeout_hint_us

Summary:
In one of our recent meetings, we discussed deprecating features that are not being actively used. One of those features, at least within Facebook, is timeout_hint. The feature is really nicely implemented, but if nobody needs it, we should remove it from our code-base (until we get a valid use-case). Some arguments:
* Less code == better icache hit rate, smaller builds, simpler code
* The motivation for adding timeout_hint_us was to work-around RocksDB's stall issue. However, we're currently addressing the stall issue itself (see @sdong's recent work on stall write_rate), so we should never see sharp lock-ups in the future.
* Nobody is using the feature within Facebook's code-base. Googling for `timeout_hint_us` also doesn't yield any users.

Test Plan: make check

Reviewers: anthony, kradhakrishnan, sdong, yhchiang

Reviewed By: yhchiang

Subscribers: sdong, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D41937
main
Igor Canadi 10 years ago
parent ae29495e4b
commit 5aea98ddd8
  1. 5
      HISTORY.md
  2. 84
      db/db_impl.cc
  3. 2
      db/db_impl.h
  4. 4
      db/db_impl_debug.cc
  5. 263
      db/db_test.cc
  6. 18
      db/listener_test.cc
  7. 50
      db/write_thread.cc
  8. 6
      db/write_thread.h
  9. 10
      include/rocksdb/options.h
  10. 1
      include/rocksdb/statistics.h
  11. 6
      include/rocksdb/status.h
  12. 3
      util/status.cc

@ -1,5 +1,10 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Public API changes
* Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it.
## 3.12.0 (7/2/2015) ## 3.12.0 (7/2/2015)
### New Features ### New Features
* Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info.

@ -2019,8 +2019,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
} }
WriteThread::Writer w(&mutex_); WriteThread::Writer w(&mutex_);
s = write_thread_.EnterWriteThread(&w, 0); write_thread_.EnterWriteThread(&w);
assert(s.ok() && !w.done); // No timeout and nobody should do our job assert(!w.done); // Nobody should do our job
// SwitchMemtable() will release and reacquire mutex // SwitchMemtable() will release and reacquire mutex
// during execution // during execution
@ -3014,8 +3014,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
Options opt(db_options_, cf_options); Options opt(db_options_, cf_options);
{ // write thread { // write thread
WriteThread::Writer w(&mutex_); WriteThread::Writer w(&mutex_);
s = write_thread_.EnterWriteThread(&w, 0); write_thread_.EnterWriteThread(&w);
assert(s.ok() && !w.done); // No timeout and nobody should do our job assert(!w.done); // Nobody should do our job
// LogAndApply will both write the creation in MANIFEST and create // LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object // ColumnFamilyData object
s = versions_->LogAndApply( s = versions_->LogAndApply(
@ -3076,8 +3076,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
if (s.ok()) { if (s.ok()) {
// we drop column family from a single write thread // we drop column family from a single write thread
WriteThread::Writer w(&mutex_); WriteThread::Writer w(&mutex_);
s = write_thread_.EnterWriteThread(&w, 0); write_thread_.EnterWriteThread(&w);
assert(s.ok() && !w.done); // No timeout and nobody should do our job assert(!w.done); // Nobody should do our job
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_); &edit, &mutex_);
write_thread_.ExitWriteThread(&w, &w, s); write_thread_.ExitWriteThread(&w, &w, s);
@ -3364,6 +3364,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (my_batch == nullptr) { if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!"); return Status::Corruption("Batch is nullptr!");
} }
if (write_options.timeout_hint_us != 0) {
return Status::InvalidArgument("timeout_hint_us is deprecated");
}
Status status; Status status;
bool xfunc_attempted_write = false; bool xfunc_attempted_write = false;
@ -3384,16 +3387,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.in_batch_group = false; w.in_batch_group = false;
w.done = false; w.done = false;
w.has_callback = (callback != nullptr) ? true : false; w.has_callback = (callback != nullptr) ? true : false;
w.timeout_hint_us = write_options.timeout_hint_us;
uint64_t expiration_time = 0;
bool has_timeout = false;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = WriteThread::kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
has_timeout = true;
}
if (!write_options.disableWAL) { if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL); RecordTick(stats_, WRITE_WITH_WAL);
@ -3408,13 +3401,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
} }
status = write_thread_.EnterWriteThread(&w, expiration_time); write_thread_.EnterWriteThread(&w);
assert(status.ok() || status.IsTimedOut());
if (status.IsTimedOut()) {
mutex_.Unlock();
RecordTick(stats_, WRITE_TIMEDOUT);
return Status::TimedOut();
}
if (w.done) { // write was done by someone else if (w.done) { // write was done by someone else
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
1); 1);
@ -3500,15 +3487,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// for previous one. It might create a fairness issue that expiration // for previous one. It might create a fairness issue that expiration
// might happen for smaller writes but larger writes can go through. // might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue. // Can optimize it if it is an issue.
status = DelayWrite(last_batch_group_size_, expiration_time); status = DelayWrite(last_batch_group_size_);
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
} }
if (UNLIKELY(status.ok() && has_timeout &&
env_->NowMicros() > expiration_time)) {
status = Status::TimedOut();
}
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
WriteThread::Writer* last_writer = &w; WriteThread::Writer* last_writer = &w;
autovector<WriteBatch*> write_batch_group; autovector<WriteBatch*> write_batch_group;
@ -3627,7 +3609,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.Lock(); mutex_.Lock();
} }
if (db_options_.paranoid_checks && !status.ok() && !status.IsTimedOut() && if (db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && bg_error_.ok()) { !status.IsBusy() && bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes bg_error_ = status; // stop compaction & fail any further writes
} }
@ -3637,56 +3619,30 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.Unlock(); mutex_.Unlock();
if (status.IsTimedOut()) {
RecordTick(stats_, WRITE_TIMEDOUT);
}
return status; return status;
} }
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { Status DBImpl::DelayWrite(uint64_t num_bytes) {
uint64_t time_delayed = 0; uint64_t time_delayed = 0;
bool delayed = false; bool delayed = false;
bool timed_out = false;
{ {
StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
bool has_timeout = (expiration_time > 0);
auto delay = write_controller_.GetDelay(env_, num_bytes); auto delay = write_controller_.GetDelay(env_, num_bytes);
if (delay > 0) { if (delay > 0) {
mutex_.Unlock(); mutex_.Unlock();
delayed = true; delayed = true;
// hopefully we don't have to sleep more than 2 billion microseconds
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
if (has_timeout) { // hopefully we don't have to sleep more than 2 billion microseconds
auto time_now = env_->NowMicros(); env_->SleepForMicroseconds(static_cast<int>(delay));
if (time_now + delay >= expiration_time) {
if (expiration_time > time_now) {
env_->SleepForMicroseconds(
static_cast<int>(expiration_time - time_now));
}
timed_out = true;
}
}
if (!timed_out) {
env_->SleepForMicroseconds(static_cast<int>(delay));
}
mutex_.Lock(); mutex_.Lock();
} }
while (!timed_out && bg_error_.ok() && write_controller_.IsStopped()) { while (bg_error_.ok() && write_controller_.IsStopped()) {
delayed = true; delayed = true;
if (has_timeout) { TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
TEST_SYNC_POINT("DBImpl::DelayWrite:TimedWait"); bg_cv_.Wait();
bg_cv_.TimedWait(expiration_time);
if (env_->NowMicros() > expiration_time) {
timed_out = true;
break;
}
} else {
bg_cv_.Wait();
}
} }
} }
if (delayed) { if (delayed) {
@ -3695,10 +3651,6 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) {
RecordTick(stats_, STALL_MICROS, time_delayed); RecordTick(stats_, STALL_MICROS, time_delayed);
} }
if (timed_out) {
return Status::TimedOut();
}
return bg_error_; return bg_error_;
} }

@ -437,7 +437,7 @@ class DBImpl : public DB {
// num_bytes: for slowdown case, delay time is calculated based on // num_bytes: for slowdown case, delay time is calculated based on
// `num_bytes` going through. // `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, uint64_t expiration_time); Status DelayWrite(uint64_t num_bytes);
Status ScheduleFlushes(WriteContext* context); Status ScheduleFlushes(WriteContext* context);

@ -132,8 +132,8 @@ void DBImpl::TEST_UnlockMutex() {
void* DBImpl::TEST_BeginWrite() { void* DBImpl::TEST_BeginWrite() {
auto w = new WriteThread::Writer(&mutex_); auto w = new WriteThread::Writer(&mutex_);
Status s = write_thread_.EnterWriteThread(w, 0); write_thread_.EnterWriteThread(w);
assert(s.ok() && !w->done); // No timeout and nobody should do our job assert(!w->done); // Nobody should do our job
return reinterpret_cast<void*>(w); return reinterpret_cast<void*>(w);
} }

@ -1998,6 +1998,16 @@ class SleepingBackgroundTask {
bg_cv_.Wait(); bg_cv_.Wait();
} }
} }
bool WokenUp() {
MutexLock l(&mutex_);
return should_sleep_ == false;
}
void Reset() {
MutexLock l(&mutex_);
should_sleep_ = true;
done_with_sleep_ = false;
}
static void DoSleepTask(void* arg) { static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep(); reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
@ -9152,157 +9162,15 @@ TEST_F(DBTest, FIFOCompactionTest) {
} }
} }
// verify that we correctly deprecated timeout_hint_us
TEST_F(DBTest, SimpleWriteTimeoutTest) { TEST_F(DBTest, SimpleWriteTimeoutTest) {
// Block compaction thread, which will also block the flushes because
// max_background_flushes == 0, so flushes are getting executed by the
// compaction thread
env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
Options options;
options.env = env_;
options.create_if_missing = true;
options.write_buffer_size = 100000;
options.max_background_flushes = 0;
options.max_write_buffer_number = 2;
options.max_total_wal_size = std::numeric_limits<uint64_t>::max();
WriteOptions write_opt; WriteOptions write_opt;
write_opt.timeout_hint_us = 0; write_opt.timeout_hint_us = 0;
DestroyAndReopen(options); ASSERT_OK(Put(Key(1), Key(1) + std::string(100, 'v'), write_opt));
// fill the two write buffers write_opt.timeout_hint_us = 10;
ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt)); ASSERT_NOK(Put(Key(1), Key(1) + std::string(100, 'v'), write_opt));
ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt));
// As the only two write buffers are full in this moment, the third
// Put is expected to be timed-out.
write_opt.timeout_hint_us = 50;
ASSERT_TRUE(
Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut());
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
} }
// Multi-threaded Timeout Test
namespace {
static const int kValueSize = 1000;
static const int kWriteBufferSize = 100000;
struct TimeoutWriterState {
int id;
DB* db;
std::atomic<bool> done;
std::map<int, std::string> success_kvs;
};
static void RandomTimeoutWriter(void* arg) {
TimeoutWriterState* state = reinterpret_cast<TimeoutWriterState*>(arg);
static const uint64_t kTimerBias = 50;
int thread_id = state->id;
DB* db = state->db;
Random rnd(1000 + thread_id);
WriteOptions write_opt;
write_opt.timeout_hint_us = 500;
int timeout_count = 0;
int num_keys = kNumKeys * 5;
for (int k = 0; k < num_keys; ++k) {
int key = k + thread_id * num_keys;
std::string value = DBTestBase::RandomString(&rnd, kValueSize);
// only the second-half is randomized
if (k > num_keys / 2) {
switch (rnd.Next() % 5) {
case 0:
write_opt.timeout_hint_us = 500 * thread_id;
break;
case 1:
write_opt.timeout_hint_us = num_keys - k;
break;
case 2:
write_opt.timeout_hint_us = 1;
break;
default:
write_opt.timeout_hint_us = 0;
state->success_kvs.insert({key, value});
}
}
uint64_t time_before_put = db->GetEnv()->NowMicros();
Status s = db->Put(write_opt, DBTestBase::Key(key), value);
uint64_t put_duration = db->GetEnv()->NowMicros() - time_before_put;
if (write_opt.timeout_hint_us == 0 ||
put_duration + kTimerBias < write_opt.timeout_hint_us) {
ASSERT_OK(s);
}
if (s.IsTimedOut()) {
timeout_count++;
ASSERT_GT(put_duration + kTimerBias, write_opt.timeout_hint_us);
}
}
state->done = true;
}
TEST_F(DBTest, MTRandomTimeoutTest) {
Options options;
options.env = env_;
options.create_if_missing = true;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
options.level0_slowdown_writes_trigger = 10;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
DestroyAndReopen(options);
TimeoutWriterState thread_states[kNumThreads];
for (int tid = 0; tid < kNumThreads; ++tid) {
thread_states[tid].id = tid;
thread_states[tid].db = db_;
thread_states[tid].done = false;
env_->StartThread(RandomTimeoutWriter, &thread_states[tid]);
}
for (int tid = 0; tid < kNumThreads; ++tid) {
while (thread_states[tid].done == false) {
env_->SleepForMicroseconds(100000);
}
}
Flush();
for (int tid = 0; tid < kNumThreads; ++tid) {
auto& success_kvs = thread_states[tid].success_kvs;
for (auto it = success_kvs.begin(); it != success_kvs.end(); ++it) {
ASSERT_EQ(Get(Key(it->first)), it->second);
}
}
}
TEST_F(DBTest, Level0StopWritesTest) {
Options options = CurrentOptions();
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 4;
options.disable_auto_compactions = true;
options.max_mem_compaction_level = 0;
Reopen(options);
// create 4 level0 tables
for (int i = 0; i < 4; ++i) {
Put("a", "b");
Flush();
}
WriteOptions woptions;
woptions.timeout_hint_us = 30 * 1000; // 30 ms
Status s = Put("a", "b", woptions);
ASSERT_TRUE(s.IsTimedOut());
}
} // anonymous namespace
/* /*
* This test is not reliable enough as it heavily depends on disk behavior. * This test is not reliable enough as it heavily depends on disk behavior.
*/ */
@ -9896,8 +9764,8 @@ TEST_F(DBTest, DynamicMemtableOptions) {
// max_background_flushes == 0, so flushes are getting executed by the // max_background_flushes == 0, so flushes are getting executed by the
// compaction thread // compaction thread
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low1; SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
// Start from scratch and disable compaction/flush. Flush can only happen // Start from scratch and disable compaction/flush. Flush can only happen
// during compaction but trigger is pretty high // during compaction but trigger is pretty high
@ -9905,28 +9773,24 @@ TEST_F(DBTest, DynamicMemtableOptions) {
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
DestroyAndReopen(options); DestroyAndReopen(options);
// Put until timeout, bounded by 256 puts. We should see timeout at ~128KB // Put until writes are stopped, bounded by 256 puts. We should see stop at
// ~128KB
int count = 0; int count = 0;
Random rnd(301); Random rnd(301);
WriteOptions wo;
wo.timeout_hint_us = 100000; // Reasonabley long timeout to make sure sleep
// triggers but not forever.
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:TimedWait", "DBImpl::DelayWrite:Wait",
[&](void* arg) { sleep_count.fetch_add(1); }); [&](void* arg) { sleeping_task_low.WakeUp(); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) { while (!sleeping_task_low.WokenUp() && count < 256) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
count++; count++;
} }
ASSERT_GT(sleep_count.load(), 0);
ASSERT_GT(static_cast<double>(count), 128 * 0.8); ASSERT_GT(static_cast<double>(count), 128 * 0.8);
ASSERT_LT(static_cast<double>(count), 128 * 1.2); ASSERT_LT(static_cast<double>(count), 128 * 1.2);
sleeping_task_low1.WakeUp(); sleeping_task_low.WaitUntilDone();
sleeping_task_low1.WaitUntilDone();
// Increase // Increase
ASSERT_OK(dbfull()->SetOptions({ ASSERT_OK(dbfull()->SetOptions({
@ -9935,23 +9799,21 @@ TEST_F(DBTest, DynamicMemtableOptions) {
// Clean up memtable and L0 // Clean up memtable and L0
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
SleepingBackgroundTask sleeping_task_low2; sleeping_task_low.Reset();
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
count = 0; count = 0;
sleep_count.store(0); while (!sleeping_task_low.WokenUp() && count < 1024) {
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
count++; count++;
} }
ASSERT_GT(sleep_count.load(), 0); // Windows fails this test. Will tune in the future and figure out
// Windows fails this test. Will tune in the future and figure out // approp number
// approp number
#ifndef OS_WIN #ifndef OS_WIN
ASSERT_GT(static_cast<double>(count), 512 * 0.8); ASSERT_GT(static_cast<double>(count), 512 * 0.8);
ASSERT_LT(static_cast<double>(count), 512 * 1.2); ASSERT_LT(static_cast<double>(count), 512 * 1.2);
#endif #endif
sleeping_task_low2.WakeUp(); sleeping_task_low.WaitUntilDone();
sleeping_task_low2.WaitUntilDone();
// Decrease // Decrease
ASSERT_OK(dbfull()->SetOptions({ ASSERT_OK(dbfull()->SetOptions({
@ -9960,24 +9822,22 @@ TEST_F(DBTest, DynamicMemtableOptions) {
// Clean up memtable and L0 // Clean up memtable and L0
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
SleepingBackgroundTask sleeping_task_low3; sleeping_task_low.Reset();
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
count = 0; count = 0;
sleep_count.store(0); while (!sleeping_task_low.WokenUp() && count < 1024) {
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), WriteOptions()));
count++; count++;
} }
ASSERT_GT(sleep_count.load(), 0); // Windows fails this test. Will tune in the future and figure out
// Windows fails this test. Will tune in the future and figure out // approp number
// approp number
#ifndef OS_WIN #ifndef OS_WIN
ASSERT_GT(static_cast<double>(count), 256 * 0.8); ASSERT_GT(static_cast<double>(count), 256 * 0.8);
ASSERT_LT(static_cast<double>(count), 266 * 1.2); ASSERT_LT(static_cast<double>(count), 266 * 1.2);
#endif #endif
sleeping_task_low3.WakeUp(); sleeping_task_low.WaitUntilDone();
sleeping_task_low3.WaitUntilDone();
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
@ -10760,50 +10620,61 @@ TEST_F(DBTest, DynamicCompactionOptions) {
// Test level0_stop_writes_trigger. // Test level0_stop_writes_trigger.
// Clean up memtable and L0. Block compaction threads. If continue to write // Clean up memtable and L0. Block compaction threads. If continue to write
// and flush memtables. We should see put timeout after 8 memtable flushes // and flush memtables. We should see put stop after 8 memtable flushes
// since level0_stop_writes_trigger = 8 // since level0_stop_writes_trigger = 8
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Block compaction // Block compaction
SleepingBackgroundTask sleeping_task_low1; SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Wait",
[&](void* arg) { sleeping_task_low.WakeUp(); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
int count = 0; int count = 0;
Random rnd(301); Random rnd(301);
WriteOptions wo; WriteOptions wo;
wo.timeout_hint_us = 10000; while (count < 64) {
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
if (sleeping_task_low.WokenUp()) {
break;
}
dbfull()->TEST_FlushMemTable(true); dbfull()->TEST_FlushMemTable(true);
count++; count++;
} }
// Stop trigger = 8 // Stop trigger = 8
ASSERT_EQ(count, 8); ASSERT_EQ(count, 8);
// Unblock // Unblock
sleeping_task_low1.WakeUp(); sleeping_task_low.WaitUntilDone();
sleeping_task_low1.WaitUntilDone();
// Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0. // Now reduce level0_stop_writes_trigger to 6. Clear up memtables and L0.
// Block compaction thread again. Perform the put and memtable flushes // Block compaction thread again. Perform the put and memtable flushes
// until we see timeout after 6 memtable flushes. // until we see the stop after 6 memtable flushes.
ASSERT_OK(dbfull()->SetOptions({ ASSERT_OK(dbfull()->SetOptions({
{"level0_stop_writes_trigger", "6"} {"level0_stop_writes_trigger", "6"}
})); }));
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
// Block compaction // Block compaction again
SleepingBackgroundTask sleeping_task_low2; sleeping_task_low.Reset();
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
count = 0; count = 0;
while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
if (sleeping_task_low.WokenUp()) {
break;
}
dbfull()->TEST_FlushMemTable(true); dbfull()->TEST_FlushMemTable(true);
count++; count++;
} }
ASSERT_EQ(count, 6); ASSERT_EQ(count, 6);
// Unblock // Unblock
sleeping_task_low2.WakeUp(); sleeping_task_low.WaitUntilDone();
sleeping_task_low2.WaitUntilDone();
// Test disable_auto_compactions // Test disable_auto_compactions
// Compaction thread is unblocked but auto compaction is disabled. Write // Compaction thread is unblocked but auto compaction is disabled. Write
@ -10818,7 +10689,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
// Wait for compaction so that put won't timeout // Wait for compaction so that put won't stop
dbfull()->TEST_FlushMemTable(true); dbfull()->TEST_FlushMemTable(true);
} }
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -10834,7 +10705,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
// Wait for compaction so that put won't timeout // Wait for compaction so that put won't stop
dbfull()->TEST_FlushMemTable(true); dbfull()->TEST_FlushMemTable(true);
} }
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -10877,6 +10748,8 @@ TEST_F(DBTest, DynamicCompactionOptions) {
ASSERT_EQ(NumTableFilesAtLevel(0), 1); ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_EQ(NumTableFilesAtLevel(2), 1); ASSERT_EQ(NumTableFilesAtLevel(2), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBTest, FileCreationRandomFailure) { TEST_F(DBTest, FileCreationRandomFailure) {
@ -12265,10 +12138,6 @@ TEST_F(DBTest, DelayedWriteRate) {
// Spread the size range to more. // Spread the size range to more.
size_t entry_size = rand_num * rand_num * rand_num; size_t entry_size = rand_num * rand_num * rand_num;
WriteOptions wo; WriteOptions wo;
// For a small chance, set a timeout.
if (rnd.Uniform(20) == 6) {
wo.timeout_hint_us = 1500;
}
Put(Key(i), std::string(entry_size, 'x'), wo); Put(Key(i), std::string(entry_size, 'x'), wo);
estimated_total_size += entry_size + 20; estimated_total_size += entry_size + 20;
// Ocassionally sleep a while // Ocassionally sleep a while

@ -438,10 +438,13 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
options.enable_thread_tracking = true; options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env); TestFlushListener* listener = new TestFlushListener(options.env);
const int kCompactionTrigger = 1;
const int kSlowdownTrigger = 5; const int kSlowdownTrigger = 5;
const int kStopTrigger = 10; const int kStopTrigger = 100;
options.level0_file_num_compaction_trigger = kCompactionTrigger;
options.level0_slowdown_writes_trigger = kSlowdownTrigger; options.level0_slowdown_writes_trigger = kSlowdownTrigger;
options.level0_stop_writes_trigger = kStopTrigger; options.level0_stop_writes_trigger = kStopTrigger;
options.max_write_buffer_number = 10;
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
// BG compaction is disabled. Number of L0 files will simply keeps // BG compaction is disabled. Number of L0 files will simply keeps
// increasing in this test. // increasing in this test.
@ -450,18 +453,17 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
options.write_buffer_size = 100000; // Small write buffer options.write_buffer_size = 100000; // Small write buffer
CreateAndReopenWithCF({"pikachu"}, &options); CreateAndReopenWithCF({"pikachu"}, &options);
WriteOptions wopts;
wopts.timeout_hint_us = 100000;
ColumnFamilyMetaData cf_meta; ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
// keep writing until writes are forced to stop. // keep writing until writes are forced to stop.
for (int i = 0; static_cast<int>(cf_meta.file_count) < kStopTrigger; ++i) { for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
Put(1, ToString(i), std::string(100000, 'x'), wopts); ++i) {
db_->Flush(FlushOptions()); Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
db_->Flush(FlushOptions(), handles_[1]);
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
} }
ASSERT_GE(listener->slowdown_count, kStopTrigger - kSlowdownTrigger); ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
ASSERT_GE(listener->stop_count, 1);
} }
} // namespace rocksdb } // namespace rocksdb

@ -7,8 +7,7 @@
namespace rocksdb { namespace rocksdb {
Status WriteThread::EnterWriteThread(WriteThread::Writer* w, void WriteThread::EnterWriteThread(WriteThread::Writer* w) {
uint64_t expiration_time) {
// the following code block pushes the current writer "w" into the writer // the following code block pushes the current writer "w" into the writer
// queue "writers_" and wait until one of the following conditions met: // queue "writers_" and wait until one of the following conditions met:
// 1. the job of "w" has been done by some other writers. // 1. the job of "w" has been done by some other writers.
@ -16,48 +15,9 @@ Status WriteThread::EnterWriteThread(WriteThread::Writer* w,
// 3. "w" timed-out. // 3. "w" timed-out.
writers_.push_back(w); writers_.push_back(w);
bool timed_out = false;
while (!w->done && w != writers_.front()) { while (!w->done && w != writers_.front()) {
if (expiration_time == 0) { w->cv.Wait();
w->cv.Wait();
} else if (w->cv.TimedWait(expiration_time)) {
if (w->in_batch_group) {
// then it means the front writer is currently doing the
// write on behalf of this "timed-out" writer. Then it
// should wait until the write completes.
expiration_time = 0;
} else {
timed_out = true;
break;
}
}
}
if (timed_out) {
#ifndef NDEBUG
bool found = false;
#endif
for (auto iter = writers_.begin(); iter != writers_.end(); iter++) {
if (*iter == w) {
writers_.erase(iter);
#ifndef NDEBUG
found = true;
#endif
break;
}
}
#ifndef NDEBUG
assert(found);
#endif
// writers_.front() might still be in cond_wait without a time-out.
// As a result, we need to signal it to wake it up. Otherwise no
// one else will wake him up, and RocksDB will hang.
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return Status::TimedOut();
} }
return Status::OK();
} }
void WriteThread::ExitWriteThread(WriteThread::Writer* w, void WriteThread::ExitWriteThread(WriteThread::Writer* w,
@ -128,12 +88,6 @@ size_t WriteThread::BuildBatchGroup(
break; break;
} }
if (w->timeout_hint_us < first->timeout_hint_us) {
// Do not include those writes with shorter timeout. Otherwise, we might
// execute a write that should instead be aborted because of timeout.
break;
}
if (w->has_callback) { if (w->has_callback) {
// Do not include writes which may be aborted if the callback does not // Do not include writes which may be aborted if the callback does not
// succeed. // succeed.

@ -18,7 +18,6 @@ namespace rocksdb {
class WriteThread { class WriteThread {
public: public:
static const uint64_t kNoTimeOut = port::kMaxUint64;
// Information kept for every waiting writer // Information kept for every waiting writer
struct Writer { struct Writer {
Status status; Status status;
@ -28,7 +27,6 @@ class WriteThread {
bool in_batch_group; bool in_batch_group;
bool done; bool done;
bool has_callback; bool has_callback;
uint64_t timeout_hint_us;
InstrumentedCondVar cv; InstrumentedCondVar cv;
explicit Writer(InstrumentedMutex* mu) explicit Writer(InstrumentedMutex* mu)
@ -38,7 +36,6 @@ class WriteThread {
in_batch_group(false), in_batch_group(false),
done(false), done(false),
has_callback(false), has_callback(false),
timeout_hint_us(kNoTimeOut),
cv(mu) {} cv(mu) {}
}; };
@ -52,10 +49,9 @@ class WriteThread {
// for examples), so check it via w.done before applying changes. // for examples), so check it via w.done before applying changes.
// //
// Writer* w: writer to be placed in the queue // Writer* w: writer to be placed in the queue
// uint64_t expiration_time: maximum time to be in the queue
// See also: ExitWriteThread // See also: ExitWriteThread
// REQUIRES: db mutex held // REQUIRES: db mutex held
Status EnterWriteThread(Writer* w, uint64_t expiration_time); void EnterWriteThread(Writer* w);
// After doing write job, we need to remove already used writers from // After doing write job, we need to remove already used writers from
// writers_ queue and notify head of the queue about it. // writers_ queue and notify head of the queue about it.

@ -1212,15 +1212,7 @@ struct WriteOptions {
// and the write may got lost after a crash. // and the write may got lost after a crash.
bool disableWAL; bool disableWAL;
// If non-zero, then associated write waiting longer than the specified // The option is deprecated. It's not used anymore.
// time MAY be aborted and returns Status::TimedOut. A write that takes
// less than the specified time is guaranteed to not fail with
// Status::TimedOut.
//
// The number of times a write call encounters a timeout is recorded in
// Statistics.WRITE_TIMEDOUT
//
// Default: 0
uint64_t timeout_hint_us; uint64_t timeout_hint_us;
// If true and if user is trying to write to column families that don't exist // If true and if user is trying to write to column families that don't exist

@ -206,7 +206,6 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WAL_FILE_BYTES, "rocksdb.wal.bytes"},
{WRITE_DONE_BY_SELF, "rocksdb.write.self"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"},
{WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, {WRITE_DONE_BY_OTHER, "rocksdb.write.other"},
{WRITE_TIMEDOUT, "rocksdb.write.timedout"},
{WRITE_WITH_WAL, "rocksdb.write.wal"}, {WRITE_WITH_WAL, "rocksdb.write.wal"},
{FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"}, {FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
{COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},

@ -68,12 +68,6 @@ class Status {
const Slice& msg2 = Slice()) { const Slice& msg2 = Slice()) {
return Status(kShutdownInProgress, msg, msg2); return Status(kShutdownInProgress, msg, msg2);
} }
static Status TimedOut() {
return Status(kTimedOut);
}
static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kTimedOut, msg, msg2);
}
static Status Aborted() { static Status Aborted() {
return Status(kAborted); return Status(kAborted);
} }

@ -67,9 +67,6 @@ std::string Status::ToString() const {
case kShutdownInProgress: case kShutdownInProgress:
type = "Shutdown in progress: "; type = "Shutdown in progress: ";
break; break;
case kTimedOut:
type = "Operation timed out: ";
break;
case kAborted: case kAborted:
type = "Operation aborted: "; type = "Operation aborted: ";
break; break;

Loading…
Cancel
Save