diff --git a/env/env_test.cc b/env/env_test.cc index 8be7b821f..c341230b0 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -3110,6 +3110,204 @@ TEST_F(EnvTest, CreateCompositeEnv) { } #endif // ROCKSDB_LITE +// Forward declaration +class ReadAsyncFS; + +struct MockIOHandle { + std::function cb; + void* cb_arg; + bool create_io_error; +}; + +// ReadAsyncFS and ReadAsyncRandomAccessFile mocks the FS doing asynchronous +// reads by creating threads that submit read requests and then calling Poll API +// to obtain those results. +class ReadAsyncRandomAccessFile : public FSRandomAccessFileOwnerWrapper { + public: + ReadAsyncRandomAccessFile(ReadAsyncFS& fs, + std::unique_ptr& file) + : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {} + + IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, + std::function cb, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, + IODebugContext* dbg) override; + + private: + ReadAsyncFS& fs_; + std::unique_ptr file_; + int counter = 0; +}; + +class ReadAsyncFS : public FileSystemWrapper { + public: + explicit ReadAsyncFS(const std::shared_ptr& wrapped) + : FileSystemWrapper(wrapped) {} + + static const char* kClassName() { return "ReadAsyncFS"; } + const char* Name() const override { return kClassName(); } + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + std::unique_ptr file; + IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg); + EXPECT_OK(s); + result->reset(new ReadAsyncRandomAccessFile(*this, file)); + return s; + } + + IOStatus Poll(std::vector& io_handles, + size_t /*min_completions*/) override { + // Wait for the threads completion. + for (auto& t : workers) { + t.join(); + } + + for (size_t i = 0; i < io_handles.size(); i++) { + MockIOHandle* handle = static_cast(io_handles[i]); + if (handle->create_io_error) { + FSReadRequest req; + req.status = IOStatus::IOError(); + handle->cb(req, handle->cb_arg); + } + } + return IOStatus::OK(); + } + + std::vector workers; +}; + +IOStatus ReadAsyncRandomAccessFile::ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) { + IOHandleDeleter deletefn = [](void* args) -> void { + delete (static_cast(args)); + args = nullptr; + }; + *del_fn = deletefn; + + // Allocate and populate io_handle. + MockIOHandle* mock_handle = new MockIOHandle(); + bool create_io_error = false; + if (counter % 2) { + create_io_error = true; + } + mock_handle->create_io_error = create_io_error; + mock_handle->cb = cb; + mock_handle->cb_arg = cb_arg; + *io_handle = static_cast(mock_handle); + counter++; + + // Submit read request asynchronously. + std::function submit_request = + [&opts, cb, cb_arg, io_handle, del_fn, dbg, create_io_error, + this](FSReadRequest _req) { + if (!create_io_error) { + target()->ReadAsync(_req, opts, cb, cb_arg, io_handle, del_fn, dbg); + } + }; + + fs_.workers.emplace_back(submit_request, req); + return IOStatus::OK(); +} + +class TestAsyncRead : public testing::Test { + public: + TestAsyncRead() { env_ = Env::Default(); } + Env* env_; +}; + +// Tests the default implementation of ReadAsync API. +TEST_F(TestAsyncRead, ReadAsync) { + EnvOptions soptions; + std::shared_ptr fs = + std::make_shared(env_->GetFileSystem()); + + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + const size_t kSectorSize = 4096; + const size_t kNumSectors = 8; + + // 1. create & write to a file. + { + std::unique_ptr wfile; + ASSERT_OK( + fs->NewWritableFile(fname, FileOptions(), &wfile, nullptr /*dbg*/)); + + for (size_t i = 0; i < kNumSectors; ++i) { + auto data = NewAligned(kSectorSize * 8, static_cast(i + 1)); + Slice slice(data.get(), kSectorSize); + ASSERT_OK(wfile->Append(slice, IOOptions(), nullptr)); + } + ASSERT_OK(wfile->Close(IOOptions(), nullptr)); + } + // 2. Read file + { + std::unique_ptr file; + ASSERT_OK(fs->NewRandomAccessFile(fname, FileOptions(), &file, nullptr)); + + IOOptions opts; + std::vector io_handles(kNumSectors); + std::vector reqs(kNumSectors); + std::vector> data; + std::vector vals; + IOHandleDeleter del_fn; + uint64_t offset = 0; + + // Initialize read requests + for (size_t i = 0; i < kNumSectors; i++) { + reqs[i].offset = offset; + reqs[i].len = kSectorSize; + data.emplace_back(NewAligned(kSectorSize, 0)); + reqs[i].scratch = data.back().get(); + vals.push_back(i); + offset += kSectorSize; + } + + // callback function passed to async read. + std::function callback = + [&](const FSReadRequest& req, void* cb_arg) { + assert(cb_arg != nullptr); + size_t i = *(reinterpret_cast(cb_arg)); + reqs[i].offset = req.offset; + reqs[i].result = req.result; + reqs[i].status = req.status; + }; + + // Submit asynchronous read requests. + for (size_t i = 0; i < kNumSectors; i++) { + void* cb_arg = static_cast(&(vals[i])); + ASSERT_OK(file->ReadAsync(reqs[i], opts, callback, cb_arg, + &(io_handles[i]), &del_fn, nullptr)); + } + + // Poll for the submitted requests. + fs->Poll(io_handles, kNumSectors); + + // Check the status of read requests. + for (size_t i = 0; i < kNumSectors; i++) { + if (i % 2) { + ASSERT_EQ(reqs[i].status, IOStatus::IOError()); + } else { + auto buf = NewAligned(kSectorSize * 8, static_cast(i + 1)); + Slice expected_data(buf.get(), kSectorSize); + + ASSERT_EQ(reqs[i].offset, i * kSectorSize); + ASSERT_OK(reqs[i].status); + ASSERT_EQ(expected_data.ToString(), reqs[i].result.ToString()); + } + } + + // Delete io_handles. + for (size_t i = 0; i < io_handles.size(); i++) { + del_fn(io_handles[i]); + } + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index d8b0c6189..13c459ee8 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -221,10 +221,10 @@ struct IODebugContext { } }; -// IOHandle is used by underlying file system to store any information it needs -// during Async Read requests. +// A function pointer type for custom destruction of void pointer passed to +// ReadAsync API. RocksDB/caller is responsible for deleting the void pointer +// allocated by FS in ReadAsync API. using IOHandleDeleter = std::function; -using IOHandle = std::unique_ptr; // The FileSystem, FSSequentialFile, FSRandomAccessFile, FSWritableFile, // FSRandomRWFileclass, and FSDIrectory classes define the interface between @@ -647,13 +647,15 @@ class FileSystem : public Customizable { // EXPERIMENTAL // Poll for completion of read IO requests. The Poll() method should call the - // callback functions to indicate completion of read requests. If Poll is not - // supported it means callee should be informed of IO completions via the - // callback on another thread. + // callback functions to indicate completion of read requests. + // Underlying FS is required to support Poll API. Poll implementation should + // ensure that the callback gets called at IO completion, and return only + // after the callback has been called. + // // // Default implementation is to return IOStatus::OK. - virtual IOStatus Poll(std::vector& /*io_handles*/, + virtual IOStatus Poll(std::vector& /*io_handles*/, size_t /*min_completions*/) { return IOStatus::OK(); } @@ -865,9 +867,13 @@ class FSRandomAccessFile { // cb_arg should be used by the callback to track the original request // submitted. // - // This API should also populate IOHandle which should be used by + // This API should also populate io_handle which should be used by // underlying FileSystem to store the context in order to distinguish the read - // requests at their side. + // requests at their side and provide the custom deletion function in del_fn. + // RocksDB guarantees that the del_fn for io_handle will be called after + // receiving the callback. Furthermore, RocksDB guarantees that if it calls + // the Poll API for this io_handle, del_fn will be called after the Poll + // returns. RocksDB is responsible for managing the lifetime of io_handle. // // req contains the request offset and size passed as input parameter of read // request and result and status fields are output parameter set by underlying @@ -877,7 +883,7 @@ class FSRandomAccessFile { virtual IOStatus ReadAsync( FSReadRequest& req, const IOOptions& opts, std::function cb, void* cb_arg, - IOHandle* /*io_handle*/, IODebugContext* dbg) { + void** /*io_handle*/, IOHandleDeleter* /*del_fn*/, IODebugContext* dbg) { req.status = Read(req.offset, req.len, opts, &(req.result), req.scratch, dbg); cb(req, cb_arg); @@ -1470,7 +1476,7 @@ class FileSystemWrapper : public FileSystem { const std::string& header) const override; #endif // ROCKSDB_LITE - virtual IOStatus Poll(std::vector& io_handles, + virtual IOStatus Poll(std::vector& io_handles, size_t min_completions) override { return target_->Poll(io_handles, min_completions); } @@ -1557,9 +1563,9 @@ class FSRandomAccessFileWrapper : public FSRandomAccessFile { } IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, std::function cb, - void* cb_arg, IOHandle* io_handle, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) override { - return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, dbg); + return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, dbg); } Temperature GetTemperature() const override { return target_->GetTemperature();