From e3a82bb9345220c85894aaa67ccf95bd9eadd4a2 Mon Sep 17 00:00:00 2001 From: sdong Date: Sat, 7 Dec 2019 20:54:27 -0800 Subject: [PATCH] PosixRandomAccessFile::MultiRead() to use I/O uring if supported (#5881) Summary: Right now, PosixRandomAccessFile::MultiRead() executes read requests in parallel. In this PR, it leverages I/O Uring library to run it in parallel, even when page cache is enabled. This function will fall back if the kernel version doesn't support it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5881 Test Plan: Run the unit test on a kernel version supporting it and make sure all tests pass, and run a unit test on kernel version supporting it and see it pass. Before merging, will also run stress test and see it passes. Differential Revision: D17742266 fbshipit-source-id: e05699c925ac04fdb42379456a4e23e4ebcb803a --- TARGETS | 3 + buckifier/targets_cfg.py | 3 + build_tools/build_detect_platform | 13 +++ build_tools/dependencies_platform007.sh | 1 + build_tools/fbcode_config_platform007.sh | 17 +++- build_tools/update_dependencies.sh | 1 + env/env_posix.cc | 29 +++++- env/io_posix.cc | 107 ++++++++++++++++++++++- env/io_posix.h | 37 +++++++- 9 files changed, 202 insertions(+), 9 deletions(-) diff --git a/TARGETS b/TARGETS index ab1f24cd7..2546b99aa 100644 --- a/TARGETS +++ b/TARGETS @@ -26,6 +26,7 @@ ROCKSDB_EXTERNAL_DEPS = [ ("lz4", None, "lz4"), ("zstd", None), ("tbb", None), + ("liburing", None, "uring"), ("googletest", None, "gtest"), ] @@ -46,6 +47,7 @@ ROCKSDB_OS_PREPROCESSOR_FLAGS = [ "-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX", "-DROCKSDB_RANGESYNC_PRESENT", "-DROCKSDB_SCHED_GETCPU_PRESENT", + "-DROCKSDB_IOURING_PRESENT", "-DHAVE_SSE42", "-DNUMA", ], @@ -70,6 +72,7 @@ ROCKSDB_PREPROCESSOR_FLAGS = [ "-DZSTD_STATIC_LINKING_ONLY", "-DGFLAGS=gflags", "-DTBB", + "-DLIBURING", # Added missing flags from output of build_detect_platform "-DROCKSDB_BACKTRACE", diff --git a/buckifier/targets_cfg.py b/buckifier/targets_cfg.py index 0ecd6fdda..81d3babfd 100644 --- a/buckifier/targets_cfg.py +++ b/buckifier/targets_cfg.py @@ -32,6 +32,7 @@ ROCKSDB_EXTERNAL_DEPS = [ ("lz4", None, "lz4"), ("zstd", None), ("tbb", None), + ("liburing", None, "uring"), ("googletest", None, "gtest"), ] @@ -52,6 +53,7 @@ ROCKSDB_OS_PREPROCESSOR_FLAGS = [ "-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX", "-DROCKSDB_RANGESYNC_PRESENT", "-DROCKSDB_SCHED_GETCPU_PRESENT", + "-DROCKSDB_IOURING_PRESENT", "-DHAVE_SSE42", "-DNUMA", ], @@ -76,6 +78,7 @@ ROCKSDB_PREPROCESSOR_FLAGS = [ "-DZSTD_STATIC_LINKING_ONLY", "-DGFLAGS=gflags", "-DTBB", + "-DLIBURING", # Added missing flags from output of build_detect_platform "-DROCKSDB_BACKTRACE", diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 45fdbe258..36a013f6f 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -150,6 +150,19 @@ case "$TARGET_OS" in PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -latomic" fi PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lpthread -lrt" + # check for liburing + $CXX $CFLAGS -x c++ - -luring -o /dev/null 2>/dev/null < + int main() { + struct io_uring ring; + io_uring_queue_init(1, &ring, 0); + return 0; + } +EOF + if [ "$?" = 0 ]; then + PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -luring" + COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_IOURING_PRESENT" + fi if test -z "$USE_FOLLY_DISTRIBUTED_MUTEX"; then USE_FOLLY_DISTRIBUTED_MUTEX=1 fi diff --git a/build_tools/dependencies_platform007.sh b/build_tools/dependencies_platform007.sh index 1de8e785a..e95f2979e 100644 --- a/build_tools/dependencies_platform007.sh +++ b/build_tools/dependencies_platform007.sh @@ -13,6 +13,7 @@ JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b86 NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9 TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d +LIBURING_BASE=/mnt/gvfs/third-party2/liburing/79427253fd0d42677255aacfe6d13bfe63f752eb/20190828/platform007/ca4da3d KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614 VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d diff --git a/build_tools/fbcode_config_platform007.sh b/build_tools/fbcode_config_platform007.sh index 51edf134f..60b51800a 100644 --- a/build_tools/fbcode_config_platform007.sh +++ b/build_tools/fbcode_config_platform007.sh @@ -86,6 +86,15 @@ else fi CFLAGS+=" -DTBB" +# location of LIBURING +LIBURING_INCLUDE=" -isystem $LIBURING_BASE/include/" +if test -z $PIC_BUILD; then + LIBURING_LIBS="$LIBURING_BASE/lib/liburing.a" +else + LIBURING_LIBS="$LIBURING_BASE/lib/liburing_pic.a" +fi +CFLAGS+=" -DLIBURING" + test "$USE_SSE" || USE_SSE=1 export USE_SSE test "$PORTABLE" || PORTABLE=1 @@ -94,7 +103,7 @@ export PORTABLE BINUTILS="$BINUTILS_BASE/bin" AR="$BINUTILS/ar" -DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE" +DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE $LIBURING_INCLUDE" STDLIBS="-L $GCC_BASE/lib64" @@ -135,10 +144,10 @@ else fi CFLAGS+=" $DEPS_INCLUDE" -CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42" +CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42 -DROCKSDB_IOURING_PRESENT" CXXFLAGS+=" $CFLAGS" -EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS" +EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS $LIBURING_LIBS" EXEC_LDFLAGS+=" -B$BINUTILS/gold" EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so" EXEC_LDFLAGS+=" $LIBUNWIND" @@ -148,7 +157,7 @@ EXEC_LDFLAGS+=" -ldl" PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++" -EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS" +EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS $LIBURING_LIBS" VALGRIND_VER="$VALGRIND_BASE/bin/" diff --git a/build_tools/update_dependencies.sh b/build_tools/update_dependencies.sh index dbc95a6e5..99099a406 100755 --- a/build_tools/update_dependencies.sh +++ b/build_tools/update_dependencies.sh @@ -92,6 +92,7 @@ get_lib_base jemalloc LATEST platform007 get_lib_base numa LATEST platform007 get_lib_base libunwind LATEST platform007 get_lib_base tbb LATEST platform007 +get_lib_base liburing LATEST platform007 get_lib_base kernel-headers fb platform007 get_lib_base binutils LATEST centos7-native diff --git a/env/env_posix.cc b/env/env_posix.cc index 83e209bf1..0723e1832 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -16,6 +16,9 @@ #if defined(OS_LINUX) #include #endif +#if defined(ROCKSDB_IOURING_PRESENT) +#include +#endif #include #include #include @@ -32,6 +35,9 @@ #include #include #include +#if defined(ROCKSDB_IOURING_PRESENT) +#include +#endif #include #include // Get nano time includes @@ -286,7 +292,12 @@ class PosixEnv : public Env { } #endif } - result->reset(new PosixRandomAccessFile(fname, fd, options)); + result->reset(new PosixRandomAccessFile(fname, fd, options +#if defined(ROCKSDB_IOURING_PRESENT) + , + thread_local_io_urings_.get() +#endif + )); } return s; } @@ -1105,6 +1116,11 @@ class PosixEnv : public Env { #endif } +#if defined(ROCKSDB_IOURING_PRESENT) + // io_uring instance + std::unique_ptr thread_local_io_urings_; +#endif + size_t page_size_; std::vector thread_pools_; @@ -1129,6 +1145,17 @@ PosixEnv::PosixEnv() thread_pools_[pool_id].SetHostEnv(this); } thread_status_updater_ = CreateThreadStatusUpdater(); + +#if defined(ROCKSDB_IOURING_PRESENT) + // Test whether IOUring is supported, and if it does, create a managing + // object for thread local point so that in the future thread-local + // io_uring can be created. + struct io_uring* new_io_uring = CreateIOUring(); + if (new_io_uring != nullptr) { + thread_local_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring)); + delete new_io_uring; + } +#endif } void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, diff --git a/env/io_posix.cc b/env/io_posix.cc index 3572d7cc9..01cfc2aae 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -34,6 +34,7 @@ #include "port/port.h" #include "rocksdb/slice.h" #include "test_util/sync_point.h" +#include "util/autovector.h" #include "util/coding.h" #include "util/string_util.h" @@ -409,12 +410,22 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { * * pread() based random-access */ -PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd, - const EnvOptions& options) +PosixRandomAccessFile::PosixRandomAccessFile( + const std::string& fname, int fd, const EnvOptions& options +#if defined(ROCKSDB_IOURING_PRESENT) + , + ThreadLocalPtr* thread_local_io_urings +#endif + ) : filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads), - logical_sector_size_(GetLogicalBufferSize(fd_)) { + logical_sector_size_(GetLogicalBufferSize(fd_)) +#if defined(ROCKSDB_IOURING_PRESENT) + , + thread_local_io_urings_(thread_local_io_urings) +#endif +{ assert(!options.use_direct_reads || !options.use_mmap_reads); assert(!options.use_mmap_reads || sizeof(void*) < 8); } @@ -460,6 +471,96 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, return s; } +Status PosixRandomAccessFile::MultiRead(ReadRequest* reqs, size_t num_reqs) { +#if defined(ROCKSDB_IOURING_PRESENT) + size_t reqs_off; + ssize_t ret __attribute__((__unused__)); + + struct io_uring* iu = nullptr; + if (thread_local_io_urings_) { + iu = static_cast(thread_local_io_urings_->Get()); + if (iu == nullptr) { + iu = CreateIOUring(); + if (iu != nullptr) { + thread_local_io_urings_->Reset(iu); + } + } + } + + // Init failed, platform doesn't support io_uring. Fall back to + // serialized reads + if (iu == nullptr) { + return RandomAccessFile::MultiRead(reqs, num_reqs); + } + + struct WrappedReadRequest { + ReadRequest* req; + struct iovec iov; + explicit WrappedReadRequest(ReadRequest* r) : req(r) {} + }; + + autovector req_wraps; + + for (size_t i = 0; i < num_reqs; i++) { + req_wraps.emplace_back(&reqs[i]); + } + + reqs_off = 0; + while (num_reqs) { + size_t this_reqs = num_reqs; + + // If requests exceed depth, split it into batches + if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth; + + for (size_t i = 0; i < this_reqs; i++) { + size_t index = i + reqs_off; + struct io_uring_sqe* sqe; + + sqe = io_uring_get_sqe(iu); + req_wraps[index].iov.iov_base = reqs[index].scratch; + req_wraps[index].iov.iov_len = reqs[index].len; + reqs[index].result = reqs[index].scratch; + io_uring_prep_readv(sqe, fd_, &req_wraps[index].iov, 1, + reqs[index].offset); + io_uring_sqe_set_data(sqe, &req_wraps[index]); + } + + ret = io_uring_submit_and_wait(iu, static_cast(this_reqs)); + if (static_cast(ret) != this_reqs) { + fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs); + } + assert(static_cast(ret) == this_reqs); + + for (size_t i = 0; i < this_reqs; i++) { + struct io_uring_cqe* cqe; + WrappedReadRequest* req_wrap; + + // We could use the peek variant here, but this seems safer in terms + // of our initial wait not reaping all completions + ret = io_uring_wait_cqe(iu, &cqe); + assert(!ret); + req_wrap = static_cast(io_uring_cqe_get_data(cqe)); + ReadRequest* req = req_wrap->req; + if (static_cast(cqe->res) == req_wrap->iov.iov_len) { + req->result = Slice(req->scratch, cqe->res); + req->status = Status::OK(); + } else if (cqe->res >= 0) { + req->result = Slice(req->scratch, req_wrap->iov.iov_len - cqe->res); + } else { + req->result = Slice(req->scratch, 0); + req->status = IOError("Req failed", filename_, cqe->res); + } + io_uring_cqe_seen(iu, cqe); + } + num_reqs -= this_reqs; + reqs_off += this_reqs; + } + return Status::OK(); +#else + return RandomAccessFile::MultiRead(reqs, num_reqs); +#endif +} + Status PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n) { Status s; if (!use_direct_io()) { diff --git a/env/io_posix.h b/env/io_posix.h index 815be8022..76d21ebe8 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -8,10 +8,15 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include +#if defined(ROCKSDB_IOURING_PRESENT) +#include +#include +#endif #include #include #include #include "rocksdb/env.h" +#include "util/thread_local.h" // For non linux platform, the following macros are used only as place // holder. @@ -79,21 +84,51 @@ class PosixSequentialFile : public SequentialFile { } }; +#if defined(ROCKSDB_IOURING_PRESENT) +// io_uring instance queue depth +const unsigned int kIoUringDepth = 256; + +inline void DeleteIOUring(void* p) { + struct io_uring* iu = static_cast(p); + delete iu; +} + +inline struct io_uring* CreateIOUring() { + struct io_uring* new_io_uring = new struct io_uring; + int ret = io_uring_queue_init(kIoUringDepth, new_io_uring, 0); + if (ret) { + delete new_io_uring; + new_io_uring = nullptr; + } + return new_io_uring; +} +#endif // defined(ROCKSDB_IOURING_PRESENT) + class PosixRandomAccessFile : public RandomAccessFile { protected: std::string filename_; int fd_; bool use_direct_io_; size_t logical_sector_size_; +#if defined(ROCKSDB_IOURING_PRESENT) + ThreadLocalPtr* thread_local_io_urings_; +#endif public: PosixRandomAccessFile(const std::string& fname, int fd, - const EnvOptions& options); + const EnvOptions& options +#if defined(ROCKSDB_IOURING_PRESENT) + , + ThreadLocalPtr* thread_local_io_urings +#endif + ); virtual ~PosixRandomAccessFile(); virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override; + virtual Status MultiRead(ReadRequest* reqs, size_t num_reqs) override; + virtual Status Prefetch(uint64_t offset, size_t n) override; #if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)