Improve BlockPrefetcher to prefetch only for sequential scans (#7394)

Summary:
BlockPrefetcher is used by iterators to prefetch data if they
anticipate more data to be used in future and this is valid for forward sequential
scans. But BlockPrefetcher tracks only num_file_reads_ and not if reads
are sequential. This presents problem for MultiGet with large number of
keys when it reseeks index iterator and data block. FilePrefetchBuffer
can end up doing large readahead for reseeks as readahead size
increases exponentially once readahead is enabled. Same issue is with
BlockBasedTableIterator.

Add previous length and offset read as well in BlockPrefetcher (creates
FilePrefetchBuffer) and FilePrefetchBuffer (does prefetching of data) to
determine if reads are sequential and then  prefetch.

Update the last block read after cache hit to take reads from cache also
in account.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7394

Test Plan: Add new unit test case

Reviewed By: anand1976

Differential Revision: D23737617

Pulled By: akankshamahajan15

fbshipit-source-id: 8e6917c25ed87b285ee495d1b68dc623d71205a3
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent 0db4cde6e2
commit a0e0feca62
  1. 3
      HISTORY.md
  2. 20
      file/file_prefetch_buffer.cc
  3. 35
      file/file_prefetch_buffer.h
  4. 363
      file/prefetch_test.cc
  5. 7
      table/block_based/block_based_table_reader.cc
  6. 20
      table/block_based/block_based_table_reader.h
  7. 57
      table/block_based/block_prefetcher.cc
  8. 18
      table/block_based/block_prefetcher.h
  9. 3
      table/block_based/partitioned_filter_block.cc
  10. 3
      table/block_based/partitioned_index_reader.cc

@ -12,6 +12,9 @@
### New Features ### New Features
* Add new option allow_stall passed during instance creation of WriteBufferManager. When allow_stall is set, WriteBufferManager will stall all writers shared across multiple DBs and columns if memory usage goes beyond specified WriteBufferManager::buffer_size (soft limit). Stall will be cleared when memory is freed after flush and memory usage goes down below buffer_size. * Add new option allow_stall passed during instance creation of WriteBufferManager. When allow_stall is set, WriteBufferManager will stall all writers shared across multiple DBs and columns if memory usage goes beyond specified WriteBufferManager::buffer_size (soft limit). Stall will be cleared when memory is freed after flush and memory usage goes down below buffer_size.
### Performace Improvements
* BlockPrefetcher is used by iterators to prefetch data if they anticipate more data to be used in future. It is enabled implicitly by rocksdb. Added change to take in account read pattern if reads are sequential. This would disable prefetching for random reads in MultiGet and iterators as readahead_size is increased exponential doing large prefetches.
## 6.20.0 (04/16/2021) ## 6.20.0 (04/16/2021)
### Behavior Changes ### Behavior Changes
* `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush. * `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush.

@ -131,6 +131,24 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
s = Prefetch(opts, file_reader_, offset, std::max(n, readahead_size_), s = Prefetch(opts, file_reader_, offset, std::max(n, readahead_size_),
for_compaction); for_compaction);
} else { } else {
if (implicit_auto_readahead_) {
// Prefetch only if this read is sequential otherwise reset
// readahead_size_ to initial value.
if (!IsBlockSequential(offset)) {
UpdateReadPattern(offset, n);
ResetValues();
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
num_file_reads_++;
if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
UpdateReadPattern(offset, n);
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
s = Prefetch(opts, file_reader_, offset, n + readahead_size_, s = Prefetch(opts, file_reader_, offset, n + readahead_size_,
for_compaction); for_compaction);
} }
@ -148,7 +166,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
return false; return false;
} }
} }
UpdateReadPattern(offset, n);
uint64_t offset_in_buffer = offset - buffer_offset_; uint64_t offset_in_buffer = offset - buffer_offset_;
*result = Slice(buffer_.BufferStart() + offset_in_buffer, n); *result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
return true; return true;

@ -23,6 +23,7 @@ namespace ROCKSDB_NAMESPACE {
// FilePrefetchBuffer is a smart buffer to store and read data from a file. // FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer { class FilePrefetchBuffer {
public: public:
static const int kMinNumFileReadsToStartAutoReadahead = 2;
// Constructor. // Constructor.
// //
// All arguments are optional. // All arguments are optional.
@ -38,6 +39,8 @@ class FilePrefetchBuffer {
// for the minimum offset if track_min_offset = true. // for the minimum offset if track_min_offset = true.
// track_min_offset : Track the minimum offset ever read and collect stats on // track_min_offset : Track the minimum offset ever read and collect stats on
// it. Used for adaptable readahead of the file footer/metadata. // it. Used for adaptable readahead of the file footer/metadata.
// implicit_auto_readahead : Readahead is enabled implicitly by rocksdb after
// doing sequential scans for two times.
// //
// Automatic readhead is enabled for a file if file_reader, readahead_size, // Automatic readhead is enabled for a file if file_reader, readahead_size,
// and max_readahead_size are passed in. // and max_readahead_size are passed in.
@ -47,14 +50,20 @@ class FilePrefetchBuffer {
// `Prefetch` to load data into the buffer. // `Prefetch` to load data into the buffer.
FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr, FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
size_t readahead_size = 0, size_t max_readahead_size = 0, size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false) bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false)
: buffer_offset_(0), : buffer_offset_(0),
file_reader_(file_reader), file_reader_(file_reader),
readahead_size_(readahead_size), readahead_size_(readahead_size),
max_readahead_size_(max_readahead_size), max_readahead_size_(max_readahead_size),
initial_readahead_size_(readahead_size),
min_offset_read_(port::kMaxSizet), min_offset_read_(port::kMaxSizet),
enable_(enable), enable_(enable),
track_min_offset_(track_min_offset) {} track_min_offset_(track_min_offset),
implicit_auto_readahead_(implicit_auto_readahead),
prev_offset_(0),
prev_len_(0),
num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1) {}
// Load data into the buffer from a file. // Load data into the buffer from a file.
// reader : the file reader. // reader : the file reader.
@ -81,12 +90,27 @@ class FilePrefetchBuffer {
// tracked if track_min_offset = true. // tracked if track_min_offset = true.
size_t min_offset_read() const { return min_offset_read_; } size_t min_offset_read() const { return min_offset_read_; }
void UpdateReadPattern(const size_t& offset, const size_t& len) {
prev_offset_ = offset;
prev_len_ = len;
}
bool IsBlockSequential(const size_t& offset) {
return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
}
void ResetValues() {
num_file_reads_ = 1;
readahead_size_ = initial_readahead_size_;
}
private: private:
AlignedBuffer buffer_; AlignedBuffer buffer_;
uint64_t buffer_offset_; uint64_t buffer_offset_;
RandomAccessFileReader* file_reader_; RandomAccessFileReader* file_reader_;
size_t readahead_size_; size_t readahead_size_;
size_t max_readahead_size_; size_t max_readahead_size_;
size_t initial_readahead_size_;
// The minimum `offset` ever passed to TryReadFromCache(). // The minimum `offset` ever passed to TryReadFromCache().
size_t min_offset_read_; size_t min_offset_read_;
// if false, TryReadFromCache() always return false, and we only take stats // if false, TryReadFromCache() always return false, and we only take stats
@ -95,5 +119,12 @@ class FilePrefetchBuffer {
// If true, track minimum `offset` ever passed to TryReadFromCache(), which // If true, track minimum `offset` ever passed to TryReadFromCache(), which
// can be fetched from min_offset_read(). // can be fetched from min_offset_read().
bool track_min_offset_; bool track_min_offset_;
// implicit_auto_readahead is enabled by rocksdb internally after 2 sequential
// IOs.
bool implicit_auto_readahead_;
size_t prev_offset_;
size_t prev_len_;
int num_file_reads_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -57,6 +57,10 @@ class MockFS : public FileSystemWrapper {
bool IsPrefetchCalled() { return prefetch_count_ > 0; } bool IsPrefetchCalled() { return prefetch_count_ > 0; }
int GetPrefetchCount() {
return prefetch_count_.load(std::memory_order_relaxed);
}
private: private:
const bool support_prefetch_; const bool support_prefetch_;
std::atomic_int prefetch_count_{0}; std::atomic_int prefetch_count_{0};
@ -69,6 +73,10 @@ class PrefetchTest
PrefetchTest() : DBTestBase("/prefetch_test", true) {} PrefetchTest() : DBTestBase("/prefetch_test", true) {}
}; };
INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
std::string BuildKey(int num, std::string postfix = "") { std::string BuildKey(int num, std::string postfix = "") {
return "my_key_" + std::to_string(num) + postfix; return "my_key_" + std::to_string(num) + postfix;
} }
@ -312,17 +320,354 @@ TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
Close(); Close();
} }
INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
class PrefetchTest1 : public DBTestBase, TEST_P(PrefetchTest, PrefetchWhenReseek) {
public ::testing::WithParamInterface<bool> { // First param is if the mockFS support_prefetch or not
public: bool support_prefetch =
PrefetchTest1() : DBTestBase("/prefetch_test1", true) {} std::get<0>(GetParam()) &&
}; test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
/*
* Reseek keys from sequential Data Blocks within same partitioned
* index. After 2 sequential reads it will prefetch the data block.
* Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
* initially (2 more data blocks).
*/
iter->Seek(BuildKey(0));
iter->Seek(BuildKey(1000));
iter->Seek(BuildKey(1004)); // Prefetch Data
iter->Seek(BuildKey(1008));
iter->Seek(BuildKey(1011));
iter->Seek(BuildKey(1015)); // Prefetch Data
iter->Seek(BuildKey(1019));
// Missed 2 blocks but they are already in buffer so no reset.
iter->Seek(BuildKey(103)); // Already in buffer.
iter->Seek(BuildKey(1033)); // Prefetch Data
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 3);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 3);
buff_prefetch_count = 0;
}
}
{
/*
* Reseek keys from non sequential data blocks within same partitioned
* index. buff_prefetch_count will be 0 in that case.
*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
iter->Seek(BuildKey(1008));
iter->Seek(BuildKey(1019));
iter->Seek(BuildKey(1033));
iter->Seek(BuildKey(1048));
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 0);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
}
{
/*
* Reesek keys from Single Data Block.
*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
iter->Seek(BuildKey(1));
iter->Seek(BuildKey(10));
iter->Seek(BuildKey(100));
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 0);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
}
{
/*
* Reseek keys from sequential data blocks to set implicit auto readahead
* and prefetch data but after that iterate over different (non sequential)
* data blocks which won't prefetch any data further. So buff_prefetch_count
* will be 1 for the first one.
*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
iter->Seek(BuildKey(1000));
iter->Seek(BuildKey(1004)); // This iteration will prefetch buffer
iter->Seek(BuildKey(1008));
iter->Seek(
BuildKey(996)); // Reseek won't prefetch any data and
// readahead_size will be initiallized to 8*1024.
iter->Seek(BuildKey(992));
iter->Seek(BuildKey(989));
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 1);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 1);
buff_prefetch_count = 0;
}
// Read sequentially to confirm readahead_size is reset to initial value (2
// more data blocks)
iter->Seek(BuildKey(1011));
iter->Seek(BuildKey(1015));
iter->Seek(BuildKey(1019)); // Prefetch Data
iter->Seek(BuildKey(1022));
iter->Seek(BuildKey(1026));
iter->Seek(BuildKey(103)); // Prefetch Data
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 2);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 2);
buff_prefetch_count = 0;
}
}
{
/* Reseek keys from sequential partitioned index block. Since partitioned
* index fetch are sequential, buff_prefetch_count will be 1.
*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
iter->Seek(BuildKey(1167));
iter->Seek(BuildKey(1334)); // This iteration will prefetch buffer
iter->Seek(BuildKey(1499));
iter->Seek(BuildKey(1667));
iter->Seek(BuildKey(1847));
iter->Seek(BuildKey(1999));
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 1);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 1);
buff_prefetch_count = 0;
}
}
{
/*
* Reseek over different keys from different blocks. buff_prefetch_count is
* set 0.
*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
int i = 0;
int j = 1000;
do {
iter->Seek(BuildKey(i));
if (!iter->Valid()) {
break;
}
i = i + 100;
iter->Seek(BuildKey(j));
j = j + 100;
} while (i < 1000 && j < kNumKeys && iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 0);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
}
{
/* Iterates sequentially over all keys. It will prefetch the buffer.*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
}
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 13);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 13);
buff_prefetch_count = 0;
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
BlockBasedTableOptions table_options;
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
table_options.block_cache = cache;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
{
/*
* Reseek keys from sequential Data Blocks within same partitioned
* index. After 2 sequential reads it will prefetch the data block.
* Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
* initially (2 more data blocks).
*/
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
// Warm up the cache
iter->Seek(BuildKey(1011));
iter->Seek(BuildKey(1015));
iter->Seek(BuildKey(1019));
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 1);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 1);
buff_prefetch_count = 0;
}
}
{
// After caching, blocks will be read from cache (Sequential blocks)
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek(BuildKey(0));
iter->Seek(BuildKey(1000));
iter->Seek(BuildKey(1004)); // Prefetch data (not in cache).
// Missed one sequential block but next is in already in buffer so readahead
// will not be reset.
iter->Seek(BuildKey(1011));
// Prefetch data but blocks are in cache so no prefetch and reset.
iter->Seek(BuildKey(1015));
iter->Seek(BuildKey(1019));
iter->Seek(BuildKey(1022));
// Prefetch data with readahead_size = 4 blocks.
iter->Seek(BuildKey(1026));
iter->Seek(BuildKey(103));
iter->Seek(BuildKey(1033));
iter->Seek(BuildKey(1037));
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 3);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 2);
buff_prefetch_count = 0;
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -1484,6 +1484,13 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// TODO(haoyu): Differentiate cache hit on uncompressed block cache and // TODO(haoyu): Differentiate cache hit on uncompressed block cache and
// compressed block cache. // compressed block cache.
is_cache_hit = true; is_cache_hit = true;
if (prefetch_buffer) {
// Update the block details so that PrefetchBuffer can use the read
// pattern to determine if reads are sequential or not for
// prefetching. It should also take in account blocks read from cache.
prefetch_buffer->UpdateReadPattern(handle.offset(),
block_size(handle));
}
} }
} }

@ -624,19 +624,23 @@ struct BlockBasedTable::Rep {
uint64_t sst_number_for_tracing() const { uint64_t sst_number_for_tracing() const {
return file ? TableFileNameToNumber(file->file_name()) : UINT64_MAX; return file ? TableFileNameToNumber(file->file_name()) : UINT64_MAX;
} }
void CreateFilePrefetchBuffer( void CreateFilePrefetchBuffer(size_t readahead_size,
size_t readahead_size, size_t max_readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb) const { std::unique_ptr<FilePrefetchBuffer>* fpb,
fpb->reset(new FilePrefetchBuffer(file.get(), readahead_size, bool implicit_auto_readahead) const {
max_readahead_size, fpb->reset(new FilePrefetchBuffer(
!ioptions.allow_mmap_reads /* enable */)); file.get(), readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset*/,
implicit_auto_readahead));
} }
void CreateFilePrefetchBufferIfNotExists( void CreateFilePrefetchBufferIfNotExists(
size_t readahead_size, size_t max_readahead_size, size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb) const { std::unique_ptr<FilePrefetchBuffer>* fpb,
bool implicit_auto_readahead) const {
if (!(*fpb)) { if (!(*fpb)) {
CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb); CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb,
implicit_auto_readahead);
} }
} }
}; };

@ -16,34 +16,53 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
if (is_for_compaction) { if (is_for_compaction) {
rep->CreateFilePrefetchBufferIfNotExists(compaction_readahead_size_, rep->CreateFilePrefetchBufferIfNotExists(compaction_readahead_size_,
compaction_readahead_size_, compaction_readahead_size_,
&prefetch_buffer_); &prefetch_buffer_, false);
return; return;
} }
// Explicit user requested readahead // Explicit user requested readahead.
if (readahead_size > 0) { if (readahead_size > 0) {
rep->CreateFilePrefetchBufferIfNotExists(readahead_size, readahead_size, rep->CreateFilePrefetchBufferIfNotExists(readahead_size, readahead_size,
&prefetch_buffer_); &prefetch_buffer_, false);
return; return;
} }
// Implicit readahead.
// If max_auto_readahead_size is set to be 0 by user, no data will be
// prefetched.
size_t max_auto_readahead_size = rep->table_options.max_auto_readahead_size;
if (max_auto_readahead_size == 0) {
return;
}
size_t len = static_cast<size_t>(block_size(handle));
size_t offset = handle.offset();
// If FS supports prefetching (readahead_limit_ will be non zero in that case)
// and current block exists in prefetch buffer then return.
if (offset + len <= readahead_limit_) {
UpdateReadPattern(offset, len);
return;
}
if (!IsBlockSequential(offset)) {
UpdateReadPattern(offset, len);
ResetValues();
return;
}
UpdateReadPattern(offset, len);
// Implicit auto readahead, which will be enabled if the number of reads // Implicit auto readahead, which will be enabled if the number of reads
// reached `kMinNumFileReadsToStartAutoReadahead` (default: 2). // reached `kMinNumFileReadsToStartAutoReadahead` (default: 2) and scans are
// sequential.
num_file_reads_++; num_file_reads_++;
if (num_file_reads_ <= if (num_file_reads_ <=
BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) {
return; return;
} }
size_t max_auto_readahead_size = rep->table_options.max_auto_readahead_size;
size_t initial_auto_readahead_size = BlockBasedTable::kInitAutoReadaheadSize; size_t initial_auto_readahead_size = BlockBasedTable::kInitAutoReadaheadSize;
// If max_auto_readahead_size is set to be 0 by user, no data will be
// prefetched.
if (max_auto_readahead_size == 0) {
return;
}
if (initial_auto_readahead_size > max_auto_readahead_size) { if (initial_auto_readahead_size > max_auto_readahead_size) {
initial_auto_readahead_size = max_auto_readahead_size; initial_auto_readahead_size = max_auto_readahead_size;
} }
@ -51,12 +70,7 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
if (rep->file->use_direct_io()) { if (rep->file->use_direct_io()) {
rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size, rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size,
max_auto_readahead_size, max_auto_readahead_size,
&prefetch_buffer_); &prefetch_buffer_, true);
return;
}
if (handle.offset() + static_cast<size_t>(block_size(handle)) <=
readahead_limit_) {
return; return;
} }
@ -67,15 +81,16 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
// If prefetch is not supported, fall back to use internal prefetch buffer. // If prefetch is not supported, fall back to use internal prefetch buffer.
// Discarding other return status of Prefetch calls intentionally, as // Discarding other return status of Prefetch calls intentionally, as
// we can fallback to reading from disk if Prefetch fails. // we can fallback to reading from disk if Prefetch fails.
Status s = rep->file->Prefetch(handle.offset(), readahead_size_); Status s = rep->file->Prefetch(handle.offset(),
block_size(handle) + readahead_size_);
if (s.IsNotSupported()) { if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size, rep->CreateFilePrefetchBufferIfNotExists(initial_auto_readahead_size,
max_auto_readahead_size, max_auto_readahead_size,
&prefetch_buffer_); &prefetch_buffer_, true);
return; return;
} }
readahead_limit_ = static_cast<size_t>(handle.offset() + readahead_size_);
readahead_limit_ = offset + len + readahead_size_;
// Keep exponentially increasing readahead size until // Keep exponentially increasing readahead size until
// max_auto_readahead_size. // max_auto_readahead_size.
readahead_size_ = std::min(max_auto_readahead_size, readahead_size_ * 2); readahead_size_ = std::min(max_auto_readahead_size, readahead_size_ * 2);

@ -19,6 +19,22 @@ class BlockPrefetcher {
bool is_for_compaction); bool is_for_compaction);
FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); } FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); }
void UpdateReadPattern(const size_t& offset, const size_t& len) {
prev_offset_ = offset;
prev_len_ = len;
}
bool IsBlockSequential(const size_t& offset) {
return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset));
}
void ResetValues() {
num_file_reads_ = 1;
readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
readahead_limit_ = 0;
return;
}
private: private:
// Readahead size used in compaction, its value is used only if // Readahead size used in compaction, its value is used only if
// lookup_context_.caller = kCompaction. // lookup_context_.caller = kCompaction.
@ -27,6 +43,8 @@ class BlockPrefetcher {
size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
size_t readahead_limit_ = 0; size_t readahead_limit_ = 0;
int64_t num_file_reads_ = 0; int64_t num_file_reads_ = 0;
size_t prev_offset_ = 0;
size_t prev_len_ = 0;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_; std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -460,7 +460,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize; uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
uint64_t prefetch_len = last_off - prefetch_off; uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer; std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer); rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
false /* Implicit autoreadahead */);
IOOptions opts; IOOptions opts;
s = rep->file->PrepareIOOptions(ro, opts); s = rep->file->PrepareIOOptions(ro, opts);

@ -146,7 +146,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
uint64_t last_off = handle.offset() + block_size(handle); uint64_t last_off = handle.offset() + block_size(handle);
uint64_t prefetch_len = last_off - prefetch_off; uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer; std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer); rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
false /*Implicit auto readahead*/);
IOOptions opts; IOOptions opts;
s = rep->file->PrepareIOOptions(ro, opts); s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) { if (s.ok()) {

Loading…
Cancel
Save