From 04b2c16f9b81b69e599b5270fa48cb5b47dd0fe9 Mon Sep 17 00:00:00 2001 From: Akanksha Mahajan Date: Tue, 30 Nov 2021 22:52:14 -0800 Subject: [PATCH] Fix bug in rocksdb internal automatic prefetching (#9234) Summary: After introducing adaptive_readahead, the original flow got broken. Readahead size was set to 0 because of which rocksdb wasn't be able to do automatic prefetching which it enables after seeing sequential reads. This PR fixes it. ---------------------------------------------------------------------------------------------------- Before this patch: b_bench -use_existing_db=true -db=/tmp/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 6.27 Date: Tue Nov 30 11:56:50 2021 CPU: 24 * Intel Core Processor (Broadwell) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 WARNING: Assertions are enabled; benchmarks unnecessarily slow ------------------------------------------------ DB path: [/tmp/prefix_scan] seekrandom : 5356367.174 micros/op 0 ops/sec; 29.4 MB/s (23 of 23 found) ---------------------------------------------------------------------------------------------------- After the patch: ./db_bench -use_existing_db=true -db=/tmp/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 6.27 Date: Tue Nov 30 14:38:33 2021 CPU: 24 * Intel Core Processor (Broadwell) CPUCache: 16384 KB Keys: 32 bytes each (+ 0 bytes user-defined timestamp) Values: 512 bytes each (256 bytes after compression) Entries: 5000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 2594.0 MB (estimated) FileSize: 1373.3 MB (estimated) Write rate: 0 bytes/second Read rate: 0 ops/second Compression: Snappy Compression sampling rate: 0 Memtablerep: SkipListFactory Perf Level: 1 WARNING: Assertions are enabled; benchmarks unnecessarily slow ------------------------------------------------ DB path: [/tmp/prefix_scan] seekrandom : 456504.277 micros/op 2 ops/sec; 359.8 MB/s (264 of 264 found) Pull Request resolved: https://github.com/facebook/rocksdb/pull/9234 Test Plan: Ran ./db_bench -db=/data/mysql/rocksdb/prefix_scan -benchmarks="fillseq" -key_size=32 -value_size=512 -num=5000000 -use_d irect_io_for_flush_and_compaction=true -target_file_size_base=16777216 and then ./db_bench -use_existing_db=true -db=/data/mysql/rocksdb/prefix_scan -benchmarks="seekrandom" -key_size=32 -value_siz e=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 and compared the results. Reviewed By: anand1976 Differential Revision: D32743965 Pulled By: akankshamahajan15 fbshipit-source-id: b950fba68c91963b7deb5c20acdf471bc60251f5 --- file/file_prefetch_buffer.cc | 4 +- file/file_prefetch_buffer.h | 11 +++-- file/prefetch_test.cc | 40 ++++++++++++++----- .../block_based/block_based_table_iterator.h | 10 +++-- table/block_based/block_prefetcher.h | 7 +++- .../block_based/partitioned_index_iterator.h | 6 ++- 6 files changed, 55 insertions(+), 23 deletions(-) diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index a3d6e0727..719e6e7d2 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -123,6 +123,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, // If readahead is enabled: prefetch the remaining bytes + readahead bytes // and satisfy the request. // If readahead is not enabled: return false. + TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", + &readahead_size_); if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { if (readahead_size_ > 0) { assert(reader != nullptr); @@ -161,8 +163,6 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, #endif return false; } - TEST_SYNC_POINT_CALLBACK("FilePrefetchBuffer::TryReadFromCache", - &readahead_size_); readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); } else { return false; diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index e91ee41ce..6607a34de 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -30,6 +30,8 @@ class RandomAccessFileReader; class FilePrefetchBuffer { public: static const int kMinNumFileReadsToStartAutoReadahead = 2; + static const size_t kInitAutoReadaheadSize = 8 * 1024; + // Constructor. // // All arguments are optional. @@ -57,7 +59,6 @@ class FilePrefetchBuffer { : buffer_offset_(0), readahead_size_(readahead_size), max_readahead_size_(max_readahead_size), - initial_readahead_size_(readahead_size), min_offset_read_(port::kMaxSizet), enable_(enable), track_min_offset_(track_min_offset), @@ -95,6 +96,7 @@ class FilePrefetchBuffer { // tracked if track_min_offset = true. size_t min_offset_read() const { return min_offset_read_; } + // Called in case of implicit auto prefetching. void UpdateReadPattern(const uint64_t& offset, const size_t& len, bool is_adaptive_readahead = false) { if (is_adaptive_readahead) { @@ -111,9 +113,10 @@ class FilePrefetchBuffer { return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); } + // Called in case of implicit auto prefetching. void ResetValues() { num_file_reads_ = 1; - readahead_size_ = initial_readahead_size_; + readahead_size_ = kInitAutoReadaheadSize; } void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) { @@ -136,8 +139,9 @@ class FilePrefetchBuffer { if ((offset + size > buffer_offset_ + buffer_.CurrentSize()) && IsBlockSequential(offset) && (num_file_reads_ + 1 > kMinNumFileReadsToStartAutoReadahead)) { + size_t initial_auto_readahead_size = kInitAutoReadaheadSize; readahead_size_ = - std::max(initial_readahead_size_, + std::max(initial_auto_readahead_size, (readahead_size_ >= value ? readahead_size_ - value : 0)); } } @@ -150,7 +154,6 @@ class FilePrefetchBuffer { // FilePrefetchBuffer object won't be created from Iterator flow if // max_readahead_size_ = 0. size_t max_readahead_size_; - size_t initial_readahead_size_; // The minimum `offset` ever passed to TryReadFromCache(). size_t min_offset_read_; // if false, TryReadFromCache() always return false, and we only take stats diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 6c2769422..28f1a1b94 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -670,13 +670,16 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { Close(); } -class PrefetchTest1 : public DBTestBase, - public ::testing::WithParamInterface { +class PrefetchTest1 + : public DBTestBase, + public ::testing::WithParamInterface> { public: PrefetchTest1() : DBTestBase("prefetch_test1", true) {} }; -INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool()); +INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, + ::testing::Combine(::testing::Bool(), + ::testing::Bool())); #ifndef ROCKSDB_LITE TEST_P(PrefetchTest1, DBIterLevelReadAhead) { @@ -686,12 +689,13 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { std::make_shared(env_->GetFileSystem(), false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + bool is_adaptive_readahead = std::get<1>(GetParam()); Options options = CurrentOptions(); options.write_buffer_size = 1024; options.create_if_missing = true; options.compression = kNoCompression; options.env = env.get(); - if (GetParam()) { + if (std::get<0>(GetParam())) { options.use_direct_reads = true; options.use_direct_io_for_flush_and_compaction = true; } @@ -704,7 +708,8 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Status s = TryReopen(options); - if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { + if (std::get<0>(GetParam()) && + (s.IsNotSupported() || s.IsInvalidArgument())) { // If direct IO is not supported, skip the test return; } else { @@ -748,12 +753,15 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { current_readahead_size = *reinterpret_cast(arg); + ASSERT_GT(current_readahead_size, 0); }); SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; - ro.adaptive_readahead = true; + if (is_adaptive_readahead) { + ro.adaptive_readahead = true; + } auto iter = std::unique_ptr(db_->NewIterator(ro)); int num_keys = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -763,14 +771,28 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) { ASSERT_GT(buff_prefetch_count, 0); buff_prefetch_count = 0; // For index and data blocks. - ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); + if (is_adaptive_readahead) { + ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); + } else { + ASSERT_EQ(readahead_carry_over_count, 0); + } SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } Close(); } +#endif //! ROCKSDB_LITE -TEST_P(PrefetchTest1, NonSequentialReads) { +class PrefetchTest2 : public DBTestBase, + public ::testing::WithParamInterface { + public: + PrefetchTest2() : DBTestBase("prefetch_test2", true) {} +}; + +INSTANTIATE_TEST_CASE_P(PrefetchTest2, PrefetchTest2, ::testing::Bool()); + +#ifndef ROCKSDB_LITE +TEST_P(PrefetchTest2, NonSequentialReads) { const int kNumKeys = 1000; // Set options std::shared_ptr fs = @@ -856,7 +878,7 @@ TEST_P(PrefetchTest1, NonSequentialReads) { } #endif //! ROCKSDB_LITE -TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { +TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) { const int kNumKeys = 2000; // Set options std::shared_ptr fs = diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index e9172907a..6228f73ef 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -161,10 +161,12 @@ class BlockBasedTableIterator : public InternalIteratorBase { } void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { - block_prefetcher_.SetReadaheadState( - &(readahead_file_info->data_block_readahead_info)); - if (index_iter_) { - index_iter_->SetReadaheadState(readahead_file_info); + if (read_options_.adaptive_readahead) { + block_prefetcher_.SetReadaheadState( + &(readahead_file_info->data_block_readahead_info)); + if (index_iter_) { + index_iter_->SetReadaheadState(readahead_file_info); + } } } diff --git a/table/block_based/block_prefetcher.h b/table/block_based/block_prefetcher.h index 35c5eceb5..74100c380 100644 --- a/table/block_based/block_prefetcher.h +++ b/table/block_based/block_prefetcher.h @@ -30,8 +30,11 @@ class BlockPrefetcher { void ResetValues() { num_file_reads_ = 1; - readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; - initial_auto_readahead_size_ = readahead_size_; + // Since initial_auto_readahead_size_ can be different from + // kInitAutoReadaheadSize in case of adaptive_readahead, so fallback the + // readahead_size_ to kInitAutoReadaheadSize in case of reset. + initial_auto_readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; + readahead_size_ = initial_auto_readahead_size_; readahead_limit_ = 0; return; } diff --git a/table/block_based/partitioned_index_iterator.h b/table/block_based/partitioned_index_iterator.h index 40ad8bb23..a2cbefe80 100644 --- a/table/block_based/partitioned_index_iterator.h +++ b/table/block_based/partitioned_index_iterator.h @@ -123,8 +123,10 @@ class PartitionedIndexIterator : public InternalIteratorBase { } void SetReadaheadState(ReadaheadFileInfo* readahead_file_info) override { - block_prefetcher_.SetReadaheadState( - &(readahead_file_info->index_block_readahead_info)); + if (read_options_.adaptive_readahead) { + block_prefetcher_.SetReadaheadState( + &(readahead_file_info->index_block_readahead_info)); + } } std::unique_ptr> index_iter_;