diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index e1bd48f0b..4d2619c14 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1890,17 +1890,14 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) { } #endif // !ROCKSDB_LITE -class DBBasicTestWithParallelIO - : public DBTestBase, - public testing::WithParamInterface< - std::tuple> { +class DBBasicTestMultiGet : public DBTestBase { public: - DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") { - bool compressed_cache = std::get<0>(GetParam()); - bool uncompressed_cache = std::get<1>(GetParam()); - compression_enabled_ = std::get<2>(GetParam()); - fill_cache_ = std::get<3>(GetParam()); - uint32_t compression_parallel_threads = std::get<4>(GetParam()); + DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache, + bool uncompressed_cache, bool compression_enabled, + bool fill_cache, uint32_t compression_parallel_threads) + : DBTestBase(test_dir) { + compression_enabled_ = compression_enabled; + fill_cache_ = fill_cache; if (compressed_cache) { std::shared_ptr cache = NewLRUCache(1048576); @@ -1960,22 +1957,43 @@ class DBBasicTestWithParallelIO } Reopen(options); - std::string zero_str(128, '\0'); - for (int i = 0; i < 100; ++i) { - // Make the value compressible. A purely random string doesn't compress - // and the resultant data block will not be compressed - values_.emplace_back(RandomString(&rnd, 128) + zero_str); - assert(Put(Key(i), values_[i]) == Status::OK()); + if (num_cfs > 1) { + for (int cf = 0; cf < num_cfs; ++cf) { + cf_names_.emplace_back("cf" + std::to_string(cf)); + } + CreateColumnFamilies(cf_names_, options); + cf_names_.emplace_back("default"); } - Flush(); - for (int i = 0; i < 100; ++i) { - // block cannot gain space by compression - uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0'); - std::string tmp_key = "a" + Key(i); - assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK()); + std::string zero_str(128, '\0'); + for (int cf = 0; cf < num_cfs; ++cf) { + for (int i = 0; i < 100; ++i) { + // Make the value compressible. A purely random string doesn't compress + // and the resultant data block will not be compressed + values_.emplace_back(RandomString(&rnd, 128) + zero_str); + assert(((num_cfs == 1) ? Put(Key(i), values_[i]) + : Put(cf, Key(i), values_[i])) == Status::OK()); + } + if (num_cfs == 1) { + Flush(); + } else { + dbfull()->Flush(FlushOptions(), handles_[cf]); + } + + for (int i = 0; i < 100; ++i) { + // block cannot gain space by compression + uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0'); + std::string tmp_key = "a" + Key(i); + assert(((num_cfs == 1) ? Put(tmp_key, uncompressable_values_[i]) + : Put(cf, tmp_key, uncompressable_values_[i])) == + Status::OK()); + } + if (num_cfs == 1) { + Flush(); + } else { + dbfull()->Flush(FlushOptions(), handles_[cf]); + } } - Flush(); } bool CheckValue(int i, const std::string& value) { @@ -1992,6 +2010,8 @@ class DBBasicTestWithParallelIO return false; } + const std::vector& GetCFNames() const { return cf_names_; } + int num_lookups() { return uncompressed_cache_->num_lookups(); } int num_found() { return uncompressed_cache_->num_found(); } int num_inserts() { return uncompressed_cache_->num_inserts(); } @@ -2008,7 +2028,7 @@ class DBBasicTestWithParallelIO static void SetUpTestCase() {} static void TearDownTestCase() {} - private: + protected: class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory { public: MyFlushBlockPolicyFactory() {} @@ -2143,6 +2163,19 @@ class DBBasicTestWithParallelIO std::vector values_; std::vector uncompressable_values_; bool fill_cache_; + std::vector cf_names_; +}; + +class DBBasicTestWithParallelIO + : public DBBasicTestMultiGet, + public testing::WithParamInterface< + std::tuple> { + public: + DBBasicTestWithParallelIO() + : DBBasicTestMultiGet("/db_basic_test_with_parallel_io", 1, + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()), std::get<3>(GetParam()), + std::get<4>(GetParam())) {} }; TEST_P(DBBasicTestWithParallelIO, MultiGet) { @@ -2363,6 +2396,254 @@ INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO, ::testing::Bool(), ::testing::Bool(), ::testing::Values(1, 4))); +// A test class for intercepting random reads and injecting artificial +// delays. Used for testing the deadline/timeout feature +class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { + public: + DBBasicTestMultiGetDeadline() + : DBBasicTestMultiGet("db_basic_test_multiget_deadline" /*Test dir*/, + 10 /*# of column families*/, + false /*compressed cache enabled*/, + true /*uncompressed cache enabled*/, + true /*compression enabled*/, + true /*ReadOptions.fill_cache*/, + 1 /*# of parallel compression threads*/) {} + + // Forward declaration + class DeadlineFS; + + class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper { + public: + DeadlineRandomAccessFile(DeadlineFS& fs, + std::unique_ptr& file) + : FSRandomAccessFileWrapper(file.get()), + fs_(fs), + file_(std::move(file)) {} + + IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts, + Slice* result, char* scratch, IODebugContext* dbg) const override { + int delay; + if (fs_.ShouldDelay(&delay)) { + Env::Default()->SleepForMicroseconds(delay); + } + return FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch, + dbg); + } + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, IODebugContext* dbg) override { + int delay; + if (fs_.ShouldDelay(&delay)) { + Env::Default()->SleepForMicroseconds(delay); + } + return FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg); + } + + private: + DeadlineFS& fs_; + std::unique_ptr file_; + }; + + class DeadlineFS : public FileSystemWrapper { + public: + DeadlineFS() : FileSystemWrapper(FileSystem::Default()) {} + ~DeadlineFS() = default; + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + std::unique_ptr file; + IOStatus s; + + s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + result->reset(new DeadlineRandomAccessFile(*this, file)); + return s; + } + + // Set a vector of {IO counter, delay in microseconds} pairs that control + // when to inject a delay and duration of the delay + void SetDelaySequence(const std::vector>&& seq) { + int total_delay = 0; + for (auto& seq_iter : seq) { + // Ensure no individual delay is > 500ms + ASSERT_LT(seq_iter.second, 500000); + total_delay += seq_iter.second; + } + // ASSERT total delay is < 1s. This is mainly to keep the test from + // timing out in CI test frameworks + ASSERT_LT(total_delay, 1000000); + delay_seq_ = seq; + delay_idx_ = 0; + io_count_ = 0; + } + + // Increment the IO counter and return a delay in microseconds + bool ShouldDelay(int* delay) { + if (delay_idx_ < delay_seq_.size() && + delay_seq_[delay_idx_].first == io_count_++) { + *delay = delay_seq_[delay_idx_].second; + delay_idx_++; + return true; + } + return false; + } + + private: + std::vector> delay_seq_; + size_t delay_idx_; + int io_count_; + }; + + inline void CheckStatus(std::vector& statuses, size_t num_ok) { + for (size_t i = 0; i < statuses.size(); ++i) { + if (i < num_ok) { + EXPECT_OK(statuses[i]); + } else { + EXPECT_EQ(statuses[i], Status::TimedOut()); + } + } + } +}; + +TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { + std::shared_ptr fs( + new DBBasicTestMultiGetDeadline::DeadlineFS()); + std::unique_ptr env = NewCompositeEnv(fs); + Options options = CurrentOptions(); + + std::shared_ptr cache = NewLRUCache(1048576); + BlockBasedTableOptions table_options; + table_options.block_cache = cache; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + options.env = env.get(); + ReopenWithColumnFamilies(GetCFNames(), options); + + // Test the non-batched version of MultiGet with multiple column + // families + std::vector key_str; + size_t i; + for (i = 0; i < 5; ++i) { + key_str.emplace_back(Key(static_cast(i))); + } + std::vector cfs(key_str.size()); + ; + std::vector keys(key_str.size()); + std::vector values(key_str.size()); + for (i = 0; i < key_str.size(); ++i) { + cfs[i] = handles_[i]; + keys[i] = Slice(key_str[i].data(), key_str[i].size()); + } + // Delay the first IO by 200ms + fs->SetDelaySequence({{0, 200000}}); + + ReadOptions ro; + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + std::vector statuses = dbfull()->MultiGet(ro, cfs, keys, &values); + std::cout << "Non-batched MultiGet"; + // The first key is successful because we check after the lookup, but + // subsequent keys fail due to deadline exceeded + CheckStatus(statuses, 1); + + // Clear the cache + cache->SetCapacity(0); + cache->SetCapacity(1048576); + // Test non-batched Multiget with multiple column families and + // introducing an IO delay in one of the middle CFs + key_str.clear(); + for (i = 0; i < 10; ++i) { + key_str.emplace_back(Key(static_cast(i))); + } + cfs.resize(key_str.size()); + keys.resize(key_str.size()); + values.resize(key_str.size()); + for (i = 0; i < key_str.size(); ++i) { + // 2 keys per CF + cfs[i] = handles_[i / 2]; + keys[i] = Slice(key_str[i].data(), key_str[i].size()); + } + fs->SetDelaySequence({{1, 200000}}); + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + statuses = dbfull()->MultiGet(ro, cfs, keys, &values); + std::cout << "Non-batched 2"; + CheckStatus(statuses, 3); + + // Test batched MultiGet with an IO delay in the first data block read. + // Both keys in the first CF should succeed as they're in the same data + // block and would form one batch, and we check for deadline between + // batches. + std::vector pin_values(keys.size()); + cache->SetCapacity(0); + cache->SetCapacity(1048576); + statuses.clear(); + statuses.resize(keys.size()); + fs->SetDelaySequence({{0, 200000}}); + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), + pin_values.data(), statuses.data()); + std::cout << "Batched 1"; + CheckStatus(statuses, 2); + + // Similar to the previous one, but an IO delay in the third CF data block + // read + for (PinnableSlice& value : pin_values) { + value.Reset(); + } + cache->SetCapacity(0); + cache->SetCapacity(1048576); + statuses.clear(); + statuses.resize(keys.size()); + fs->SetDelaySequence({{2, 200000}}); + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), + pin_values.data(), statuses.data()); + std::cout << "Batched 2"; + CheckStatus(statuses, 6); + + // Similar to the previous one, but an IO delay in the last but one CF + for (PinnableSlice& value : pin_values) { + value.Reset(); + } + cache->SetCapacity(0); + cache->SetCapacity(1048576); + statuses.clear(); + statuses.resize(keys.size()); + fs->SetDelaySequence({{3, 200000}}); + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(), + pin_values.data(), statuses.data()); + std::cout << "Batched 3"; + CheckStatus(statuses, 8); + + // Test batched MultiGet with single CF and lots of keys. Inject delay + // into the second batch of keys. As each batch is 32, the first 64 keys, + // i.e first two batches, should succeed and the rest should time out + for (PinnableSlice& value : pin_values) { + value.Reset(); + } + cache->SetCapacity(0); + cache->SetCapacity(1048576); + key_str.clear(); + for (i = 0; i < 100; ++i) { + key_str.emplace_back(Key(static_cast(i))); + } + keys.resize(key_str.size()); + pin_values.clear(); + pin_values.resize(key_str.size()); + for (i = 0; i < key_str.size(); ++i) { + keys[i] = Slice(key_str[i].data(), key_str[i].size()); + } + statuses.clear(); + statuses.resize(keys.size()); + fs->SetDelaySequence({{1, 200000}}); + ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(), + pin_values.data(), statuses.data()); + std::cout << "Batched single CF"; + CheckStatus(statuses, 64); + Close(); +} + } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index cb033c0cf..a51a122f5 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1529,6 +1529,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, GetImplOptions& get_impl_options) { assert(get_impl_options.value != nullptr || get_impl_options.merge_operands != nullptr); + // We will eventually support deadline for Get requests too, but safeguard + // for now + if (read_options.deadline != std::chrono::microseconds::zero()) { + return Status::NotSupported("ReadOptions deadline is not supported"); + } + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_GET); PERF_TIMER_GUARD(get_snapshot_time); @@ -1760,14 +1766,16 @@ std::vector DBImpl::MultiGet( // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. size_t num_found = 0; - for (size_t i = 0; i < num_keys; ++i) { + size_t keys_read; + for (keys_read = 0; keys_read < num_keys; ++keys_read) { merge_context.Clear(); - Status& s = stat_list[i]; - std::string* value = &(*values)[i]; - std::string* timestamp = timestamps ? &(*timestamps)[i] : nullptr; + Status& s = stat_list[keys_read]; + std::string* value = &(*values)[keys_read]; + std::string* timestamp = timestamps ? &(*timestamps)[keys_read] : nullptr; - LookupKey lkey(keys[i], consistent_seqnum, read_options.timestamp); - auto cfh = reinterpret_cast(column_family[i]); + LookupKey lkey(keys[keys_read], consistent_seqnum, read_options.timestamp); + auto cfh = + static_cast_with_check(column_family[keys_read]); SequenceNumber max_covering_tombstone_seq = 0; auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); @@ -1803,6 +1811,22 @@ std::vector DBImpl::MultiGet( bytes_read += value->size(); num_found++; } + + if (read_options.deadline.count() && + env_->NowMicros() > + static_cast(read_options.deadline.count())) { + break; + } + } + + if (keys_read < num_keys) { + // The only reason to break out of the loop is when the deadline is + // exceeded + assert(env_->NowMicros() > + static_cast(read_options.deadline.count())); + for (++keys_read; keys_read < num_keys; ++keys_read) { + stat_list[keys_read] = Status::TimedOut(); + } } // Post processing (decrement reference counts and record statistics) @@ -2012,14 +2036,31 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, read_options, nullptr, iter_deref_lambda, &multiget_cf_data, &consistent_seqnum); - for (auto cf_iter = multiget_cf_data.begin(); - cf_iter != multiget_cf_data.end(); ++cf_iter) { - MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys, - cf_iter->super_version, consistent_seqnum, nullptr, nullptr); + Status s; + auto cf_iter = multiget_cf_data.begin(); + for (; cf_iter != multiget_cf_data.end(); ++cf_iter) { + s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, + &sorted_keys, cf_iter->super_version, consistent_seqnum, + nullptr, nullptr); + if (!s.ok()) { + break; + } + } + if (!s.ok()) { + assert(s.IsTimedOut()); + for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) { + for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys; + ++i) { + *sorted_keys[i]->s = Status::TimedOut(); + } + } + } + + for (const auto& iter : multiget_cf_data) { if (!unref_only) { - ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version); + ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version); } else { - cf_iter->cfd->GetSuperVersion()->Unref(); + iter.cfd->GetSuperVersion()->Unref(); } } } @@ -2160,14 +2201,24 @@ void DBImpl::MultiGetWithCallback( consistent_seqnum = callback->max_visible_seq(); } - MultiGetImpl(read_options, 0, num_keys, sorted_keys, - multiget_cf_data[0].super_version, consistent_seqnum, nullptr, - nullptr); + Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, + multiget_cf_data[0].super_version, consistent_seqnum, + nullptr, nullptr); + assert(s.ok() || s.IsTimedOut()); ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, multiget_cf_data[0].super_version); } -void DBImpl::MultiGetImpl( +// The actual implementation of batched MultiGet. Parameters - +// start_key - Index in the sorted_keys vector to start processing from +// num_keys - Number of keys to lookup, starting with sorted_keys[start_key] +// sorted_keys - The entire batch of sorted keys for this CF +// +// The per key status is returned in the KeyContext structures pointed to by +// sorted_keys. An overall Status is also returned, with the only possible +// values being Status::OK() and Status::TimedOut(). The latter indicates +// that the call exceeded read_options.deadline +Status DBImpl::MultiGetImpl( const ReadOptions& read_options, size_t start_key, size_t num_keys, autovector* sorted_keys, SuperVersion* super_version, SequenceNumber snapshot, @@ -2180,7 +2231,15 @@ void DBImpl::MultiGetImpl( // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. size_t keys_left = num_keys; + Status s; while (keys_left) { + if (read_options.deadline.count() && + env_->NowMicros() > + static_cast(read_options.deadline.count())) { + s = Status::TimedOut(); + break; + } + size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE) ? MultiGetContext::MAX_BATCH_SIZE : keys_left; @@ -2223,13 +2282,21 @@ void DBImpl::MultiGetImpl( PERF_TIMER_GUARD(get_post_process_time); size_t num_found = 0; uint64_t bytes_read = 0; - for (size_t i = start_key; i < start_key + num_keys; ++i) { + for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) { KeyContext* key = (*sorted_keys)[i]; if (key->s->ok()) { bytes_read += key->value->size(); num_found++; } } + if (keys_left) { + assert(s.IsTimedOut()); + for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys; + ++i) { + KeyContext* key = (*sorted_keys)[i]; + *key->s = Status::TimedOut(); + } + } RecordTick(stats_, NUMBER_MULTIGET_CALLS); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); @@ -2238,6 +2305,8 @@ void DBImpl::MultiGetImpl( RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read); PERF_COUNTER_ADD(multiget_read_bytes, bytes_read); PERF_TIMER_STOP(get_post_process_time); + + return s; } Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, @@ -2526,6 +2595,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, return NewErrorIterator( Status::NotSupported("Managed iterator is not supported anymore.")); } + // We will eventually support deadline for iterators too, but safeguard + // for now + if (read_options.deadline != std::chrono::microseconds::zero()) { + return NewErrorIterator( + Status::NotSupported("ReadOptions deadline is not supported")); + } Iterator* result = nullptr; if (read_options.read_tier == kPersistedTier) { return NewErrorIterator(Status::NotSupported( diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 9e33955d6..5f1a0dbf9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1769,7 +1769,7 @@ class DBImpl : public DB { // to have acquired the SuperVersion and pass in a snapshot sequence number // in order to construct the LookupKeys. The start_key and num_keys specify // the range of keys in the sorted_keys vector for a single column family. - void MultiGetImpl( + Status MultiGetImpl( const ReadOptions& read_options, size_t start_key, size_t num_keys, autovector* sorted_keys, SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1806536fb..16c09f161 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1346,6 +1346,15 @@ struct ReadOptions { const Slice* timestamp; const Slice* iter_start_ts; + // Deadline for completing the read request (only MultiGet for now) in us. + // It should be set to some number of microseconds since a fixed point in + // time, identical to that used by system time. The best way is to use + // env->NowMicros() + some timeout. This is best efforts. The call may + // exceed the deadline if there is IO involved and the file system doesn't + // support deadlines, or due to checking for deadline periodically rather + // than for every key if processing a batch + std::chrono::microseconds deadline; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/options/options.cc b/options/options.cc index 980841127..0e2094d4c 100644 --- a/options/options.cc +++ b/options/options.cc @@ -607,7 +607,8 @@ ReadOptions::ReadOptions() ignore_range_deletions(false), iter_start_seqnum(0), timestamp(nullptr), - iter_start_ts(nullptr) {} + iter_start_ts(nullptr), + deadline(std::chrono::microseconds::zero()) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -628,6 +629,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) ignore_range_deletions(false), iter_start_seqnum(0), timestamp(nullptr), - iter_start_ts(nullptr) {} + iter_start_ts(nullptr), + deadline(std::chrono::microseconds::zero()) {} } // namespace ROCKSDB_NAMESPACE