From dc2584eea0d9965f97d38d4d462a7b57750144d3 Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Wed, 11 Jan 2017 16:42:07 -0800 Subject: [PATCH] direct reads refactor Summary: direct IO reads refactoring remove unnecessary classes and unified interfaces tested with db_bench need more change for options and ON/OFF for different files. Since disabled is default, it should be fine now Closes https://github.com/facebook/rocksdb/pull/1636 Differential Revision: D4307189 Pulled By: lightmark fbshipit-source-id: 6991e22 --- include/rocksdb/env.h | 55 ++++++---- util/aligned_buffer.h | 10 ++ util/env_posix.cc | 96 ++++++++--------- util/env_test.cc | 36 +++---- util/file_reader_writer.cc | 99 ++++++++++++++++- util/file_reader_writer.h | 19 +++- util/io_posix.cc | 214 +++++++++++++------------------------ util/io_posix.h | 38 +------ 8 files changed, 298 insertions(+), 269 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 3e2458628..1dfd10c0a 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -51,6 +51,7 @@ struct ThreadStatus; using std::unique_ptr; using std::shared_ptr; +const size_t kDefaultPageSize = 4 * 1024; // Options while opening a file to read/write struct EnvOptions { @@ -429,12 +430,27 @@ class SequentialFile { // REQUIRES: External synchronization virtual Status Skip(uint64_t n) = 0; + // Indicates the upper layers if the current SequentialFile implementation + // uses direct IO. + virtual bool UseDirectIO() const { return false; } + + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } + // Remove any kind of caching of data from the offset to offset+length // of this file. If the length is 0, then it refers to the end of file. // If the system is not caching the file contents, then this is a noop. virtual Status InvalidateCache(size_t offset, size_t length) { return Status::NotSupported("InvalidateCache not supported."); } + + // Positioned Read for direct I/O + // If Direct I/O enabled, offset, n, and scratch should be properly aligned + virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) { + return Status::NotSupported(); + } }; // A file abstraction for randomly reading the contents of a file. @@ -452,6 +468,7 @@ class RandomAccessFile { // status. // // Safe for concurrent use by multiple threads. + // If Direct I/O enabled, offset, n, and scratch should be aligned properly. virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const = 0; @@ -489,6 +506,14 @@ class RandomAccessFile { virtual void Hint(AccessPattern pattern) {} + // Indicates the upper layers if the current RandomAccessFile implementation + // uses direct IO. + virtual bool UseDirectIO() const { return false; } + + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } + // Remove any kind of caching of data from the offset to offset+length // of this file. If the length is 0, then it refers to the end of file. // If the system is not caching the file contents, then this is a noop. @@ -509,19 +534,6 @@ class WritableFile { } virtual ~WritableFile(); - // Indicates if the class makes use of direct IO - // If true you must pass aligned buffer to Write() - virtual bool UseDirectIO() const { return false; } - - const size_t c_DefaultPageSize = 4 * 1024; - - // Use the returned alignment value to allocate - // aligned buffer for Write() when UseDirectIO() - // returns true - virtual size_t GetRequiredBufferAlignment() const { - return c_DefaultPageSize; - } - // Append data to the end of the file // Note: A WriteabelFile object must support either Append or // PositionedAppend, so the users cannot mix the two. @@ -578,6 +590,13 @@ class WritableFile { return false; } + // Indicates the upper layers if the current WritableFile implementation + // uses direct IO. + virtual bool UseDirectIO() const { return false; } + + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } /* * Change the priority in rate limiter if rate limiting is enabled. * If rate limiting is not enabled, this call has no effect. @@ -690,13 +709,9 @@ class RandomRWFile { // If false you must pass aligned buffer to Write() virtual bool UseDirectIO() const { return false; } - const size_t c_DefaultPageSize = 4 * 1024; - - // Use the returned alignment value to allocate aligned - // buffer for Write() when UseDirectIO() returns true - virtual size_t GetRequiredBufferAlignment() const { - return c_DefaultPageSize; - } + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } // Used by the file_reader_writer to decide if the ReadAhead wrapper // should simply forward the call and do not enact read_ahead buffering or locking. diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index 14e5d1234..3eeec5bad 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -58,6 +58,14 @@ public: AlignedBuffer& operator=(const AlignedBuffer&) = delete; + static bool isAligned(const void* ptr, size_t alignment) { + return reinterpret_cast(ptr) % alignment == 0; + } + + static bool isAligned(size_t n, size_t alignment) { + return n % alignment == 0; + } + size_t Alignment() const { return alignment_; } @@ -74,6 +82,8 @@ public: return bufstart_; } + char* BufferStart() { return bufstart_; } + void Clear() { cursize_ = 0; } diff --git a/util/env_posix.cc b/util/env_posix.cc index 3f7a13f1c..6a87a2deb 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -5,7 +5,7 @@ // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. +// found in the LICENSE file. See the AUTHORS file for names of contributors #include #include #include @@ -149,42 +149,45 @@ class PosixEnv : public Env { unique_ptr* result, const EnvOptions& options) override { result->reset(); - FILE* f = nullptr; + int fd = -1; + int flags = O_RDONLY; + FILE* file = nullptr; + + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifndef OS_MACOSX + flags |= O_DIRECT; +#endif + } + do { IOSTATS_TIMER_GUARD(open_nanos); - f = fopen(fname.c_str(), "r"); - } while (f == nullptr && errno == EINTR); - if (f == nullptr) { - *result = nullptr; + fd = open(fname.c_str(), flags, 0644); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { return IOError(fname, errno); - } else if (options.use_direct_reads && !options.use_mmap_writes) { - fclose(f); -#ifdef OS_MACOSX - int flags = O_RDONLY; -#else - int flags = O_RDONLY | O_DIRECT; - TEST_SYNC_POINT_CALLBACK("NewSequentialFile:O_DIRECT", &flags); -#endif - int fd = open(fname.c_str(), flags, 0644); - if (fd < 0) { - return IOError(fname, errno); - } + } + + SetFD_CLOEXEC(fd, &options); + + if (options.use_direct_reads && !options.use_mmap_reads) { #ifdef OS_MACOSX if (fcntl(fd, F_NOCACHE, 1) == -1) { close(fd); return IOError(fname, errno); } #endif - std::unique_ptr file( - new PosixDirectIOSequentialFile(fname, fd)); - *result = std::move(file); - return Status::OK(); } else { - int fd = fileno(f); - SetFD_CLOEXEC(fd, &options); - result->reset(new PosixSequentialFile(fname, f, options)); - return Status::OK(); + do { + IOSTATS_TIMER_GUARD(open_nanos); + file = fdopen(fd, "r"); + } while (file == nullptr && errno == EINTR); + if (file == nullptr) { + close(fd); + return IOError(fname, errno); + } } + result->reset(new PosixSequentialFile(fname, file, fd, options)); + return Status::OK(); } virtual Status NewRandomAccessFile(const std::string& fname, @@ -193,14 +196,24 @@ class PosixEnv : public Env { result->reset(); Status s; int fd; - { + int flags = O_RDONLY; + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifndef OS_MACOSX + flags |= O_DIRECT; + TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags); +#endif + } + + do { IOSTATS_TIMER_GUARD(open_nanos); - fd = open(fname.c_str(), O_RDONLY); + fd = open(fname.c_str(), flags, 0644); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + return IOError(fname, errno); } SetFD_CLOEXEC(fd, &options); - if (fd < 0) { - s = IOError(fname, errno); - } else if (options.use_mmap_reads && sizeof(void*) >= 8) { + + if (options.use_mmap_reads && sizeof(void*) >= 8) { // Use of mmap for random reads has been removed because it // kills performance when storage is fast. // Use mmap when virtual address-space is plentiful. @@ -216,30 +229,15 @@ class PosixEnv : public Env { } } close(fd); - } else if (options.use_direct_reads) { - close(fd); -#ifdef OS_MACOSX - int flags = O_RDONLY; -#else - int flags = O_RDONLY | O_DIRECT; - TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags); -#endif - fd = open(fname.c_str(), flags, 0644); - if (fd < 0) { - s = IOError(fname, errno); - } else { - std::unique_ptr file( - new PosixDirectIORandomAccessFile(fname, fd)); - *result = std::move(file); - s = Status::OK(); + } else { + if (options.use_direct_reads && !options.use_mmap_reads) { #ifdef OS_MACOSX if (fcntl(fd, F_NOCACHE, 1) == -1) { close(fd); - s = IOError(fname, errno); + return IOError(fname, errno); } #endif } - } else { result->reset(new PosixRandomAccessFile(fname, fd, options)); } return s; diff --git a/util/env_test.cc b/util/env_test.cc index 8a6f07bd8..6749d3b88 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -924,7 +924,7 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) { std::string fname = test::TmpDir(env_) + "/" + "testfile"; const size_t kSectorSize = 512; - auto data = NewAligned(kSectorSize, 'A'); + auto data = NewAligned(kSectorSize, 0); Slice slice(data.get(), kSectorSize); // Create file. @@ -932,11 +932,7 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) { unique_ptr wfile; #if !defined(OS_MACOSX) && !defined(OS_WIN) if (soptions.use_direct_writes) { - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "NewWritableFile:O_DIRECT", [&](void* arg) { - int* val = static_cast(arg); - *val &= ~O_DIRECT; - }); + soptions.use_direct_writes = false; } #endif ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); @@ -948,20 +944,16 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) { // Random Read { unique_ptr file; - char scratch[kSectorSize]; + auto scratch = NewAligned(kSectorSize, 0); Slice result; #if !defined(OS_MACOSX) && !defined(OS_WIN) if (soptions.use_direct_reads) { - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "NewRandomAccessFile:O_DIRECT", [&](void* arg) { - int* val = static_cast(arg); - *val &= ~O_DIRECT; - }); + soptions.use_direct_reads = false; } #endif ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); - ASSERT_OK(file->Read(0, kSectorSize, &result, scratch)); - ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0); + ASSERT_OK(file->Read(0, kSectorSize, &result, scratch.get())); + ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0); ASSERT_OK(file->InvalidateCache(0, 11)); ASSERT_OK(file->InvalidateCache(0, 0)); } @@ -969,20 +961,20 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) { // Sequential Read { unique_ptr file; - char scratch[kSectorSize]; + auto scratch = NewAligned(kSectorSize, 0); Slice result; #if !defined(OS_MACOSX) && !defined(OS_WIN) if (soptions.use_direct_reads) { - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "NewSequentialFile:O_DIRECT", [&](void* arg) { - int* val = static_cast(arg); - *val &= ~O_DIRECT; - }); + soptions.use_direct_reads = false; } #endif ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions)); - ASSERT_OK(file->Read(kSectorSize, &result, scratch)); - ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0); + if (file->UseDirectIO()) { + ASSERT_OK(file->PositionedRead(0, kSectorSize, &result, scratch.get())); + } else { + ASSERT_OK(file->Read(kSectorSize, &result, scratch.get())); + } + ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0); ASSERT_OK(file->InvalidateCache(0, 11)); ASSERT_OK(file->InvalidateCache(0, 0)); } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 34b3aab50..55d408e7a 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -22,12 +22,64 @@ namespace rocksdb { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { - Status s = file_->Read(n, result, scratch); + Status s; + if (UseDirectIO()) { + size_t offset = offset_.fetch_add(n); + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + size_t r = 0; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + Slice tmp; + s = file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart()); + if (s.ok() && offset_advance < tmp.size()) { + buf.Size(tmp.size()); + r = buf.Read(scratch, offset_advance, + std::min(tmp.size() - offset_advance, n)); + } + *result = Slice(scratch, r); + } else { + s = file_->Read(n, result, scratch); + } IOSTATS_ADD(bytes_read, result->size()); return s; } -Status SequentialFileReader::Skip(uint64_t n) { return file_->Skip(n); } +Status SequentialFileReader::DirectRead(size_t n, Slice* result, + char* scratch) { + size_t offset = offset_.fetch_add(n); + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + Slice tmp; + Status s = + file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart()); + if (s.ok()) { + buf.Size(tmp.size()); + size_t r = buf.Read(scratch, offset_advance, + tmp.size() <= offset_advance + ? 0 + : std::min(tmp.size() - offset_advance, n)); + *result = Slice(scratch, r); + } + return s; +} + +Status SequentialFileReader::Skip(uint64_t n) { + if (UseDirectIO()) { + offset_ += n; + return Status::OK(); + } else { + return file_->Skip(n); + } +} Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { @@ -37,7 +89,26 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, StopWatch sw(env_, stats_, hist_type_, (stats_ != nullptr) ? &elapsed : nullptr); IOSTATS_TIMER_GUARD(read_nanos); - s = file_->Read(offset, n, result, scratch); + if (UseDirectIO()) { + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + size_t r = 0; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + Slice tmp; + s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart()); + if (s.ok() && offset_advance < tmp.size()) { + buf.Size(tmp.size()); + r = buf.Read(scratch, offset_advance, + std::min(tmp.size() - offset_advance, n)); + } + *result = Slice(scratch, r); + } else { + s = file_->Read(offset, n, result, scratch); + } IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); } if (stats_ != nullptr && file_read_hist_ != nullptr) { @@ -46,6 +117,28 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, return s; } +Status RandomAccessFileReader::DirectRead(uint64_t offset, size_t n, + Slice* result, char* scratch) const { + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + Slice tmp; + Status s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart()); + if (s.ok()) { + buf.Size(tmp.size()); + size_t r = buf.Read(scratch, offset_advance, + tmp.size() <= offset_advance + ? 0 + : std::min(tmp.size() - offset_advance, n)); + *result = Slice(scratch, r); + } + return s; +} + Status WritableFileWriter::Append(const Slice& data) { const char* src = data.data(); size_t left = data.size(); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 6f01159db..c38f386a7 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -7,10 +7,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include +#include "port/port.h" #include "rocksdb/env.h" #include "util/aligned_buffer.h" -#include "port/port.h" namespace rocksdb { @@ -23,10 +24,11 @@ std::unique_ptr NewReadaheadRandomAccessFile( class SequentialFileReader { private: std::unique_ptr file_; + std::atomic offset_; // read offset public: explicit SequentialFileReader(std::unique_ptr&& _file) - : file_(std::move(_file)) {} + : file_(std::move(_file)), offset_(0) {} SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); @@ -45,6 +47,11 @@ class SequentialFileReader { Status Skip(uint64_t n); SequentialFile* file() { return file_.get(); } + + bool UseDirectIO() const { return file_->UseDirectIO(); } + + protected: + Status DirectRead(size_t n, Slice* result, char* scratch); }; class RandomAccessFileReader { @@ -86,6 +93,12 @@ class RandomAccessFileReader { Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const; RandomAccessFile* file() { return file_.get(); } + + bool UseDirectIO() const { return file_->UseDirectIO(); } + + protected: + Status DirectRead(uint64_t offset, size_t n, Slice* result, + char* scratch) const; }; // Use posix write to write data to a file. @@ -152,6 +165,8 @@ class WritableFileWriter { WritableFile* writable_file() const { return writable_file_.get(); } + bool UseDirectIO() { return writable_file_->UseDirectIO(); } + private: // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode diff --git a/util/io_posix.cc b/util/io_posix.cc index 299de8984..eb8287708 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -58,28 +58,6 @@ const size_t kPageSize = sysconf(_SC_PAGESIZE); const size_t kPageSize = 4 * 1024; #endif -std::unique_ptr NewAligned(const size_t size) { - void* ptr = nullptr; - if (posix_memalign(&ptr, 4 * 1024, size) != 0) { - return std::unique_ptr(nullptr, free); - } - std::unique_ptr uptr(ptr, free); - return uptr; -} - -size_t Upper(const size_t size, const size_t fac) { - if (size % fac == 0) { - return size; - } - return size + (fac - size % fac); -} - -size_t Lower(const size_t size, const size_t fac) { - if (size % fac == 0) { - return size; - } - return size - (size % fac); -} bool IsSectorAligned(const size_t off) { return off % kSectorSize == 0; } @@ -87,89 +65,32 @@ static bool IsPageAligned(const void* ptr) { return uintptr_t(ptr) % (kPageSize) == 0; } -Status ReadAligned(int fd, Slice* data, const uint64_t offset, - const size_t size, char* scratch) { - assert(IsSectorAligned(offset)); - assert(IsSectorAligned(size)); - assert(IsPageAligned(scratch)); - - size_t bytes_read = 0; - ssize_t status = -1; - while (bytes_read < size) { - status = - pread(fd, scratch + bytes_read, size - bytes_read, offset + bytes_read); - if (status <= 0) { - if (errno == EINTR) { - continue; - } - break; - } - bytes_read += status; - if (status % static_cast(kSectorSize) != 0) { - // Bytes reads don't fill sectors. Should only happen at the end - // of the file. - break; - } - } - - *data = Slice(scratch, bytes_read); - return status < 0 ? Status::IOError(strerror(errno)) : Status::OK(); } -Status ReadUnaligned(int fd, Slice* data, const uint64_t offset, - const size_t size, char* scratch) { - assert(scratch); - assert(!IsSectorAligned(offset) || !IsSectorAligned(size) || - !IsPageAligned(scratch)); - - const uint64_t aligned_off = Lower(offset, kSectorSize); - const size_t aligned_size = Upper(size + (offset - aligned_off), kSectorSize); - auto aligned_scratch = NewAligned(aligned_size); - assert(aligned_scratch); - if (!aligned_scratch) { - return Status::IOError("Unable to allocate"); - } - - assert(IsSectorAligned(aligned_off)); - assert(IsSectorAligned(aligned_size)); - assert(aligned_scratch); - assert(IsPageAligned(aligned_scratch.get())); - assert(offset + size <= aligned_off + aligned_size); - - Slice scratch_slice; - Status s = ReadAligned(fd, &scratch_slice, aligned_off, aligned_size, - reinterpret_cast(aligned_scratch.get())); - - // copy data upto min(size, what was read) - memcpy(scratch, reinterpret_cast(aligned_scratch.get()) + - (offset % kSectorSize), - std::min(size, scratch_slice.size())); - *data = Slice(scratch, std::min(size, scratch_slice.size())); - return s; -} - -Status DirectIORead(int fd, Slice* result, size_t off, size_t n, - char* scratch) { - if (IsSectorAligned(off) && IsSectorAligned(n) && IsPageAligned(scratch)) { - return ReadAligned(fd, result, off, n, scratch); - } - return ReadUnaligned(fd, result, off, n, scratch); -} -} // namespace - /* * PosixSequentialFile */ -PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* f, - const EnvOptions& options) +PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file, + int fd, const EnvOptions& options) : filename_(fname), - file_(f), - fd_(fileno(f)), - use_direct_io_(options.use_direct_reads) {} + file_(file), + fd_(fd), + use_direct_io_(options.use_direct_reads) { + assert(!options.use_direct_reads || !options.use_mmap_reads); +} -PosixSequentialFile::~PosixSequentialFile() { fclose(file_); } +PosixSequentialFile::~PosixSequentialFile() { + if (!UseDirectIO()) { + assert(file_); + fclose(file_); + } else { + assert(fd_); + close(fd_); + } +} Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) { + assert(result != nullptr && !UseDirectIO()); Status s; size_t r = 0; do { @@ -187,11 +108,41 @@ Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) { s = IOError(filename_, errno); } } - if (use_direct_io_) { - // we need to fadvise away the entire range of pages because - // we do not want readahead pages to be cached. - Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages + // we need to fadvise away the entire range of pages because + // we do not want readahead pages to be cached under buffered io + Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages + return s; +} + +Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n, + Slice* result, char* scratch) { + Status s; + ssize_t r = -1; + size_t left = n; + char* ptr = scratch; + assert(UseDirectIO()); + while (left > 0) { + r = pread(fd_, ptr, left, static_cast(offset)); + if (r <= 0) { + if (r == -1 && errno == EINTR) { + continue; + } + break; + } + ptr += r; + offset += r; + left -= r; + if (r % static_cast(GetRequiredBufferAlignment()) != 0) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + break; + } + } + if (r < 0) { + // An error: return a non-ok status + s = IOError(filename_, errno); } + *result = Slice(scratch, (r < 0) ? 0 : n - left); return s; } @@ -206,32 +157,15 @@ Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) { #ifndef OS_LINUX return Status::OK(); #else - // free OS pages - int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); - if (ret == 0) { - return Status::OK(); + if (!UseDirectIO()) { + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret != 0) { + return IOError(filename_, errno); + } } - return IOError(filename_, errno); -#endif -} - -/* - * PosixDirectIOSequentialFile - */ -Status PosixDirectIOSequentialFile::Read(size_t n, Slice* result, - char* scratch) { - const size_t off = off_.fetch_add(n); - return DirectIORead(fd_, result, off, n, scratch); -} - -Status PosixDirectIOSequentialFile::Skip(uint64_t n) { - off_ += n; - return Status::OK(); -} - -Status PosixDirectIOSequentialFile::InvalidateCache(size_t /*offset*/, - size_t /*length*/) { return Status::OK(); +#endif } /* @@ -295,6 +229,7 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd, const EnvOptions& options) : filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads) { + assert(!options.use_direct_reads || !options.use_mmap_reads); assert(!options.use_mmap_reads || sizeof(void*) < 8); } @@ -308,9 +243,8 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, char* ptr = scratch; while (left > 0) { r = pread(fd_, ptr, left, static_cast(offset)); - if (r <= 0) { - if (errno == EINTR) { + if (r == -1 && errno == EINTR) { continue; } break; @@ -318,19 +252,23 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, ptr += r; offset += r; left -= r; + if (UseDirectIO() && + r % static_cast(GetRequiredBufferAlignment()) != 0) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + break; + } } - - *result = Slice(scratch, (r < 0) ? 0 : n - left); if (r < 0) { // An error: return a non-ok status s = IOError(filename_, errno); } - - if (use_direct_io_) { + if (!UseDirectIO()) { // we need to fadvise away the entire range of pages because // we do not want readahead pages to be cached. Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages } + *result = Slice(scratch, (r < 0) ? 0 : n - left); return s; } @@ -341,6 +279,9 @@ size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const { #endif void PosixRandomAccessFile::Hint(AccessPattern pattern) { + if (UseDirectIO()) { + return; + } switch (pattern) { case NORMAL: Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL); @@ -364,6 +305,9 @@ void PosixRandomAccessFile::Hint(AccessPattern pattern) { } Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) { + if (UseDirectIO()) { + return Status::OK(); + } #ifndef OS_LINUX return Status::OK(); #else @@ -376,15 +320,6 @@ Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) { #endif } -/* - * PosixDirectIORandomAccessFile - */ -Status PosixDirectIORandomAccessFile::Read(uint64_t offset, size_t n, - Slice* result, char* scratch) const { - Status s = DirectIORead(fd_, result, offset, n, scratch); - return s; -} - /* * PosixMmapReadableFile * @@ -467,7 +402,6 @@ Status PosixMmapFile::UnmapCurrentRegion() { Status PosixMmapFile::MapNewRegion() { #ifdef ROCKSDB_FALLOCATE_PRESENT assert(base_ == nullptr); - TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds); // we can't fallocate with FALLOC_FL_KEEP_SIZE here if (allow_fallocate_) { diff --git a/util/io_posix.h b/util/io_posix.h index 4f01e86b0..a6023440c 100644 --- a/util/io_posix.h +++ b/util/io_posix.h @@ -44,30 +44,16 @@ class PosixSequentialFile : public SequentialFile { bool use_direct_io_; public: - PosixSequentialFile(const std::string& fname, FILE* f, + PosixSequentialFile(const std::string& fname, FILE* file, int fd, const EnvOptions& options); virtual ~PosixSequentialFile(); virtual Status Read(size_t n, Slice* result, char* scratch) override; + virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override; virtual Status Skip(uint64_t n) override; virtual Status InvalidateCache(size_t offset, size_t length) override; -}; - -class PosixDirectIOSequentialFile : public SequentialFile { - public: - explicit PosixDirectIOSequentialFile(const std::string& filename, int fd) - : filename_(filename), fd_(fd) {} - - virtual ~PosixDirectIOSequentialFile() {} - - Status Read(size_t n, Slice* result, char* scratch) override; - Status Skip(uint64_t n) override; - Status InvalidateCache(size_t offset, size_t length) override; - - private: - const std::string filename_; - int fd_ = -1; - std::atomic off_{0}; // read offset + virtual bool UseDirectIO() const override { return use_direct_io_; } }; class PosixRandomAccessFile : public RandomAccessFile { @@ -88,21 +74,7 @@ class PosixRandomAccessFile : public RandomAccessFile { #endif virtual void Hint(AccessPattern pattern) override; virtual Status InvalidateCache(size_t offset, size_t length) override; -}; - -// Direct IO random access file direct IO implementation -class PosixDirectIORandomAccessFile : public PosixRandomAccessFile { - public: - explicit PosixDirectIORandomAccessFile(const std::string& filename, int fd) - : PosixRandomAccessFile(filename, fd, EnvOptions()) {} - virtual ~PosixDirectIORandomAccessFile() {} - - Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override; - virtual void Hint(AccessPattern pattern) override {} - Status InvalidateCache(size_t offset, size_t length) override { - return Status::OK(); - } + virtual bool UseDirectIO() const override { return use_direct_io_; } }; class PosixWritableFile : public WritableFile {