Add an interface to provide support for underlying FS to pass their own buffer during reads (#11324)

Summary:
1. Public API change: Replace `use_async_io`  API in file_system with `SupportedOps` API which is used by underlying FileSystem to indicate to upper layers whether the FileSystem supports different operations introduced in `enum FSSupportedOps `. Right now operations are `async_io` and whether FS will provide its own buffer during reads or not. The api is changed to extend it to various FileSystem operations in one API rather than creating a separate API for each operation.

2. Provide support for underlying FS to pass their own buffer during Reads (async and sync read) instead of using RocksDB provided `scratch` (buffer) in `FSReadRequest`. Currently only MultiRead supports it and later will be extended to other reads as well (point lookup, scan etc). More details in how to enable in file_system.h

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11324

Test Plan: Tested locally

Reviewed By: anand1976

Differential Revision: D44465322

Pulled By: akankshamahajan15

fbshipit-source-id: 9ec9e08f839b5cc815e75d5dade6cd549998d0ec
oxigraph-main
akankshamahajan 2 years ago committed by Facebook GitHub Bot
parent fb5748decf
commit fbd2f563bb
  1. 4
      db/arena_wrapped_db_iter.cc
  2. 4
      db/blob/blob_file_reader.cc
  3. 133
      db/db_tailing_iter_test.cc
  4. 5
      db/forward_iterator.cc
  5. 8
      db/version_set.cc
  6. 3
      env/env.cc
  7. 2
      env/env_test.cc
  8. 10
      env/fs_posix.cc
  9. 10
      file/file_util.h
  10. 56
      file/prefetch_test.cc
  11. 6
      file/random_access_file_reader.cc
  12. 14
      file/random_access_file_reader_test.cc
  13. 79
      include/rocksdb/file_system.h
  14. 2
      port/win/env_win.h
  15. 2
      table/block_based/block_based_table_reader.h
  16. 46
      table/block_based/block_based_table_reader_sync_and_async.h
  17. 1
      unreleased_history/new_features/fs_provided_buffer.md
  18. 1
      unreleased_history/public_api_changes/change_async_io_api.md

@ -47,7 +47,9 @@ void ArenaWrappedDBIter::Init(
read_options_ = read_options; read_options_ = read_options;
allow_refresh_ = allow_refresh; allow_refresh_ = allow_refresh;
memtable_range_tombstone_iter_ = nullptr; memtable_range_tombstone_iter_ = nullptr;
if (!env->GetFileSystem()->use_async_io()) {
if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false; read_options_.async_io = false;
} }
} }

@ -439,11 +439,11 @@ void BlobFileReader::MultiGetBlob(
assert(req->offset >= adjustment); assert(req->offset >= adjustment);
adjustments.push_back(adjustment); adjustments.push_back(adjustment);
FSReadRequest read_req = {}; FSReadRequest read_req;
read_req.offset = req->offset - adjustment; read_req.offset = req->offset - adjustment;
read_req.len = req->len + adjustment; read_req.len = req->len + adjustment;
read_reqs.emplace_back(read_req);
total_len += read_req.len; total_len += read_req.len;
read_reqs.emplace_back(std::move(read_req));
} }
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len); RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);

@ -15,6 +15,11 @@
#include "db/forward_iterator.h" #include "db/forward_iterator.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
namespace {
static bool enable_io_uring = true;
extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
} // namespace
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class DBTestTailingIterator : public DBTestBase, class DBTestTailingIterator : public DBTestBase,
@ -50,12 +55,22 @@ TEST_P(DBTestTailingIterator, TailingIteratorSingle) {
} }
TEST_P(DBTestTailingIterator, TailingIteratorKeepAdding) { TEST_P(DBTestTailingIterator, TailingIteratorKeepAdding) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions();
options.env = env.get();
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions read_options; ReadOptions read_options;
read_options.tailing = true; read_options.tailing = true;
if (GetParam()) { if (GetParam()) {
read_options.async_io = true; read_options.async_io = true;
} }
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1])); std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
std::string value(1024, 'a'); std::string value(1024, 'a');
@ -73,17 +88,29 @@ TEST_P(DBTestTailingIterator, TailingIteratorKeepAdding) {
ASSERT_EQ(iter->key().compare(key), 0); ASSERT_EQ(iter->key().compare(key), 0);
} }
} }
Close();
}
TEST_P(DBTestTailingIterator, TailingIteratorSeekToNext) { TEST_P(DBTestTailingIterator, TailingIteratorSeekToNext) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions();
options.env = env.get();
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions read_options; ReadOptions read_options;
read_options.tailing = true; read_options.tailing = true;
if (GetParam()) { if (GetParam()) {
read_options.async_io = true; read_options.async_io = true;
} }
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1])); std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1])); std::unique_ptr<Iterator> itern(
db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(itern->status()); ASSERT_OK(itern->status());
std::string value(1024, 'a'); std::string value(1024, 'a');
@ -134,10 +161,19 @@ TEST_P(DBTestTailingIterator, TailingIteratorSeekToNext) {
ASSERT_EQ(iter->key().compare(key), 0); ASSERT_EQ(iter->key().compare(key), 0);
} }
} }
Close();
}
TEST_P(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { TEST_P(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
const uint64_t k150KB = 150 * 1024; const uint64_t k150KB = 150 * 1024;
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options; Options options;
options.env = env.get();
options.write_buffer_size = k150KB; options.write_buffer_size = k150KB;
options.max_write_buffer_number = 3; options.max_write_buffer_number = 3;
options.min_write_buffer_number_to_merge = 2; options.min_write_buffer_number_to_merge = 2;
@ -278,13 +314,21 @@ TEST_P(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
} }
TEST_P(DBTestTailingIterator, TailingIteratorDeletes) { TEST_P(DBTestTailingIterator, TailingIteratorDeletes) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions();
options.env = env.get();
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions read_options; ReadOptions read_options;
read_options.tailing = true; read_options.tailing = true;
if (GetParam()) { if (GetParam()) {
read_options.async_io = true; read_options.async_io = true;
} }
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1])); std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
@ -320,14 +364,23 @@ TEST_P(DBTestTailingIterator, TailingIteratorDeletes) {
ASSERT_EQ(count, num_records); ASSERT_EQ(count, num_records);
} }
Close();
}
TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) { TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
ReadOptions read_options; ReadOptions read_options;
read_options.tailing = true; read_options.tailing = true;
if (GetParam()) { if (GetParam()) {
read_options.async_io = true; read_options.async_io = true;
} }
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env.get();
options.create_if_missing = true; options.create_if_missing = true;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.prefix_extractor.reset(NewFixedPrefixTransform(2)); options.prefix_extractor.reset(NewFixedPrefixTransform(2));
@ -336,6 +389,7 @@ TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) {
DestroyAndReopen(options); DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1])); std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
ASSERT_OK(Put(1, "0101", "test")); ASSERT_OK(Put(1, "0101", "test"));
@ -355,9 +409,19 @@ TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) {
iter->Next(); iter->Next();
ASSERT_TRUE(!iter->Valid()); ASSERT_TRUE(!iter->Valid());
} }
Close();
}
TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) { TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions();
options.env = env.get();
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions read_options; ReadOptions read_options;
read_options.tailing = true; read_options.tailing = true;
if (GetParam()) { if (GetParam()) {
@ -370,6 +434,7 @@ TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) {
ASSERT_OK(db_->Put(WriteOptions(), key, value)); ASSERT_OK(db_->Put(WriteOptions(), key, value));
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options)); std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
iter->SeekToFirst(); iter->SeekToFirst();
@ -381,9 +446,18 @@ TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) {
// should still be true after compaction // should still be true after compaction
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
} }
Close();
}
TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) { TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env.get();
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 1000; options.write_buffer_size = 1000;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -403,6 +477,7 @@ TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) {
ASSERT_OK(db_->Put(WriteOptions(), key, value)); ASSERT_OK(db_->Put(WriteOptions(), key, value));
} }
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options)); std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
// Seek to 00001. We expect to find 00002. // Seek to 00001. We expect to find 00002.
@ -419,12 +494,22 @@ TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) {
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(found, iter->key().ToString()); ASSERT_EQ(found, iter->key().ToString());
} }
Close();
}
// Sets iterate_upper_bound and verifies that ForwardIterator doesn't call // Sets iterate_upper_bound and verifies that ForwardIterator doesn't call
// Seek() on immutable iterators when target key is >= prev_key and all // Seek() on immutable iterators when target key is >= prev_key and all
// iterators, including the memtable iterator, are over the upper bound. // iterators, including the memtable iterator, are over the upper bound.
TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) { TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions();
options.env = env.get();
CreateAndReopenWithCF({"pikachu"}, options);
const Slice upper_bound("20", 3); const Slice upper_bound("20", 3);
ReadOptions read_options; ReadOptions read_options;
@ -441,7 +526,16 @@ TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) {
// Add another key to the memtable. // Add another key to the memtable.
ASSERT_OK(Put(1, "21", "21")); ASSERT_OK(Put(1, "21", "21"));
std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1])); {
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
auto it =
std::unique_ptr<Iterator>(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(it->status()); ASSERT_OK(it->status());
it->Seek("12"); it->Seek("12");
ASSERT_TRUE(it->Valid()); ASSERT_TRUE(it->Valid());
@ -461,15 +555,23 @@ TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
it->Seek("13"); it->Seek("13");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_FALSE(it->Valid()); ASSERT_FALSE(it->Valid());
ASSERT_OK(it->status()); ASSERT_OK(it->status());
if (GetParam()) { if (GetParam() && read_async_called) {
ASSERT_EQ(1, immutable_seeks); ASSERT_EQ(1, immutable_seeks);
} else { } else {
ASSERT_EQ(0, immutable_seeks); ASSERT_EQ(0, immutable_seeks);
} }
} }
Close();
}
TEST_P(DBTestTailingIterator, TailingIteratorGap) { TEST_P(DBTestTailingIterator, TailingIteratorGap) {
// level 1: [20, 25] [35, 40] // level 1: [20, 25] [35, 40]
@ -480,7 +582,15 @@ TEST_P(DBTestTailingIterator, TailingIteratorGap) {
// the largest key of index n file and the smallest key of index n+1 file // the largest key of index n file and the smallest key of index n+1 file
// if both file fit in that gap. In this example, 25 < key < 35 // if both file fit in that gap. In this example, 25 < key < 35
// https://github.com/facebook/rocksdb/issues/1372 // https://github.com/facebook/rocksdb/issues/1372
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
std::unique_ptr<Env> env(
new CompositeEnvWrapper(env_, FileSystem::Default()));
Options options = CurrentOptions();
options.env = env.get();
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions read_options; ReadOptions read_options;
read_options.tailing = true; read_options.tailing = true;
@ -512,6 +622,7 @@ TEST_P(DBTestTailingIterator, TailingIteratorGap) {
ColumnFamilyMetaData meta; ColumnFamilyMetaData meta;
db_->GetColumnFamilyMetaData(handles_[1], &meta); db_->GetColumnFamilyMetaData(handles_[1], &meta);
{
std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1])); std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1]));
it->Seek("30"); it->Seek("30");
ASSERT_TRUE(it->Valid()); ASSERT_TRUE(it->Valid());
@ -527,6 +638,8 @@ TEST_P(DBTestTailingIterator, TailingIteratorGap) {
ASSERT_OK(it->status()); ASSERT_OK(it->status());
} }
Close();
}
TEST_P(DBTestTailingIterator, SeekWithUpperBoundBug) { TEST_P(DBTestTailingIterator, SeekWithUpperBoundBug) {
ReadOptions read_options; ReadOptions read_options;

@ -241,10 +241,10 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
if (sv_) { if (sv_) {
RebuildIterators(false); RebuildIterators(false);
} }
if (!cfd_->ioptions()->env->GetFileSystem()->use_async_io()) { if (!CheckFSFeatureSupport(cfd_->ioptions()->env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false; read_options_.async_io = false;
} }
// immutable_status_ is a local aggregation of the // immutable_status_ is a local aggregation of the
// status of the immutable Iterators. // status of the immutable Iterators.
// We have to PermitUncheckedError in case it is never // We have to PermitUncheckedError in case it is never
@ -1067,4 +1067,3 @@ void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -39,6 +39,7 @@
#include "db/version_builder.h" #include "db/version_builder.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/version_edit_handler.h" #include "db/version_edit_handler.h"
#include "file/file_util.h"
#include "table/compaction_merging_iterator.h" #include "table/compaction_merging_iterator.h"
#if USE_COROUTINES #if USE_COROUTINES
@ -2190,7 +2191,12 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
MaxFileSizeForL0MetaPin(mutable_cf_options_)), MaxFileSizeForL0MetaPin(mutable_cf_options_)),
version_number_(version_number), version_number_(version_number),
io_tracer_(io_tracer), io_tracer_(io_tracer),
use_async_io_(env_->GetFileSystem()->use_async_io()) {} use_async_io_(false) {
if (CheckFSFeatureSupport(env_->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
use_async_io_ = true;
}
}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice, const Slice& blob_index_slice,

3
env/env.cc vendored

@ -159,8 +159,7 @@ class LegacyRandomAccessFileWrapper : public FSRandomAccessFile {
req.len = fs_reqs[i].len; req.len = fs_reqs[i].len;
req.scratch = fs_reqs[i].scratch; req.scratch = fs_reqs[i].scratch;
req.status = Status::OK(); req.status = Status::OK();
reqs.emplace_back(std::move(req));
reqs.emplace_back(req);
} }
status = target_->MultiRead(reqs.data(), num_reqs); status = target_->MultiRead(reqs.data(), num_reqs);
for (size_t i = 0; i < num_reqs; ++i) { for (size_t i = 0; i < num_reqs; ++i) {

2
env/env_test.cc vendored

@ -3538,7 +3538,7 @@ IOStatus ReadAsyncRandomAccessFile::ReadAsync(
} }
}; };
fs_.workers.emplace_back(submit_request, req); fs_.workers.emplace_back(submit_request, std::move(req));
return IOStatus::OK(); return IOStatus::OK();
} }

10
env/fs_posix.cc vendored

@ -1183,11 +1183,13 @@ class PosixFileSystem : public FileSystem {
#endif #endif
} }
bool use_async_io() override { void SupportedOps(int64_t& supported_ops) override {
supported_ops = 0;
#if defined(ROCKSDB_IOURING_PRESENT) #if defined(ROCKSDB_IOURING_PRESENT)
return IsIOUringEnabled(); if (IsIOUringEnabled()) {
#else // Underlying FS supports async_io
return false; supported_ops |= (1 << FSSupportedOps::kAsyncIO);
}
#endif #endif
} }

@ -88,4 +88,14 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
// Test method to delete the input directory and all of its contents. // Test method to delete the input directory and all of its contents.
// This method is destructive and is meant for use only in tests!!! // This method is destructive and is meant for use only in tests!!!
Status DestroyDir(Env* env, const std::string& dir); Status DestroyDir(Env* env, const std::string& dir);
inline bool CheckFSFeatureSupport(FileSystem* fs, FSSupportedOps feat) {
int64_t supported_ops = 0;
fs->SupportedOps(supported_ops);
if (supported_ops & (1ULL << feat)) {
return true;
}
return false;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -1304,10 +1304,14 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) {
// This test verifies the functionality of ReadOptions.adaptive_readahead when // This test verifies the functionality of ReadOptions.adaptive_readahead when
// async_io is enabled. // async_io is enabled.
TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) { TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 1000; const int kNumKeys = 1000;
// Set options // Set options
std::shared_ptr<MockFS> fs = std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false); std::make_shared<MockFS>(FileSystem::Default(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam()); bool use_direct_io = std::get<0>(GetParam());
@ -1341,16 +1345,26 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
} }
MoveFilesToLevel(2); MoveFilesToLevel(2);
int buff_async_prefetch_count = 0; int buff_async_prefetch_count = 0;
int buff_prefetch_count = 0;
int readahead_carry_over_count = 0; int readahead_carry_over_count = 0;
int num_sst_files = NumTableFilesAtLevel(2); int num_sst_files = NumTableFilesAtLevel(2);
size_t current_readahead_size = 0; size_t current_readahead_size = 0;
bool read_async_called = false;
// Test - Iterate over the keys sequentially. // Test - Iterate over the keys sequentially.
{ {
SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start", "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_async_prefetch_count++; }); [&](void*) { buff_async_prefetch_count++; });
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
// The callback checks, since reads are sequential, readahead_size doesn't // The callback checks, since reads are sequential, readahead_size doesn't
// start from 8KB when iterator moves to next file and its called // start from 8KB when iterator moves to next file and its called
// num_sst_files-1 times (excluding for first file). // num_sst_files-1 times (excluding for first file).
@ -1393,15 +1407,18 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) {
} else { } else {
ASSERT_EQ(readahead_carry_over_count, 0); ASSERT_EQ(readahead_carry_over_count, 0);
} }
ASSERT_GT(buff_async_prefetch_count, 0);
// Check stats to make sure async prefetch is done. // Check stats to make sure async prefetch is done.
{ {
HistogramData async_read_bytes; HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (ro.async_io) { // Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
if (read_async_called) {
ASSERT_GT(buff_async_prefetch_count, 0);
ASSERT_GT(async_read_bytes.count, 0); ASSERT_GT(async_read_bytes.count, 0);
} else { } else {
ASSERT_GT(buff_prefetch_count, 0);
ASSERT_EQ(async_read_bytes.count, 0); ASSERT_EQ(async_read_bytes.count, 0);
} }
} }
@ -1434,6 +1451,7 @@ TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) {
Status s = TryReopen(options); Status s = TryReopen(options);
if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
// If direct IO is not supported, skip the test // If direct IO is not supported, skip the test
enable_io_uring = true;
return; return;
} else { } else {
ASSERT_OK(s); ASSERT_OK(s);
@ -1515,7 +1533,8 @@ class PrefetchTest1 : public DBTestBase,
public: public:
PrefetchTest1() : DBTestBase("prefetch_test1", true) {} PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
void SetGenericOptions(Env* env, bool use_direct_io, Options& options) { virtual void SetGenericOptions(Env* env, bool use_direct_io,
Options& options) {
options = CurrentOptions(); options = CurrentOptions();
options.write_buffer_size = 1024; options.write_buffer_size = 1024;
options.create_if_missing = true; options.create_if_missing = true;
@ -1769,10 +1788,14 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
// This test verifies the basic functionality of seek parallelization for // This test verifies the basic functionality of seek parallelization for
// async_io. // async_io.
TEST_P(PrefetchTest1, SeekParallelizationTest) { TEST_P(PrefetchTest1, SeekParallelizationTest) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment");
return;
}
const int kNumKeys = 2000; const int kNumKeys = 2000;
// Set options // Set options
std::shared_ptr<MockFS> fs = std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
std::make_shared<MockFS>(env_->GetFileSystem(), false); FileSystem::Default(), /*support_prefetch=*/false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options; Options options;
@ -1805,11 +1828,20 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
int buff_prefetch_count = 0; int buff_prefetch_count = 0;
int buff_prefetch_async_count = 0;
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"FilePrefetchBuffer::PrefetchAsyncInternal:Start", "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
[&](void*) { buff_prefetch_async_count++; });
SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
[&](void*) { buff_prefetch_count++; }); [&](void*) { buff_prefetch_count++; });
bool read_async_called = false;
SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result",
[&](void* /*arg*/) { read_async_called = true; });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro; ReadOptions ro;
ro.adaptive_readahead = true; ro.adaptive_readahead = true;
@ -1843,17 +1875,17 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) {
iter->Next(); iter->Next();
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
ASSERT_EQ(buff_prefetch_count, 2);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes; HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
// not all platforms support io_uring. In that case it'll fallback to normal
// prefetching without async_io.
if (read_async_called) {
ASSERT_EQ(buff_prefetch_async_count, 2);
ASSERT_GT(async_read_bytes.count, 0); ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0); ASSERT_GT(get_perf_context()->number_async_seek, 0);
} else {
ASSERT_EQ(buff_prefetch_count, 1);
} }
buff_prefetch_count = 0;
} }
Close(); Close();
} }

@ -314,14 +314,14 @@ IOStatus RandomAccessFileReader::MultiRead(
// Align and merge the read requests. // Align and merge the read requests.
size_t alignment = file_->GetRequiredBufferAlignment(); size_t alignment = file_->GetRequiredBufferAlignment();
for (size_t i = 0; i < num_reqs; i++) { for (size_t i = 0; i < num_reqs; i++) {
const auto& r = Align(read_reqs[i], alignment); FSReadRequest r = Align(read_reqs[i], alignment);
if (i == 0) { if (i == 0) {
// head // head
aligned_reqs.push_back(r); aligned_reqs.push_back(std::move(r));
} else if (!TryMerge(&aligned_reqs.back(), r)) { } else if (!TryMerge(&aligned_reqs.back(), r)) {
// head + n // head + n
aligned_reqs.push_back(r); aligned_reqs.push_back(std::move(r));
} else { } else {
// unused // unused

@ -95,7 +95,18 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
"RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* reqs) { "RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* reqs) {
// Copy reqs, since it's allocated on stack inside MultiRead, which will // Copy reqs, since it's allocated on stack inside MultiRead, which will
// be deallocated after MultiRead returns. // be deallocated after MultiRead returns.
aligned_reqs = *reinterpret_cast<std::vector<FSReadRequest>*>(reqs); size_t i = 0;
aligned_reqs.resize(
(*reinterpret_cast<std::vector<FSReadRequest>*>(reqs)).size());
for (auto& req :
(*reinterpret_cast<std::vector<FSReadRequest>*>(reqs))) {
aligned_reqs[i].offset = req.offset;
aligned_reqs[i].len = req.len;
aligned_reqs[i].result = req.result;
aligned_reqs[i].status = req.status;
aligned_reqs[i].scratch = req.scratch;
i++;
}
}); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -283,7 +294,6 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
} }
TEST(FSReadRequest, Align) { TEST(FSReadRequest, Align) {
FSReadRequest r; FSReadRequest r;
r.offset = 2000; r.offset = 2000;

@ -79,6 +79,10 @@ enum class IOType : uint8_t {
kInvalid, kInvalid,
}; };
// enum representing various operations supported by underlying FileSystem.
// These need to be set in SupportedOps API for RocksDB to use them.
enum FSSupportedOps { kAsyncIO, kFSBuffer };
// Per-request options that can be passed down to the FileSystem // Per-request options that can be passed down to the FileSystem
// implementation. These are hints and are not necessarily guaranteed to be // implementation. These are hints and are not necessarily guaranteed to be
// honored. More hints can be added here in the future to indicate things like // honored. More hints can be added here in the future to indicate things like
@ -657,7 +661,6 @@ class FileSystem : public Customizable {
const IOOptions& options, bool* is_dir, const IOOptions& options, bool* is_dir,
IODebugContext* /*dgb*/) = 0; IODebugContext* /*dgb*/) = 0;
// EXPERIMENTAL
// Poll for completion of read IO requests. The Poll() method should call the // Poll for completion of read IO requests. The Poll() method should call the
// callback functions to indicate completion of read requests. // callback functions to indicate completion of read requests.
// Underlying FS is required to support Poll API. Poll implementation should // Underlying FS is required to support Poll API. Poll implementation should
@ -665,28 +668,33 @@ class FileSystem : public Customizable {
// after the callback has been called. // after the callback has been called.
// If Poll returns partial results for any reads, its caller reponsibility to // If Poll returns partial results for any reads, its caller reponsibility to
// call Read or ReadAsync in order to get the remaining bytes. // call Read or ReadAsync in order to get the remaining bytes.
//
// Default implementation is to return IOStatus::OK.
virtual IOStatus Poll(std::vector<void*>& /*io_handles*/, virtual IOStatus Poll(std::vector<void*>& /*io_handles*/,
size_t /*min_completions*/) { size_t /*min_completions*/) {
return IOStatus::OK(); return IOStatus::OK();
} }
// EXPERIMENTAL
// Abort the read IO requests submitted asynchronously. Underlying FS is // Abort the read IO requests submitted asynchronously. Underlying FS is
// required to support AbortIO API. AbortIO implementation should ensure that // required to support AbortIO API. AbortIO implementation should ensure that
// the all the read requests related to io_handles should be aborted and // the all the read requests related to io_handles should be aborted and
// it shouldn't call the callback for these io_handles. // it shouldn't call the callback for these io_handles.
//
// Default implementation is to return IOStatus::OK.
virtual IOStatus AbortIO(std::vector<void*>& /*io_handles*/) { virtual IOStatus AbortIO(std::vector<void*>& /*io_handles*/) {
return IOStatus::OK(); return IOStatus::OK();
} }
// Indicates to upper layers whether the FileSystem supports/uses async IO // Indicates to upper layers which FileSystem operations mentioned in
// or not // FSSupportedOps are supported by underlying FileSystem. Each bit in
virtual bool use_async_io() { return true; } // supported_ops argument represent corresponding FSSupportedOps operation.
// Foreg:
// If async_io is supported by the underlying FileSystem, then supported_ops
// will have corresponding bit (i.e FSSupportedOps::kAsyncIO) set to 1.
//
// By default, async_io operation is set and FS should override this API and
// set all the operations they support provided in FSSupportedOps (including
// async_io).
virtual void SupportedOps(int64_t& supported_ops) {
supported_ops = 0;
supported_ops |= (1 << FSSupportedOps::kAsyncIO);
}
// If you're adding methods here, remember to add them to EnvWrapper too. // If you're adding methods here, remember to add them to EnvWrapper too.
@ -791,6 +799,42 @@ struct FSReadRequest {
// Output parameter set by underlying FileSystem that represents status of // Output parameter set by underlying FileSystem that represents status of
// read request. // read request.
IOStatus status; IOStatus status;
// fs_scratch is a data buffer allocated and provided by underlying FileSystem
// to RocksDB during reads, when FS wants to provide its own buffer with data
// instead of using RocksDB provided FSReadRequest::scratch.
//
// FileSystem needs to provide a buffer and custom delete function. The
// lifecycle of fs_scratch until data is used by RocksDB. The buffer
// should be released by RocksDB using custom delete function provided in
// unique_ptr fs_scratch.
//
// Optimization benefits:
// This is helpful in cases where underlying FileSystem has to do additional
// copy of data to RocksDB provided buffer which can consume CPU cycles. It
// can be optimized by avoiding copying to RocksDB buffer and directly using
// FS provided buffer.
//
// How to enable:
// In order to enable this option, FS needs to override SupportedOps() API and
// set FSSupportedOps::kFSBuffer in SupportedOps() as:
// {
// supported_ops |= (1 << FSSupportedOps::kFSBuffer);
// }
//
// Work in progress:
// Right now it's only enabled for MultiReads (sync and async
// both) with non direct io.
// If RocksDB provide its own buffer (scratch) during reads, that's a
// signal for FS to use RocksDB buffer.
// If FSSupportedOps::kFSBuffer is enabled and scratch == nullptr,
// then FS have to provide its own buffer in fs_scratch.
//
// NOTE:
// - FSReadRequest::result should point to fs_scratch.
// - This is needed only if FSSupportedOps::kFSBuffer support is provided by
// underlying FS.
std::unique_ptr<void, std::function<void(void*)>> fs_scratch;
}; };
// A file abstraction for randomly reading the contents of a file. // A file abstraction for randomly reading the contents of a file.
@ -885,7 +929,6 @@ class FSRandomAccessFile {
return IOStatus::NotSupported("InvalidateCache not supported."); return IOStatus::NotSupported("InvalidateCache not supported.");
} }
// EXPERIMENTAL
// This API reads the requested data in FSReadRequest asynchronously. This is // This API reads the requested data in FSReadRequest asynchronously. This is
// a asynchronous call, i.e it should return after submitting the request. // a asynchronous call, i.e it should return after submitting the request.
// //
@ -907,6 +950,16 @@ class FSRandomAccessFile {
// request and result and status fields are output parameter set by underlying // request and result and status fields are output parameter set by underlying
// FileSystem. The data should always be read into scratch field. // FileSystem. The data should always be read into scratch field.
// //
// How to enable:
// In order to enable ReadAsync, FS needs to override SupportedOps() API and
// set FSSupportedOps::kAsyncIO in SupportedOps() as:
// {
// supported_ops |= (1 << FSSupportedOps::kAsyncIO);
// }
//
// Note: If FS supports ReadAsync API, it should also override Poll and
// AbortIO API.
//
// Default implementation is to read the data synchronously. // Default implementation is to read the data synchronously.
virtual IOStatus ReadAsync( virtual IOStatus ReadAsync(
FSReadRequest& req, const IOOptions& opts, FSReadRequest& req, const IOOptions& opts,
@ -1550,7 +1603,9 @@ class FileSystemWrapper : public FileSystem {
return target_->AbortIO(io_handles); return target_->AbortIO(io_handles);
} }
virtual bool use_async_io() override { return target_->use_async_io(); } virtual void SupportedOps(int64_t& supported_ops) override {
return target_->SupportedOps(supported_ops);
}
protected: protected:
std::shared_ptr<FileSystem> target_; std::shared_ptr<FileSystem> target_;

@ -227,7 +227,7 @@ class WinFileSystem : public FileSystem {
const FileOptions& file_options) const override; const FileOptions& file_options) const override;
FileOptions OptimizeForManifestWrite( FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override; const FileOptions& file_options) const override;
bool use_async_io() override { return false; } void SupportedOps(int64_t& supported_ops) override { supported_ops = 0; }
protected: protected:
static uint64_t FileTimeToUnixTime(const FILETIME& ftTime); static uint64_t FileTimeToUnixTime(const FILETIME& ftTime);

@ -377,7 +377,7 @@ class BlockBasedTable : public TableReader {
const MultiGetRange* batch, const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles, const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
Status* statuses, CachableEntry<Block_kData>* results, char* scratch, Status* statuses, CachableEntry<Block_kData>* results, char* scratch,
const UncompressionDict& uncompression_dict); const UncompressionDict& uncompression_dict, bool use_fs_scratch);
// Get the iterator from the index reader. // Get the iterator from the index reader.
// //

@ -33,7 +33,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
(const ReadOptions& options, const MultiGetRange* batch, (const ReadOptions& options, const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles, const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
Status* statuses, CachableEntry<Block_kData>* results, char* scratch, Status* statuses, CachableEntry<Block_kData>* results, char* scratch,
const UncompressionDict& uncompression_dict) const { const UncompressionDict& uncompression_dict, bool use_fs_scratch) const {
RandomAccessFileReader* file = rep_->file.get(); RandomAccessFileReader* file = rep_->file.get();
const Footer& footer = rep_->footer; const Footer& footer = rep_->footer;
const ImmutableOptions& ioptions = rep_->ioptions; const ImmutableOptions& ioptions = rep_->ioptions;
@ -88,7 +88,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
// We don't combine block reads here in direct IO mode, because when doing // We don't combine block reads here in direct IO mode, because when doing
// direct IO read, the block requests will be realigned and merged when // direct IO read, the block requests will be realigned and merged when
// necessary. // necessary.
if (use_shared_buffer && !file->use_direct_io() && if ((use_shared_buffer || use_fs_scratch) && !file->use_direct_io() &&
prev_end == handle.offset()) { prev_end == handle.offset()) {
req_offset_for_block.emplace_back(prev_len); req_offset_for_block.emplace_back(prev_len);
prev_len += BlockSizeWithTrailer(handle); prev_len += BlockSizeWithTrailer(handle);
@ -99,7 +99,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
FSReadRequest req; FSReadRequest req;
req.offset = prev_offset; req.offset = prev_offset;
req.len = prev_len; req.len = prev_len;
if (file->use_direct_io()) { if (file->use_direct_io() || use_fs_scratch) {
req.scratch = nullptr; req.scratch = nullptr;
} else if (use_shared_buffer) { } else if (use_shared_buffer) {
req.scratch = scratch + buf_offset; req.scratch = scratch + buf_offset;
@ -107,10 +107,10 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
} else { } else {
req.scratch = new char[req.len]; req.scratch = new char[req.len];
} }
read_reqs.emplace_back(req); read_reqs.emplace_back(std::move(req));
} }
// Step 2, remeber the previous block info // Step 2, remember the previous block info
prev_offset = handle.offset(); prev_offset = handle.offset();
prev_len = BlockSizeWithTrailer(handle); prev_len = BlockSizeWithTrailer(handle);
req_offset_for_block.emplace_back(0); req_offset_for_block.emplace_back(0);
@ -125,14 +125,14 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
FSReadRequest req; FSReadRequest req;
req.offset = prev_offset; req.offset = prev_offset;
req.len = prev_len; req.len = prev_len;
if (file->use_direct_io()) { if (file->use_direct_io() || use_fs_scratch) {
req.scratch = nullptr; req.scratch = nullptr;
} else if (use_shared_buffer) { } else if (use_shared_buffer) {
req.scratch = scratch + buf_offset; req.scratch = scratch + buf_offset;
} else { } else {
req.scratch = new char[req.len]; req.scratch = new char[req.len];
} }
read_reqs.emplace_back(req); read_reqs.emplace_back(std::move(req));
} }
AlignedBuf direct_io_buf; AlignedBuf direct_io_buf;
@ -192,7 +192,10 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
BlockContents serialized_block; BlockContents serialized_block;
if (s.ok()) { if (s.ok()) {
if (!use_shared_buffer) { if (use_fs_scratch) {
serialized_block =
BlockContents(Slice(req.result.data() + req_offset, handle.size()));
} else if (!use_shared_buffer) {
// We allocated a buffer for this block. Give ownership of it to // We allocated a buffer for this block. Give ownership of it to
// BlockContents so it can free the memory // BlockContents so it can free the memory
assert(req.result.data() == req.scratch); assert(req.result.data() == req.scratch);
@ -243,7 +246,8 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
// heap buffer or there is no cache at all. // heap buffer or there is no cache at all.
CompressionType compression_type = CompressionType compression_type =
GetBlockCompressionType(serialized_block); GetBlockCompressionType(serialized_block);
if (use_shared_buffer && compression_type == kNoCompression) { if ((use_fs_scratch || use_shared_buffer) &&
compression_type == kNoCompression) {
Slice serialized = Slice serialized =
Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle)); Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle));
serialized_block = BlockContents( serialized_block = BlockContents(
@ -304,6 +308,17 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
} }
statuses[idx_in_batch] = s; statuses[idx_in_batch] = s;
} }
if (use_fs_scratch) {
// Free the allocated scratch buffer by fs here as read requests might have
// been combined into one.
for (FSReadRequest& req : read_reqs) {
if (req.fs_scratch != nullptr) {
req.fs_scratch.reset();
req.fs_scratch = nullptr;
}
}
}
} }
using MultiGetRange = MultiGetContext::Range; using MultiGetRange = MultiGetContext::Range;
@ -512,11 +527,20 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
if (total_len) { if (total_len) {
char* scratch = nullptr; char* scratch = nullptr;
bool use_fs_scratch = false;
const UncompressionDict& dict = uncompression_dict.GetValue() const UncompressionDict& dict = uncompression_dict.GetValue()
? *uncompression_dict.GetValue() ? *uncompression_dict.GetValue()
: UncompressionDict::GetEmptyDict(); : UncompressionDict::GetEmptyDict();
assert(uncompression_dict_inited || !rep_->uncompression_dict_reader); assert(uncompression_dict_inited || !rep_->uncompression_dict_reader);
assert(uncompression_dict_status.ok()); assert(uncompression_dict_status.ok());
if (!rep_->file->use_direct_io()) {
if (CheckFSFeatureSupport(rep_->ioptions.fs.get(),
FSSupportedOps::kFSBuffer)) {
use_fs_scratch = true;
}
}
// If using direct IO, then scratch is not used, so keep it nullptr. // If using direct IO, then scratch is not used, so keep it nullptr.
// If the blocks need to be uncompressed and we don't need the // If the blocks need to be uncompressed and we don't need the
// compressed blocks, then we can use a contiguous block of // compressed blocks, then we can use a contiguous block of
@ -527,7 +551,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
// 2. If blocks are uncompressed, alloc heap bufs // 2. If blocks are uncompressed, alloc heap bufs
// 3. If blocks are compressed and no compressed block cache, use // 3. If blocks are compressed and no compressed block cache, use
// stack buf // stack buf
if (!rep_->file->use_direct_io() && if (!use_fs_scratch && !rep_->file->use_direct_io() &&
rep_->blocks_maybe_compressed) { rep_->blocks_maybe_compressed) {
if (total_len <= kMultiGetReadStackBufSize) { if (total_len <= kMultiGetReadStackBufSize) {
scratch = stack_buf; scratch = stack_buf;
@ -538,7 +562,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
} }
CO_AWAIT(RetrieveMultipleBlocks) CO_AWAIT(RetrieveMultipleBlocks)
(read_options, &data_block_range, &block_handles, &statuses[0], (read_options, &data_block_range, &block_handles, &statuses[0],
&results[0], scratch, dict); &results[0], scratch, dict, use_fs_scratch);
if (get_context) { if (get_context) {
++(get_context->get_context_stats_.num_sst_read); ++(get_context->get_context_stats_.num_sst_read);
} }

@ -0,0 +1 @@
Add FSReadRequest::fs_scratch which is a data buffer allocated and provided by underlying FileSystem to RocksDB during reads, when FS wants to provide its own buffer with data instead of using RocksDB provided FSReadRequest::scratch. This can help in cpu optimization by avoiding copy from file system's buffer to RocksDB buffer. More details on how to use/enable it in file_system.h. Right now its supported only for MultiReads(async + sync) with non direct io.

@ -0,0 +1 @@
change the FileSystem::use_async_io() API to SupportedOps API in order to extend it to various operations supported by underlying FileSystem. Right now it contains FSSupportedOps::kAsyncIO and FSSupportedOps::kFSBuffer. More details about FSSupportedOps in filesystem.h
Loading…
Cancel
Save