Add buffer prefetch support for non directIO usecase (#7312)

Summary:
A new file interface `SupportPrefetch()` is added. When the user overrides it to `false`, an internal prefetch buffer will be used for readahead. Useful for non-directIO but FS doesn't have readahead support.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7312

Reviewed By: anand1976

Differential Revision: D23329847

Pulled By: jay-zhuang

fbshipit-source-id: 71cd4ce6f4a820840294e4e6aec111ab76175527
main
Jay Zhuang 4 years ago committed by Facebook GitHub Bot
parent 5043960623
commit c2485f2d81
  1. 1
      CMakeLists.txt
  2. 3
      HISTORY.md
  3. 3
      Makefile
  4. 7
      TARGETS
  5. 1
      file/file_prefetch_buffer.cc
  6. 186
      file/prefetch_test.cc
  7. 4
      include/rocksdb/file_system.h
  8. 1
      src.mk
  9. 25
      table/block_based/block_based_table_reader.cc
  10. 8
      table/block_based/block_based_table_reader.h
  11. 86
      table/block_based/block_prefetcher.cc

@ -1092,6 +1092,7 @@ if(WITH_TESTS)
env/io_posix_test.cc
env/mock_env_test.cc
file/delete_scheduler_test.cc
file/prefetch_test.cc
file/random_access_file_reader_test.cc
logging/auto_roll_logger_test.cc
logging/env_logger_test.cc

@ -20,6 +20,9 @@
* Expose kTypeDeleteWithTimestamp in EntryType and update GetEntryType() accordingly.
* Added file_checksum and file_checksum_func_name to TableFileCreationInfo, which can pass the table file checksum information through the OnTableFileCreated callback during flush and compaction.
### Behavior Changes
* File abstraction `FSRandomAccessFile.Prefetch()` default return status is changed from `OK` to `NotSupported`. If the user inherited file doesn't implement prefetch, RocksDB will create internal prefetch buffer to improve read performance.
### Others
* Error in prefetching partitioned index blocks will not be swallowed. It will fail the query and return the IOError users.

@ -1825,6 +1825,9 @@ testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY)
io_tracer_test: $(OBJ_DIR)/trace_replay/io_tracer_test.o $(OBJ_DIR)/trace_replay/io_tracer.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
prefetch_test: $(OBJ_DIR)/file/prefetch_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local

@ -1367,6 +1367,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"prefetch_test",
"file/prefetch_test.cc",
"serial",
[],
[],
],
[
"prefix_test",
"db/prefix_test.cc",

@ -28,6 +28,7 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
if (!enable_ || reader == nullptr) {
return Status::OK();
}
TEST_SYNC_POINT("FilePrefetchBuffer::Prefetch:Start");
size_t alignment = reader->file()->GetRequiredBufferAlignment();
size_t offset_ = static_cast<size_t>(offset);
uint64_t rounddown_offset = Rounddown(offset_, alignment);

@ -0,0 +1,186 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
class MockFS;
class MockRandomAccessFile : public FSRandomAccessFileWrapper {
public:
MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
bool support_prefetch, std::atomic_int& prefetch_count)
: FSRandomAccessFileWrapper(file.get()),
file_(std::move(file)),
support_prefetch_(support_prefetch),
prefetch_count_(prefetch_count) {}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
if (support_prefetch_) {
prefetch_count_.fetch_add(1);
return target()->Prefetch(offset, n, options, dbg);
} else {
return IOStatus::NotSupported();
}
}
private:
std::unique_ptr<FSRandomAccessFile> file_;
const bool support_prefetch_;
std::atomic_int& prefetch_count_;
};
class MockFS : public FileSystemWrapper {
public:
explicit MockFS(bool support_prefetch)
: FileSystemWrapper(FileSystem::Default()),
support_prefetch_(support_prefetch) {}
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s;
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(
new MockRandomAccessFile(file, support_prefetch_, prefetch_count_));
return s;
}
void ClearPrefetchCount() { prefetch_count_ = 0; }
bool IsPrefetchCalled() { return prefetch_count_ > 0; }
private:
const bool support_prefetch_;
std::atomic_int prefetch_count_{0};
};
class PrefetchTest
: public DBTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
PrefetchTest() : DBTestBase("/prefetch_test", true) {}
};
std::string BuildKey(int num, std::string postfix = "") {
return "my_key_" + std::to_string(num) + postfix;
}
TEST_P(PrefetchTest, Basic) {
// First param is if the mockFS support_prefetch or not
bool support_prefetch = std::get<0>(GetParam());
// Second param is if directIO is enabled or not
bool use_direct_io = std::get<1>(GetParam());
const int kNumKeys = 1100;
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(support_prefetch);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.env = env.get();
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
}
int buff_prefetch_count = 0;
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->EnableProcessing();
Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test
return;
} else {
ASSERT_OK(s);
}
// create first key range
WriteBatch batch;
for (int i = 0; i < kNumKeys; i++) {
batch.Put(BuildKey(i), "value for range 1 key");
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
// create second key range
batch.Clear();
for (int i = 0; i < kNumKeys; i++) {
batch.Put(BuildKey(i, "key2"), "value for range 2 key");
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
// delete second key range
batch.Clear();
for (int i = 0; i < kNumKeys; i++) {
batch.Delete(BuildKey(i, "key2"));
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
// compact database
std::string start_key = BuildKey(0);
std::string end_key = BuildKey(kNumKeys - 1);
Slice least(start_key.data(), start_key.size());
Slice greatest(end_key.data(), end_key.size());
// commenting out the line below causes the example to work correctly
db_->CompactRange(CompactRangeOptions(), &least, &greatest);
if (support_prefetch && !use_direct_io) {
// If underline file system supports prefetch, and directIO is not enabled
// make sure prefetch() is called and FilePrefetchBuffer is not used.
ASSERT_TRUE(fs->IsPrefetchCalled());
fs->ClearPrefetchCount();
ASSERT_EQ(0, buff_prefetch_count);
} else {
// If underline file system doesn't support prefetch, or directIO is
// enabled, make sure prefetch() is not called and FilePrefetchBuffer is
// used.
ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
// count the keys
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
num_keys++;
}
}
// Make sure prefetch is called only if file system support prefetch.
if (support_prefetch && !use_direct_io) {
ASSERT_TRUE(fs->IsPrefetchCalled());
fs->ClearPrefetchCount();
ASSERT_EQ(0, buff_prefetch_count);
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
ASSERT_GT(buff_prefetch_count, 0);
buff_prefetch_count = 0;
}
Close();
}
INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -633,10 +633,12 @@ class FSRandomAccessFile {
IODebugContext* dbg) const = 0;
// Readahead the file starting from offset by n bytes for caching.
// If it's not implemented (default: `NotSupported`), RocksDB will create
// internal prefetch buffer to improve read performance.
virtual IOStatus Prefetch(uint64_t /*offset*/, size_t /*n*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) {
return IOStatus::OK();
return IOStatus::NotSupported();
}
// Read a bunch of blocks as described by reqs. The blocks can

@ -424,6 +424,7 @@ TEST_MAIN_SOURCES = \
env/io_posix_test.cc \
env/mock_env_test.cc \
file/delete_scheduler_test.cc \
file/prefetch_test.cc \
file/random_access_file_reader_test.cc \
logging/auto_roll_logger_test.cc \
logging/env_logger_test.cc \

@ -747,22 +747,23 @@ Status BlockBasedTable::PrefetchTail(
}
TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen",
&tail_prefetch_size);
Status s;
// TODO should not have this special logic in the future.
// Try file system prefetch
if (!file->use_direct_io() && !force_direct_prefetch) {
prefetch_buffer->reset(new FilePrefetchBuffer(
nullptr, 0, 0, false /* enable */, true /* track_min_offset */));
s = file->Prefetch(prefetch_off, prefetch_len);
} else {
prefetch_buffer->reset(new FilePrefetchBuffer(
nullptr, 0, 0, true /* enable */, true /* track_min_offset */));
IOOptions opts;
s = PrepareIOFromReadOptions(ro, file->env(), opts);
if (s.ok()) {
s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len);
if (!file->Prefetch(prefetch_off, prefetch_len).IsNotSupported()) {
prefetch_buffer->reset(
new FilePrefetchBuffer(nullptr, 0, 0, false, true));
return Status::OK();
}
}
// Use `FilePrefetchBuffer`
prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true));
IOOptions opts;
Status s = PrepareIOFromReadOptions(ro, file->env(), opts);
if (s.ok()) {
s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len);
}
return s;
}

@ -633,5 +633,13 @@ struct BlockBasedTable::Rep {
max_readahead_size,
!ioptions.allow_mmap_reads /* enable */));
}
void CreateFilePrefetchBufferIfNotExists(
size_t readahead_size, size_t max_readahead_size,
std::unique_ptr<FilePrefetchBuffer>* fpb) const {
if (!(*fpb)) {
CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb);
}
}
};
} // namespace ROCKSDB_NAMESPACE

@ -13,44 +13,54 @@ void BlockPrefetcher::PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle,
size_t readahead_size,
bool is_for_compaction) {
if (!is_for_compaction) {
if (readahead_size == 0) {
// Implicit auto readahead
num_file_reads_++;
if (num_file_reads_ >
BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) {
if (!rep->file->use_direct_io() &&
(handle.offset() + static_cast<size_t>(block_size(handle)) >
readahead_limit_)) {
// Buffered I/O
// Discarding the return status of Prefetch calls intentionally, as
// we can fallback to reading from disk if Prefetch fails.
rep->file->Prefetch(handle.offset(), readahead_size_);
readahead_limit_ =
static_cast<size_t>(handle.offset() + readahead_size_);
// Keep exponentially increasing readahead size until
// kMaxAutoReadaheadSize.
readahead_size_ = std::min(BlockBasedTable::kMaxAutoReadaheadSize,
readahead_size_ * 2);
} else if (rep->file->use_direct_io() && !prefetch_buffer_) {
// Direct I/O
// Let FilePrefetchBuffer take care of the readahead.
rep->CreateFilePrefetchBuffer(BlockBasedTable::kInitAutoReadaheadSize,
BlockBasedTable::kMaxAutoReadaheadSize,
&prefetch_buffer_);
}
}
} else if (!prefetch_buffer_) {
// Explicit user requested readahead
// The actual condition is:
// if (readahead_size != 0 && !prefetch_buffer_)
rep->CreateFilePrefetchBuffer(readahead_size, readahead_size,
&prefetch_buffer_);
}
} else if (!prefetch_buffer_) {
rep->CreateFilePrefetchBuffer(compaction_readahead_size_,
compaction_readahead_size_,
&prefetch_buffer_);
if (is_for_compaction) {
rep->CreateFilePrefetchBufferIfNotExists(compaction_readahead_size_,
compaction_readahead_size_,
&prefetch_buffer_);
return;
}
// Explicit user requested readahead
if (readahead_size > 0) {
rep->CreateFilePrefetchBufferIfNotExists(readahead_size, readahead_size,
&prefetch_buffer_);
return;
}
// Implicit auto readahead, which will be enabled if the number of reads
// reached `kMinNumFileReadsToStartAutoReadahead` (default: 2).
num_file_reads_++;
if (num_file_reads_ <=
BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) {
return;
}
if (rep->file->use_direct_io()) {
rep->CreateFilePrefetchBufferIfNotExists(
BlockBasedTable::kInitAutoReadaheadSize,
BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_);
return;
}
if (handle.offset() + static_cast<size_t>(block_size(handle)) <=
readahead_limit_) {
return;
}
// If prefetch is not supported, fall back to use internal prefetch buffer.
// Discarding other return status of Prefetch calls intentionally, as
// we can fallback to reading from disk if Prefetch fails.
Status s = rep->file->Prefetch(handle.offset(), readahead_size_);
if (s.IsNotSupported()) {
rep->CreateFilePrefetchBufferIfNotExists(
BlockBasedTable::kInitAutoReadaheadSize,
BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_);
return;
}
readahead_limit_ = static_cast<size_t>(handle.offset() + readahead_size_);
// Keep exponentially increasing readahead size until
// kMaxAutoReadaheadSize.
readahead_size_ =
std::min(BlockBasedTable::kMaxAutoReadaheadSize, readahead_size_ * 2);
}
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save