From 0153e14569c30f225d7a08050acbf10c4d211d41 Mon Sep 17 00:00:00 2001 From: anand76 Date: Wed, 5 Jun 2019 09:38:23 -0700 Subject: [PATCH] Add a MultiRead() method to Env (#5311) Summary: Define the Env:: MultiRead() method to allow callers to request multiple block reads in one shot. The underlying Env implementation can parallelize it if it chooses to in order to reduce the overall IO latency. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5311 Differential Revision: D15502172 Pulled By: anand1976 fbshipit-source-id: 2b228269c2e11b5f54694d6b2bb3119c8a8ce2b9 --- env/env_test.cc | 53 +++++++++++++++++++++++++++++++++++++++++++ include/rocksdb/env.h | 39 +++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/env/env_test.cc b/env/env_test.cc index 30d5b5282..a2b6db5c4 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -1105,6 +1105,59 @@ TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) { } } +TEST_P(EnvPosixTestWithParam, MultiRead) { + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + const size_t kSectorSize = 4096; + const size_t kNumSectors = 8; + + // Create file. + { + std::unique_ptr wfile; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) + if (soptions.use_direct_writes) { + soptions.use_direct_writes = false; + } +#endif + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + for (size_t i = 0; i < kNumSectors; ++i) { + auto data = NewAligned(kSectorSize * 8, static_cast(i + 1)); + Slice slice(data.get(), kSectorSize); + ASSERT_OK(wfile->Append(slice)); + } + ASSERT_OK(wfile->Close()); + } + + // Random Read + { + std::unique_ptr file; + std::vector reqs(3); + std::vector> data; + uint64_t offset = 0; + for (size_t i = 0; i < reqs.size(); ++i) { + reqs[i].offset = offset; + offset += 2 * kSectorSize; + reqs[i].len = kSectorSize; + data.emplace_back(NewAligned(kSectorSize, 0)); + reqs[i].scratch = data.back().get(); + } +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) + if (soptions.use_direct_reads) { + soptions.use_direct_reads = false; + } +#endif + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + ASSERT_OK(file->MultiRead(reqs.data(), reqs.size())); + for (size_t i = 0; i < reqs.size(); ++i) { + auto buf = NewAligned(kSectorSize * 8, static_cast(i*2 + 1)); + ASSERT_OK(reqs[i].status); + ASSERT_EQ(memcmp(reqs[i].scratch, buf.get(), kSectorSize), 0); + } + } +} + // Only works in linux platforms #ifdef OS_WIN TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index a8fe2fb78..0a055cea0 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -583,6 +583,26 @@ class SequentialFile { // SequentialFileWrapper too. }; +// A read IO request structure for use in MultiRead +struct ReadRequest { + // File offset in bytes + uint64_t offset; + + // Length to read in bytes + size_t len; + + // A buffer that MultiRead() can optionally place data in. It can + // ignore this and allocate its own buffer + char* scratch; + + // Output parameter set by MultiRead() to point to the data buffer, and + // the number of valid bytes + Slice result; + + // Status of read + Status status; +}; + // A file abstraction for randomly reading the contents of a file. class RandomAccessFile { public: @@ -607,6 +627,22 @@ class RandomAccessFile { return Status::OK(); } + // Read a bunch of blocks as described by reqs. The blocks can + // optionally be read in parallel. This is a synchronous call, i.e it + // should return after all reads have completed. The reads will be + // non-overlapping. If the function return Status is not ok, status of + // individual requests will be ignored and return status will be assumed + // for all read requests. The function return status is only meant for any + // any errors that occur before even processing specific read requests + virtual Status MultiRead(ReadRequest* reqs, size_t num_reqs) { + assert(reqs != nullptr); + for (size_t i = 0; i < num_reqs; ++i) { + ReadRequest& req = reqs[i]; + req.status = Read(req.offset, req.len, &req.result, req.scratch); + } + return Status::OK(); + } + // Tries to get an unique ID for this file that will be the same each time // the file is opened (and will stay the same while the file is open). // Furthermore, it tries to make this ID at most "max_size" bytes. If such an @@ -1357,6 +1393,9 @@ class RandomAccessFileWrapper : public RandomAccessFile { char* scratch) const override { return target_->Read(offset, n, result, scratch); } + Status MultiRead(ReadRequest* reqs, size_t num_reqs) override { + return target_->MultiRead(reqs, num_reqs); + } Status Prefetch(uint64_t offset, size_t n) override { return target_->Prefetch(offset, n); }