Ankit Gupta 11 years ago
commit bc708e0012
  1. 3
      HISTORY.md
  2. 158
      db/db_impl.cc
  3. 9
      db/db_impl.h
  4. 123
      db/db_test.cc
  5. 2
      db/version_set.cc
  6. 34
      include/rocksdb/iostats_context.h
  7. 13
      include/rocksdb/options.h
  8. 4
      include/rocksdb/statistics.h
  9. 11
      include/rocksdb/status.h
  10. 5
      port/port_posix.cc
  11. 6
      table/plain_table_key_coding.cc
  12. 8
      util/env_posix.cc
  13. 30
      util/iostats_context.cc
  14. 32
      util/iostats_context_imp.h
  15. 3
      util/status.cc
  16. 2
      util/stop_watch.h

@ -4,7 +4,8 @@
### New Features ### New Features
* HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). * HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory().
* RocksDB is now able to reclaim storage space more effectively during the compaction process. This is done by compensating the size of each deletion entry by the 2X average value size, which makes compaction to be triggerred by deletion entries more easily.
* Add TimeOut API to write. Now WriteOptions have a variable called timeout_hint_us. With timeout_hint_us set to non-zero, any write associated with this timeout_hint_us may be aborted when it runs longer than the specified timeout_hint_us, and it is guaranteed that any write completes earlier than the specified time-out will not be aborted due to the time-out condition.
## 3.2.0 (06/20/2014) ## 3.2.0 (06/20/2014)

@ -65,6 +65,7 @@
#include "util/log_buffer.h" #include "util/log_buffer.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -80,7 +81,9 @@ struct DBImpl::Writer {
WriteBatch* batch; WriteBatch* batch;
bool sync; bool sync;
bool disableWAL; bool disableWAL;
bool in_batch_group;
bool done; bool done;
uint64_t timeout_hint_us;
port::CondVar cv; port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { } explicit Writer(port::Mutex* mu) : cv(mu) { }
@ -1602,6 +1605,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
// true, mark DB read-only // true, mark DB read-only
bg_error_ = s; bg_error_ = s;
} }
RecordFlushIOStats();
return s; return s;
} }
@ -1918,11 +1922,28 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
} }
} }
void DBImpl::RecordFlushIOStats() {
RecordTick(options_.statistics.get(), FLUSH_WRITE_BYTES,
iostats_context.bytes_written);
IOSTATS_RESET(bytes_written);
}
void DBImpl::RecordCompactionIOStats() {
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
IOSTATS(bytes_read));
IOSTATS_RESET(bytes_read);
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES,
IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
void DBImpl::BGWorkFlush(void* db) { void DBImpl::BGWorkFlush(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush(); reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
} }
void DBImpl::BGWorkCompaction(void* db) { void DBImpl::BGWorkCompaction(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction(); reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
} }
@ -2022,6 +2043,7 @@ void DBImpl::BackgroundCallFlush() {
// that case, all DB variables will be dealloacated and referencing them // that case, all DB variables will be dealloacated and referencing them
// will cause trouble. // will cause trouble.
} }
RecordFlushIOStats();
} }
void DBImpl::BackgroundCallCompaction() { void DBImpl::BackgroundCallCompaction() {
@ -2557,6 +2579,7 @@ Status DBImpl::ProcessKeyValueCompaction(
while (input->Valid() && !shutting_down_.Acquire_Load() && while (input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) { !cfd->IsDropped()) {
RecordCompactionIOStats();
// FLUSH preempts compaction // FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on // TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on // compacting column family. we should also check if flush is necessary on
@ -2815,6 +2838,8 @@ Status DBImpl::ProcessKeyValueCompaction(
} }
} }
RecordCompactionIOStats();
return status; return status;
} }
@ -3122,22 +3147,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
for (int i = 0; i < compact->compaction->num_input_files(0); i++) { for (int i = 0; i < compact->compaction->num_input_files(0); i++) {
stats.bytes_readn += compact->compaction->input(0, i)->fd.GetFileSize(); stats.bytes_readn += compact->compaction->input(0, i)->fd.GetFileSize();
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
compact->compaction->input(0, i)->fd.GetFileSize());
} }
for (int i = 0; i < compact->compaction->num_input_files(1); i++) { for (int i = 0; i < compact->compaction->num_input_files(1); i++) {
stats.bytes_readnp1 += compact->compaction->input(1, i)->fd.GetFileSize(); stats.bytes_readnp1 += compact->compaction->input(1, i)->fd.GetFileSize();
RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
compact->compaction->input(1, i)->fd.GetFileSize());
} }
for (int i = 0; i < num_output_files; i++) { for (int i = 0; i < num_output_files; i++) {
stats.bytes_written += compact->outputs[i].file_size; stats.bytes_written += compact->outputs[i].file_size;
RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES,
compact->outputs[i].file_size);
} }
RecordCompactionIOStats();
LogFlush(options_.info_log); LogFlush(options_.info_log);
mutex_.Lock(); mutex_.Lock();
cfd->internal_stats()->AddCompactionStats(compact->compaction->output_level(), cfd->internal_stats()->AddCompactionStats(compact->compaction->output_level(),
@ -3729,13 +3750,41 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w.batch = my_batch; w.batch = my_batch;
w.sync = options.sync; w.sync = options.sync;
w.disableWAL = options.disableWAL; w.disableWAL = options.disableWAL;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = options.timeout_hint_us;
uint64_t expiration_time = 0;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
}
w.done = false; w.done = false;
StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false);
mutex_.Lock(); mutex_.Lock();
// the following code block pushes the current writer "w" into the writer
// queue "writers_" and wait until one of the following conditions met:
// 1. the job of "w" has been done by some other writers.
// 2. "w" becomes the first writer in "writers_"
// 3. "w" timed-out.
writers_.push_back(&w); writers_.push_back(&w);
bool timed_out = false;
while (!w.done && &w != writers_.front()) { while (!w.done && &w != writers_.front()) {
w.cv.Wait(); if (expiration_time == 0) {
w.cv.Wait();
} else if (w.cv.TimedWait(expiration_time)) {
if (w.in_batch_group) {
// then it means the front writer is currently doing the
// write on behalf of this "timed-out" writer. Then it
// should wait until the write completes.
expiration_time = 0;
} else {
timed_out = true;
break;
}
}
} }
if (!options.disableWAL) { if (!options.disableWAL) {
@ -3746,10 +3795,39 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
mutex_.Unlock(); mutex_.Unlock();
RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1); RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1);
return w.status; return w.status;
} else if (timed_out) {
#ifndef NDEBUG
bool found = false;
#endif
for (auto iter = writers_.begin(); iter != writers_.end(); iter++) {
if (*iter == &w) {
writers_.erase(iter);
#ifndef NDEBUG
found = true;
#endif
break;
}
}
#ifndef NDEBUG
assert(found);
#endif
// writers_.front() might still be in cond_wait without a time-out.
// As a result, we need to signal it to wake it up. Otherwise no
// one else will wake him up, and RocksDB will hang.
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
mutex_.Unlock();
RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1);
return Status::TimedOut();
} else { } else {
RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1);
} }
// Once reaches this point, the current writer "w" will try to do its write
// job. It may also pick up some of the remaining writers in the "writers_"
// when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer.
assert(!single_column_family_mode_ || assert(!single_column_family_mode_ ||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
@ -3774,8 +3852,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (LIKELY(single_column_family_mode_)) { if (LIKELY(single_column_family_mode_)) {
// fast path // fast path
status = MakeRoomForWrite(default_cf_handle_->cfd(), my_batch == nullptr, status = MakeRoomForWrite(
&superversions_to_free, &logs_to_free); default_cf_handle_->cfd(), my_batch == nullptr,
&superversions_to_free, &logs_to_free,
expiration_time);
} else { } else {
// refcounting cfd in iteration // refcounting cfd in iteration
bool dead_cfd = false; bool dead_cfd = false;
@ -3786,8 +3866,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
(flush_column_family_if_log_file != 0 && (flush_column_family_if_log_file != 0 &&
cfd->GetLogNumber() <= flush_column_family_if_log_file); cfd->GetLogNumber() <= flush_column_family_if_log_file);
// May temporarily unlock and wait. // May temporarily unlock and wait.
status = MakeRoomForWrite(cfd, force_flush, &superversions_to_free, status = MakeRoomForWrite(
&logs_to_free); cfd, force_flush, &superversions_to_free, &logs_to_free,
expiration_time);
if (cfd->Unref()) { if (cfd->Unref()) {
dead_cfd = true; dead_cfd = true;
} }
@ -3883,11 +3964,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
} }
} }
} }
if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) { if (options_.paranoid_checks && !status.ok() &&
!status.IsTimedOut() && bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes bg_error_ = status; // stop compaction & fail any further writes
} }
while (true) { // Pop out the current writer and all writers being pushed before the
// current writer from the writer queue.
while (!writers_.empty()) {
Writer* ready = writers_.front(); Writer* ready = writers_.front();
writers_.pop_front(); writers_.pop_front();
if (ready != &w) { if (ready != &w) {
@ -3904,6 +3988,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
} }
mutex_.Unlock(); mutex_.Unlock();
if (status.IsTimedOut()) {
RecordTick(options_.statistics.get(), WRITE_TIMEDOUT, 1);
}
for (auto& sv : superversions_to_free) { for (auto& sv : superversions_to_free) {
delete sv; delete sv;
} }
@ -3915,6 +4003,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
return status; return status;
} }
// This function will be called only when the first writer succeeds.
// All writers in the to-be-built batch group will be processed.
//
// REQUIRES: Writer list must be non-empty // REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-nullptr batch // REQUIRES: First writer must have a non-nullptr batch
void DBImpl::BuildBatchGroup(Writer** last_writer, void DBImpl::BuildBatchGroup(Writer** last_writer,
@ -3950,6 +4041,12 @@ void DBImpl::BuildBatchGroup(Writer** last_writer,
break; break;
} }
if (w->timeout_hint_us < first->timeout_hint_us) {
// Do not include those writes with shorter timeout. Otherwise, we might
// execute a write that should instead be aborted because of timeout.
break;
}
if (w->batch != nullptr) { if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch); size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) { if (size > max_size) {
@ -3959,6 +4056,7 @@ void DBImpl::BuildBatchGroup(Writer** last_writer,
write_batch_group->push_back(w->batch); write_batch_group->push_back(w->batch);
} }
w->in_batch_group = true;
*last_writer = w; *last_writer = w;
} }
} }
@ -4000,7 +4098,8 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
Status DBImpl::MakeRoomForWrite( Status DBImpl::MakeRoomForWrite(
ColumnFamilyData* cfd, bool force, ColumnFamilyData* cfd, bool force,
autovector<SuperVersion*>* superversions_to_free, autovector<SuperVersion*>* superversions_to_free,
autovector<log::Writer*>* logs_to_free) { autovector<log::Writer*>* logs_to_free,
uint64_t expiration_time) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(!writers_.empty()); assert(!writers_.empty());
bool allow_delay = !force; bool allow_delay = !force;
@ -4013,12 +4112,16 @@ Status DBImpl::MakeRoomForWrite(
// might generate a tight feedback loop, constantly scheduling more background // might generate a tight feedback loop, constantly scheduling more background
// work, even if additional background work is not needed // work, even if additional background work is not needed
bool schedule_background_work = true; bool schedule_background_work = true;
bool has_timeout = (expiration_time > 0);
while (true) { while (true) {
if (!bg_error_.ok()) { if (!bg_error_.ok()) {
// Yield previous error // Yield previous error
s = bg_error_; s = bg_error_;
break; break;
} else if (has_timeout && env_->NowMicros() > expiration_time) {
s = Status::TimedOut();
break;
} else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) {
// We are getting close to hitting a hard limit on the number of // We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several // L0 files. Rather than delaying a single write by several
@ -4063,7 +4166,11 @@ Status DBImpl::MakeRoomForWrite(
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, options_.statistics.get(),
STALL_MEMTABLE_COMPACTION_COUNT); STALL_MEMTABLE_COMPACTION_COUNT);
bg_cv_.Wait(); if (!has_timeout) {
bg_cv_.Wait();
} else {
bg_cv_.TimedWait(expiration_time);
}
stall = sw.ElapsedMicros(); stall = sw.ElapsedMicros();
} }
RecordTick(options_.statistics.get(), RecordTick(options_.statistics.get(),
@ -4078,10 +4185,15 @@ Status DBImpl::MakeRoomForWrite(
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, options_.statistics.get(),
STALL_L0_NUM_FILES_COUNT); STALL_L0_NUM_FILES_COUNT);
bg_cv_.Wait(); if (!has_timeout) {
bg_cv_.Wait();
} else {
bg_cv_.TimedWait(expiration_time);
}
stall = sw.ElapsedMicros(); stall = sw.ElapsedMicros();
} }
RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); RecordTick(options_.statistics.get(),
STALL_L0_NUM_FILES_MICROS, stall);
cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES,
stall); stall);
} else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) { } else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) {
@ -4112,18 +4224,18 @@ Status DBImpl::MakeRoomForWrite(
score = cfd->current()->MaxCompactionScore(); score = cfd->current()->MaxCompactionScore();
// Delay a write when the compaction score for any level is too large. // Delay a write when the compaction score for any level is too large.
// TODO: add statistics // TODO: add statistics
uint64_t slowdown = SlowdownAmount(score, cfd->options()->soft_rate_limit,
cfd->options()->hard_rate_limit);
mutex_.Unlock(); mutex_.Unlock();
{ {
StopWatch sw(env_, options_.statistics.get(), StopWatch sw(env_, options_.statistics.get(),
SOFT_RATE_LIMIT_DELAY_COUNT); SOFT_RATE_LIMIT_DELAY_COUNT);
env_->SleepForMicroseconds( env_->SleepForMicroseconds(slowdown);
SlowdownAmount(score, cfd->options()->soft_rate_limit, slowdown = sw.ElapsedMicros();
cfd->options()->hard_rate_limit)); rate_limit_delay_millis += slowdown;
rate_limit_delay_millis += sw.ElapsedMicros();
} }
allow_soft_rate_limit_delay = false; allow_soft_rate_limit_delay = false;
mutex_.Lock(); mutex_.Lock();
} else { } else {
unique_ptr<WritableFile> lfile; unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr; log::Writer* new_log = nullptr;

@ -10,6 +10,7 @@
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <limits>
#include <set> #include <set>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -28,6 +29,7 @@
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/stats_logger.h" #include "util/stats_logger.h"
#include "util/stop_watch.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
@ -345,7 +347,8 @@ class DBImpl : public DB {
Status MakeRoomForWrite(ColumnFamilyData* cfd, Status MakeRoomForWrite(ColumnFamilyData* cfd,
bool force /* flush even if there is room? */, bool force /* flush even if there is room? */,
autovector<SuperVersion*>* superversions_to_free, autovector<SuperVersion*>* superversions_to_free,
autovector<log::Writer*>* logs_to_free); autovector<log::Writer*>* logs_to_free,
uint64_t expiration_time);
void BuildBatchGroup(Writer** last_writer, void BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group); autovector<WriteBatch*>* write_batch_group);
@ -356,6 +359,9 @@ class DBImpl : public DB {
// Wait for memtable flushed // Wait for memtable flushed
Status WaitForFlushMemTable(ColumnFamilyData* cfd); Status WaitForFlushMemTable(ColumnFamilyData* cfd);
void RecordFlushIOStats();
void RecordCompactionIOStats();
void MaybeScheduleLogDBDeployStats(); void MaybeScheduleLogDBDeployStats();
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -578,6 +584,7 @@ class DBImpl : public DB {
bool flush_on_destroy_; // Used when disableWAL is true. bool flush_on_destroy_; // Used when disableWAL is true.
static const int KEEP_LOG_FILE_NUM = 1000; static const int KEEP_LOG_FILE_NUM = 1000;
static const uint64_t kNoTimeOut = std::numeric_limits<uint64_t>::max();
std::string db_absolute_path_; std::string db_absolute_path_;
// count of the number of contiguous delaying writes // count of the number of contiguous delaying writes

@ -7001,6 +7001,129 @@ TEST(DBTest, FIFOCompactionTest) {
} }
} }
} }
TEST(DBTest, SimpleWriteTimeoutTest) {
Options options;
options.env = env_;
options.create_if_missing = true;
options.write_buffer_size = 100000;
options.max_background_flushes = 0;
options.max_write_buffer_number = 2;
options.min_write_buffer_number_to_merge = 3;
options.max_total_wal_size = std::numeric_limits<uint64_t>::max();
WriteOptions write_opt = WriteOptions();
write_opt.timeout_hint_us = 0;
DestroyAndReopen(&options);
// fill the two write buffer
ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt));
ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt));
// As the only two write buffers are full in this moment, the third
// Put is expected to be timed-out.
write_opt.timeout_hint_us = 300;
ASSERT_TRUE(
Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut());
}
// Multi-threaded Timeout Test
namespace {
static const int kValueSize = 1000;
static const int kWriteBufferSize = 100000;
struct TimeoutWriterState {
int id;
DB* db;
std::atomic<bool> done;
std::map<int, std::string> success_kvs;
};
static void RandomTimeoutWriter(void* arg) {
TimeoutWriterState* state = reinterpret_cast<TimeoutWriterState*>(arg);
static const uint64_t kTimerBias = 50;
int thread_id = state->id;
DB* db = state->db;
Random rnd(1000 + thread_id);
WriteOptions write_opt = WriteOptions();
write_opt.timeout_hint_us = 500;
int timeout_count = 0;
int num_keys = kNumKeys * 5;
for (int k = 0; k < num_keys; ++k) {
int key = k + thread_id * num_keys;
std::string value = RandomString(&rnd, kValueSize);
// only the second-half is randomized
if (k > num_keys / 2) {
switch (rnd.Next() % 5) {
case 0:
write_opt.timeout_hint_us = 500 * thread_id;
break;
case 1:
write_opt.timeout_hint_us = num_keys - k;
break;
case 2:
write_opt.timeout_hint_us = 1;
break;
default:
write_opt.timeout_hint_us = 0;
state->success_kvs.insert({key, value});
}
}
uint64_t time_before_put = db->GetEnv()->NowMicros();
Status s = db->Put(write_opt, Key(key), value);
uint64_t put_duration = db->GetEnv()->NowMicros() - time_before_put;
if (write_opt.timeout_hint_us == 0 ||
put_duration + kTimerBias < write_opt.timeout_hint_us) {
ASSERT_OK(s);
std::string result;
}
if (s.IsTimedOut()) {
timeout_count++;
ASSERT_GT(put_duration + kTimerBias, write_opt.timeout_hint_us);
}
}
state->done = true;
}
TEST(DBTest, MTRandomTimeoutTest) {
Options options;
options.env = env_;
options.create_if_missing = true;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
options.level0_slowdown_writes_trigger = 10;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
DestroyAndReopen(&options);
TimeoutWriterState thread_states[kNumThreads];
for (int tid = 0; tid < kNumThreads; ++tid) {
thread_states[tid].id = tid;
thread_states[tid].db = db_;
thread_states[tid].done = false;
env_->StartThread(RandomTimeoutWriter, &thread_states[tid]);
}
for (int tid = 0; tid < kNumThreads; ++tid) {
while (thread_states[tid].done == false) {
env_->SleepForMicroseconds(100000);
}
}
Flush();
for (int tid = 0; tid < kNumThreads; ++tid) {
auto& success_kvs = thread_states[tid].success_kvs;
for (auto it = success_kvs.begin(); it != success_kvs.end(); ++it) {
ASSERT_EQ(Get(Key(it->first)), it->second);
}
}
}
} // anonymous namespace
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -1472,7 +1472,7 @@ class VersionSet::Builder {
} }
} }
if (!found) { if (!found) {
fprintf(stderr, "not found %ld\n", number); fprintf(stderr, "not found %" PRIu64 "\n", number);
} }
assert(found); assert(found);
#endif #endif

@ -0,0 +1,34 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_
#define INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_
#include <stdint.h>
#include <string>
// A thread local context for gathering io-stats efficiently and transparently.
namespace rocksdb {
struct IOStatsContext {
// reset all io-stats counter to zero
void Reset();
std::string ToString() const;
// the thread pool id
uint64_t thread_pool_id;
// number of bytes that has been written.
uint64_t bytes_written;
// number of bytes that has been read.
uint64_t bytes_read;
};
extern __thread IOStatsContext iostats_context;
} // namespace rocksdb
#endif // INCLUDE_ROCKSDB_IOSTATS_CONTEXT_H_

@ -975,7 +975,18 @@ struct WriteOptions {
// and the write may got lost after a crash. // and the write may got lost after a crash.
bool disableWAL; bool disableWAL;
WriteOptions() : sync(false), disableWAL(false) {} // If non-zero, then associated write waiting longer than the specified
// time MAY be aborted and returns Status::TimedOut. A write that takes
// less than the specified time is guaranteed to not fail with
// Status::TimedOut.
//
// The number of times a write call encounters a timeout is recorded in
// Statistics.WRITE_TIMEDOUT
//
// Default: 0
uint64_t timeout_hint_us;
WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {}
}; };
// Options that control flush operations // Options that control flush operations

@ -115,9 +115,11 @@ enum Tickers {
// head of the writers queue. // head of the writers queue.
WRITE_DONE_BY_SELF, WRITE_DONE_BY_SELF,
WRITE_DONE_BY_OTHER, WRITE_DONE_BY_OTHER,
WRITE_TIMEDOUT, // Number of writes ending up with timed-out.
WRITE_WITH_WAL, // Number of Write calls that request WAL WRITE_WITH_WAL, // Number of Write calls that request WAL
COMPACT_READ_BYTES, // Bytes read during compaction COMPACT_READ_BYTES, // Bytes read during compaction
COMPACT_WRITE_BYTES, // Bytes written during compaction COMPACT_WRITE_BYTES, // Bytes written during compaction
FLUSH_WRITE_BYTES, // Bytes written during flush
// Number of table's properties loaded directly from file, without creating // Number of table's properties loaded directly from file, without creating
// table reader object. // table reader object.
@ -176,7 +178,9 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{WAL_FILE_BYTES, "rocksdb.wal.bytes"}, {WAL_FILE_BYTES, "rocksdb.wal.bytes"},
{WRITE_DONE_BY_SELF, "rocksdb.write.self"}, {WRITE_DONE_BY_SELF, "rocksdb.write.self"},
{WRITE_DONE_BY_OTHER, "rocksdb.write.other"}, {WRITE_DONE_BY_OTHER, "rocksdb.write.other"},
{WRITE_TIMEDOUT, "rocksdb.write.timedout"},
{WRITE_WITH_WAL, "rocksdb.write.wal"}, {WRITE_WITH_WAL, "rocksdb.write.wal"},
{FLUSH_WRITE_BYTES, "rocksdb.flush.write.bytes"},
{COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"}, {COMPACT_READ_BYTES, "rocksdb.compact.read.bytes"},
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
{NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,

@ -65,6 +65,12 @@ class Status {
const Slice& msg2 = Slice()) { const Slice& msg2 = Slice()) {
return Status(kShutdownInProgress, msg, msg2); return Status(kShutdownInProgress, msg, msg2);
} }
static Status TimedOut() {
return Status(kTimedOut);
}
static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kTimedOut, msg, msg2);
}
// Returns true iff the status indicates success. // Returns true iff the status indicates success.
bool ok() const { return code() == kOk; } bool ok() const { return code() == kOk; }
@ -93,6 +99,8 @@ class Status {
// Returns true iff the status indicates Incomplete // Returns true iff the status indicates Incomplete
bool IsShutdownInProgress() const { return code() == kShutdownInProgress; } bool IsShutdownInProgress() const { return code() == kShutdownInProgress; }
bool IsTimedOut() const { return code() == kTimedOut; }
// Return a string representation of this status suitable for printing. // Return a string representation of this status suitable for printing.
// Returns the string "OK" for success. // Returns the string "OK" for success.
std::string ToString() const; std::string ToString() const;
@ -106,7 +114,8 @@ class Status {
kIOError = 5, kIOError = 5,
kMergeInProgress = 6, kMergeInProgress = 6,
kIncomplete = 7, kIncomplete = 7,
kShutdownInProgress = 8 kShutdownInProgress = 8,
kTimedOut = 9
}; };
Code code() const { Code code() const {

@ -20,11 +20,12 @@
namespace rocksdb { namespace rocksdb {
namespace port { namespace port {
static void PthreadCall(const char* label, int result) { static int PthreadCall(const char* label, int result) {
if (result != 0) { if (result != 0 && result != ETIMEDOUT) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort(); abort();
} }
return result;
} }
Mutex::Mutex(bool adaptive) { Mutex::Mutex(bool adaptive) {

@ -198,7 +198,8 @@ Status PlainTableKeyDecoder::NextPlainEncodingKey(
user_key_size = static_cast<size_t>(tmp_size); user_key_size = static_cast<size_t>(tmp_size);
*bytes_read = key_ptr - start; *bytes_read = key_ptr - start;
} }
bool decoded_internal_key_valid; // dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true;
Slice decoded_internal_key; Slice decoded_internal_key;
Status s = Status s =
ReadInternalKey(key_ptr, limit, user_key_size, parsed_key, bytes_read, ReadInternalKey(key_ptr, limit, user_key_size, parsed_key, bytes_read,
@ -227,7 +228,8 @@ Status PlainTableKeyDecoder::NextPrefixEncodingKey(
bool expect_suffix = false; bool expect_suffix = false;
do { do {
size_t size = 0; size_t size = 0;
bool decoded_internal_key_valid; // dummy initial value to avoid compiler complain
bool decoded_internal_key_valid = true;
const char* pos = DecodeSize(key_ptr, limit, &entry_type, &size); const char* pos = DecodeSize(key_ptr, limit, &entry_type, &size);
if (pos == nullptr) { if (pos == nullptr) {
return Status::Corruption("Unexpected EOF when reading size of the key"); return Status::Corruption("Unexpected EOF when reading size of the key");

@ -40,6 +40,7 @@
#include "util/logging.h" #include "util/logging.h"
#include "util/posix_logger.h" #include "util/posix_logger.h"
#include "util/random.h" #include "util/random.h"
#include "util/iostats_context_imp.h"
#include <signal.h> #include <signal.h>
// Get nano time for mach systems // Get nano time for mach systems
@ -178,6 +179,7 @@ class PosixSequentialFile: public SequentialFile {
do { do {
r = fread_unlocked(scratch, 1, n, file_); r = fread_unlocked(scratch, 1, n, file_);
} while (r == 0 && ferror(file_) && errno == EINTR); } while (r == 0 && ferror(file_) && errno == EINTR);
IOSTATS_ADD(bytes_read, r);
*result = Slice(scratch, r); *result = Slice(scratch, r);
if (r < n) { if (r < n) {
if (feof(file_)) { if (feof(file_)) {
@ -241,6 +243,7 @@ class PosixRandomAccessFile: public RandomAccessFile {
do { do {
r = pread(fd_, scratch, n, static_cast<off_t>(offset)); r = pread(fd_, scratch, n, static_cast<off_t>(offset));
} while (r < 0 && errno == EINTR); } while (r < 0 && errno == EINTR);
IOSTATS_ADD_IF_POSITIVE(bytes_read, r);
*result = Slice(scratch, (r < 0) ? 0 : r); *result = Slice(scratch, (r < 0) ? 0 : r);
if (r < 0) { if (r < 0) {
// An error: return a non-ok status // An error: return a non-ok status
@ -488,6 +491,7 @@ class PosixMmapFile : public WritableFile {
size_t n = (left <= avail) ? left : avail; size_t n = (left <= avail) ? left : avail;
memcpy(dst_, src, n); memcpy(dst_, src, n);
IOSTATS_ADD(bytes_written, n);
dst_ += n; dst_ += n;
src += n; src += n;
left -= n; left -= n;
@ -694,6 +698,7 @@ class PosixWritableFile : public WritableFile {
} }
return IOError(filename_, errno); return IOError(filename_, errno);
} }
IOSTATS_ADD(bytes_written, done);
TEST_KILL_RANDOM(rocksdb_kill_odds); TEST_KILL_RANDOM(rocksdb_kill_odds);
left -= done; left -= done;
@ -744,6 +749,7 @@ class PosixWritableFile : public WritableFile {
} }
return IOError(filename_, errno); return IOError(filename_, errno);
} }
IOSTATS_ADD(bytes_written, done);
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
left -= done; left -= done;
src += done; src += done;
@ -877,6 +883,7 @@ class PosixRandomRWFile : public RandomRWFile {
} }
return IOError(filename_, errno); return IOError(filename_, errno);
} }
IOSTATS_ADD(bytes_written, done);
left -= done; left -= done;
src += done; src += done;
@ -890,6 +897,7 @@ class PosixRandomRWFile : public RandomRWFile {
char* scratch) const { char* scratch) const {
Status s; Status s;
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset)); ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
IOSTATS_ADD_IF_POSITIVE(bytes_read, r);
*result = Slice(scratch, (r < 0) ? 0 : r); *result = Slice(scratch, (r < 0) ? 0 : r);
if (r < 0) { if (r < 0) {
s = IOError(filename_, errno); s = IOError(filename_, errno);

@ -0,0 +1,30 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <sstream>
#include "rocksdb/env.h"
#include "util/iostats_context_imp.h"
namespace rocksdb {
__thread IOStatsContext iostats_context;
void IOStatsContext::Reset() {
thread_pool_id = Env::Priority::TOTAL;
bytes_read = 0;
bytes_written = 0;
}
#define OUTPUT(counter) #counter << " = " << counter << ", "
std::string IOStatsContext::ToString() const {
std::ostringstream ss;
ss << OUTPUT(thread_pool_id)
<< OUTPUT(bytes_read)
<< OUTPUT(bytes_written);
return ss.str();
}
} // namespace rocksdb

@ -0,0 +1,32 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#include "rocksdb/iostats_context.h"
// increment a specific counter by the specified value
#define IOSTATS_ADD(metric, value) \
(iostats_context.metric += value)
// Increase metric value only when it is positive
#define IOSTATS_ADD_IF_POSITIVE(metric, value) \
if (value > 0) { IOSTATS_ADD(metric, value); }
// reset a specific counter to zero
#define IOSTATS_RESET(metric) \
(iostats_context.metric = 0)
// reset all counters to zero
#define IOSTATS_RESET_ALL() \
(iostats_context.Reset())
#define IOSTATS_SET_THREAD_POOL_ID(value) \
(iostats_context.thread_pool_id = value)
#define IOSTATS_THREAD_POOL_ID() \
(iostats_context.thread_pool_id)
#define IOSTATS(metric) \
(iostats_context.metric)

@ -68,6 +68,9 @@ std::string Status::ToString() const {
case kShutdownInProgress: case kShutdownInProgress:
type = "Shutdown in progress: "; type = "Shutdown in progress: ";
break; break;
case kTimedOut:
type = "Operation timed out: ";
break;
default: default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code())); static_cast<int>(code()));

@ -24,7 +24,7 @@ class StopWatch {
uint64_t ElapsedMicros() { uint64_t ElapsedMicros() const {
return env_->NowMicros() - start_time_; return env_->NowMicros() - start_time_;
} }

Loading…
Cancel
Save