diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 053caacd0..23bb85bb3 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -25,10 +25,12 @@ class MockFS; class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper { public: MockRandomAccessFile(std::unique_ptr& file, - bool support_prefetch, std::atomic_int& prefetch_count) + bool support_prefetch, std::atomic_int& prefetch_count, + bool small_buffer_alignment = false) : FSRandomAccessFileOwnerWrapper(std::move(file)), support_prefetch_(support_prefetch), - prefetch_count_(prefetch_count) {} + prefetch_count_(prefetch_count), + small_buffer_alignment_(small_buffer_alignment) {} IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, IODebugContext* dbg) override { @@ -40,16 +42,25 @@ class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper { } } + size_t GetRequiredBufferAlignment() const override { + return small_buffer_alignment_ + ? 1 + : FSRandomAccessFileOwnerWrapper::GetRequiredBufferAlignment(); + } + private: const bool support_prefetch_; std::atomic_int& prefetch_count_; + const bool small_buffer_alignment_; }; class MockFS : public FileSystemWrapper { public: explicit MockFS(const std::shared_ptr& wrapped, - bool support_prefetch) - : FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {} + bool support_prefetch, bool small_buffer_alignment = false) + : FileSystemWrapper(wrapped), + support_prefetch_(support_prefetch), + small_buffer_alignment_(small_buffer_alignment) {} static const char* kClassName() { return "MockFS"; } const char* Name() const override { return kClassName(); } @@ -61,8 +72,8 @@ class MockFS : public FileSystemWrapper { std::unique_ptr file; IOStatus s; s = target()->NewRandomAccessFile(fname, opts, &file, dbg); - result->reset( - new MockRandomAccessFile(file, support_prefetch_, prefetch_count_)); + result->reset(new MockRandomAccessFile( + file, support_prefetch_, prefetch_count_, small_buffer_alignment_)); return s; } @@ -76,6 +87,7 @@ class MockFS : public FileSystemWrapper { private: const bool support_prefetch_; + const bool small_buffer_alignment_; std::atomic_int prefetch_count_{0}; }; @@ -85,7 +97,8 @@ class PrefetchTest public: PrefetchTest() : DBTestBase("prefetch_test", true) {} - void SetGenericOptions(Env* env, bool use_direct_io, Options& options) { + virtual void SetGenericOptions(Env* env, bool use_direct_io, + Options& options) { options = CurrentOptions(); options.write_buffer_size = 1024; options.create_if_missing = true; @@ -236,30 +249,79 @@ TEST_P(PrefetchTest, Basic) { Close(); } -TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) { - const bool support_prefetch = - std::get<0>(GetParam()) && - test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); - // Second param is if directIO is enabled or not - const bool use_direct_io = std::get<1>(GetParam()); - const bool use_file_prefetch_buffer = !support_prefetch || use_direct_io; +class PrefetchTailTest : public PrefetchTest { + public: + bool SupportPrefetch() const { + return std::get<0>(GetParam()) && + test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); + } - std::shared_ptr fs = - std::make_shared(env_->GetFileSystem(), support_prefetch); - std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + bool UseDirectIO() const { return std::get<1>(GetParam()); } + + bool UseFilePrefetchBuffer() const { + return !SupportPrefetch() || UseDirectIO(); + } + + Env* GetEnv(bool small_buffer_alignment = false) const { + std::shared_ptr fs = std::make_shared( + env_->GetFileSystem(), SupportPrefetch(), small_buffer_alignment); + + return new CompositeEnvWrapper(env_, fs); + } + + void SetGenericOptions(Env* env, bool use_direct_io, + Options& options) override { + PrefetchTest::SetGenericOptions(env, use_direct_io, options); + options.statistics = CreateDBStatistics(); + } + + void SetBlockBasedTableOptions( + BlockBasedTableOptions& table_options, bool partition_filters = true, + uint64_t metadata_block_size = + BlockBasedTableOptions().metadata_block_size, + bool use_small_cache = false) { + table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch; + table_options.partition_filters = partition_filters; + if (table_options.partition_filters) { + table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + } + table_options.metadata_block_size = metadata_block_size; + + if (use_small_cache) { + LRUCacheOptions co; + co.capacity = 1; + std::shared_ptr cache = NewLRUCache(co); + table_options.block_cache = cache; + } + } + int64_t GetNumIndexPartition() const { + int64_t index_partition_counts = 0; + TablePropertiesCollection all_table_props; + assert(db_->GetPropertiesOfAllTables(&all_table_props).ok()); + for (const auto& name_and_table_props : all_table_props) { + const auto& table_props = name_and_table_props.second; + index_partition_counts += table_props->index_partitions; + } + return index_partition_counts; + } +}; + +INSTANTIATE_TEST_CASE_P(PrefetchTailTest, PrefetchTailTest, + ::testing::Combine(::testing::Bool(), + ::testing::Bool())); + +TEST_P(PrefetchTailTest, Basic) { + std::unique_ptr env(GetEnv()); Options options; - SetGenericOptions(env.get(), use_direct_io, options); - options.statistics = CreateDBStatistics(); + SetGenericOptions(env.get(), UseDirectIO(), options); BlockBasedTableOptions bbto; - bbto.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch; - bbto.partition_filters = true; - bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + SetBlockBasedTableOptions(bbto); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Status s = TryReopen(options); - if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + if (UseDirectIO() && (s.IsNotSupported() || s.IsInvalidArgument())) { // If direct IO is not supported, skip the test ROCKSDB_GTEST_BYPASS("Direct IO is not supported"); return; @@ -276,7 +338,7 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) { HistogramData post_flush_file_read; options.statistics->histogramData(FILE_READ_FLUSH_MICROS, &post_flush_file_read); - if (use_file_prefetch_buffer) { + if (UseFilePrefetchBuffer()) { // `PartitionedFilterBlockReader/PartitionIndexReader::CacheDependencies()` // should read from the prefetched tail in file prefetch buffer instead of // initiating extra SST reads. Therefore `BlockBasedTable::PrefetchTail()` @@ -300,7 +362,7 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) { HistogramData post_compaction_file_read; options.statistics->histogramData(FILE_READ_COMPACTION_MICROS, &post_compaction_file_read); - if (use_file_prefetch_buffer) { + if (UseFilePrefetchBuffer()) { // `PartitionedFilterBlockReader/PartitionIndexReader::CacheDependencies()` // should read from the prefetched tail in file prefetch buffer instead of // initiating extra SST reads. @@ -323,6 +385,84 @@ TEST_P(PrefetchTest, BlockBasedTableTailPrefetch) { Close(); } +TEST_P(PrefetchTailTest, UpgradeToTailSizeInManifest) { + if (!UseFilePrefetchBuffer()) { + ROCKSDB_GTEST_BYPASS( + "Upgrade to tail size in manifest is only relevant when RocksDB file " + "prefetch buffer is used."); + } + if (UseDirectIO()) { + ROCKSDB_GTEST_BYPASS( + "To simplify testing logics with setting file's buffer alignment to be " + "1, direct IO is required to be disabled."); + } + + std::unique_ptr env(GetEnv(true /* small_buffer_alignment */)); + Options options; + SetGenericOptions(env.get(), false /* use_direct_io*/, options); + options.max_open_files = -1; + options.write_buffer_size = 1024 * 1024; + + BlockBasedTableOptions table_options; + SetBlockBasedTableOptions(table_options, false /* partition_filters */, + 1 /* metadata_block_size*/, + true /* use_small_cache */); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + SyncPoint::GetInstance()->EnableProcessing(); + // To simulate a pre-upgrade DB where file tail size is not recorded in + // manifest + SyncPoint::GetInstance()->SetCallBack( + "FileMetaData::FileMetaData", [&](void* arg) { + FileMetaData* meta = static_cast(arg); + meta->tail_size = 0; + }); + + ASSERT_OK(TryReopen(options)); + for (int i = 0; i < 10000; ++i) { + ASSERT_OK(Put("k" + std::to_string(i), "v")); + } + ASSERT_OK(Flush()); + + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // To simulate a DB undergoing the upgrade where tail size to prefetch is + // inferred to be a small number for files with no tail size recorded in + // manifest. + // "1" is chosen to be such number so that with `small_buffer_alignment == + // true` and `use_small_cache == true`, it would have caused one file read per + // index partition during db open if the upgrade is done wrong. + SyncPoint::GetInstance()->SetCallBack( + "BlockBasedTable::Open::TailPrefetchLen", [&](void* arg) { + std::pair* prefetch_off_len_pair = + static_cast*>(arg); + size_t* prefetch_off = prefetch_off_len_pair->first; + size_t* tail_size = prefetch_off_len_pair->second; + const size_t file_size = *prefetch_off + *tail_size; + + *tail_size = 1; + *prefetch_off = file_size - (*tail_size); + }); + + ASSERT_OK(TryReopen(options)); + + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + + HistogramData db_open_file_read; + options.statistics->histogramData(FILE_READ_DB_OPEN_MICROS, + &db_open_file_read); + + int64_t num_index_partition = GetNumIndexPartition(); + // If the upgrade is done right, db open will prefetch all the index + // partitions at once, instead of doing one read per partition. + // That is, together with `metadata_block_size == 1`, there will be more index + // partitions than number of non index partitions reads. + ASSERT_LT(db_open_file_read.count, num_index_partition); + + Close(); +} + // This test verifies BlockBasedTableOptions.max_auto_readahead_size is // configured dynamically. TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) { diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 329436975..35c6f46f6 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -851,8 +851,13 @@ Status BlockBasedTable::PrefetchTail( prefetch_off = static_cast(file_size - tail_prefetch_size); prefetch_len = tail_prefetch_size; } + +#ifndef NDEBUG + std::pair prefetch_off_len_pair = {&prefetch_off, + &prefetch_len}; TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen", - &tail_prefetch_size); + &prefetch_off_len_pair); +#endif // NDEBUG // Try file system prefetch if (!file->use_direct_io() && !force_direct_prefetch) {