Implement deadline support for MultiGet (#6710)

Summary:
Initial implementation of ReadOptions.deadline for MultiGet. If the request takes longer than the deadline, the keys not yet found will be returned with Status::TimedOut(). This
implementation enforces the deadline in DBImpl, which is fairly high
level. Its best effort and may not check the deadline after every key
lookup, but may do so after a batch of keys.

In subsequent stages, we will extend this to passing a timeout down to the FileSystem.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6710

Test Plan: Add new unit tests

Reviewed By: riversand963

Differential Revision: D21149158

Pulled By: anand1976

fbshipit-source-id: 9f44eecffeb40873f5034ed59a66d21f9f88879e
main
anand76 6 years ago committed by Facebook GitHub Bot
parent 6ee66cf8f0
commit c1ccd6b6af
  1. 307
      db/db_basic_test.cc
  2. 109
      db/db_impl/db_impl.cc
  3. 2
      db/db_impl/db_impl.h
  4. 9
      include/rocksdb/options.h
  5. 6
      options/options.cc

@ -1890,17 +1890,14 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
}
#endif // !ROCKSDB_LITE
class DBBasicTestWithParallelIO
: public DBTestBase,
public testing::WithParamInterface<
std::tuple<bool, bool, bool, bool, uint32_t>> {
class DBBasicTestMultiGet : public DBTestBase {
public:
DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
bool compressed_cache = std::get<0>(GetParam());
bool uncompressed_cache = std::get<1>(GetParam());
compression_enabled_ = std::get<2>(GetParam());
fill_cache_ = std::get<3>(GetParam());
uint32_t compression_parallel_threads = std::get<4>(GetParam());
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
bool uncompressed_cache, bool compression_enabled,
bool fill_cache, uint32_t compression_parallel_threads)
: DBTestBase(test_dir) {
compression_enabled_ = compression_enabled;
fill_cache_ = fill_cache;
if (compressed_cache) {
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
@ -1960,22 +1957,43 @@ class DBBasicTestWithParallelIO
}
Reopen(options);
if (num_cfs > 1) {
for (int cf = 0; cf < num_cfs; ++cf) {
cf_names_.emplace_back("cf" + std::to_string(cf));
}
CreateColumnFamilies(cf_names_, options);
cf_names_.emplace_back("default");
}
std::string zero_str(128, '\0');
for (int cf = 0; cf < num_cfs; ++cf) {
for (int i = 0; i < 100; ++i) {
// Make the value compressible. A purely random string doesn't compress
// and the resultant data block will not be compressed
values_.emplace_back(RandomString(&rnd, 128) + zero_str);
assert(Put(Key(i), values_[i]) == Status::OK());
assert(((num_cfs == 1) ? Put(Key(i), values_[i])
: Put(cf, Key(i), values_[i])) == Status::OK());
}
if (num_cfs == 1) {
Flush();
} else {
dbfull()->Flush(FlushOptions(), handles_[cf]);
}
for (int i = 0; i < 100; ++i) {
// block cannot gain space by compression
uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
std::string tmp_key = "a" + Key(i);
assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
assert(((num_cfs == 1) ? Put(tmp_key, uncompressable_values_[i])
: Put(cf, tmp_key, uncompressable_values_[i])) ==
Status::OK());
}
if (num_cfs == 1) {
Flush();
} else {
dbfull()->Flush(FlushOptions(), handles_[cf]);
}
}
}
bool CheckValue(int i, const std::string& value) {
@ -1992,6 +2010,8 @@ class DBBasicTestWithParallelIO
return false;
}
const std::vector<std::string>& GetCFNames() const { return cf_names_; }
int num_lookups() { return uncompressed_cache_->num_lookups(); }
int num_found() { return uncompressed_cache_->num_found(); }
int num_inserts() { return uncompressed_cache_->num_inserts(); }
@ -2008,7 +2028,7 @@ class DBBasicTestWithParallelIO
static void SetUpTestCase() {}
static void TearDownTestCase() {}
private:
protected:
class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
public:
MyFlushBlockPolicyFactory() {}
@ -2143,6 +2163,19 @@ class DBBasicTestWithParallelIO
std::vector<std::string> values_;
std::vector<std::string> uncompressable_values_;
bool fill_cache_;
std::vector<std::string> cf_names_;
};
class DBBasicTestWithParallelIO
: public DBBasicTestMultiGet,
public testing::WithParamInterface<
std::tuple<bool, bool, bool, bool, uint32_t>> {
public:
DBBasicTestWithParallelIO()
: DBBasicTestMultiGet("/db_basic_test_with_parallel_io", 1,
std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam()), std::get<3>(GetParam()),
std::get<4>(GetParam())) {}
};
TEST_P(DBBasicTestWithParallelIO, MultiGet) {
@ -2363,6 +2396,254 @@ INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
::testing::Bool(), ::testing::Bool(),
::testing::Values(1, 4)));
// A test class for intercepting random reads and injecting artificial
// delays. Used for testing the deadline/timeout feature
class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
public:
DBBasicTestMultiGetDeadline()
: DBBasicTestMultiGet("db_basic_test_multiget_deadline" /*Test dir*/,
10 /*# of column families*/,
false /*compressed cache enabled*/,
true /*uncompressed cache enabled*/,
true /*compression enabled*/,
true /*ReadOptions.fill_cache*/,
1 /*# of parallel compression threads*/) {}
// Forward declaration
class DeadlineFS;
class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper {
public:
DeadlineRandomAccessFile(DeadlineFS& fs,
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileWrapper(file.get()),
fs_(fs),
file_(std::move(file)) {}
IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
Slice* result, char* scratch, IODebugContext* dbg) const override {
int delay;
if (fs_.ShouldDelay(&delay)) {
Env::Default()->SleepForMicroseconds(delay);
}
return FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
dbg);
}
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options, IODebugContext* dbg) override {
int delay;
if (fs_.ShouldDelay(&delay)) {
Env::Default()->SleepForMicroseconds(delay);
}
return FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
}
private:
DeadlineFS& fs_;
std::unique_ptr<FSRandomAccessFile> file_;
};
class DeadlineFS : public FileSystemWrapper {
public:
DeadlineFS() : FileSystemWrapper(FileSystem::Default()) {}
~DeadlineFS() = default;
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s;
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(new DeadlineRandomAccessFile(*this, file));
return s;
}
// Set a vector of {IO counter, delay in microseconds} pairs that control
// when to inject a delay and duration of the delay
void SetDelaySequence(const std::vector<std::pair<int, int>>&& seq) {
int total_delay = 0;
for (auto& seq_iter : seq) {
// Ensure no individual delay is > 500ms
ASSERT_LT(seq_iter.second, 500000);
total_delay += seq_iter.second;
}
// ASSERT total delay is < 1s. This is mainly to keep the test from
// timing out in CI test frameworks
ASSERT_LT(total_delay, 1000000);
delay_seq_ = seq;
delay_idx_ = 0;
io_count_ = 0;
}
// Increment the IO counter and return a delay in microseconds
bool ShouldDelay(int* delay) {
if (delay_idx_ < delay_seq_.size() &&
delay_seq_[delay_idx_].first == io_count_++) {
*delay = delay_seq_[delay_idx_].second;
delay_idx_++;
return true;
}
return false;
}
private:
std::vector<std::pair<int, int>> delay_seq_;
size_t delay_idx_;
int io_count_;
};
inline void CheckStatus(std::vector<Status>& statuses, size_t num_ok) {
for (size_t i = 0; i < statuses.size(); ++i) {
if (i < num_ok) {
EXPECT_OK(statuses[i]);
} else {
EXPECT_EQ(statuses[i], Status::TimedOut());
}
}
}
};
TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
std::shared_ptr<DBBasicTestMultiGetDeadline::DeadlineFS> fs(
new DBBasicTestMultiGetDeadline::DeadlineFS());
std::unique_ptr<Env> env = NewCompositeEnv(fs);
Options options = CurrentOptions();
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
options.env = env.get();
ReopenWithColumnFamilies(GetCFNames(), options);
// Test the non-batched version of MultiGet with multiple column
// families
std::vector<std::string> key_str;
size_t i;
for (i = 0; i < 5; ++i) {
key_str.emplace_back(Key(static_cast<int>(i)));
}
std::vector<ColumnFamilyHandle*> cfs(key_str.size());
;
std::vector<Slice> keys(key_str.size());
std::vector<std::string> values(key_str.size());
for (i = 0; i < key_str.size(); ++i) {
cfs[i] = handles_[i];
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
// Delay the first IO by 200ms
fs->SetDelaySequence({{0, 200000}});
ReadOptions ro;
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
std::vector<Status> statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
std::cout << "Non-batched MultiGet";
// The first key is successful because we check after the lookup, but
// subsequent keys fail due to deadline exceeded
CheckStatus(statuses, 1);
// Clear the cache
cache->SetCapacity(0);
cache->SetCapacity(1048576);
// Test non-batched Multiget with multiple column families and
// introducing an IO delay in one of the middle CFs
key_str.clear();
for (i = 0; i < 10; ++i) {
key_str.emplace_back(Key(static_cast<int>(i)));
}
cfs.resize(key_str.size());
keys.resize(key_str.size());
values.resize(key_str.size());
for (i = 0; i < key_str.size(); ++i) {
// 2 keys per CF
cfs[i] = handles_[i / 2];
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
fs->SetDelaySequence({{1, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
std::cout << "Non-batched 2";
CheckStatus(statuses, 3);
// Test batched MultiGet with an IO delay in the first data block read.
// Both keys in the first CF should succeed as they're in the same data
// block and would form one batch, and we check for deadline between
// batches.
std::vector<PinnableSlice> pin_values(keys.size());
cache->SetCapacity(0);
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{0, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 1";
CheckStatus(statuses, 2);
// Similar to the previous one, but an IO delay in the third CF data block
// read
for (PinnableSlice& value : pin_values) {
value.Reset();
}
cache->SetCapacity(0);
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{2, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 2";
CheckStatus(statuses, 6);
// Similar to the previous one, but an IO delay in the last but one CF
for (PinnableSlice& value : pin_values) {
value.Reset();
}
cache->SetCapacity(0);
cache->SetCapacity(1048576);
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{3, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 3";
CheckStatus(statuses, 8);
// Test batched MultiGet with single CF and lots of keys. Inject delay
// into the second batch of keys. As each batch is 32, the first 64 keys,
// i.e first two batches, should succeed and the rest should time out
for (PinnableSlice& value : pin_values) {
value.Reset();
}
cache->SetCapacity(0);
cache->SetCapacity(1048576);
key_str.clear();
for (i = 0; i < 100; ++i) {
key_str.emplace_back(Key(static_cast<int>(i)));
}
keys.resize(key_str.size());
pin_values.clear();
pin_values.resize(key_str.size());
for (i = 0; i < key_str.size(); ++i) {
keys[i] = Slice(key_str[i].data(), key_str[i].size());
}
statuses.clear();
statuses.resize(keys.size());
fs->SetDelaySequence({{1, 200000}});
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched single CF";
CheckStatus(statuses, 64);
Close();
}
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -1529,6 +1529,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
get_impl_options.merge_operands != nullptr);
// We will eventually support deadline for Get requests too, but safeguard
// for now
if (read_options.deadline != std::chrono::microseconds::zero()) {
return Status::NotSupported("ReadOptions deadline is not supported");
}
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
@ -1760,14 +1766,16 @@ std::vector<Status> DBImpl::MultiGet(
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
size_t num_found = 0;
for (size_t i = 0; i < num_keys; ++i) {
size_t keys_read;
for (keys_read = 0; keys_read < num_keys; ++keys_read) {
merge_context.Clear();
Status& s = stat_list[i];
std::string* value = &(*values)[i];
std::string* timestamp = timestamps ? &(*timestamps)[i] : nullptr;
Status& s = stat_list[keys_read];
std::string* value = &(*values)[keys_read];
std::string* timestamp = timestamps ? &(*timestamps)[keys_read] : nullptr;
LookupKey lkey(keys[i], consistent_seqnum, read_options.timestamp);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
LookupKey lkey(keys[keys_read], consistent_seqnum, read_options.timestamp);
auto cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family[keys_read]);
SequenceNumber max_covering_tombstone_seq = 0;
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
assert(mgd_iter != multiget_cf_data.end());
@ -1803,6 +1811,22 @@ std::vector<Status> DBImpl::MultiGet(
bytes_read += value->size();
num_found++;
}
if (read_options.deadline.count() &&
env_->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
break;
}
}
if (keys_read < num_keys) {
// The only reason to break out of the loop is when the deadline is
// exceeded
assert(env_->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count()));
for (++keys_read; keys_read < num_keys; ++keys_read) {
stat_list[keys_read] = Status::TimedOut();
}
}
// Post processing (decrement reference counts and record statistics)
@ -2012,14 +2036,31 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum);
for (auto cf_iter = multiget_cf_data.begin();
cf_iter != multiget_cf_data.end(); ++cf_iter) {
MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys,
cf_iter->super_version, consistent_seqnum, nullptr, nullptr);
Status s;
auto cf_iter = multiget_cf_data.begin();
for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
&sorted_keys, cf_iter->super_version, consistent_seqnum,
nullptr, nullptr);
if (!s.ok()) {
break;
}
}
if (!s.ok()) {
assert(s.IsTimedOut());
for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
++i) {
*sorted_keys[i]->s = Status::TimedOut();
}
}
}
for (const auto& iter : multiget_cf_data) {
if (!unref_only) {
ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version);
ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version);
} else {
cf_iter->cfd->GetSuperVersion()->Unref();
iter.cfd->GetSuperVersion()->Unref();
}
}
}
@ -2160,14 +2201,24 @@ void DBImpl::MultiGetWithCallback(
consistent_seqnum = callback->max_visible_seq();
}
MultiGetImpl(read_options, 0, num_keys, sorted_keys,
multiget_cf_data[0].super_version, consistent_seqnum, nullptr,
nullptr);
Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
multiget_cf_data[0].super_version, consistent_seqnum,
nullptr, nullptr);
assert(s.ok() || s.IsTimedOut());
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version);
}
void DBImpl::MultiGetImpl(
// The actual implementation of batched MultiGet. Parameters -
// start_key - Index in the sorted_keys vector to start processing from
// num_keys - Number of keys to lookup, starting with sorted_keys[start_key]
// sorted_keys - The entire batch of sorted keys for this CF
//
// The per key status is returned in the KeyContext structures pointed to by
// sorted_keys. An overall Status is also returned, with the only possible
// values being Status::OK() and Status::TimedOut(). The latter indicates
// that the call exceeded read_options.deadline
Status DBImpl::MultiGetImpl(
const ReadOptions& read_options, size_t start_key, size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* super_version, SequenceNumber snapshot,
@ -2180,7 +2231,15 @@ void DBImpl::MultiGetImpl(
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
size_t keys_left = num_keys;
Status s;
while (keys_left) {
if (read_options.deadline.count() &&
env_->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
s = Status::TimedOut();
break;
}
size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
? MultiGetContext::MAX_BATCH_SIZE
: keys_left;
@ -2223,13 +2282,21 @@ void DBImpl::MultiGetImpl(
PERF_TIMER_GUARD(get_post_process_time);
size_t num_found = 0;
uint64_t bytes_read = 0;
for (size_t i = start_key; i < start_key + num_keys; ++i) {
for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
KeyContext* key = (*sorted_keys)[i];
if (key->s->ok()) {
bytes_read += key->value->size();
num_found++;
}
}
if (keys_left) {
assert(s.IsTimedOut());
for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
++i) {
KeyContext* key = (*sorted_keys)[i];
*key->s = Status::TimedOut();
}
}
RecordTick(stats_, NUMBER_MULTIGET_CALLS);
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
@ -2238,6 +2305,8 @@ void DBImpl::MultiGetImpl(
RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
PERF_TIMER_STOP(get_post_process_time);
return s;
}
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
@ -2526,6 +2595,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
return NewErrorIterator(
Status::NotSupported("Managed iterator is not supported anymore."));
}
// We will eventually support deadline for iterators too, but safeguard
// for now
if (read_options.deadline != std::chrono::microseconds::zero()) {
return NewErrorIterator(
Status::NotSupported("ReadOptions deadline is not supported"));
}
Iterator* result = nullptr;
if (read_options.read_tier == kPersistedTier) {
return NewErrorIterator(Status::NotSupported(

@ -1769,7 +1769,7 @@ class DBImpl : public DB {
// to have acquired the SuperVersion and pass in a snapshot sequence number
// in order to construct the LookupKeys. The start_key and num_keys specify
// the range of keys in the sorted_keys vector for a single column family.
void MultiGetImpl(
Status MultiGetImpl(
const ReadOptions& read_options, size_t start_key, size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback,

@ -1346,6 +1346,15 @@ struct ReadOptions {
const Slice* timestamp;
const Slice* iter_start_ts;
// Deadline for completing the read request (only MultiGet for now) in us.
// It should be set to some number of microseconds since a fixed point in
// time, identical to that used by system time. The best way is to use
// env->NowMicros() + some timeout. This is best efforts. The call may
// exceed the deadline if there is IO involved and the file system doesn't
// support deadlines, or due to checking for deadline periodically rather
// than for every key if processing a batch
std::chrono::microseconds deadline;
ReadOptions();
ReadOptions(bool cksum, bool cache);
};

@ -607,7 +607,8 @@ ReadOptions::ReadOptions()
ignore_range_deletions(false),
iter_start_seqnum(0),
timestamp(nullptr),
iter_start_ts(nullptr) {}
iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()) {}
ReadOptions::ReadOptions(bool cksum, bool cache)
: snapshot(nullptr),
@ -628,6 +629,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
ignore_range_deletions(false),
iter_start_seqnum(0),
timestamp(nullptr),
iter_start_ts(nullptr) {}
iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()) {}
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save