Add new option num_file_reads_for_auto_readahead in BlockBasedTableOptions (#10556)

Summary:
RocksDB does auto-readahead for iterators on noticing more
than two reads for a table file if user doesn't provide readahead_size and reads are sequential.
A new option num_file_reads_for_auto_readahead is added which can be
configured and indicates after how many sequential reads prefetching should
be start.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10556

Test Plan: Existing and new unit test

Reviewed By: anand1976

Differential Revision: D38947147

Pulled By: akankshamahajan15

fbshipit-source-id: c9eeab495f84a8df7f701c42f04894e46440ad97
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent 5fbcc8c54d
commit 4cd16d65ae
  1. 3
      HISTORY.md
  2. 21
      file/file_prefetch_buffer.h
  3. 109
      file/prefetch_test.cc
  4. 20
      include/rocksdb/table.h
  5. 3
      options/options_settable_test.cc
  6. 9
      table/block_based/block_based_table_factory.cc
  7. 24
      table/block_based/block_based_table_reader.h
  8. 21
      table/block_based/block_prefetcher.cc
  9. 6
      table/block_based/partitioned_filter_block.cc
  10. 6
      table/block_based/partitioned_index_reader.cc

@ -19,6 +19,9 @@
### Behavior Change
* Right now, when the option migration tool (OptionChangeMigration()) migrates to FIFO compaction, it compacts all the data into one single SST file and move to L0. This might create a problem for some users: the giant file may be soon deleted to satisfy max_table_files_size, and might cayse the DB to be almost empty. We change the behavior so that the files are cut to be smaller, but these files might not follow the data insertion order. With the change, after the migration, migrated data might not be dropped by insertion order by FIFO compaction.
### New Features
* RocksDB does internal auto prefetching if it notices 2 sequential reads if readahead_size is not specified. New option `num_file_reads_for_auto_readahead` is added in BlockBasedTableOptions which indicates after how many sequential reads internal auto prefetching should be start (default is 2).
## 7.6.0 (08/19/2022)
### New Features
* Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies.

@ -37,8 +37,6 @@ struct BufferInfo {
// FilePrefetchBuffer is a smart buffer to store and read data from a file.
class FilePrefetchBuffer {
public:
static const int kMinNumFileReadsToStartAutoReadahead = 2;
// Constructor.
//
// All arguments are optional.
@ -66,8 +64,10 @@ class FilePrefetchBuffer {
FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0,
bool enable = true, bool track_min_offset = false,
bool implicit_auto_readahead = false,
uint64_t num_file_reads = 0, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr)
uint64_t num_file_reads = 0,
uint64_t num_file_reads_for_auto_readahead = 0,
FileSystem* fs = nullptr, SystemClock* clock = nullptr,
Statistics* stats = nullptr)
: curr_(0),
readahead_size_(readahead_size),
initial_auto_readahead_size_(readahead_size),
@ -78,6 +78,7 @@ class FilePrefetchBuffer {
implicit_auto_readahead_(implicit_auto_readahead),
prev_offset_(0),
prev_len_(0),
num_file_reads_for_auto_readahead_(num_file_reads_for_auto_readahead),
num_file_reads_(num_file_reads),
io_handle_(nullptr),
del_fn_(nullptr),
@ -86,7 +87,7 @@ class FilePrefetchBuffer {
fs_(fs),
clock_(clock),
stats_(stats) {
assert((num_file_reads_ >= kMinNumFileReadsToStartAutoReadahead + 1) ||
assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) ||
(num_file_reads_ == 0));
// If async_io_ is enabled, data is asynchronously filled in second buffer
// while curr_ is being consumed. If data is overlapping in two buffers,
@ -234,12 +235,12 @@ class FilePrefetchBuffer {
// - few/no bytes are in buffer and,
// - block is sequential with the previous read and,
// - num_file_reads_ + 1 (including this read) >
// kMinNumFileReadsToStartAutoReadahead
// num_file_reads_for_auto_readahead_
if (implicit_auto_readahead_ && readahead_size_ > 0) {
if ((offset + size >
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) &&
IsBlockSequential(offset) &&
(num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) {
(num_file_reads_ + 1 > num_file_reads_for_auto_readahead_)) {
readahead_size_ =
std::max(initial_auto_readahead_size_,
(readahead_size_ >= value ? readahead_size_ - value : 0));
@ -307,7 +308,7 @@ class FilePrefetchBuffer {
if (async_request_submitted_) {
return true;
}
if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) {
if (num_file_reads_ <= num_file_reads_for_auto_readahead_) {
UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/);
return false;
}
@ -337,7 +338,9 @@ class FilePrefetchBuffer {
bool implicit_auto_readahead_;
uint64_t prev_offset_;
size_t prev_len_;
// num_file_reads_ is only used when implicit_auto_readahead_ is set.
// num_file_reads_ and num_file_reads_for_auto_readahead_ is only used when
// implicit_auto_readahead_ is set.
uint64_t num_file_reads_for_auto_readahead_;
uint64_t num_file_reads_;
// io_handle_ is allocated and used by underlying file system in case of

@ -463,6 +463,115 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch =
std::get<0>(GetParam()) &&
test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
const int kNumKeys = 2000;
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.cache_index_and_filter_blocks = false;
table_options.metadata_block_size = 1024;
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
table_options.num_file_reads_for_auto_readahead = 0;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
WriteBatch batch;
Random rnd(309);
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
Close();
TryReopen(options);
fs->ClearPrefetchCount();
buff_prefetch_count = 0;
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
/*
* Reseek keys from sequential Data Blocks within same partitioned
* index. It will prefetch the data block at the first seek since
* num_file_reads_for_auto_readahead = 0. Data Block size is nearly 4076 so
* readahead will fetch 8 * 1024 data more initially (2 more data blocks).
*/
iter->Seek(BuildKey(0)); // Prefetch data + index block since
// num_file_reads_for_auto_readahead = 0.
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1000)); // In buffer
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1004)); // In buffer
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1008)); // Prefetch Data
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1011)); // In buffer
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1015)); // In buffer
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1019)); // In buffer
ASSERT_TRUE(iter->Valid());
// Missed 2 blocks but they are already in buffer so no reset.
iter->Seek(BuildKey(103)); // Already in buffer.
ASSERT_TRUE(iter->Valid());
iter->Seek(BuildKey(1033)); // Prefetch Data.
ASSERT_TRUE(iter->Valid());
if (support_prefetch && !use_direct_io) {
ASSERT_EQ(fs->GetPrefetchCount(), 4);
fs->ClearPrefetchCount();
} else {
ASSERT_EQ(buff_prefetch_count, 4);
buff_prefetch_count = 0;
}
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
#endif // !ROCKSDB_LITE
TEST_P(PrefetchTest, PrefetchWhenReseek) {

@ -641,6 +641,26 @@ struct BlockBasedTableOptions {
//
// Default: 8 KB (8 * 1024).
size_t initial_auto_readahead_size = 8 * 1024;
// RocksDB does auto-readahead for iterators on noticing more than two reads
// for a table file if user doesn't provide readahead_size and reads are
// sequential.
// num_file_reads_for_auto_readahead indicates after how many
// sequential reads internal auto prefetching should be start.
//
// For example, if value is 2 then after reading 2 sequential data blocks on
// third data block prefetching will start.
// If set 0, it will start prefetching from the first read.
//
// This parameter can be changed dynamically by
// DB::SetOptions({{"block_based_table_factory",
// "{num_file_reads_for_auto_readahead=0;}"}}));
//
// Changing the value dynamically will only affect files opened after the
// change.
//
// Default: 2
uint64_t num_file_reads_for_auto_readahead = 2;
};
// Table Properties that are specific to block-based table properties.

@ -198,7 +198,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"block_align=true;"
"max_auto_readahead_size=0;"
"prepopulate_block_cache=kDisable;"
"initial_auto_readahead_size=0",
"initial_auto_readahead_size=0;"
"num_file_reads_for_auto_readahead=0",
new_bbto));
ASSERT_EQ(unset_bytes_base,

@ -416,6 +416,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct BlockBasedTableOptions, initial_auto_readahead_size),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"num_file_reads_for_auto_readahead",
{offsetof(struct BlockBasedTableOptions,
num_file_reads_for_auto_readahead),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
#endif // ROCKSDB_LITE
};
@ -894,6 +899,10 @@ std::string BlockBasedTableFactory::GetPrintableOptions() const {
" initial_auto_readahead_size: %" ROCKSDB_PRIszt "\n",
table_options_.initial_auto_readahead_size);
ret.append(buffer);
snprintf(buffer, kBufferSize,
" num_file_reads_for_auto_readahead: %" PRIu64 "\n",
table_options_.num_file_reads_for_auto_readahead);
ret.append(buffer);
return ret;
}

@ -70,9 +70,6 @@ class BlockBasedTable : public TableReader {
static const std::string kFullFilterBlockPrefix;
static const std::string kPartitionedFilterBlockPrefix;
// All the below fields control iterator readahead
static const int kMinNumFileReadsToStartAutoReadahead = 2;
// 1-byte compression type + 32-bit checksum
static constexpr size_t kBlockTrailerSize = 5;
@ -659,25 +656,28 @@ struct BlockBasedTable::Rep {
uint64_t sst_number_for_tracing() const {
return file ? TableFileNameToNumber(file->file_name()) : UINT64_MAX;
}
void CreateFilePrefetchBuffer(size_t readahead_size,
size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb,
bool implicit_auto_readahead,
uint64_t num_file_reads) const {
void CreateFilePrefetchBuffer(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
uint64_t num_file_reads,
uint64_t num_file_reads_for_auto_readahead) const {
fpb->reset(new FilePrefetchBuffer(
readahead_size, max_readahead_size,
!ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */,
implicit_auto_readahead, num_file_reads, ioptions.fs.get(),
ioptions.clock, ioptions.stats));
implicit_auto_readahead, num_file_reads,
num_file_reads_for_auto_readahead, ioptions.fs.get(), ioptions.clock,
ioptions.stats));
}
void CreateFilePrefetchBufferIfNotExists(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb, bool implicit_auto_readahead,
uint64_t num_file_reads) const {
uint64_t num_file_reads,
uint64_t num_file_reads_for_auto_readahead) const {
if (!(*fpb)) {
CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb,
implicit_auto_readahead, num_file_reads);
implicit_auto_readahead, num_file_reads,
num_file_reads_for_auto_readahead);
}
}

@ -23,7 +23,7 @@ void BlockPrefetcher::PrefetchIfNeeded(
rep->CreateFilePrefetchBufferIfNotExists(
compaction_readahead_size_, compaction_readahead_size_,
&prefetch_buffer_, /*implicit_auto_readahead=*/false,
/*num_file_reads=*/0);
/*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0);
return;
}
@ -31,7 +31,8 @@ void BlockPrefetcher::PrefetchIfNeeded(
if (readahead_size > 0) {
rep->CreateFilePrefetchBufferIfNotExists(
readahead_size, readahead_size, &prefetch_buffer_,
/*implicit_auto_readahead=*/false, /*num_file_reads=*/0);
/*implicit_auto_readahead=*/false, /*num_file_reads=*/0,
/*num_file_reads_for_auto_readahead=*/0);
return;
}
@ -50,7 +51,8 @@ void BlockPrefetcher::PrefetchIfNeeded(
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true,
/*num_file_reads=*/0);
/*num_file_reads=*/0,
rep->table_options.num_file_reads_for_auto_readahead);
return;
}
@ -72,11 +74,10 @@ void BlockPrefetcher::PrefetchIfNeeded(
UpdateReadPattern(offset, len);
// Implicit auto readahead, which will be enabled if the number of reads
// reached `kMinNumFileReadsToStartAutoReadahead` (default: 2) and scans are
// sequential.
// reached `table_options.num_file_reads_for_auto_readahead` (default: 2) and
// scans are sequential.
num_file_reads_++;
if (num_file_reads_ <=
BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) {
if (num_file_reads_ <= rep->table_options.num_file_reads_for_auto_readahead) {
return;
}
@ -87,7 +88,8 @@ void BlockPrefetcher::PrefetchIfNeeded(
if (rep->file->use_direct_io()) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_);
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_,
rep->table_options.num_file_reads_for_auto_readahead);
return;
}
@ -105,7 +107,8 @@ void BlockPrefetcher::PrefetchIfNeeded(
if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_);
&prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_,
rep->table_options.num_file_reads_for_auto_readahead);
return;
}

@ -493,9 +493,9 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
handle.offset() + handle.size() + BlockBasedTable::kBlockTrailerSize;
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
false /* Implicit autoreadahead */,
0 /*num_reads_*/);
rep->CreateFilePrefetchBuffer(
0, 0, &prefetch_buffer, false /* Implicit autoreadahead */,
0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/);
IOOptions opts;
s = rep->file->PrepareIOOptions(ro, opts);

@ -156,9 +156,9 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle);
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer,
false /*Implicit auto readahead*/,
0 /*num_reads_*/);
rep->CreateFilePrefetchBuffer(
0, 0, &prefetch_buffer, false /*Implicit auto readahead*/,
0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/);
IOOptions opts;
{
Status s = rep->file->PrepareIOOptions(ro, opts);

Loading…
Cancel
Save