From c2485f2d81583488333a9a79ece86f0c0380c6a0 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Thu, 27 Aug 2020 18:15:11 -0700 Subject: [PATCH] Add buffer prefetch support for non directIO usecase (#7312) Summary: A new file interface `SupportPrefetch()` is added. When the user overrides it to `false`, an internal prefetch buffer will be used for readahead. Useful for non-directIO but FS doesn't have readahead support. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7312 Reviewed By: anand1976 Differential Revision: D23329847 Pulled By: jay-zhuang fbshipit-source-id: 71cd4ce6f4a820840294e4e6aec111ab76175527 --- CMakeLists.txt | 1 + HISTORY.md | 3 + Makefile | 3 + TARGETS | 7 + file/file_prefetch_buffer.cc | 1 + file/prefetch_test.cc | 186 ++++++++++++++++++ include/rocksdb/file_system.h | 4 +- src.mk | 1 + table/block_based/block_based_table_reader.cc | 25 +-- table/block_based/block_based_table_reader.h | 8 + table/block_based/block_prefetcher.cc | 86 ++++---- 11 files changed, 274 insertions(+), 51 deletions(-) create mode 100644 file/prefetch_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 46209c7d1..18b14afae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1092,6 +1092,7 @@ if(WITH_TESTS) env/io_posix_test.cc env/mock_env_test.cc file/delete_scheduler_test.cc + file/prefetch_test.cc file/random_access_file_reader_test.cc logging/auto_roll_logger_test.cc logging/env_logger_test.cc diff --git a/HISTORY.md b/HISTORY.md index dc20cad36..70afd8e44 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,6 +20,9 @@ * Expose kTypeDeleteWithTimestamp in EntryType and update GetEntryType() accordingly. * Added file_checksum and file_checksum_func_name to TableFileCreationInfo, which can pass the table file checksum information through the OnTableFileCreated callback during flush and compaction. +### Behavior Changes +* File abstraction `FSRandomAccessFile.Prefetch()` default return status is changed from `OK` to `NotSupported`. If the user inherited file doesn't implement prefetch, RocksDB will create internal prefetch buffer to improve read performance. + ### Others * Error in prefetching partitioned index blocks will not be swallowed. It will fail the query and return the IOError users. diff --git a/Makefile b/Makefile index 044b3b4fa..b4e9207e3 100644 --- a/Makefile +++ b/Makefile @@ -1825,6 +1825,9 @@ testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY) io_tracer_test: $(OBJ_DIR)/trace_replay/io_tracer_test.o $(OBJ_DIR)/trace_replay/io_tracer.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index ab0ce85e4..caa65f752 100644 --- a/TARGETS +++ b/TARGETS @@ -1367,6 +1367,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "prefetch_test", + "file/prefetch_test.cc", + "serial", + [], + [], + ], [ "prefix_test", "db/prefix_test.cc", diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index bccc59fc0..8d9798d09 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -28,6 +28,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, if (!enable_ || reader == nullptr) { return Status::OK(); } + TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start"); size_t alignment = reader->file()->GetRequiredBufferAlignment(); size_t offset_ = static_cast(offset); uint64_t rounddown_offset = Rounddown(offset_, alignment); diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc new file mode 100644 index 000000000..ffe0367a4 --- /dev/null +++ b/file/prefetch_test.cc @@ -0,0 +1,186 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +class MockFS; + +class MockRandomAccessFile : public FSRandomAccessFileWrapper { + public: + MockRandomAccessFile(std::unique_ptr& file, + bool support_prefetch, std::atomic_int& prefetch_count) + : FSRandomAccessFileWrapper(file.get()), + file_(std::move(file)), + support_prefetch_(support_prefetch), + prefetch_count_(prefetch_count) {} + + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) override { + if (support_prefetch_) { + prefetch_count_.fetch_add(1); + return target()->Prefetch(offset, n, options, dbg); + } else { + return IOStatus::NotSupported(); + } + } + + private: + std::unique_ptr file_; + const bool support_prefetch_; + std::atomic_int& prefetch_count_; +}; + +class MockFS : public FileSystemWrapper { + public: + explicit MockFS(bool support_prefetch) + : FileSystemWrapper(FileSystem::Default()), + support_prefetch_(support_prefetch) {} + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + std::unique_ptr file; + IOStatus s; + s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + result->reset( + new MockRandomAccessFile(file, support_prefetch_, prefetch_count_)); + return s; + } + + void ClearPrefetchCount() { prefetch_count_ = 0; } + + bool IsPrefetchCalled() { return prefetch_count_ > 0; } + + private: + const bool support_prefetch_; + std::atomic_int prefetch_count_{0}; +}; + +class PrefetchTest + : public DBTestBase, + public ::testing::WithParamInterface> { + public: + PrefetchTest() : DBTestBase("/prefetch_test", true) {} +}; + +std::string BuildKey(int num, std::string postfix = "") { + return "my_key_" + std::to_string(num) + postfix; +} + +TEST_P(PrefetchTest, Basic) { + // First param is if the mockFS support_prefetch or not + bool support_prefetch = std::get<0>(GetParam()); + + // Second param is if directIO is enabled or not + bool use_direct_io = std::get<1>(GetParam()); + + const int kNumKeys = 1100; + std::shared_ptr fs = std::make_shared(support_prefetch); + std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); + Options options = CurrentOptions(); + options.write_buffer_size = 1024; + options.create_if_missing = true; + options.compression = kNoCompression; + options.env = env.get(); + if (use_direct_io) { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + } + + int buff_prefetch_count = 0; + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Status s = TryReopen(options); + if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { + // If direct IO is not supported, skip the test + return; + } else { + ASSERT_OK(s); + } + + // create first key range + WriteBatch batch; + for (int i = 0; i < kNumKeys; i++) { + batch.Put(BuildKey(i), "value for range 1 key"); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // create second key range + batch.Clear(); + for (int i = 0; i < kNumKeys; i++) { + batch.Put(BuildKey(i, "key2"), "value for range 2 key"); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // delete second key range + batch.Clear(); + for (int i = 0; i < kNumKeys; i++) { + batch.Delete(BuildKey(i, "key2")); + } + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // compact database + std::string start_key = BuildKey(0); + std::string end_key = BuildKey(kNumKeys - 1); + Slice least(start_key.data(), start_key.size()); + Slice greatest(end_key.data(), end_key.size()); + + // commenting out the line below causes the example to work correctly + db_->CompactRange(CompactRangeOptions(), &least, &greatest); + + if (support_prefetch && !use_direct_io) { + // If underline file system supports prefetch, and directIO is not enabled + // make sure prefetch() is called and FilePrefetchBuffer is not used. + ASSERT_TRUE(fs->IsPrefetchCalled()); + fs->ClearPrefetchCount(); + ASSERT_EQ(0, buff_prefetch_count); + } else { + // If underline file system doesn't support prefetch, or directIO is + // enabled, make sure prefetch() is not called and FilePrefetchBuffer is + // used. + ASSERT_FALSE(fs->IsPrefetchCalled()); + ASSERT_GT(buff_prefetch_count, 0); + buff_prefetch_count = 0; + } + + // count the keys + { + auto iter = std::unique_ptr(db_->NewIterator(ReadOptions())); + int num_keys = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + num_keys++; + } + } + + // Make sure prefetch is called only if file system support prefetch. + if (support_prefetch && !use_direct_io) { + ASSERT_TRUE(fs->IsPrefetchCalled()); + fs->ClearPrefetchCount(); + ASSERT_EQ(0, buff_prefetch_count); + } else { + ASSERT_FALSE(fs->IsPrefetchCalled()); + ASSERT_GT(buff_prefetch_count, 0); + buff_prefetch_count = 0; + } + Close(); +} + +INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest, + ::testing::Combine(::testing::Bool(), + ::testing::Bool())); + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 8292f0ee2..76009c2bb 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -633,10 +633,12 @@ class FSRandomAccessFile { IODebugContext* dbg) const = 0; // Readahead the file starting from offset by n bytes for caching. + // If it's not implemented (default: `NotSupported`), RocksDB will create + // internal prefetch buffer to improve read performance. virtual IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/, const IOOptions& /*options*/, IODebugContext* /*dbg*/) { - return IOStatus::OK(); + return IOStatus::NotSupported(); } // Read a bunch of blocks as described by reqs. The blocks can diff --git a/src.mk b/src.mk index 49d6132a3..a6c823b3b 100644 --- a/src.mk +++ b/src.mk @@ -424,6 +424,7 @@ TEST_MAIN_SOURCES = \ env/io_posix_test.cc \ env/mock_env_test.cc \ file/delete_scheduler_test.cc \ + file/prefetch_test.cc \ file/random_access_file_reader_test.cc \ logging/auto_roll_logger_test.cc \ logging/env_logger_test.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 686db90f7..0aa5cf725 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -747,22 +747,23 @@ Status BlockBasedTable::PrefetchTail( } TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen", &tail_prefetch_size); - Status s; - // TODO should not have this special logic in the future. + + // Try file system prefetch if (!file->use_direct_io() && !force_direct_prefetch) { - prefetch_buffer->reset(new FilePrefetchBuffer( - nullptr, 0, 0, false /* enable */, true /* track_min_offset */)); - s = file->Prefetch(prefetch_off, prefetch_len); - } else { - prefetch_buffer->reset(new FilePrefetchBuffer( - nullptr, 0, 0, true /* enable */, true /* track_min_offset */)); - IOOptions opts; - s = PrepareIOFromReadOptions(ro, file->env(), opts); - if (s.ok()) { - s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len); + if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) { + prefetch_buffer->reset( + new FilePrefetchBuffer(nullptr, 0, 0, false, true)); + return Status::OK(); } } + // Use `FilePrefetchBuffer` + prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true)); + IOOptions opts; + Status s = PrepareIOFromReadOptions(ro, file->env(), opts); + if (s.ok()) { + s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len); + } return s; } diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 20a1317cd..9ee28dc98 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -633,5 +633,13 @@ struct BlockBasedTable::Rep { max_readahead_size, !ioptions.allow_mmap_reads /* enable */)); } + + void CreateFilePrefetchBufferIfNotExists( + size_t readahead_size, size_t max_readahead_size, + std::unique_ptr* fpb) const { + if (!(*fpb)) { + CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb); + } + } }; } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_prefetcher.cc b/table/block_based/block_prefetcher.cc index 087a96864..aa3fc3610 100644 --- a/table/block_based/block_prefetcher.cc +++ b/table/block_based/block_prefetcher.cc @@ -13,44 +13,54 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep, const BlockHandle& handle, size_t readahead_size, bool is_for_compaction) { - if (!is_for_compaction) { - if (readahead_size == 0) { - // Implicit auto readahead - num_file_reads_++; - if (num_file_reads_ > - BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { - if (!rep->file->use_direct_io() && - (handle.offset() + static_cast(block_size(handle)) > - readahead_limit_)) { - // Buffered I/O - // Discarding the return status of Prefetch calls intentionally, as - // we can fallback to reading from disk if Prefetch fails. - rep->file->Prefetch(handle.offset(), readahead_size_); - readahead_limit_ = - static_cast(handle.offset() + readahead_size_); - // Keep exponentially increasing readahead size until - // kMaxAutoReadaheadSize. - readahead_size_ = std::min(BlockBasedTable::kMaxAutoReadaheadSize, - readahead_size_ * 2); - } else if (rep->file->use_direct_io() && !prefetch_buffer_) { - // Direct I/O - // Let FilePrefetchBuffer take care of the readahead. - rep->CreateFilePrefetchBuffer(BlockBasedTable::kInitAutoReadaheadSize, - BlockBasedTable::kMaxAutoReadaheadSize, - &prefetch_buffer_); - } - } - } else if (!prefetch_buffer_) { - // Explicit user requested readahead - // The actual condition is: - // if (readahead_size != 0 && !prefetch_buffer_) - rep->CreateFilePrefetchBuffer(readahead_size, readahead_size, - &prefetch_buffer_); - } - } else if (!prefetch_buffer_) { - rep->CreateFilePrefetchBuffer(compaction_readahead_size_, - compaction_readahead_size_, - &prefetch_buffer_); + if (is_for_compaction) { + rep->CreateFilePrefetchBufferIfNotExists(compaction_readahead_size_, + compaction_readahead_size_, + &prefetch_buffer_); + return; } + + // Explicit user requested readahead + if (readahead_size > 0) { + rep->CreateFilePrefetchBufferIfNotExists(readahead_size, readahead_size, + &prefetch_buffer_); + return; + } + + // Implicit auto readahead, which will be enabled if the number of reads + // reached `kMinNumFileReadsToStartAutoReadahead` (default: 2). + num_file_reads_++; + if (num_file_reads_ <= + BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { + return; + } + + if (rep->file->use_direct_io()) { + rep->CreateFilePrefetchBufferIfNotExists( + BlockBasedTable::kInitAutoReadaheadSize, + BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_); + return; + } + + if (handle.offset() + static_cast(block_size(handle)) <= + readahead_limit_) { + return; + } + + // If prefetch is not supported, fall back to use internal prefetch buffer. + // Discarding other return status of Prefetch calls intentionally, as + // we can fallback to reading from disk if Prefetch fails. + Status s = rep->file->Prefetch(handle.offset(), readahead_size_); + if (s.IsNotSupported()) { + rep->CreateFilePrefetchBufferIfNotExists( + BlockBasedTable::kInitAutoReadaheadSize, + BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_); + return; + } + readahead_limit_ = static_cast(handle.offset() + readahead_size_); + // Keep exponentially increasing readahead size until + // kMaxAutoReadaheadSize. + readahead_size_ = + std::min(BlockBasedTable::kMaxAutoReadaheadSize, readahead_size_ * 2); } } // namespace ROCKSDB_NAMESPACE