From 4cd16d65ae93a4361e6a9d6da0aca06755d2ea9a Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Thu, 1 Sep 2022 11:56:00 -0700 Subject: [PATCH] Add new option num_file_reads_for_auto_readahead in BlockBasedTableOptions (#10556) Summary: RocksDB does auto-readahead for iterators on noticing more than two reads for a table file if user doesn't provide readahead_size and reads are sequential. A new option num_file_reads_for_auto_readahead is added which can be configured and indicates after how many sequential reads prefetching should be start. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10556 Test Plan: Existing and new unit test Reviewed By: anand1976 Differential Revision: D38947147 Pulled By: akankshamahajan15 fbshipit-source-id: c9eeab495f84a8df7f701c42f04894e46440ad97 --- HISTORY.md | 3 + file/file_prefetch_buffer.h | 21 ++-- file/prefetch_test.cc | 109 ++++++++++++++++++ include/rocksdb/table.h | 20 ++++ options/options_settable_test.cc | 3 +- .../block_based/block_based_table_factory.cc | 9 ++ table/block_based/block_based_table_reader.h | 24 ++-- table/block_based/block_prefetcher.cc | 21 ++-- table/block_based/partitioned_filter_block.cc | 6 +- table/block_based/partitioned_index_reader.cc | 6 +- 10 files changed, 185 insertions(+), 37 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 5732d6dd6..5b69bfb7f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,9 @@ ### Behavior Change * Right now, when the option migration tool (OptionChangeMigration()) migrates to FIFO compaction, it compacts all the data into one single SST file and move to L0. This might create a problem for some users: the giant file may be soon deleted to satisfy max_table_files_size, and might cayse the DB to be almost empty. We change the behavior so that the files are cut to be smaller, but these files might not follow the data insertion order. With the change, after the migration, migrated data might not be dropped by insertion order by FIFO compaction. +### New Features +* RocksDB does internal auto prefetching if it notices 2 sequential reads if readahead_size is not specified. New option `num_file_reads_for_auto_readahead` is added in BlockBasedTableOptions which indicates after how many sequential reads internal auto prefetching should be start (default is 2). + ## 7.6.0 (08/19/2022) ### New Features * Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies. diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index d142a74b5..9c70d4895 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -37,8 +37,6 @@ struct BufferInfo { // FilePrefetchBuffer is a smart buffer to store and read data from a file. class FilePrefetchBuffer { public: - static const int kMinNumFileReadsToStartAutoReadahead = 2; - // Constructor. // // All arguments are optional. @@ -66,8 +64,10 @@ class FilePrefetchBuffer { FilePrefetchBuffer(size_t readahead_size = 0, size_t max_readahead_size = 0, bool enable = true, bool track_min_offset = false, bool implicit_auto_readahead = false, - uint64_t num_file_reads = 0, FileSystem* fs = nullptr, - SystemClock* clock = nullptr, Statistics* stats = nullptr) + uint64_t num_file_reads = 0, + uint64_t num_file_reads_for_auto_readahead = 0, + FileSystem* fs = nullptr, SystemClock* clock = nullptr, + Statistics* stats = nullptr) : curr_(0), readahead_size_(readahead_size), initial_auto_readahead_size_(readahead_size), @@ -78,6 +78,7 @@ class FilePrefetchBuffer { implicit_auto_readahead_(implicit_auto_readahead), prev_offset_(0), prev_len_(0), + num_file_reads_for_auto_readahead_(num_file_reads_for_auto_readahead), num_file_reads_(num_file_reads), io_handle_(nullptr), del_fn_(nullptr), @@ -86,7 +87,7 @@ class FilePrefetchBuffer { fs_(fs), clock_(clock), stats_(stats) { - assert((num_file_reads_ >= kMinNumFileReadsToStartAutoReadahead + 1) || + assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) || (num_file_reads_ == 0)); // If async_io_ is enabled, data is asynchronously filled in second buffer // while curr_ is being consumed. If data is overlapping in two buffers, @@ -234,12 +235,12 @@ class FilePrefetchBuffer { // - few/no bytes are in buffer and, // - block is sequential with the previous read and, // - num_file_reads_ + 1 (including this read) > - // kMinNumFileReadsToStartAutoReadahead + // num_file_reads_for_auto_readahead_ if (implicit_auto_readahead_ && readahead_size_ > 0) { if ((offset + size > bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) && IsBlockSequential(offset) && - (num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) { + (num_file_reads_ + 1 > num_file_reads_for_auto_readahead_)) { readahead_size_ = std::max(initial_auto_readahead_size_, (readahead_size_ >= value ? readahead_size_ - value : 0)); @@ -307,7 +308,7 @@ class FilePrefetchBuffer { if (async_request_submitted_) { return true; } - if (num_file_reads_ <= kMinNumFileReadsToStartAutoReadahead) { + if (num_file_reads_ <= num_file_reads_for_auto_readahead_) { UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); return false; } @@ -337,7 +338,9 @@ class FilePrefetchBuffer { bool implicit_auto_readahead_; uint64_t prev_offset_; size_t prev_len_; - // num_file_reads_ is only used when implicit_auto_readahead_ is set. + // num_file_reads_ and num_file_reads_for_auto_readahead_ is only used when + // implicit_auto_readahead_ is set. + uint64_t num_file_reads_for_auto_readahead_; uint64_t num_file_reads_; // io_handle_ is allocated and used by underlying file system in case of diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index d4d996fa7..24c439cb7 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -463,6 +463,115 @@ TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) { SyncPoint::GetInstance()->ClearAllCallBacks(); Close(); } + +TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = + std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + + const int kNumKeys = 2000; + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem(), support_prefetch); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + + BlockBasedTableOptions table_options; + table_options.no_block_cache = true; + table_options.cache_index_and_filter_blocks = false; + table_options.metadata_block_size = 1024; + table_options.index_type = + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.num_file_reads_for_auto_readahead = 0; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + WriteBatch batch; + Random rnd(309); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); + + Close(); + TryReopen(options); + + fs->ClearPrefetchCount(); + buff_prefetch_count = 0; + + { + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + /* + * Reseek keys from sequential Data Blocks within same partitioned + * index. It will prefetch the data block at the first seek since + * num_file_reads_for_auto_readahead = 0. Data Block size is nearly 4076 so + * readahead will fetch 8 * 1024 data more initially (2 more data blocks). + */ + iter->Seek(BuildKey(0)); // Prefetch data + index block since + // num_file_reads_for_auto_readahead = 0. + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1000)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1004)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1008)); // Prefetch Data + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1011)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1015)); // In buffer + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1019)); // In buffer + ASSERT_TRUE(iter->Valid()); + // Missed 2 blocks but they are already in buffer so no reset. + iter->Seek(BuildKey(103)); // Already in buffer. + ASSERT_TRUE(iter->Valid()); + iter->Seek(BuildKey(1033)); // Prefetch Data. + ASSERT_TRUE(iter->Valid()); + if (support_prefetch && !use_direct_io) { + ASSERT_EQ(fs->GetPrefetchCount(), 4); + fs->ClearPrefetchCount(); + } else { + ASSERT_EQ(buff_prefetch_count, 4); + buff_prefetch_count = 0; + } + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); +} #endif // !ROCKSDB_LITE TEST_P(PrefetchTest, PrefetchWhenReseek) { diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 73c8f7914..ab6632045 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -641,6 +641,26 @@ struct BlockBasedTableOptions { // // Default: 8 KB (8 * 1024). size_t initial_auto_readahead_size = 8 * 1024; + + // RocksDB does auto-readahead for iterators on noticing more than two reads + // for a table file if user doesn't provide readahead_size and reads are + // sequential. + // num_file_reads_for_auto_readahead indicates after how many + // sequential reads internal auto prefetching should be start. + // + // For example, if value is 2 then after reading 2 sequential data blocks on + // third data block prefetching will start. + // If set 0, it will start prefetching from the first read. + // + // This parameter can be changed dynamically by + // DB::SetOptions({{"block_based_table_factory", + // "{num_file_reads_for_auto_readahead=0;}"}})); + // + // Changing the value dynamically will only affect files opened after the + // change. + // + // Default: 2 + uint64_t num_file_reads_for_auto_readahead = 2; }; // Table Properties that are specific to block-based table properties. diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 49b80e17e..08e86e7fd 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -198,7 +198,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) { "block_align=true;" "max_auto_readahead_size=0;" "prepopulate_block_cache=kDisable;" - "initial_auto_readahead_size=0", + "initial_auto_readahead_size=0;" + "num_file_reads_for_auto_readahead=0", new_bbto)); ASSERT_EQ(unset_bytes_base, diff --git a/table/block_based/block_based_table_factory.cc b/table/block_based/block_based_table_factory.cc index 0192605af..405a93841 100644 --- a/table/block_based/block_based_table_factory.cc +++ b/table/block_based/block_based_table_factory.cc @@ -416,6 +416,11 @@ static std::unordered_map {offsetof(struct BlockBasedTableOptions, initial_auto_readahead_size), OptionType::kSizeT, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"num_file_reads_for_auto_readahead", + {offsetof(struct BlockBasedTableOptions, + num_file_reads_for_auto_readahead), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, #endif // ROCKSDB_LITE }; @@ -894,6 +899,10 @@ std::string BlockBasedTableFactory::GetPrintableOptions() const { " initial_auto_readahead_size: %" ROCKSDB_PRIszt "\n", table_options_.initial_auto_readahead_size); ret.append(buffer); + snprintf(buffer, kBufferSize, + " num_file_reads_for_auto_readahead: %" PRIu64 "\n", + table_options_.num_file_reads_for_auto_readahead); + ret.append(buffer); return ret; } diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index a1e4e56fd..6bac865b0 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -70,9 +70,6 @@ class BlockBasedTable : public TableReader { static const std::string kFullFilterBlockPrefix; static const std::string kPartitionedFilterBlockPrefix; - // All the below fields control iterator readahead - static const int kMinNumFileReadsToStartAutoReadahead = 2; - // 1-byte compression type + 32-bit checksum static constexpr size_t kBlockTrailerSize = 5; @@ -659,25 +656,28 @@ struct BlockBasedTable::Rep { uint64_t sst_number_for_tracing() const { return file ? TableFileNameToNumber(file->file_name()) : UINT64_MAX; } - void CreateFilePrefetchBuffer(size_t readahead_size, - size_t max_readahead_size, - std::unique_ptr* fpb, - bool implicit_auto_readahead, - uint64_t num_file_reads) const { + void CreateFilePrefetchBuffer( + size_t readahead_size, size_t max_readahead_size, + std::unique_ptr* fpb, bool implicit_auto_readahead, + uint64_t num_file_reads, + uint64_t num_file_reads_for_auto_readahead) const { fpb->reset(new FilePrefetchBuffer( readahead_size, max_readahead_size, !ioptions.allow_mmap_reads /* enable */, false /* track_min_offset */, - implicit_auto_readahead, num_file_reads, ioptions.fs.get(), - ioptions.clock, ioptions.stats)); + implicit_auto_readahead, num_file_reads, + num_file_reads_for_auto_readahead, ioptions.fs.get(), ioptions.clock, + ioptions.stats)); } void CreateFilePrefetchBufferIfNotExists( size_t readahead_size, size_t max_readahead_size, std::unique_ptr* fpb, bool implicit_auto_readahead, - uint64_t num_file_reads) const { + uint64_t num_file_reads, + uint64_t num_file_reads_for_auto_readahead) const { if (!(*fpb)) { CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb, - implicit_auto_readahead, num_file_reads); + implicit_auto_readahead, num_file_reads, + num_file_reads_for_auto_readahead); } } diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index e84872754..5cd2cc2b5 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -23,7 +23,7 @@ void BlockPrefetcher::PrefetchIfNeeded( rep->CreateFilePrefetchBufferIfNotExists( compaction_readahead_size_, compaction_readahead_size_, &prefetch_buffer_, /*implicit_auto_readahead=*/false, - /*num_file_reads=*/0); + /*num_file_reads=*/0, /*num_file_reads_for_auto_readahead=*/0); return; } @@ -31,7 +31,8 @@ void BlockPrefetcher::PrefetchIfNeeded( if (readahead_size > 0) { rep->CreateFilePrefetchBufferIfNotExists( readahead_size, readahead_size, &prefetch_buffer_, - /*implicit_auto_readahead=*/false, /*num_file_reads=*/0); + /*implicit_auto_readahead=*/false, /*num_file_reads=*/0, + /*num_file_reads_for_auto_readahead=*/0); return; } @@ -50,7 +51,8 @@ void BlockPrefetcher::PrefetchIfNeeded( rep->CreateFilePrefetchBufferIfNotExists( initial_auto_readahead_size_, max_auto_readahead_size, &prefetch_buffer_, /*implicit_auto_readahead=*/true, - /*num_file_reads=*/0); + /*num_file_reads=*/0, + rep->table_options.num_file_reads_for_auto_readahead); return; } @@ -72,11 +74,10 @@ void BlockPrefetcher::PrefetchIfNeeded( UpdateReadPattern(offset, len); // Implicit auto readahead, which will be enabled if the number of reads - // reached `kMinNumFileReadsToStartAutoReadahead` (default: 2) and scans are - // sequential. + // reached `table_options.num_file_reads_for_auto_readahead` (default: 2) and + // scans are sequential. num_file_reads_++; - if (num_file_reads_ <= - BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { + if (num_file_reads_ <= rep->table_options.num_file_reads_for_auto_readahead) { return; } @@ -87,7 +88,8 @@ void BlockPrefetcher::PrefetchIfNeeded( if (rep->file->use_direct_io()) { rep->CreateFilePrefetchBufferIfNotExists( initial_auto_readahead_size_, max_auto_readahead_size, - &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_); + &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_, + rep->table_options.num_file_reads_for_auto_readahead); return; } @@ -105,7 +107,8 @@ void BlockPrefetcher::PrefetchIfNeeded( if (s.IsNotSupported()) { rep->CreateFilePrefetchBufferIfNotExists( initial_auto_readahead_size_, max_auto_readahead_size, - &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_); + &prefetch_buffer_, /*implicit_auto_readahead=*/true, num_file_reads_, + rep->table_options.num_file_reads_for_auto_readahead); return; } diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index 6e4e9dabd..af30925b7 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -493,9 +493,9 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, handle.offset() + handle.size() + BlockBasedTable::kBlockTrailerSize; uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; - rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, - false /* Implicit autoreadahead */, - 0 /*num_reads_*/); + rep->CreateFilePrefetchBuffer( + 0, 0, &prefetch_buffer, false /* Implicit autoreadahead */, + 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/); IOOptions opts; s = rep->file->PrepareIOOptions(ro, opts); diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 53d892097..017ea4a3a 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -156,9 +156,9 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle); uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; - rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer, - false /*Implicit auto readahead*/, - 0 /*num_reads_*/); + rep->CreateFilePrefetchBuffer( + 0, 0, &prefetch_buffer, false /*Implicit auto readahead*/, + 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/); IOOptions opts; { Status s = rep->file->PrepareIOOptions(ro, opts);