From fbd2f563bb409ea990b4c3852e5aca8f2fbc5162 Mon Sep 17 00:00:00 2001 From: akankshamahajan Date: Fri, 23 Jun 2023 11:48:49 -0700 Subject: [PATCH] Add an interface to provide support for underlying FS to pass their own buffer during reads (#11324) Summary: 1. Public API change: Replace `use_async_io` API in file_system with `SupportedOps` API which is used by underlying FileSystem to indicate to upper layers whether the FileSystem supports different operations introduced in `enum FSSupportedOps `. Right now operations are `async_io` and whether FS will provide its own buffer during reads or not. The api is changed to extend it to various FileSystem operations in one API rather than creating a separate API for each operation. 2. Provide support for underlying FS to pass their own buffer during Reads (async and sync read) instead of using RocksDB provided `scratch` (buffer) in `FSReadRequest`. Currently only MultiRead supports it and later will be extended to other reads as well (point lookup, scan etc). More details in how to enable in file_system.h Pull Request resolved: https://github.com/facebook/rocksdb/pull/11324 Test Plan: Tested locally Reviewed By: anand1976 Differential Revision: D44465322 Pulled By: akankshamahajan15 fbshipit-source-id: 9ec9e08f839b5cc815e75d5dade6cd549998d0ec --- db/arena_wrapped_db_iter.cc | 4 +- db/blob/blob_file_reader.cc | 4 +- db/db_tailing_iter_test.cc | 445 +++++++++++------- db/forward_iterator.cc | 5 +- db/version_set.cc | 8 +- env/env.cc | 3 +- env/env_test.cc | 2 +- env/fs_posix.cc | 10 +- file/file_util.h | 10 + file/prefetch_test.cc | 62 ++- file/random_access_file_reader.cc | 6 +- file/random_access_file_reader_test.cc | 22 +- include/rocksdb/file_system.h | 79 +++- port/win/env_win.h | 2 +- table/block_based/block_based_table_reader.h | 2 +- .../block_based_table_reader_sync_and_async.h | 46 +- .../new_features/fs_provided_buffer.md | 1 + .../public_api_changes/change_async_io_api.md | 1 + 18 files changed, 483 insertions(+), 229 deletions(-) create mode 100644 unreleased_history/new_features/fs_provided_buffer.md create mode 100644 unreleased_history/public_api_changes/change_async_io_api.md diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index 2dbfff79a..b101fbbc7 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -47,7 +47,9 @@ void ArenaWrappedDBIter::Init( read_options_ = read_options; allow_refresh_ = allow_refresh; memtable_range_tombstone_iter_ = nullptr; - if (!env->GetFileSystem()->use_async_io()) { + + if (!CheckFSFeatureSupport(env->GetFileSystem().get(), + FSSupportedOps::kAsyncIO)) { read_options_.async_io = false; } } diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 7abd1ce6b..6df7f3aee 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -439,11 +439,11 @@ void BlobFileReader::MultiGetBlob( assert(req->offset >= adjustment); adjustments.push_back(adjustment); - FSReadRequest read_req = {}; + FSReadRequest read_req; read_req.offset = req->offset - adjustment; read_req.len = req->len + adjustment; - read_reqs.emplace_back(read_req); total_len += read_req.len; + read_reqs.emplace_back(std::move(read_req)); } RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len); diff --git a/db/db_tailing_iter_test.cc b/db/db_tailing_iter_test.cc index 964e06eb3..d3debed7e 100644 --- a/db/db_tailing_iter_test.cc +++ b/db/db_tailing_iter_test.cc @@ -15,6 +15,11 @@ #include "db/forward_iterator.h" #include "port/stack_trace.h" +namespace { +static bool enable_io_uring = true; +extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; } +} // namespace + namespace ROCKSDB_NAMESPACE { class DBTestTailingIterator : public DBTestBase, @@ -50,94 +55,125 @@ TEST_P(DBTestTailingIterator, TailingIteratorSingle) { } TEST_P(DBTestTailingIterator, TailingIteratorKeepAdding) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); + Options options = CurrentOptions(); + options.env = env.get(); + CreateAndReopenWithCF({"pikachu"}, options); ReadOptions read_options; read_options.tailing = true; if (GetParam()) { read_options.async_io = true; } - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(iter->status()); - std::string value(1024, 'a'); - const int num_records = 10000; - for (int i = 0; i < num_records; ++i) { - char buf[32]; - snprintf(buf, sizeof(buf), "%016d", i); + { + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(iter->status()); + std::string value(1024, 'a'); - Slice key(buf, 16); - ASSERT_OK(Put(1, key, value)); + const int num_records = 10000; + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "%016d", i); - iter->Seek(key); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } } + Close(); } TEST_P(DBTestTailingIterator, TailingIteratorSeekToNext) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); + Options options = CurrentOptions(); + options.env = env.get(); + CreateAndReopenWithCF({"pikachu"}, options); ReadOptions read_options; read_options.tailing = true; if (GetParam()) { read_options.async_io = true; } - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(iter->status()); - std::unique_ptr itern(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(itern->status()); - std::string value(1024, 'a'); - - const int num_records = 1000; - for (int i = 1; i < num_records; ++i) { - char buf1[32]; - char buf2[32]; - snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); - - Slice key(buf1, 20); - ASSERT_OK(Put(1, key, value)); + { + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(iter->status()); + std::unique_ptr itern( + db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(itern->status()); + std::string value(1024, 'a'); + + const int num_records = 1000; + for (int i = 1; i < num_records; ++i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } - if (i % 100 == 99) { - ASSERT_OK(Flush(1)); + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + if (i == 1) { + itern->SeekToFirst(); + } else { + itern->Next(); + } + ASSERT_TRUE(itern->Valid()); + ASSERT_EQ(itern->key().compare(key), 0); } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + for (int i = 2 * num_records; i > 0; --i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } - snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); - Slice target(buf2, 20); - iter->Seek(target); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); - if (i == 1) { - itern->SeekToFirst(); - } else { - itern->Next(); + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); } - ASSERT_TRUE(itern->Valid()); - ASSERT_EQ(itern->key().compare(key), 0); - } - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - for (int i = 2 * num_records; i > 0; --i) { - char buf1[32]; - char buf2[32]; - snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); - - Slice key(buf1, 20); - ASSERT_OK(Put(1, key, value)); - - if (i % 100 == 99) { - ASSERT_OK(Flush(1)); - } - - snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); - Slice target(buf2, 20); - iter->Seek(target); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().compare(key), 0); } + Close(); } TEST_P(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } const uint64_t k150KB = 150 * 1024; + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); Options options; + options.env = env.get(); options.write_buffer_size = k150KB; options.max_write_buffer_number = 3; options.min_write_buffer_number_to_merge = 2; @@ -278,56 +314,73 @@ TEST_P(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { } TEST_P(DBTestTailingIterator, TailingIteratorDeletes) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); + Options options = CurrentOptions(); + options.env = env.get(); + CreateAndReopenWithCF({"pikachu"}, options); ReadOptions read_options; read_options.tailing = true; if (GetParam()) { read_options.async_io = true; } + { + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(iter->status()); - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(iter->status()); - - // write a single record, read it using the iterator, then delete it - ASSERT_OK(Put(1, "0test", "test")); - iter->SeekToFirst(); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "0test"); - ASSERT_OK(Delete(1, "0test")); + // write a single record, read it using the iterator, then delete it + ASSERT_OK(Put(1, "0test", "test")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0test"); + ASSERT_OK(Delete(1, "0test")); - // write many more records - const int num_records = 10000; - std::string value(1024, 'A'); + // write many more records + const int num_records = 10000; + std::string value(1024, 'A'); - for (int i = 0; i < num_records; ++i) { - char buf[32]; - snprintf(buf, sizeof(buf), "1%015d", i); + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "1%015d", i); - Slice key(buf, 16); - ASSERT_OK(Put(1, key, value)); - } + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + } - // force a flush to make sure that no records are read from memtable - ASSERT_OK(Flush(1)); + // force a flush to make sure that no records are read from memtable + ASSERT_OK(Flush(1)); - // skip "0test" - iter->Next(); + // skip "0test" + iter->Next(); - // make sure we can read all new records using the existing iterator - int count = 0; - for (; iter->Valid(); iter->Next(), ++count) - ; + // make sure we can read all new records using the existing iterator + int count = 0; + for (; iter->Valid(); iter->Next(), ++count) + ; - ASSERT_EQ(count, num_records); + ASSERT_EQ(count, num_records); + } + Close(); } TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } ReadOptions read_options; read_options.tailing = true; if (GetParam()) { read_options.async_io = true; } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); Options options = CurrentOptions(); + options.env = env.get(); options.create_if_missing = true; options.disable_auto_compactions = true; options.prefix_extractor.reset(NewFixedPrefixTransform(2)); @@ -336,28 +389,39 @@ TEST_P(DBTestTailingIterator, TailingIteratorPrefixSeek) { DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); - std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(iter->status()); - ASSERT_OK(Put(1, "0101", "test")); + { + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(iter->status()); + ASSERT_OK(Put(1, "0101", "test")); - ASSERT_OK(Flush(1)); + ASSERT_OK(Flush(1)); - ASSERT_OK(Put(1, "0202", "test")); + ASSERT_OK(Put(1, "0202", "test")); - // Seek(0102) shouldn't find any records since 0202 has a different prefix - iter->Seek("0102"); - ASSERT_TRUE(!iter->Valid()); + // Seek(0102) shouldn't find any records since 0202 has a different prefix + iter->Seek("0102"); + ASSERT_TRUE(!iter->Valid()); - iter->Seek("0202"); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "0202"); + iter->Seek("0202"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0202"); - iter->Next(); - ASSERT_TRUE(!iter->Valid()); + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + } + Close(); } TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); + Options options = CurrentOptions(); + options.env = env.get(); + CreateAndReopenWithCF({"pikachu"}, options); ReadOptions read_options; read_options.tailing = true; if (GetParam()) { @@ -370,20 +434,30 @@ TEST_P(DBTestTailingIterator, TailingIteratorIncomplete) { ASSERT_OK(db_->Put(WriteOptions(), key, value)); - std::unique_ptr iter(db_->NewIterator(read_options)); - ASSERT_OK(iter->status()); - iter->SeekToFirst(); - // we either see the entry or it's not in cache - ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); - - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - iter->SeekToFirst(); - // should still be true after compaction - ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); + { + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + iter->SeekToFirst(); + // we either see the entry or it's not in cache + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + iter->SeekToFirst(); + // should still be true after compaction + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); + } + Close(); } TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); Options options = CurrentOptions(); + options.env = env.get(); options.compaction_style = kCompactionStyleUniversal; options.write_buffer_size = 1000; CreateAndReopenWithCF({"pikachu"}, options); @@ -403,28 +477,39 @@ TEST_P(DBTestTailingIterator, TailingIteratorSeekToSame) { ASSERT_OK(db_->Put(WriteOptions(), key, value)); } - std::unique_ptr iter(db_->NewIterator(read_options)); - ASSERT_OK(iter->status()); - // Seek to 00001. We expect to find 00002. - std::string start_key = "00001"; - iter->Seek(start_key); - ASSERT_TRUE(iter->Valid()); + { + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(iter->status()); + // Seek to 00001. We expect to find 00002. + std::string start_key = "00001"; + iter->Seek(start_key); + ASSERT_TRUE(iter->Valid()); - std::string found = iter->key().ToString(); - ASSERT_EQ("00002", found); + std::string found = iter->key().ToString(); + ASSERT_EQ("00002", found); - // Now seek to the same key. The iterator should remain in the same - // position. - iter->Seek(found); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(found, iter->key().ToString()); + // Now seek to the same key. The iterator should remain in the same + // position. + iter->Seek(found); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(found, iter->key().ToString()); + } + Close(); } // Sets iterate_upper_bound and verifies that ForwardIterator doesn't call // Seek() on immutable iterators when target key is >= prev_key and all // iterators, including the memtable iterator, are over the upper bound. TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) { - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); + Options options = CurrentOptions(); + options.env = env.get(); + CreateAndReopenWithCF({"pikachu"}, options); const Slice upper_bound("20", 3); ReadOptions read_options; @@ -441,34 +526,51 @@ TEST_P(DBTestTailingIterator, TailingIteratorUpperBound) { // Add another key to the memtable. ASSERT_OK(Put(1, "21", "21")); - std::unique_ptr it(db_->NewIterator(read_options, handles_[1])); - ASSERT_OK(it->status()); - it->Seek("12"); - ASSERT_TRUE(it->Valid()); - ASSERT_EQ("12", it->key().ToString()); - - it->Next(); - // Not valid since "21" is over the upper bound. - ASSERT_FALSE(it->Valid()); - ASSERT_OK(it->status()); - // This keeps track of the number of times NeedToSeekImmutable() was true. - int immutable_seeks = 0; - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "ForwardIterator::SeekInternal:Immutable", - [&](void* /*arg*/) { ++immutable_seeks; }); - - // Seek to 13. This should not require any immutable seeks. - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - it->Seek("13"); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); - - ASSERT_FALSE(it->Valid()); - ASSERT_OK(it->status()); - if (GetParam()) { - ASSERT_EQ(1, immutable_seeks); - } else { - ASSERT_EQ(0, immutable_seeks); + { + bool read_async_called = false; + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + auto it = + std::unique_ptr(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(it->status()); + it->Seek("12"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("12", it->key().ToString()); + + it->Next(); + // Not valid since "21" is over the upper bound. + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + // This keeps track of the number of times NeedToSeekImmutable() was true. + int immutable_seeks = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "ForwardIterator::SeekInternal:Immutable", + [&](void* /*arg*/) { ++immutable_seeks; }); + + // Seek to 13. This should not require any immutable seeks. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + it->Seek("13"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_FALSE(it->Valid()); + ASSERT_OK(it->status()); + if (GetParam() && read_async_called) { + ASSERT_EQ(1, immutable_seeks); + } else { + ASSERT_EQ(0, immutable_seeks); + } } + Close(); } TEST_P(DBTestTailingIterator, TailingIteratorGap) { @@ -480,7 +582,15 @@ TEST_P(DBTestTailingIterator, TailingIteratorGap) { // the largest key of index n file and the smallest key of index n+1 file // if both file fit in that gap. In this example, 25 < key < 35 // https://github.com/facebook/rocksdb/issues/1372 - CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } + std::unique_ptr env( + new CompositeEnvWrapper(env_, FileSystem::Default())); + Options options = CurrentOptions(); + options.env = env.get(); + CreateAndReopenWithCF({"pikachu"}, options); ReadOptions read_options; read_options.tailing = true; @@ -512,20 +622,23 @@ TEST_P(DBTestTailingIterator, TailingIteratorGap) { ColumnFamilyMetaData meta; db_->GetColumnFamilyMetaData(handles_[1], &meta); - std::unique_ptr it(db_->NewIterator(read_options, handles_[1])); - it->Seek("30"); - ASSERT_TRUE(it->Valid()); - ASSERT_EQ("30", it->key().ToString()); + { + std::unique_ptr it(db_->NewIterator(read_options, handles_[1])); + it->Seek("30"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("30", it->key().ToString()); - it->Next(); - ASSERT_TRUE(it->Valid()); - ASSERT_EQ("35", it->key().ToString()); + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("35", it->key().ToString()); - it->Next(); - ASSERT_TRUE(it->Valid()); - ASSERT_EQ("40", it->key().ToString()); + it->Next(); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("40", it->key().ToString()); - ASSERT_OK(it->status()); + ASSERT_OK(it->status()); + } + Close(); } TEST_P(DBTestTailingIterator, SeekWithUpperBoundBug) { diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 75a7c599b..c7691560e 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -241,10 +241,10 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, if (sv_) { RebuildIterators(false); } - if (!cfd_->ioptions()->env->GetFileSystem()->use_async_io()) { + if (!CheckFSFeatureSupport(cfd_->ioptions()->env->GetFileSystem().get(), + FSSupportedOps::kAsyncIO)) { read_options_.async_io = false; } - // immutable_status_ is a local aggregation of the // status of the immutable Iterators. // We have to PermitUncheckedError in case it is never @@ -1067,4 +1067,3 @@ void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) { } } // namespace ROCKSDB_NAMESPACE - diff --git a/db/version_set.cc b/db/version_set.cc index 8ce2b8277..8086b01e6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -39,6 +39,7 @@ #include "db/version_builder.h" #include "db/version_edit.h" #include "db/version_edit_handler.h" +#include "file/file_util.h" #include "table/compaction_merging_iterator.h" #if USE_COROUTINES @@ -2190,7 +2191,12 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, MaxFileSizeForL0MetaPin(mutable_cf_options_)), version_number_(version_number), io_tracer_(io_tracer), - use_async_io_(env_->GetFileSystem()->use_async_io()) {} + use_async_io_(false) { + if (CheckFSFeatureSupport(env_->GetFileSystem().get(), + FSSupportedOps::kAsyncIO)) { + use_async_io_ = true; + } +} Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, const Slice& blob_index_slice, diff --git a/env/env.cc b/env/env.cc index 2137738c7..937be43c0 100644 --- a/env/env.cc +++ b/env/env.cc @@ -159,8 +159,7 @@ class LegacyRandomAccessFileWrapper : public FSRandomAccessFile { req.len = fs_reqs[i].len; req.scratch = fs_reqs[i].scratch; req.status = Status::OK(); - - reqs.emplace_back(req); + reqs.emplace_back(std::move(req)); } status = target_->MultiRead(reqs.data(), num_reqs); for (size_t i = 0; i < num_reqs; ++i) { diff --git a/env/env_test.cc b/env/env_test.cc index 1f2077a45..4462b95b8 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -3538,7 +3538,7 @@ IOStatus ReadAsyncRandomAccessFile::ReadAsync( } }; - fs_.workers.emplace_back(submit_request, req); + fs_.workers.emplace_back(submit_request, std::move(req)); return IOStatus::OK(); } diff --git a/env/fs_posix.cc b/env/fs_posix.cc index c2e8cfaf0..dd2f74935 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -1183,11 +1183,13 @@ class PosixFileSystem : public FileSystem { #endif } - bool use_async_io() override { + void SupportedOps(int64_t& supported_ops) override { + supported_ops = 0; #if defined(ROCKSDB_IOURING_PRESENT) - return IsIOUringEnabled(); -#else - return false; + if (IsIOUringEnabled()) { + // Underlying FS supports async_io + supported_ops |= (1 << FSSupportedOps::kAsyncIO); + } #endif } diff --git a/file/file_util.h b/file/file_util.h index e279cfba0..8b59731eb 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -88,4 +88,14 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, // Test method to delete the input directory and all of its contents. // This method is destructive and is meant for use only in tests!!! Status DestroyDir(Env* env, const std::string& dir); + +inline bool CheckFSFeatureSupport(FileSystem* fs, FSSupportedOps feat) { + int64_t supported_ops = 0; + fs->SupportedOps(supported_ops); + if (supported_ops & (1ULL << feat)) { + return true; + } + return false; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/prefetch_test.cc b/file/prefetch_test.cc index 23bb85bb3..319f4ddef 100644 --- a/file/prefetch_test.cc +++ b/file/prefetch_test.cc @@ -1304,10 +1304,14 @@ TEST_P(PrefetchTest, DBIterLevelReadAhead) { // This test verifies the functionality of ReadOptions.adaptive_readahead when // async_io is enabled. TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } const int kNumKeys = 1000; // Set options std::shared_ptr fs = - std::make_shared(env_->GetFileSystem(), false); + std::make_shared(FileSystem::Default(), false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); bool use_direct_io = std::get<0>(GetParam()); @@ -1341,16 +1345,26 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) { } MoveFilesToLevel(2); int buff_async_prefetch_count = 0; + int buff_prefetch_count = 0; int readahead_carry_over_count = 0; int num_sst_files = NumTableFilesAtLevel(2); size_t current_readahead_size = 0; + bool read_async_called = false; // Test - Iterate over the keys sequentially. { + SyncPoint::GetInstance()->SetCallBack( + "FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::PrefetchAsyncInternal:Start", [&](void*) { buff_async_prefetch_count++; }); + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); + // The callback checks, since reads are sequential, readahead_size doesn't // start from 8KB when iterator moves to next file and its called // num_sst_files-1 times (excluding for first file). @@ -1393,15 +1407,18 @@ TEST_P(PrefetchTest, DBIterLevelReadAheadWithAsyncIO) { } else { ASSERT_EQ(readahead_carry_over_count, 0); } - ASSERT_GT(buff_async_prefetch_count, 0); // Check stats to make sure async prefetch is done. { HistogramData async_read_bytes; options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); - if (ro.async_io) { + // Not all platforms support iouring. In that case, ReadAsync in posix + // won't submit async requests. + if (read_async_called) { + ASSERT_GT(buff_async_prefetch_count, 0); ASSERT_GT(async_read_bytes.count, 0); } else { + ASSERT_GT(buff_prefetch_count, 0); ASSERT_EQ(async_read_bytes.count, 0); } } @@ -1434,6 +1451,7 @@ TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) { Status s = TryReopen(options); if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { // If direct IO is not supported, skip the test + enable_io_uring = true; return; } else { ASSERT_OK(s); @@ -1515,7 +1533,8 @@ class PrefetchTest1 : public DBTestBase, public: PrefetchTest1() : DBTestBase("prefetch_test1", true) {} - void SetGenericOptions(Env* env, bool use_direct_io, Options& options) { + virtual void SetGenericOptions(Env* env, bool use_direct_io, + Options& options) { options = CurrentOptions(); options.write_buffer_size = 1024; options.create_if_missing = true; @@ -1769,10 +1788,14 @@ TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { // This test verifies the basic functionality of seek parallelization for // async_io. TEST_P(PrefetchTest1, SeekParallelizationTest) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_BYPASS("Test requires non-mem or non-encrypted environment"); + return; + } const int kNumKeys = 2000; // Set options - std::shared_ptr fs = - std::make_shared(env_->GetFileSystem(), false); + std::shared_ptr fs = std::make_shared( + FileSystem::Default(), /*support_prefetch=*/false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); Options options; @@ -1805,10 +1828,19 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); int buff_prefetch_count = 0; + int buff_prefetch_async_count = 0; SyncPoint::GetInstance()->SetCallBack( "FilePrefetchBuffer::PrefetchAsyncInternal:Start", - [&](void*) { buff_prefetch_count++; }); + [&](void*) { buff_prefetch_async_count++; }); + + SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", + [&](void*) { buff_prefetch_count++; }); + + bool read_async_called = false; + SyncPoint::GetInstance()->SetCallBack( + "UpdateResults::io_uring_result", + [&](void* /*arg*/) { read_async_called = true; }); SyncPoint::GetInstance()->EnableProcessing(); ReadOptions ro; @@ -1843,17 +1875,17 @@ TEST_P(PrefetchTest1, SeekParallelizationTest) { iter->Next(); ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(buff_prefetch_count, 2); - - // Check stats to make sure async prefetch is done. - { - HistogramData async_read_bytes; - options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + HistogramData async_read_bytes; + options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); + // not all platforms support io_uring. In that case it'll fallback to normal + // prefetching without async_io. + if (read_async_called) { + ASSERT_EQ(buff_prefetch_async_count, 2); ASSERT_GT(async_read_bytes.count, 0); ASSERT_GT(get_perf_context()->number_async_seek, 0); + } else { + ASSERT_EQ(buff_prefetch_count, 1); } - - buff_prefetch_count = 0; } Close(); } diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 308cd3e5b..d3fb37f78 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -314,14 +314,14 @@ IOStatus RandomAccessFileReader::MultiRead( // Align and merge the read requests. size_t alignment = file_->GetRequiredBufferAlignment(); for (size_t i = 0; i < num_reqs; i++) { - const auto& r = Align(read_reqs[i], alignment); + FSReadRequest r = Align(read_reqs[i], alignment); if (i == 0) { // head - aligned_reqs.push_back(r); + aligned_reqs.push_back(std::move(r)); } else if (!TryMerge(&aligned_reqs.back(), r)) { // head + n - aligned_reqs.push_back(r); + aligned_reqs.push_back(std::move(r)); } else { // unused diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc index 22e950c78..82ddcfff9 100644 --- a/file/random_access_file_reader_test.cc +++ b/file/random_access_file_reader_test.cc @@ -95,7 +95,18 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { "RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* reqs) { // Copy reqs, since it's allocated on stack inside MultiRead, which will // be deallocated after MultiRead returns. - aligned_reqs = *reinterpret_cast*>(reqs); + size_t i = 0; + aligned_reqs.resize( + (*reinterpret_cast*>(reqs)).size()); + for (auto& req : + (*reinterpret_cast*>(reqs))) { + aligned_reqs[i].offset = req.offset; + aligned_reqs[i].len = req.len; + aligned_reqs[i].result = req.result; + aligned_reqs[i].status = req.status; + aligned_reqs[i].scratch = req.scratch; + i++; + } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -136,7 +147,7 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r1)); AlignedBuf aligned_buf; ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, - Env::IO_TOTAL /* rate_limiter_priority */)); + Env::IO_TOTAL /*rate_limiter_priority*/)); AssertResult(content, reqs); @@ -181,7 +192,7 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r2)); AlignedBuf aligned_buf; ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, - Env::IO_TOTAL /* rate_limiter_priority */)); + Env::IO_TOTAL /*rate_limiter_priority*/)); AssertResult(content, reqs); @@ -226,7 +237,7 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r2)); AlignedBuf aligned_buf; ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, - Env::IO_TOTAL /* rate_limiter_priority */)); + Env::IO_TOTAL /*rate_limiter_priority*/)); AssertResult(content, reqs); @@ -263,7 +274,7 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r1)); AlignedBuf aligned_buf; ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, - Env::IO_TOTAL /* rate_limiter_priority */)); + Env::IO_TOTAL /*rate_limiter_priority*/)); AssertResult(content, reqs); @@ -283,7 +294,6 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); } - TEST(FSReadRequest, Align) { FSReadRequest r; r.offset = 2000; diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index cf57c8fc9..f8e321417 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -79,6 +79,10 @@ enum class IOType : uint8_t { kInvalid, }; +// enum representing various operations supported by underlying FileSystem. +// These need to be set in SupportedOps API for RocksDB to use them. +enum FSSupportedOps { kAsyncIO, kFSBuffer }; + // Per-request options that can be passed down to the FileSystem // implementation. These are hints and are not necessarily guaranteed to be // honored. More hints can be added here in the future to indicate things like @@ -657,7 +661,6 @@ class FileSystem : public Customizable { const IOOptions& options, bool* is_dir, IODebugContext* /*dgb*/) = 0; - // EXPERIMENTAL // Poll for completion of read IO requests. The Poll() method should call the // callback functions to indicate completion of read requests. // Underlying FS is required to support Poll API. Poll implementation should @@ -665,28 +668,33 @@ class FileSystem : public Customizable { // after the callback has been called. // If Poll returns partial results for any reads, its caller reponsibility to // call Read or ReadAsync in order to get the remaining bytes. - // - // Default implementation is to return IOStatus::OK. - virtual IOStatus Poll(std::vector& /*io_handles*/, size_t /*min_completions*/) { return IOStatus::OK(); } - // EXPERIMENTAL // Abort the read IO requests submitted asynchronously. Underlying FS is // required to support AbortIO API. AbortIO implementation should ensure that // the all the read requests related to io_handles should be aborted and // it shouldn't call the callback for these io_handles. - // - // Default implementation is to return IOStatus::OK. virtual IOStatus AbortIO(std::vector& /*io_handles*/) { return IOStatus::OK(); } - // Indicates to upper layers whether the FileSystem supports/uses async IO - // or not - virtual bool use_async_io() { return true; } + // Indicates to upper layers which FileSystem operations mentioned in + // FSSupportedOps are supported by underlying FileSystem. Each bit in + // supported_ops argument represent corresponding FSSupportedOps operation. + // Foreg: + // If async_io is supported by the underlying FileSystem, then supported_ops + // will have corresponding bit (i.e FSSupportedOps::kAsyncIO) set to 1. + // + // By default, async_io operation is set and FS should override this API and + // set all the operations they support provided in FSSupportedOps (including + // async_io). + virtual void SupportedOps(int64_t& supported_ops) { + supported_ops = 0; + supported_ops |= (1 << FSSupportedOps::kAsyncIO); + } // If you're adding methods here, remember to add them to EnvWrapper too. @@ -791,6 +799,42 @@ struct FSReadRequest { // Output parameter set by underlying FileSystem that represents status of // read request. IOStatus status; + + // fs_scratch is a data buffer allocated and provided by underlying FileSystem + // to RocksDB during reads, when FS wants to provide its own buffer with data + // instead of using RocksDB provided FSReadRequest::scratch. + // + // FileSystem needs to provide a buffer and custom delete function. The + // lifecycle of fs_scratch until data is used by RocksDB. The buffer + // should be released by RocksDB using custom delete function provided in + // unique_ptr fs_scratch. + // + // Optimization benefits: + // This is helpful in cases where underlying FileSystem has to do additional + // copy of data to RocksDB provided buffer which can consume CPU cycles. It + // can be optimized by avoiding copying to RocksDB buffer and directly using + // FS provided buffer. + // + // How to enable: + // In order to enable this option, FS needs to override SupportedOps() API and + // set FSSupportedOps::kFSBuffer in SupportedOps() as: + // { + // supported_ops |= (1 << FSSupportedOps::kFSBuffer); + // } + // + // Work in progress: + // Right now it's only enabled for MultiReads (sync and async + // both) with non direct io. + // If RocksDB provide its own buffer (scratch) during reads, that's a + // signal for FS to use RocksDB buffer. + // If FSSupportedOps::kFSBuffer is enabled and scratch == nullptr, + // then FS have to provide its own buffer in fs_scratch. + // + // NOTE: + // - FSReadRequest::result should point to fs_scratch. + // - This is needed only if FSSupportedOps::kFSBuffer support is provided by + // underlying FS. + std::unique_ptr> fs_scratch; }; // A file abstraction for randomly reading the contents of a file. @@ -885,7 +929,6 @@ class FSRandomAccessFile { return IOStatus::NotSupported("InvalidateCache not supported."); } - // EXPERIMENTAL // This API reads the requested data in FSReadRequest asynchronously. This is // a asynchronous call, i.e it should return after submitting the request. // @@ -907,6 +950,16 @@ class FSRandomAccessFile { // request and result and status fields are output parameter set by underlying // FileSystem. The data should always be read into scratch field. // + // How to enable: + // In order to enable ReadAsync, FS needs to override SupportedOps() API and + // set FSSupportedOps::kAsyncIO in SupportedOps() as: + // { + // supported_ops |= (1 << FSSupportedOps::kAsyncIO); + // } + // + // Note: If FS supports ReadAsync API, it should also override Poll and + // AbortIO API. + // // Default implementation is to read the data synchronously. virtual IOStatus ReadAsync( FSReadRequest& req, const IOOptions& opts, @@ -1550,7 +1603,9 @@ class FileSystemWrapper : public FileSystem { return target_->AbortIO(io_handles); } - virtual bool use_async_io() override { return target_->use_async_io(); } + virtual void SupportedOps(int64_t& supported_ops) override { + return target_->SupportedOps(supported_ops); + } protected: std::shared_ptr target_; diff --git a/port/win/env_win.h b/port/win/env_win.h index cf04ec2fe..b6482ba92 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -227,7 +227,7 @@ class WinFileSystem : public FileSystem { const FileOptions& file_options) const override; FileOptions OptimizeForManifestWrite( const FileOptions& file_options) const override; - bool use_async_io() override { return false; } + void SupportedOps(int64_t& supported_ops) override { supported_ops = 0; } protected: static uint64_t FileTimeToUnixTime(const FILETIME& ftTime); diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 1dc02960b..d0686a4bc 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -377,7 +377,7 @@ class BlockBasedTable : public TableReader { const MultiGetRange* batch, const autovector* handles, Status* statuses, CachableEntry* results, char* scratch, - const UncompressionDict& uncompression_dict); + const UncompressionDict& uncompression_dict, bool use_fs_scratch); // Get the iterator from the index reader. // diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h index 0cb251afa..eb4bf5bed 100644 --- a/table/block_based/block_based_table_reader_sync_and_async.h +++ b/table/block_based/block_based_table_reader_sync_and_async.h @@ -33,7 +33,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) (const ReadOptions& options, const MultiGetRange* batch, const autovector* handles, Status* statuses, CachableEntry* results, char* scratch, - const UncompressionDict& uncompression_dict) const { + const UncompressionDict& uncompression_dict, bool use_fs_scratch) const { RandomAccessFileReader* file = rep_->file.get(); const Footer& footer = rep_->footer; const ImmutableOptions& ioptions = rep_->ioptions; @@ -88,7 +88,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) // We don't combine block reads here in direct IO mode, because when doing // direct IO read, the block requests will be realigned and merged when // necessary. - if (use_shared_buffer && !file->use_direct_io() && + if ((use_shared_buffer || use_fs_scratch) && !file->use_direct_io() && prev_end == handle.offset()) { req_offset_for_block.emplace_back(prev_len); prev_len += BlockSizeWithTrailer(handle); @@ -99,7 +99,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) FSReadRequest req; req.offset = prev_offset; req.len = prev_len; - if (file->use_direct_io()) { + if (file->use_direct_io() || use_fs_scratch) { req.scratch = nullptr; } else if (use_shared_buffer) { req.scratch = scratch + buf_offset; @@ -107,10 +107,10 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) } else { req.scratch = new char[req.len]; } - read_reqs.emplace_back(req); + read_reqs.emplace_back(std::move(req)); } - // Step 2, remeber the previous block info + // Step 2, remember the previous block info prev_offset = handle.offset(); prev_len = BlockSizeWithTrailer(handle); req_offset_for_block.emplace_back(0); @@ -125,14 +125,14 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) FSReadRequest req; req.offset = prev_offset; req.len = prev_len; - if (file->use_direct_io()) { + if (file->use_direct_io() || use_fs_scratch) { req.scratch = nullptr; } else if (use_shared_buffer) { req.scratch = scratch + buf_offset; } else { req.scratch = new char[req.len]; } - read_reqs.emplace_back(req); + read_reqs.emplace_back(std::move(req)); } AlignedBuf direct_io_buf; @@ -192,7 +192,10 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) BlockContents serialized_block; if (s.ok()) { - if (!use_shared_buffer) { + if (use_fs_scratch) { + serialized_block = + BlockContents(Slice(req.result.data() + req_offset, handle.size())); + } else if (!use_shared_buffer) { // We allocated a buffer for this block. Give ownership of it to // BlockContents so it can free the memory assert(req.result.data() == req.scratch); @@ -243,7 +246,8 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) // heap buffer or there is no cache at all. CompressionType compression_type = GetBlockCompressionType(serialized_block); - if (use_shared_buffer && compression_type == kNoCompression) { + if ((use_fs_scratch || use_shared_buffer) && + compression_type == kNoCompression) { Slice serialized = Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle)); serialized_block = BlockContents( @@ -304,6 +308,17 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) } statuses[idx_in_batch] = s; } + + if (use_fs_scratch) { + // Free the allocated scratch buffer by fs here as read requests might have + // been combined into one. + for (FSReadRequest& req : read_reqs) { + if (req.fs_scratch != nullptr) { + req.fs_scratch.reset(); + req.fs_scratch = nullptr; + } + } + } } using MultiGetRange = MultiGetContext::Range; @@ -512,11 +527,20 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) if (total_len) { char* scratch = nullptr; + bool use_fs_scratch = false; const UncompressionDict& dict = uncompression_dict.GetValue() ? *uncompression_dict.GetValue() : UncompressionDict::GetEmptyDict(); assert(uncompression_dict_inited || !rep_->uncompression_dict_reader); assert(uncompression_dict_status.ok()); + + if (!rep_->file->use_direct_io()) { + if (CheckFSFeatureSupport(rep_->ioptions.fs.get(), + FSSupportedOps::kFSBuffer)) { + use_fs_scratch = true; + } + } + // If using direct IO, then scratch is not used, so keep it nullptr. // If the blocks need to be uncompressed and we don't need the // compressed blocks, then we can use a contiguous block of @@ -527,7 +551,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) // 2. If blocks are uncompressed, alloc heap bufs // 3. If blocks are compressed and no compressed block cache, use // stack buf - if (!rep_->file->use_direct_io() && + if (!use_fs_scratch && !rep_->file->use_direct_io() && rep_->blocks_maybe_compressed) { if (total_len <= kMultiGetReadStackBufSize) { scratch = stack_buf; @@ -538,7 +562,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) } CO_AWAIT(RetrieveMultipleBlocks) (read_options, &data_block_range, &block_handles, &statuses[0], - &results[0], scratch, dict); + &results[0], scratch, dict, use_fs_scratch); if (get_context) { ++(get_context->get_context_stats_.num_sst_read); } diff --git a/unreleased_history/new_features/fs_provided_buffer.md b/unreleased_history/new_features/fs_provided_buffer.md new file mode 100644 index 000000000..8979f5c4e --- /dev/null +++ b/unreleased_history/new_features/fs_provided_buffer.md @@ -0,0 +1 @@ +Add FSReadRequest::fs_scratch which is a data buffer allocated and provided by underlying FileSystem to RocksDB during reads, when FS wants to provide its own buffer with data instead of using RocksDB provided FSReadRequest::scratch. This can help in cpu optimization by avoiding copy from file system's buffer to RocksDB buffer. More details on how to use/enable it in file_system.h. Right now its supported only for MultiReads(async + sync) with non direct io. diff --git a/unreleased_history/public_api_changes/change_async_io_api.md b/unreleased_history/public_api_changes/change_async_io_api.md new file mode 100644 index 000000000..88f16367d --- /dev/null +++ b/unreleased_history/public_api_changes/change_async_io_api.md @@ -0,0 +1 @@ +change the FileSystem::use_async_io() API to SupportedOps API in order to extend it to various operations supported by underlying FileSystem. Right now it contains FSSupportedOps::kAsyncIO and FSSupportedOps::kFSBuffer. More details about FSSupportedOps in filesystem.h