Reuse internal auto readhead_size at each Level (expect L0) for Iterations (#9056)

Summary:
RocksDB does auto-readahead for iterators on noticing more than two sequential reads for a table file if user doesn't provide readahead_size. The readahead starts at 8KB and doubles on every additional read up to max_auto_readahead_size. However at each level, if iterator moves over next file, readahead_size starts again from 8KB.

This PR introduces a new ReadOption "adaptive_readahead" which when set true will maintain readahead_size  at each level. So when iterator moves from one file to another, new file's readahead_size will continue from previous file's readahead_size instead of scratch. However if reads are not sequential it will fall back to 8KB (default) with no prefetching for that block.

1. If block is found in cache but it was eligible for prefetch (block wasn't in Rocksdb's prefetch buffer),  readahead_size will decrease by 8KB.
2. It maintains readahead_size for L1 - Ln levels.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9056

Test Plan:
Added new unit tests
Ran db_bench for "readseq, seekrandom, seekrandomwhilewriting, readrandom" with --adaptive_readahead=true and there was no regression if new feature is enabled.

Reviewed By: anand1976

Differential Revision: D31773640

Pulled By: akankshamahajan15

fbshipit-source-id: 7332d16258b846ae5cea773009195a5af58f8f98
main
Akanksha Mahajan 3 years ago committed by Facebook GitHub Bot
parent afcd32533c
commit 17ce1ca48b
  1. 1
      HISTORY.md
  2. 13
      db/version_set.cc
  3. 2
      file/file_prefetch_buffer.cc
  4. 50
      file/file_prefetch_buffer.h
  5. 303
      file/prefetch_test.cc
  6. 33
      file/readahead_file_info.h
  7. 13
      include/rocksdb/options.h
  8. 6
      options/options.cc
  9. 1
      table/block_based/block_based_table_iterator.cc
  10. 26
      table/block_based/block_based_table_iterator.h
  11. 4
      table/block_based/block_based_table_reader.cc
  12. 9
      table/block_based/block_prefetcher.cc
  13. 19
      table/block_based/block_prefetcher.h
  14. 1
      table/block_based/partitioned_index_iterator.cc
  15. 23
      table/block_based/partitioned_index_iterator.h
  16. 1
      table/block_based/partitioned_index_reader.cc
  17. 14
      table/internal_iterator.h
  18. 8
      table/iterator_wrapper.h
  19. 12
      tools/db_bench_tool.cc

@ -4,6 +4,7 @@
* Added new ChecksumType kXXH3 which is faster than kCRC32c on almost all x86\_64 hardware. * Added new ChecksumType kXXH3 which is faster than kCRC32c on almost all x86\_64 hardware.
* Added a new online consistency check for BlobDB which validates that the number/total size of garbage blobs does not exceed the number/total size of all blobs in any given blob file. * Added a new online consistency check for BlobDB which validates that the number/total size of garbage blobs does not exceed the number/total size of all blobs in any given blob file.
* Provided support for tracking per-sst user-defined timestamp information in MANIFEST. * Provided support for tracking per-sst user-defined timestamp information in MANIFEST.
* Added new option "adaptive_readahead" in ReadOptions. For iterators, RocksDB does auto-readahead on noticing sequential reads and by enabling this option, readahead_size of current file (if reads are sequential) will be carried forward to next file instead of starting from the scratch at each level (except L0 level files). If reads are not sequential it will fall back to 8KB. This option is applicable only for RocksDB internal prefetch buffer and isn't supported with underlying file system prefetching.
### Bug Fixes ### Bug Fixes
* Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption. * Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption.

@ -880,7 +880,8 @@ class LevelIterator final : public InternalIterator {
level_(level), level_(level),
range_del_agg_(range_del_agg), range_del_agg_(range_del_agg),
pinned_iters_mgr_(nullptr), pinned_iters_mgr_(nullptr),
compaction_boundaries_(compaction_boundaries) { compaction_boundaries_(compaction_boundaries),
is_next_read_sequential_(false) {
// Empty level is not supported. // Empty level is not supported.
assert(flevel_ != nullptr && flevel_->num_files > 0); assert(flevel_ != nullptr && flevel_->num_files > 0);
} }
@ -1027,6 +1028,8 @@ class LevelIterator final : public InternalIterator {
// To be propagated to RangeDelAggregator in order to safely truncate range // To be propagated to RangeDelAggregator in order to safely truncate range
// tombstones. // tombstones.
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_; const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
bool is_next_read_sequential_;
}; };
void LevelIterator::Seek(const Slice& target) { void LevelIterator::Seek(const Slice& target) {
@ -1128,7 +1131,9 @@ bool LevelIterator::NextAndGetResult(IterateResult* result) {
assert(Valid()); assert(Valid());
bool is_valid = file_iter_.NextAndGetResult(result); bool is_valid = file_iter_.NextAndGetResult(result);
if (!is_valid) { if (!is_valid) {
is_next_read_sequential_ = true;
SkipEmptyFileForward(); SkipEmptyFileForward();
is_next_read_sequential_ = false;
is_valid = Valid(); is_valid = Valid();
if (is_valid) { if (is_valid) {
result->key = key(); result->key = key();
@ -1195,6 +1200,12 @@ void LevelIterator::SetFileIterator(InternalIterator* iter) {
} }
InternalIterator* old_iter = file_iter_.Set(iter); InternalIterator* old_iter = file_iter_.Set(iter);
// Update the read pattern for PrefetchBuffer.
if (is_next_read_sequential_) {
file_iter_.UpdateReadaheadState(old_iter);
}
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIterator(old_iter); pinned_iters_mgr_->PinIterator(old_iter);
} else { } else {

@ -161,6 +161,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
#endif #endif
return false; return false;
} }
TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache",
&readahead_size_);
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
} else { } else {
return false; return false;

@ -8,11 +8,13 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#include <algorithm>
#include <atomic> #include <atomic>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include "file/random_access_file_reader.h" #include "file/random_access_file_reader.h"
#include "file/readahead_file_info.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -20,6 +22,8 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
#define DEAFULT_DECREMENT 8 * 1024
// FilePrefetchBuffer is a smart buffer to store and read data from a file. // FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer { class FilePrefetchBuffer {
public: public:
@ -90,7 +94,14 @@ class FilePrefetchBuffer {
// tracked if track_min_offset = true. // tracked if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; } size_t min_offset_read() const { return min_offset_read_; }
void UpdateReadPattern(const size_t& offset, const size_t& len) { void UpdateReadPattern(const uint64_t& offset, const size_t& len,
bool is_adaptive_readahead = false) {
if (is_adaptive_readahead) {
// Since this block was eligible for prefetch but it was found in
// cache, so check and decrease the readahead_size by 8KB (default)
// if eligible.
DecreaseReadAheadIfEligible(offset, len);
}
prev_offset_ = offset; prev_offset_ = offset;
prev_len_ = len; prev_len_ = len;
} }
@ -104,11 +115,40 @@ class FilePrefetchBuffer {
readahead_size_ = initial_readahead_size_; readahead_size_ = initial_readahead_size_;
} }
void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) {
readahead_info->readahead_size = readahead_size_;
readahead_info->num_file_reads = num_file_reads_;
}
void DecreaseReadAheadIfEligible(uint64_t offset, size_t size,
size_t value = DEAFULT_DECREMENT) {
// Decrease the readahead_size if
// - its enabled internally by RocksDB (implicit_auto_readahead_) and,
// - readahead_size is greater than 0 and,
// - this block would have called prefetch API if not found in cache for
// which conditions are:
// - few/no bytes are in buffer and,
// - block is sequential with the previous read and,
// - num_file_reads_ + 1 (including this read) >
// kMinNumFileReadsToStartAutoReadahead
if (implicit_auto_readahead_ && readahead_size_ > 0) {
if ((offset + size > buffer_offset_ + buffer_.CurrentSize()) &&
IsBlockSequential(offset) &&
(num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) {
readahead_size_ =
std::max(initial_readahead_size_,
(readahead_size_ >= value ? readahead_size_ - value : 0));
}
}
}
private: private:
AlignedBuffer buffer_; AlignedBuffer buffer_;
uint64_t buffer_offset_; uint64_t buffer_offset_;
RandomAccessFileReader* file_reader_; RandomAccessFileReader* file_reader_;
size_t readahead_size_; size_t readahead_size_;
// FilePrefetchBuffer object won't be created from Iterator flow if
// max_readahead_size_ = 0.
size_t max_readahead_size_; size_t max_readahead_size_;
size_t initial_readahead_size_; size_t initial_readahead_size_;
// The minimum `offset` ever passed to TryReadFromCache(). // The minimum `offset` ever passed to TryReadFromCache().
@ -120,11 +160,11 @@ class FilePrefetchBuffer {
// can be fetched from min_offset_read(). // can be fetched from min_offset_read().
bool track_min_offset_; bool track_min_offset_;
// implicit_auto_readahead is enabled by rocksdb internally after 2 sequential // implicit_auto_readahead is enabled by rocksdb internally after 2
// IOs. // sequential IOs.
bool implicit_auto_readahead_; bool implicit_auto_readahead_;
size_t prev_offset_; uint64_t prev_offset_;
size_t prev_len_; size_t prev_len_;
int num_file_reads_; int64_t num_file_reads_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -670,6 +670,309 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
Close(); Close();
} }
class PrefetchTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool());
#ifndef ROCKSDB_LITE
TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
const int kNumKeys = 1000;
// Set options
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (GetParam()) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
int buff_prefetch_count = 0;
int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2);
size_t current_readahead_size = 0;
// Test - Iterate over the keys sequentially.
{
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
// The callback checks, since reads are sequential, readahead_size doesn't
// start from 8KB when iterator moves to next file and its called
// num_sst_files-1 times (excluding for first file).
SyncPoint::GetInstance()->SetCallBack(
"BlockPrefetcher::SetReadaheadState", [&](void* arg) {
readahead_carry_over_count++;
size_t readahead_size = *reinterpret_cast<size_t*>(arg);
if (readahead_carry_over_count) {
ASSERT_GT(readahead_size, 8 * 1024);
// ASSERT_GE(readahead_size, current_readahead_size);
}
});
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *reinterpret_cast<size_t*>(arg);
});
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++;
}
ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0;
// For index and data blocks.
ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
Close();
}
TEST_P(PrefetchTest1, NonSequentialReads) {
const int kNumKeys = 1000;
// Set options
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (GetParam()) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int j = 0; j < 5; j++) {
for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(Flush());
}
MoveFilesToLevel(2);
int buff_prefetch_count = 0;
int set_readahead = 0;
size_t readahead_size = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"BlockPrefetcher::SetReadaheadState",
[&](void* /*arg*/) { set_readahead++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache",
[&](void* arg) { readahead_size = *reinterpret_cast<size_t*>(arg); });
SyncPoint::GetInstance()->EnableProcessing();
{
// Iterate until prefetch is done.
ReadOptions ro;
ro.adaptive_readahead = true;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->SeekToFirst();
while (iter->Valid() && buff_prefetch_count == 0) {
iter->Next();
}
ASSERT_EQ(readahead_size, 8 * 1024);
ASSERT_EQ(buff_prefetch_count, 1);
ASSERT_EQ(set_readahead, 0);
buff_prefetch_count = 0;
// Move to last file and check readahead size fallbacks to 8KB. So next
// readahead size after prefetch should be 8 * 1024;
iter->Seek(BuildKey(4004));
while (iter->Valid() && buff_prefetch_count == 0) {
iter->Next();
}
ASSERT_EQ(readahead_size, 8 * 1024);
ASSERT_EQ(set_readahead, 0);
ASSERT_EQ(buff_prefetch_count, 1);
}
Close();
}
#endif //! ROCKSDB_LITE
TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
const int kNumKeys = 2000;
// Set options
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (GetParam()) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
BlockBasedTableOptions table_options;
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Status s = TryReopen(options);
if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
int buff_prefetch_count = 0;
size_t current_readahead_size = 0;
size_t expected_current_readahead_size = 8 * 1024;
size_t decrease_readahead_size = 8 * 1024;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
current_readahead_size = *reinterpret_cast<size_t*>(arg);
});
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
{
/*
* Reseek keys from sequential Data Blocks within same partitioned
* index. After 2 sequential reads it will prefetch the data block.
* Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data
* more initially (2 more data blocks).
*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
// Warm up the cache
iter->Seek(BuildKey(1011));
iter->Seek(BuildKey(1015));
iter->Seek(BuildKey(1019));
buff_prefetch_count = 0;
}
{
// After caching, blocks will be read from cache (Sequential blocks)
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
iter->Seek(BuildKey(0));
iter->Seek(BuildKey(1000));
iter->Seek(BuildKey(1004)); // Prefetch data (not in cache).
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
// Missed one sequential block but 1011 is already in buffer so
// readahead will not be reset.
iter->Seek(BuildKey(1011));
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
// Eligible to Prefetch data (not in buffer) but block is in cache so no
// prefetch will happen and will result in decrease in readahead_size.
// readahead_size will be 8 * 1024
iter->Seek(BuildKey(1015));
expected_current_readahead_size -= decrease_readahead_size;
// 1016 is the same block as 1015. So no change in readahead_size.
iter->Seek(BuildKey(1016));
// Prefetch data (not in buffer) but found in cache. So decrease
// readahead_size. Since it will 0 after decrementing so readahead_size will
// be set to initial value.
iter->Seek(BuildKey(1019));
expected_current_readahead_size = std::max(
decrease_readahead_size,
(expected_current_readahead_size >= decrease_readahead_size
? (expected_current_readahead_size - decrease_readahead_size)
: 0));
// Prefetch next sequential data.
iter->Seek(BuildKey(1022));
ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
ASSERT_EQ(buff_prefetch_count, 2);
buff_prefetch_count = 0;
}
Close();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -0,0 +1,33 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cstddef>
#include <cstdint>
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
// struct ReadaheadFileInfo contains readahead information that is passed from
// one file to another file per level during iterations. This information helps
// iterators to carry forward the internal automatic prefetching readahead value
// to next file during sequential reads instead of starting from the scratch.
struct ReadaheadFileInfo {
struct ReadaheadInfo {
size_t readahead_size = 0;
int64_t num_file_reads = 0;
};
// Used by Data block iterators to update readahead info.
ReadaheadInfo data_block_readahead_info;
// Used by Index block iterators to update readahead info.
ReadaheadInfo index_block_readahead_info;
};
} // namespace ROCKSDB_NAMESPACE

@ -1602,6 +1602,19 @@ struct ReadOptions {
// Default: std::numeric_limits<uint64_t>::max() // Default: std::numeric_limits<uint64_t>::max()
uint64_t value_size_soft_limit; uint64_t value_size_soft_limit;
// For iterators, RocksDB does auto-readahead on noticing more than two
// sequential reads for a table file if user doesn't provide readahead_size.
// The readahead starts at 8KB and doubles on every additional read upto
// max_auto_readahead_size only when reads are sequential. However at each
// level, if iterator moves over next file, readahead_size starts again from
// 8KB.
//
// By enabling this option, RocksDB will do some enhancements for
// prefetching the data.
//
// Default: false
bool adaptive_readahead;
ReadOptions(); ReadOptions();
ReadOptions(bool cksum, bool cache); ReadOptions(bool cksum, bool cache);
}; };

@ -650,7 +650,8 @@ ReadOptions::ReadOptions()
iter_start_ts(nullptr), iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()), deadline(std::chrono::microseconds::zero()),
io_timeout(std::chrono::microseconds::zero()), io_timeout(std::chrono::microseconds::zero()),
value_size_soft_limit(std::numeric_limits<uint64_t>::max()) {} value_size_soft_limit(std::numeric_limits<uint64_t>::max()),
adaptive_readahead(false) {}
ReadOptions::ReadOptions(bool cksum, bool cache) ReadOptions::ReadOptions(bool cksum, bool cache)
: snapshot(nullptr), : snapshot(nullptr),
@ -674,6 +675,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
iter_start_ts(nullptr), iter_start_ts(nullptr),
deadline(std::chrono::microseconds::zero()), deadline(std::chrono::microseconds::zero()),
io_timeout(std::chrono::microseconds::zero()), io_timeout(std::chrono::microseconds::zero()),
value_size_soft_limit(std::numeric_limits<uint64_t>::max()) {} value_size_soft_limit(std::numeric_limits<uint64_t>::max()),
adaptive_readahead(false) {}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -235,7 +235,6 @@ void BlockBasedTableIterator::InitDataBlock() {
block_prefetcher_.PrefetchIfNeeded(rep, data_block_handle, block_prefetcher_.PrefetchIfNeeded(rep, data_block_handle,
read_options_.readahead_size, read_options_.readahead_size,
is_for_compaction); is_for_compaction);
Status s; Status s;
table_->NewDataBlockIterator<DataBlockIter>( table_->NewDataBlockIterator<DataBlockIter>(
read_options_, data_block_handle, &block_iter_, BlockType::kData, read_options_, data_block_handle, &block_iter_, BlockType::kData,

@ -27,11 +27,11 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
bool check_filter, bool need_upper_bound_check, bool check_filter, bool need_upper_bound_check,
const SliceTransform* prefix_extractor, TableReaderCaller caller, const SliceTransform* prefix_extractor, TableReaderCaller caller,
size_t compaction_readahead_size = 0, bool allow_unprepared_value = false) size_t compaction_readahead_size = 0, bool allow_unprepared_value = false)
: table_(table), : index_iter_(std::move(index_iter)),
table_(table),
read_options_(read_options), read_options_(read_options),
icomp_(icomp), icomp_(icomp),
user_comparator_(icomp.user_comparator()), user_comparator_(icomp.user_comparator()),
index_iter_(std::move(index_iter)),
pinned_iters_mgr_(nullptr), pinned_iters_mgr_(nullptr),
prefix_extractor_(prefix_extractor), prefix_extractor_(prefix_extractor),
lookup_context_(caller), lookup_context_(caller),
@ -149,6 +149,27 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
} }
} }
void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
if (block_prefetcher_.prefetch_buffer() != nullptr &&
read_options_.adaptive_readahead) {
block_prefetcher_.prefetch_buffer()->GetReadaheadState(
&(readahead_file_info->data_block_readahead_info));
if (index_iter_) {
index_iter_->GetReadaheadState(readahead_file_info);
}
}
}
void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
block_prefetcher_.SetReadaheadState(
&(readahead_file_info->data_block_readahead_info));
if (index_iter_) {
index_iter_->SetReadaheadState(readahead_file_info);
}
}
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
private: private:
enum class IterDirection { enum class IterDirection {
kForward, kForward,
@ -187,7 +208,6 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
const ReadOptions& read_options_; const ReadOptions& read_options_;
const InternalKeyComparator& icomp_; const InternalKeyComparator& icomp_;
UserComparatorWrapper user_comparator_; UserComparatorWrapper user_comparator_;
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
PinnedIteratorsManager* pinned_iters_mgr_; PinnedIteratorsManager* pinned_iters_mgr_;
DataBlockIter block_iter_; DataBlockIter block_iter_;
const SliceTransform* prefix_extractor_; const SliceTransform* prefix_extractor_;

@ -1523,8 +1523,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// Update the block details so that PrefetchBuffer can use the read // Update the block details so that PrefetchBuffer can use the read
// pattern to determine if reads are sequential or not for // pattern to determine if reads are sequential or not for
// prefetching. It should also take in account blocks read from cache. // prefetching. It should also take in account blocks read from cache.
prefetch_buffer->UpdateReadPattern(handle.offset(), prefetch_buffer->UpdateReadPattern(
block_size(handle)); handle.offset(), block_size(handle), ro.adaptive_readahead);
} }
} }
} }

@ -62,13 +62,12 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
return; return;
} }
size_t initial_auto_readahead_size = BlockBasedTable::kInitAutoReadaheadSize; if (initial_auto_readahead_size_ > max_auto_readahead_size) {
if (initial_auto_readahead_size > max_auto_readahead_size) { initial_auto_readahead_size_ = max_auto_readahead_size;
initial_auto_readahead_size = max_auto_readahead_size;
} }
if (rep->file->use_direct_io()) { if (rep->file->use_direct_io()) {
rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size, rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_,
max_auto_readahead_size, max_auto_readahead_size,
&prefetch_buffer_, true); &prefetch_buffer_, true);
return; return;
@ -84,7 +83,7 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
Status s = rep->file->Prefetch(handle.offset(), Status s = rep->file->Prefetch(handle.offset(),
block_size(handle) + readahead_size_); block_size(handle) + readahead_size_);
if (s.IsNotSupported()) { if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size, rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size_,
max_auto_readahead_size, max_auto_readahead_size,
&prefetch_buffer_, true); &prefetch_buffer_, true);
return; return;

@ -19,31 +19,44 @@ class BlockPrefetcher {
bool is_for_compaction); bool is_for_compaction);
FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); } FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); }
void UpdateReadPattern(const size_t& offset, const size_t& len) { void UpdateReadPattern(const uint64_t& offset, const size_t& len) {
prev_offset_ = offset; prev_offset_ = offset;
prev_len_ = len; prev_len_ = len;
} }
bool IsBlockSequential(const size_t& offset) { bool IsBlockSequential(const uint64_t& offset) {
return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
} }
void ResetValues() { void ResetValues() {
num_file_reads_ = 1; num_file_reads_ = 1;
readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
initial_auto_readahead_size_ = readahead_size_;
readahead_limit_ = 0; readahead_limit_ = 0;
return; return;
} }
void SetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) {
num_file_reads_ = readahead_info->num_file_reads;
initial_auto_readahead_size_ = readahead_info->readahead_size;
TEST_SYNC_POINT_CALLBACK("BlockPrefetcher::SetReadaheadState",
&initial_auto_readahead_size_);
}
private: private:
// Readahead size used in compaction, its value is used only if // Readahead size used in compaction, its value is used only if
// lookup_context_.caller = kCompaction. // lookup_context_.caller = kCompaction.
size_t compaction_readahead_size_; size_t compaction_readahead_size_;
// readahead_size_ is used if underlying FS supports prefetching.
size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
size_t readahead_limit_ = 0; size_t readahead_limit_ = 0;
// initial_auto_readahead_size_ is used if RocksDB uses internal prefetch
// buffer.
uint64_t initial_auto_readahead_size_ =
BlockBasedTable::kInitAutoReadaheadSize;
int64_t num_file_reads_ = 0; int64_t num_file_reads_ = 0;
size_t prev_offset_ = 0; uint64_t prev_offset_ = 0;
size_t prev_len_ = 0; size_t prev_len_ = 0;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_; std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
}; };

@ -92,7 +92,6 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() {
block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle, block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle,
read_options_.readahead_size, read_options_.readahead_size,
is_for_compaction); is_for_compaction);
Status s; Status s;
table_->NewDataBlockIterator<IndexBlockIter>( table_->NewDataBlockIterator<IndexBlockIter>(
read_options_, partitioned_index_handle, &block_iter_, read_options_, partitioned_index_handle, &block_iter_,

@ -27,16 +27,17 @@ class PartitionedIndexIterator : public InternalIteratorBase<IndexValue> {
const InternalKeyComparator& icomp, const InternalKeyComparator& icomp,
std::unique_ptr<InternalIteratorBase<IndexValue>>&& index_iter, std::unique_ptr<InternalIteratorBase<IndexValue>>&& index_iter,
TableReaderCaller caller, size_t compaction_readahead_size = 0) TableReaderCaller caller, size_t compaction_readahead_size = 0)
: table_(table), : index_iter_(std::move(index_iter)),
table_(table),
read_options_(read_options), read_options_(read_options),
#ifndef NDEBUG #ifndef NDEBUG
icomp_(icomp), icomp_(icomp),
#endif #endif
user_comparator_(icomp.user_comparator()), user_comparator_(icomp.user_comparator()),
index_iter_(std::move(index_iter)),
block_iter_points_to_real_block_(false), block_iter_points_to_real_block_(false),
lookup_context_(caller), lookup_context_(caller),
block_prefetcher_(compaction_readahead_size) {} block_prefetcher_(compaction_readahead_size) {
}
~PartitionedIndexIterator() override {} ~PartitionedIndexIterator() override {}
@ -113,6 +114,21 @@ class PartitionedIndexIterator : public InternalIteratorBase<IndexValue> {
} }
} }
void GetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
if (block_prefetcher_.prefetch_buffer() != nullptr &&
read_options_.adaptive_readahead) {
block_prefetcher_.prefetch_buffer()->GetReadaheadState(
&(readahead_file_info->index_block_readahead_info));
}
}
void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override {
block_prefetcher_.SetReadaheadState(
&(readahead_file_info->index_block_readahead_info));
}
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
private: private:
friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test; friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
const BlockBasedTable* table_; const BlockBasedTable* table_;
@ -121,7 +137,6 @@ class PartitionedIndexIterator : public InternalIteratorBase<IndexValue> {
const InternalKeyComparator& icomp_; const InternalKeyComparator& icomp_;
#endif #endif
UserComparatorWrapper user_comparator_; UserComparatorWrapper user_comparator_;
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter_;
IndexBlockIter block_iter_; IndexBlockIter block_iter_;
// True if block_iter_ is initialized and points to the same block // True if block_iter_ is initialized and points to the same block

@ -79,6 +79,7 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
ro.fill_cache = read_options.fill_cache; ro.fill_cache = read_options.fill_cache;
ro.deadline = read_options.deadline; ro.deadline = read_options.deadline;
ro.io_timeout = read_options.io_timeout; ro.io_timeout = read_options.io_timeout;
ro.adaptive_readahead = read_options.adaptive_readahead;
// We don't return pinned data from index blocks, so no need // We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`. // to set `block_contents_pinned`.
std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter( std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(

@ -7,7 +7,9 @@
#pragma once #pragma once
#include <string> #include <string>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/readahead_file_info.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
@ -172,6 +174,18 @@ class InternalIteratorBase : public Cleanable {
return Status::NotSupported(""); return Status::NotSupported("");
} }
// When iterator moves from one file to another file at same level, new file's
// readahead state (details of last block read) is updated with previous
// file's readahead state. This way internal readahead_size of Prefetch Buffer
// doesn't start from scratch and can fall back to 8KB with no prefetch if
// reads are not sequential.
//
// Default implementation is no-op and its implemented by iterators.
virtual void GetReadaheadState(ReadaheadFileInfo* /*readahead_file_info*/) {}
// Default implementation is no-op and its implemented by iterators.
virtual void SetReadaheadState(ReadaheadFileInfo* /*readahead_file_info*/) {}
protected: protected:
void SeekForPrevImpl(const Slice& target, const Comparator* cmp) { void SeekForPrevImpl(const Slice& target, const Comparator* cmp) {
Seek(target); Seek(target);

@ -154,6 +154,14 @@ class IteratorWrapperBase {
return iter_->user_key(); return iter_->user_key();
} }
void UpdateReadaheadState(InternalIteratorBase<TValue>* old_iter) {
if (old_iter && iter_) {
ReadaheadFileInfo readahead_file_info;
old_iter->GetReadaheadState(&readahead_file_info);
iter_->SetReadaheadState(&readahead_file_info);
}
}
private: private:
void Update() { void Update() {
valid_ = iter_->Valid(); valid_ = iter_->Valid();

@ -1049,6 +1049,10 @@ DEFINE_bool(io_uring_enabled, true,
extern "C" bool RocksDbIOUringEnable() { return FLAGS_io_uring_enabled; } extern "C" bool RocksDbIOUringEnable() { return FLAGS_io_uring_enabled; }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
DEFINE_bool(adaptive_readahead, false,
"carry forward internal auto readahead size from one file to next "
"file at each level during iteration");
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) { const char* ctype) {
assert(ctype); assert(ctype);
@ -5491,6 +5495,7 @@ class Benchmark {
options.timestamp = &ts; options.timestamp = &ts;
} }
options.adaptive_readahead = FLAGS_adaptive_readahead;
Iterator* iter = db->NewIterator(options); Iterator* iter = db->NewIterator(options);
int64_t i = 0; int64_t i = 0;
int64_t bytes = 0; int64_t bytes = 0;
@ -5585,7 +5590,9 @@ class Benchmark {
} }
void ReadReverse(ThreadState* thread, DB* db) { void ReadReverse(ThreadState* thread, DB* db) {
Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); ReadOptions options(FLAGS_verify_checksum, true);
options.adaptive_readahead = FLAGS_adaptive_readahead;
Iterator* iter = db->NewIterator(options);
int64_t i = 0; int64_t i = 0;
int64_t bytes = 0; int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
@ -6375,6 +6382,7 @@ class Benchmark {
options.prefix_same_as_start = FLAGS_prefix_same_as_start; options.prefix_same_as_start = FLAGS_prefix_same_as_start;
options.tailing = FLAGS_use_tailing_iterator; options.tailing = FLAGS_use_tailing_iterator;
options.readahead_size = FLAGS_readahead_size; options.readahead_size = FLAGS_readahead_size;
options.adaptive_readahead = FLAGS_adaptive_readahead;
std::unique_ptr<char[]> ts_guard; std::unique_ptr<char[]> ts_guard;
Slice ts; Slice ts;
if (user_timestamp_size_ > 0) { if (user_timestamp_size_ > 0) {
@ -6671,6 +6679,7 @@ class Benchmark {
ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get());
read_options.timestamp = &ts; read_options.timestamp = &ts;
} }
read_options.adaptive_readahead = FLAGS_adaptive_readahead;
Iterator* iter = db_.db->NewIterator(read_options); Iterator* iter = db_.db->NewIterator(read_options);
fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_); fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_);
@ -7271,6 +7280,7 @@ class Benchmark {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
ReadOptions read_opts(FLAGS_verify_checksum, true); ReadOptions read_opts(FLAGS_verify_checksum, true);
read_opts.adaptive_readahead = FLAGS_adaptive_readahead;
std::unique_ptr<char[]> ts_guard; std::unique_ptr<char[]> ts_guard;
Slice ts; Slice ts;
if (user_timestamp_size_ > 0) { if (user_timestamp_size_ > 0) {

Loading…
Cancel
Save