Provide support for direct_reads with async_io (#10197)

Summary:
Provide support for use_direct_reads with async_io.

TestPlan:
-  Updated unit tests
-  db_bench: Results in https://github.com/facebook/rocksdb/pull/10197#issuecomment-1159239420
- db_stress
```
export CRASH_TEST_EXT_ARGS=" --async_io=1 --use_direct_reads=1"
make crash_test -j
```
- Ran db_bench on previous RocksDB version before any async_io implementation (as there have many changes in different PRs in this area) https://github.com/facebook/rocksdb/pull/10197#issuecomment-1160781563.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10197

Reviewed By: anand1976

Differential Revision: D37255646

Pulled By: akankshamahajan15

fbshipit-source-id: fec61ae15bf4d625f79dea56e4f86e0e307ba920
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent 177b2fa341
commit 2acbf386a3
  1. 1
      HISTORY.md
  2. 8
      env/fs_posix.cc
  3. 54
      env/io_posix.cc
  4. 55
      env/io_posix.h
  5. 18
      file/file_prefetch_buffer.cc
  6. 10
      file/file_prefetch_buffer.h
  7. 70
      file/prefetch_test.cc
  8. 99
      file/random_access_file_reader.cc
  9. 28
      file/random_access_file_reader.h
  10. 7
      table/block_based/block_based_table_iterator.cc
  11. 9
      table/block_based/block_prefetcher.cc
  12. 3
      table/block_based/block_prefetcher.h
  13. 6
      table/block_based/partitioned_index_iterator.cc
  14. 3
      table/block_fetcher.cc
  15. 2
      util/async_file_reader.cc

@ -3,6 +3,7 @@
### New Features
* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`.
* Support backward iteration when `ReadOptions::iter_start_ts` is set.
* Provide support for ReadOptions.async_io with direct_io to improve Seek latency by using async IO to parallelize child iterator seek and doing asynchronous prefetching on sequential scans.
### Public API changes
* Add metadata related structs and functions in C API, including

8
env/fs_posix.cc vendored

@ -1102,15 +1102,21 @@ class PosixFileSystem : public FileSystem {
req.scratch = posix_handle->scratch;
req.offset = posix_handle->offset;
req.len = posix_handle->len;
size_t finished_len = 0;
size_t bytes_read = 0;
bool read_again = false;
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
true /*async_read*/, finished_len, &req, bytes_read);
true /*async_read*/, posix_handle->use_direct_io,
posix_handle->alignment, finished_len, &req, bytes_read,
read_again);
posix_handle->is_finished = true;
io_uring_cqe_seen(iu, cqe);
posix_handle->cb(req, posix_handle->cb_arg);
(void)finished_len;
(void)bytes_read;
(void)read_again;
if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
break;

54
env/io_posix.cc vendored

@ -199,23 +199,6 @@ bool IsSyncFileRangeSupported(int fd) {
} // anonymous namespace
/*
* DirectIOHelper
*/
namespace {
bool IsSectorAligned(const size_t off, size_t sector_size) {
assert((sector_size & (sector_size - 1)) == 0);
return (off & (sector_size - 1)) == 0;
}
#ifndef NDEBUG
bool IsSectorAligned(const void* ptr, size_t sector_size) {
return uintptr_t(ptr) % sector_size == 0;
}
#endif
} // namespace
/*
* PosixSequentialFile
*/
@ -760,24 +743,14 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
FSReadRequest* req = req_wrap->req;
size_t bytes_read = 0;
bool read_again = false;
UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
false /*async_read*/, req_wrap->finished_len, req,
bytes_read);
false /*async_read*/, use_direct_io(),
GetRequiredBufferAlignment(), req_wrap->finished_len, req,
bytes_read, read_again);
int32_t res = cqe->res;
if (res >= 0) {
if (bytes_read == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io() &&
!IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK();
} else {
if (bytes_read == 0 && read_again) {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
@ -785,7 +758,6 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
} else if (bytes_read < req_wrap->iov.iov_len) {
incomplete_rq_list.push_back(req_wrap);
}
@ -910,19 +882,15 @@ IOStatus PosixRandomAccessFile::ReadAsync(
args = nullptr;
};
Posix_IOHandle* posix_handle = new Posix_IOHandle();
*io_handle = static_cast<void*>(posix_handle);
*del_fn = deletefn;
// Initialize Posix_IOHandle.
posix_handle->iu = iu;
Posix_IOHandle* posix_handle =
new Posix_IOHandle(iu, cb, cb_arg, req.offset, req.len, req.scratch,
use_direct_io(), GetRequiredBufferAlignment());
posix_handle->iov.iov_base = req.scratch;
posix_handle->iov.iov_len = req.len;
posix_handle->cb = cb;
posix_handle->cb_arg = cb_arg;
posix_handle->offset = req.offset;
posix_handle->len = req.len;
posix_handle->scratch = req.scratch;
*io_handle = static_cast<void*>(posix_handle);
*del_fn = deletefn;
// Step 3: io_uring_sqe_set_data
struct io_uring_sqe* sqe;

55
env/io_posix.h vendored

@ -52,8 +52,37 @@ class PosixHelper {
size_t* size);
};
/*
* DirectIOHelper
*/
inline bool IsSectorAligned(const size_t off, size_t sector_size) {
assert((sector_size & (sector_size - 1)) == 0);
return (off & (sector_size - 1)) == 0;
}
#ifndef NDEBUG
inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
return uintptr_t(ptr) % sector_size == 0;
}
#endif
#if defined(ROCKSDB_IOURING_PRESENT)
struct Posix_IOHandle {
Posix_IOHandle(struct io_uring* _iu,
std::function<void(const FSReadRequest&, void*)> _cb,
void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch,
bool _use_direct_io, size_t _alignment)
: iu(_iu),
cb(_cb),
cb_arg(_cb_arg),
offset(_offset),
len(_len),
scratch(_scratch),
use_direct_io(_use_direct_io),
alignment(_alignment),
is_finished(false),
req_count(0) {}
struct iovec iov;
struct io_uring* iu;
std::function<void(const FSReadRequest&, void*)> cb;
@ -61,15 +90,19 @@ struct Posix_IOHandle {
uint64_t offset;
size_t len;
char* scratch;
bool is_finished = false;
bool use_direct_io;
size_t alignment;
bool is_finished;
// req_count is used by AbortIO API to keep track of number of requests.
uint32_t req_count = 0;
uint32_t req_count;
};
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
size_t len, size_t iov_len, bool async_read,
bool use_direct_io, uint32_t alignment,
size_t& finished_len, FSReadRequest* req,
size_t& bytes_read) {
size_t& bytes_read, bool& read_again) {
read_again = false;
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", file_name, cqe->res);
@ -80,10 +113,24 @@ inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
req->result = Slice(req->scratch, req->len);
req->status = IOStatus::OK();
} else if (bytes_read == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, finished_len);
req->status = IOStatus::OK();
} else {
if (async_read) {
// No bytes read. It can means EOF.
// No bytes read. It can means EOF. In case of partial results, it's
// caller responsibility to call read/readasync again.
req->result = Slice(req->scratch, 0);
req->status = IOStatus::OK();
} else {
read_again = true;
}
}
} else if (bytes_read < iov_len) {
assert(bytes_read > 0);

@ -88,7 +88,7 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts,
Slice result;
Status s = reader->Read(opts, rounddown_start + chunk_len, read_len, &result,
bufs_[index].buffer_.BufferStart() + chunk_len,
nullptr, rate_limiter_priority);
/*aligned_buf=*/nullptr, rate_limiter_priority);
#ifndef NDEBUG
if (result.size() < read_len) {
// Fake an IO error to force db_stress fault injection to ignore
@ -108,7 +108,6 @@ Status FilePrefetchBuffer::Read(const IOOptions& opts,
Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
Env::IOPriority rate_limiter_priority,
uint64_t read_len, uint64_t chunk_len,
uint64_t rounddown_start, uint32_t index) {
// callback for async read request.
@ -120,8 +119,9 @@ Status FilePrefetchBuffer::ReadAsync(const IOOptions& opts,
req.offset = rounddown_start + chunk_len;
req.result = result;
req.scratch = bufs_[index].buffer_.BufferStart() + chunk_len;
Status s = reader->ReadAsync(req, opts, fp, nullptr /*cb_arg*/, &io_handle_,
&del_fn_, rate_limiter_priority);
Status s = reader->ReadAsync(req, opts, fp,
/*cb_arg=*/nullptr, &io_handle_, &del_fn_,
/*aligned_buf=*/nullptr);
req.status.PermitUncheckedError();
if (s.ok()) {
async_read_in_progress_ = true;
@ -373,8 +373,7 @@ Status FilePrefetchBuffer::PrefetchAsyncInternal(
bufs_[second].offset_ = rounddown_start2;
assert(roundup_len2 >= chunk_len2);
uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2,
rounddown_start2, second)
ReadAsync(opts, reader, read_len2, chunk_len2, rounddown_start2, second)
.PermitUncheckedError();
}
@ -544,7 +543,8 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
if (req.status.ok()) {
if (req.offset + req.result.size() <=
bufs_[index].offset_ + bufs_[index].buffer_.CurrentSize()) {
// All requested bytes are already in the buffer. So no need to update.
// All requested bytes are already in the buffer or no data is read
// because of EOF. So no need to update.
return;
}
if (req.offset < bufs_[index].offset_) {
@ -560,7 +560,6 @@ void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req,
Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority,
Slice* result) {
assert(reader != nullptr);
if (!enable_) {
@ -630,8 +629,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
s = ReadAsync(opts, reader, rate_limiter_priority, read_len, chunk_len,
rounddown_start, second);
s = ReadAsync(opts, reader, read_len, chunk_len, rounddown_start, second);
if (!s.ok()) {
return s;

@ -138,16 +138,13 @@ class FilePrefetchBuffer {
// reader : the file reader.
// offset : the file offset to start reading from.
// n : the number of bytes to read.
// rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to
// bypass.
// result : if data already exists in the buffer, result will
// be updated with the data.
//
// If data already exist in the buffer, it will return Status::OK, otherwise
// it will send asynchronous request and return Status::TryAgain.
Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority, Slice* result);
uint64_t offset, size_t n, Slice* result);
// Tries returning the data for a file read from this buffer if that data is
// in the buffer.
@ -246,9 +243,8 @@ class FilePrefetchBuffer {
uint64_t chunk_len, uint64_t rounddown_start, uint32_t index);
Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader,
Env::IOPriority rate_limiter_priority, uint64_t read_len,
uint64_t chunk_len, uint64_t rounddown_start,
uint32_t index);
uint64_t read_len, uint64_t chunk_len,
uint64_t rounddown_start, uint32_t index);
// Copy the data from src to third buffer.
void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length);

@ -864,33 +864,24 @@ TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
Close();
}
class PrefetchTest1
: public DBTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool>> {
public:
PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1,
::testing::Combine(::testing::Bool(),
::testing::Bool()));
#ifndef ROCKSDB_LITE
TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
TEST_P(PrefetchTest, DBIterLevelReadAhead) {
const int kNumKeys = 1000;
// Set options
std::shared_ptr<MockFS> fs =
std::make_shared<MockFS>(env_->GetFileSystem(), false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = std::get<0>(GetParam());
bool is_adaptive_readahead = std::get<1>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
options.compression = kNoCompression;
options.statistics = CreateDBStatistics();
options.env = env.get();
bool use_direct_io = std::get<0>(GetParam());
if (use_direct_io) {
options.use_direct_reads = true;
options.use_direct_io_for_flush_and_compaction = true;
@ -987,7 +978,7 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (ro.async_io && !use_direct_io) {
if (ro.async_io) {
ASSERT_GT(async_read_bytes.count, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
@ -1001,16 +992,16 @@ TEST_P(PrefetchTest1, DBIterLevelReadAhead) {
}
#endif //! ROCKSDB_LITE
class PrefetchTest2 : public DBTestBase,
class PrefetchTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTest2() : DBTestBase("prefetch_test2", true) {}
PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest2, PrefetchTest2, ::testing::Bool());
INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool());
#ifndef ROCKSDB_LITE
TEST_P(PrefetchTest2, NonSequentialReadsWithAdaptiveReadahead) {
TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
const int kNumKeys = 1000;
// Set options
std::shared_ptr<MockFS> fs =
@ -1103,7 +1094,7 @@ TEST_P(PrefetchTest2, NonSequentialReadsWithAdaptiveReadahead) {
}
#endif //! ROCKSDB_LITE
TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
const int kNumKeys = 2000;
// Set options
std::shared_ptr<MockFS> fs =
@ -1167,7 +1158,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
SyncPoint::GetInstance()->EnableProcessing();
ReadOptions ro;
ro.adaptive_readahead = true;
// ro.async_io = true;
{
/*
* Reseek keys from sequential Data Blocks within same partitioned
@ -1248,7 +1238,7 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
Close();
}
TEST_P(PrefetchTest2, SeekParallelizationTest) {
TEST_P(PrefetchTest1, SeekParallelizationTest) {
const int kNumKeys = 2000;
// Set options
std::shared_ptr<MockFS> fs =
@ -1341,13 +1331,9 @@ TEST_P(PrefetchTest2, SeekParallelizationTest) {
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
if (GetParam()) {
ASSERT_EQ(async_read_bytes.count, 0);
} else {
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
}
}
buff_prefetch_count = 0;
}
@ -1356,11 +1342,7 @@ TEST_P(PrefetchTest2, SeekParallelizationTest) {
extern "C" bool RocksDbIOUringEnable() { return true; }
class PrefetchTestWithPosix : public DBTestBase,
public ::testing::WithParamInterface<bool> {
public:
PrefetchTestWithPosix() : DBTestBase("prefetch_test_with_posix", true) {}
namespace {
#ifndef ROCKSDB_LITE
#ifdef GFLAGS
const int kMaxArgCount = 100;
@ -1387,13 +1369,10 @@ class PrefetchTestWithPosix : public DBTestBase,
}
#endif // GFLAGS
#endif // ROCKSDB_LITE
};
INSTANTIATE_TEST_CASE_P(PrefetchTestWithPosix, PrefetchTestWithPosix,
::testing::Bool());
} // namespace
// Tests the default implementation of ReadAsync API with PosixFileSystem.
TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
@ -1404,7 +1383,7 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
FileSystem::Default(), /*support_prefetch=*/false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = false;
bool use_direct_io = std::get<0>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
@ -1453,7 +1432,7 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
ro.adaptive_readahead = true;
ro.async_io = true;
if (GetParam()) {
if (std::get<1>(GetParam())) {
ro.readahead_size = 16 * 1024;
}
@ -1530,7 +1509,8 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
options.statistics->histogramData(ASYNC_READ_BYTES,
&async_read_bytes);
HistogramData prefetched_bytes_discarded;
options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
&prefetched_bytes_discarded);
@ -1553,11 +1533,11 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
}
#ifndef ROCKSDB_LITE
#ifdef GFLAGS
TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) {
TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
@ -1568,7 +1548,7 @@ TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) {
FileSystem::Default(), /*support_prefetch=*/false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
bool use_direct_io = false;
bool use_direct_io = std::get<0>(GetParam());
Options options = CurrentOptions();
options.write_buffer_size = 1024;
options.create_if_missing = true;
@ -1617,7 +1597,7 @@ TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) {
ro.adaptive_readahead = true;
ro.async_io = true;
if (GetParam()) {
if (std::get<1>(GetParam())) {
ro.readahead_size = 16 * 1024;
}
@ -1638,8 +1618,8 @@ TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) {
std::unique_ptr<TraceWriter> trace_writer;
std::string trace_file_path = dbname_ + "/io_trace_file";
ASSERT_OK(
NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer));
ASSERT_OK(NewFileTraceWriter(env_, EnvOptions(), trace_file_path,
&trace_writer));
ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer)));
ASSERT_OK(options.statistics->Reset());
@ -1678,7 +1658,7 @@ TEST_P(PrefetchTestWithPosix, TraceReadAsyncWithCallbackWrapper) {
SyncPoint::GetInstance()->ClearAllCallBacks();
Close();
}
}
#endif // GFLAGS
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -446,29 +446,17 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
}
}
// TODO akanksha:
// 1. Handle use_direct_io case which currently calls Read API.
IOStatus RandomAccessFileReader::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority) {
if (use_direct_io()) {
// For direct_io, it calls Read API.
req.status = Read(opts, req.offset, req.len, &(req.result), req.scratch,
nullptr /*dbg*/, rate_limiter_priority);
cb(req, cb_arg);
return IOStatus::OK();
}
void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf) {
IOStatus s;
// Create a callback and populate info.
auto read_async_callback =
std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
ReadAsyncInfo* read_async_info = new ReadAsyncInfo;
read_async_info->cb_ = cb;
read_async_info->cb_arg_ = cb_arg;
read_async_info->start_time_ = clock_->NowMicros();
ReadAsyncInfo* read_async_info =
new ReadAsyncInfo(cb, cb_arg, clock_->NowMicros());
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
@ -476,8 +464,38 @@ IOStatus RandomAccessFileReader::ReadAsync(
}
#endif
IOStatus s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
size_t alignment = file_->GetRequiredBufferAlignment();
bool is_aligned = (req.offset & (alignment - 1)) == 0 &&
(req.len & (alignment - 1)) == 0 &&
(uintptr_t(req.scratch) & (alignment - 1)) == 0;
read_async_info->is_aligned_ = is_aligned;
if (use_direct_io() && is_aligned == false) {
FSReadRequest aligned_req = Align(req, alignment);
// Allocate aligned buffer.
read_async_info->buf_.Alignment(alignment);
read_async_info->buf_.AllocateNewBuffer(aligned_req.len);
// Set rem fields in aligned FSReadRequest.
aligned_req.scratch = read_async_info->buf_.BufferStart();
// Set user provided fields to populate back in callback.
read_async_info->user_scratch_ = req.scratch;
read_async_info->user_aligned_buf_ = aligned_buf;
read_async_info->user_len_ = req.len;
read_async_info->user_offset_ = req.offset;
read_async_info->user_result_ = req.result;
assert(read_async_info->buf_.CurrentSize() == 0);
s = file_->ReadAsync(aligned_req, opts, read_async_callback,
read_async_info, io_handle, del_fn, nullptr /*dbg*/);
} else {
s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);
}
// Suppress false positive clang analyzer warnings.
// Memory is not released if file_->ReadAsync returns !s.ok(), because
// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
@ -497,7 +515,54 @@ void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
assert(read_async_info);
assert(read_async_info->cb_);
if (use_direct_io() && read_async_info->is_aligned_ == false) {
// Create FSReadRequest with user provided fields.
FSReadRequest user_req;
user_req.scratch = read_async_info->user_scratch_;
user_req.offset = read_async_info->user_offset_;
user_req.len = read_async_info->user_len_;
// Update results in user_req.
user_req.result = req.result;
user_req.status = req.status;
read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() +
req.result.size());
size_t offset_advance_len = static_cast<size_t>(
/*offset_passed_by_user=*/read_async_info->user_offset_ -
/*aligned_offset=*/req.offset);
size_t res_len = 0;
if (req.status.ok() &&
offset_advance_len < read_async_info->buf_.CurrentSize()) {
res_len =
std::min(read_async_info->buf_.CurrentSize() - offset_advance_len,
read_async_info->user_len_);
if (read_async_info->user_aligned_buf_ == nullptr) {
// Copy the data into user's scratch.
// Clang analyzer assumes that it will take use_direct_io() == false in
// ReadAsync and use_direct_io() == true in Callback which cannot be true.
#ifndef __clang_analyzer__
read_async_info->buf_.Read(user_req.scratch, offset_advance_len,
res_len);
#endif // __clang_analyzer__
} else {
// Set aligned_buf provided by user without additional copy.
user_req.scratch =
read_async_info->buf_.BufferStart() + offset_advance_len;
read_async_info->user_aligned_buf_->reset(
read_async_info->buf_.Release());
}
user_req.result = Slice(user_req.scratch, res_len);
} else {
// Either req.status is not ok or data was not read.
user_req.result = Slice();
}
read_async_info->cb_(user_req, read_async_info->cb_arg_);
} else {
read_async_info->cb_(req, read_async_info->cb_arg_);
}
// Update stats and notify listeners.
if (stats_ != nullptr && file_read_hist_ != nullptr) {

@ -93,12 +93,32 @@ class RandomAccessFileReader {
const bool is_last_level_;
struct ReadAsyncInfo {
ReadAsyncInfo(std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, uint64_t start_time)
: cb_(cb),
cb_arg_(cb_arg),
start_time_(start_time),
user_scratch_(nullptr),
user_aligned_buf_(nullptr),
user_offset_(0),
user_len_(0),
is_aligned_(false) {}
std::function<void(const FSReadRequest&, void*)> cb_;
void* cb_arg_;
uint64_t start_time_;
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint fs_start_ts_;
#endif
uint64_t start_time_;
std::function<void(const FSReadRequest&, void*)> cb_;
void* cb_arg_;
// Below fields stores the parameters passed by caller in case of direct_io.
char* user_scratch_;
AlignedBuf* user_aligned_buf_;
uint64_t user_offset_;
size_t user_len_;
Slice user_result_;
// Used in case of direct_io
AlignedBuffer buf_;
bool is_aligned_;
};
public:
@ -190,7 +210,7 @@ class RandomAccessFileReader {
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
Env::IOPriority rate_limiter_priority);
AlignedBuf* aligned_buf);
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
};

@ -257,7 +257,7 @@ void BlockBasedTableIterator::InitDataBlock() {
// Enabled from the very first IO when ReadOptions.readahead_size is set.
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size, is_for_compaction,
/*async_io=*/false, read_options_.rate_limiter_priority);
/*no_sequential_checking=*/false, read_options_.rate_limiter_priority);
Status s;
table_->NewDataBlockIterator<DataBlockIter>(
read_options_, data_block_handle, &block_iter_, BlockType::kData,
@ -288,9 +288,12 @@ void BlockBasedTableIterator::AsyncInitDataBlock(bool is_first_pass) {
// Explicit user requested readahead:
// Enabled from the very first IO when ReadOptions.readahead_size is
// set.
// In case of async_io with Implicit readahead, block_prefetcher_ will
// always the create the prefetch buffer by setting no_sequential_checking
// = true.
block_prefetcher_.PrefetchIfNeeded(
rep, data_block_handle, read_options_.readahead_size,
is_for_compaction, read_options_.async_io,
is_for_compaction, /*no_sequential_checking=*/read_options_.async_io,
read_options_.rate_limiter_priority);
Status s;

@ -14,7 +14,8 @@
namespace ROCKSDB_NAMESPACE {
void BlockPrefetcher::PrefetchIfNeeded(
const BlockBasedTable::Rep* rep, const BlockHandle& handle,
const size_t readahead_size, bool is_for_compaction, const bool async_io,
const size_t readahead_size, bool is_for_compaction,
const bool no_sequential_checking,
const Env::IOPriority rate_limiter_priority) {
// num_file_reads is used by FilePrefetchBuffer only when
// implicit_auto_readahead is set.
@ -43,9 +44,9 @@ void BlockPrefetcher::PrefetchIfNeeded(
return;
}
// In case of async_io, always creates the PrefetchBuffer irrespective of
// num_file_reads_.
if (async_io) {
// In case of no_sequential_checking, it will skip the num_file_reads_ and
// will always creates the FilePrefetchBuffer.
if (no_sequential_checking) {
rep->CreateFilePrefetchBufferIfNotExists(
initial_auto_readahead_size_, max_auto_readahead_size,
&prefetch_buffer_, /*implicit_auto_readahead=*/true,

@ -20,7 +20,8 @@ class BlockPrefetcher {
void PrefetchIfNeeded(const BlockBasedTable::Rep* rep,
const BlockHandle& handle, size_t readahead_size,
bool is_for_compaction, bool async_io,
bool is_for_compaction,
const bool no_sequential_checking,
Env::IOPriority rate_limiter_priority);
FilePrefetchBuffer* prefetch_buffer() { return prefetch_buffer_.get(); }

@ -89,9 +89,9 @@ void PartitionedIndexIterator::InitPartitionedIndexBlock() {
// Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0.
// Explicit user requested readahead:
// Enabled from the very first IO when ReadOptions.readahead_size is set.
block_prefetcher_.PrefetchIfNeeded(rep, partitioned_index_handle,
read_options_.readahead_size,
is_for_compaction, /*async_io=*/false,
block_prefetcher_.PrefetchIfNeeded(
rep, partitioned_index_handle, read_options_.readahead_size,
is_for_compaction, /*no_sequential_checking=*/false,
read_options_.rate_limiter_priority);
Status s;
table_->NewDataBlockIterator<IndexBlockIter>(

@ -359,8 +359,7 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
return io_s;
}
io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync(
opts, file_, handle_.offset(), block_size_with_trailer_,
read_options_.rate_limiter_priority, &slice_));
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_));
if (io_s.IsTryAgain()) {
return io_s;
}

@ -28,7 +28,7 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) {
read_req->result = req.result;
},
&awaiter->read_reqs_[i], &awaiter->io_handle_[i],
&awaiter->del_fn_[i], Env::IOPriority::IO_TOTAL)
&awaiter->del_fn_[i], /*aligned_buf=*/nullptr)
.PermitUncheckedError();
}
return true;

Loading…
Cancel
Save