Posix API support for Async Read and Poll APIs (#9578)

Summary:
Provide support for Async Read and Poll in Posix file system using IOUring.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9578

Test Plan: In progress

Reviewed By: anand1976

Differential Revision: D34690256

Pulled By: akankshamahajan15

fbshipit-source-id: 291cbd1380a3cb904b726c34c0560d1b2ce44a2e
main
Akanksha Mahajan 3 years ago committed by Facebook GitHub Bot
parent 7bed6595f3
commit 8465cccde2
  1. 11
      env/env_test.cc
  2. 70
      env/fs_posix.cc
  3. 134
      env/io_posix.cc
  4. 56
      env/io_posix.h
  5. 3
      include/rocksdb/file_system.h

11
env/env_test.cc vendored

@ -1256,7 +1256,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
// Random Read // Random Read
Random rnd(301 + attempt); Random rnd(301 + attempt);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) { "UpdateResults:io_uring_result", [&](void* arg) {
if (attempt > 0) { if (attempt > 0) {
// No failure in the first attempt. // No failure in the first attempt.
size_t& bytes_read = *static_cast<size_t*>(arg); size_t& bytes_read = *static_cast<size_t*>(arg);
@ -1326,7 +1326,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
const int num_reads = rnd.Uniform(512) + 1; const int num_reads = rnd.Uniform(512) + 1;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PosixRandomAccessFile::MultiRead:io_uring_result", [&](void* arg) { "UpdateResults:io_uring_result", [&](void* arg) {
if (attempt > 5) { if (attempt > 5) {
// Improve partial result rates in second half of the run to // Improve partial result rates in second half of the run to
// cover the case of repeated partial results. // cover the case of repeated partial results.
@ -3203,10 +3203,11 @@ IOStatus ReadAsyncRandomAccessFile::ReadAsync(
// Submit read request asynchronously. // Submit read request asynchronously.
std::function<void(FSReadRequest)> submit_request = std::function<void(FSReadRequest)> submit_request =
[&opts, cb, cb_arg, io_handle, del_fn, dbg, create_io_error, [&opts, cb, cb_arg, dbg, create_io_error, this](FSReadRequest _req) {
this](FSReadRequest _req) {
if (!create_io_error) { if (!create_io_error) {
target()->ReadAsync(_req, opts, cb, cb_arg, io_handle, del_fn, dbg); _req.status = target()->Read(_req.offset, _req.len, opts,
&(_req.result), _req.scratch, dbg);
cb(_req, cb_arg);
} }
}; };

70
env/fs_posix.cc vendored

@ -1043,6 +1043,76 @@ class PosixFileSystem : public FileSystem {
} }
#endif // ROCKSDB_IOURING_PRESENT #endif // ROCKSDB_IOURING_PRESENT
// EXPERIMENTAL
//
// TODO akankshamahajan: Update Poll API to take into account min_completions
// and returns if number of handles in io_handles (any order) completed is
// equal to atleast min_completions.
virtual IOStatus Poll(std::vector<void*>& io_handles,
size_t /*min_completions*/) override {
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring_queue_init.
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
}
// Init failed, platform doesn't support io_uring.
if (iu == nullptr) {
return IOStatus::NotSupported("Poll");
}
for (size_t i = 0; i < io_handles.size(); i++) {
// The request has been completed in earlier runs.
if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
continue;
}
// Loop until IO for io_handles[i] is completed.
while (true) {
// io_uring_wait_cqe.
struct io_uring_cqe* cqe = nullptr;
ssize_t ret = io_uring_wait_cqe(iu, &cqe);
if (ret) {
// abort as it shouldn't be in indeterminate state and there is no
// good way currently to handle this error.
abort();
}
// Step 3: Populate the request.
assert(cqe != nullptr);
Posix_IOHandle* posix_handle =
static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
assert(posix_handle->iu == iu);
if (posix_handle->iu != iu) {
return IOStatus::IOError("");
}
// Reset cqe data to catch any stray reuse of it
static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
FSReadRequest req;
req.scratch = posix_handle->scratch;
req.offset = posix_handle->offset;
req.len = posix_handle->len;
size_t finished_len = 0;
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
true /*async_read*/, finished_len, &req);
posix_handle->is_finished = true;
io_uring_cqe_seen(iu, cqe);
posix_handle->cb(req, posix_handle->cb_arg);
(void)finished_len;
if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
break;
}
}
}
return IOStatus::OK();
#else
(void)io_handles;
return IOStatus::NotSupported("Poll");
#endif
}
#if defined(ROCKSDB_IOURING_PRESENT) #if defined(ROCKSDB_IOURING_PRESENT)
// io_uring instance // io_uring instance
std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_; std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;

134
env/io_posix.cc vendored

@ -744,47 +744,31 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
wrap_cache.erase(wrap_check); wrap_cache.erase(wrap_check);
FSReadRequest* req = req_wrap->req; FSReadRequest* req = req_wrap->req;
if (cqe->res < 0) { UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
req->result = Slice(req->scratch, 0); false /*async_read*/, req_wrap->finished_len, req);
req->status = IOError("Req failed", filename_, cqe->res); int32_t res = cqe->res;
} else { if (res == 0) {
size_t bytes_read = static_cast<size_t>(cqe->res); /// cqe->res == 0 can means EOF, or can mean partial results. See
TEST_SYNC_POINT_CALLBACK( // comment
"PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read); // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
if (bytes_read == req_wrap->iov.iov_len) { // Fall back to pread in this case.
req->result = Slice(req->scratch, req->len); if (use_direct_io() && !IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK(); req->status = IOStatus::OK();
} else if (bytes_read == 0) {
// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io() &&
!IsSectorAligned(req_wrap->finished_len,
GetRequiredBufferAlignment())) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, req_wrap->finished_len);
req->status = IOStatus::OK();
} else {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
} else if (bytes_read < req_wrap->iov.iov_len) {
assert(bytes_read > 0);
assert(bytes_read + req_wrap->finished_len < req->len);
req_wrap->finished_len += bytes_read;
incomplete_rq_list.push_back(req_wrap);
} else { } else {
req->result = Slice(req->scratch, 0); Slice tmp_slice;
req->status = IOError("Req returned more bytes than requested", req->status =
filename_, cqe->res); Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
} }
} else if (res > 0 && res < static_cast<int32_t>(req_wrap->iov.iov_len)) {
incomplete_rq_list.push_back(req_wrap);
} }
io_uring_cqe_seen(iu, cqe); io_uring_cqe_seen(iu, cqe);
} }
@ -872,6 +856,80 @@ IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
#endif #endif
} }
IOStatus PosixRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& /*opts*/,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
if (use_direct_io()) {
assert(IsSectorAligned(req.offset, GetRequiredBufferAlignment()));
assert(IsSectorAligned(req.len, GetRequiredBufferAlignment()));
assert(IsSectorAligned(req.scratch, GetRequiredBufferAlignment()));
}
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring_queue_init.
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
if (iu == nullptr) {
iu = CreateIOUring();
if (iu != nullptr) {
thread_local_io_urings_->Reset(iu);
}
}
}
// Init failed, platform doesn't support io_uring.
if (iu == nullptr) {
return IOStatus::NotSupported("ReadAsync");
}
// Allocate io_handle.
IOHandleDeleter deletefn = [](void* args) -> void {
delete (static_cast<Posix_IOHandle*>(args));
args = nullptr;
};
Posix_IOHandle* posix_handle = new Posix_IOHandle();
*io_handle = static_cast<void*>(posix_handle);
*del_fn = deletefn;
// Initialize Posix_IOHandle.
posix_handle->iu = iu;
posix_handle->iov.iov_base = posix_handle->scratch;
posix_handle->iov.iov_len = posix_handle->len;
posix_handle->cb = cb;
posix_handle->cb_arg = cb_arg;
posix_handle->offset = req.offset;
posix_handle->len = req.len;
posix_handle->scratch = req.scratch;
// Step 3: io_uring_sqe_set_data
struct io_uring_sqe* sqe;
sqe = io_uring_get_sqe(iu);
io_uring_prep_readv(sqe, fd_, &posix_handle->iov, 1, posix_handle->offset);
io_uring_sqe_set_data(sqe, posix_handle);
// Step 4: io_uring_submit
ssize_t ret = io_uring_submit(iu);
if (ret < 0) {
fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
return IOStatus::IOError("io_uring_submit() requested but returned " +
ToString(ret));
}
return IOStatus::OK();
#else
(void)req;
(void)cb;
(void)cb_arg;
(void)io_handle;
(void)del_fn;
return IOStatus::NotSupported("ReadAsync");
#endif
}
/* /*
* PosixMmapReadableFile * PosixMmapReadableFile
* *

56
env/io_posix.h vendored

@ -13,14 +13,17 @@
#include <sys/uio.h> #include <sys/uio.h>
#endif #endif
#include <unistd.h> #include <unistd.h>
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <map> #include <map>
#include <string> #include <string>
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_system.h" #include "rocksdb/file_system.h"
#include "rocksdb/io_status.h" #include "rocksdb/io_status.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/thread_local.h" #include "util/thread_local.h"
@ -49,6 +52,54 @@ class PosixHelper {
size_t* size); size_t* size);
}; };
#if defined(ROCKSDB_IOURING_PRESENT)
struct Posix_IOHandle {
struct iovec iov;
struct io_uring* iu;
std::function<void(const FSReadRequest&, void*)> cb;
void* cb_arg;
uint64_t offset;
size_t len;
char* scratch;
bool is_finished = false;
};
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
size_t len, size_t iov_len, bool async_read,
size_t& finished_len, FSReadRequest* req) {
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", file_name, cqe->res);
} else {
size_t bytes_read = static_cast<size_t>(cqe->res);
TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
if (bytes_read == iov_len) {
req->result = Slice(req->scratch, req->len);
req->status = IOStatus::OK();
} else if (bytes_read == 0) {
if (async_read) {
// No bytes read. It can means EOF.
req->result = Slice(req->scratch, 0);
req->status = IOStatus::OK();
}
} else if (bytes_read < iov_len) {
assert(bytes_read > 0);
if (async_read) {
req->result = Slice(req->scratch, bytes_read);
req->status = IOStatus::OK();
} else {
assert(bytes_read + finished_len < len);
finished_len += bytes_read;
}
} else {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req returned more bytes than requested", file_name,
cqe->res);
}
}
}
#endif
#ifdef OS_LINUX #ifdef OS_LINUX
// Files under a specific directory have the same logical block size. // Files under a specific directory have the same logical block size.
// This class caches the logical block size for the specified directories to // This class caches the logical block size for the specified directories to
@ -210,6 +261,11 @@ class PosixRandomAccessFile : public FSRandomAccessFile {
virtual size_t GetRequiredBufferAlignment() const override { virtual size_t GetRequiredBufferAlignment() const override {
return logical_sector_size_; return logical_sector_size_;
} }
// EXPERIMENTAL
virtual IOStatus ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) override;
}; };
class PosixWritableFile : public FSWritableFile { class PosixWritableFile : public FSWritableFile {

@ -651,7 +651,8 @@ class FileSystem : public Customizable {
// Underlying FS is required to support Poll API. Poll implementation should // Underlying FS is required to support Poll API. Poll implementation should
// ensure that the callback gets called at IO completion, and return only // ensure that the callback gets called at IO completion, and return only
// after the callback has been called. // after the callback has been called.
// // If Poll returns partial results for any reads, its caller reponsibility to
// call Read or ReadAsync in order to get the remaining bytes.
// //
// Default implementation is to return IOStatus::OK. // Default implementation is to return IOStatus::OK.

Loading…
Cancel
Save