direct reads refactor

Summary:
direct IO reads refactoring
remove unnecessary classes and unified interfaces
tested with db_bench

need more change for options and ON/OFF for different files.
Since disabled is default, it should be fine now
Closes https://github.com/facebook/rocksdb/pull/1636

Differential Revision: D4307189

Pulled By: lightmark

fbshipit-source-id: 6991e22
main
Aaron Gao 8 years ago committed by Facebook Github Bot
parent d18dd2c41f
commit dc2584eea0
  1. 55
      include/rocksdb/env.h
  2. 10
      util/aligned_buffer.h
  3. 90
      util/env_posix.cc
  4. 36
      util/env_test.cc
  5. 97
      util/file_reader_writer.cc
  6. 19
      util/file_reader_writer.h
  7. 202
      util/io_posix.cc
  8. 38
      util/io_posix.h

@ -51,6 +51,7 @@ struct ThreadStatus;
using std::unique_ptr; using std::unique_ptr;
using std::shared_ptr; using std::shared_ptr;
const size_t kDefaultPageSize = 4 * 1024;
// Options while opening a file to read/write // Options while opening a file to read/write
struct EnvOptions { struct EnvOptions {
@ -429,12 +430,27 @@ class SequentialFile {
// REQUIRES: External synchronization // REQUIRES: External synchronization
virtual Status Skip(uint64_t n) = 0; virtual Status Skip(uint64_t n) = 0;
// Indicates the upper layers if the current SequentialFile implementation
// uses direct IO.
virtual bool UseDirectIO() const { return false; }
// Use the returned alignment value to allocate
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
// Remove any kind of caching of data from the offset to offset+length // Remove any kind of caching of data from the offset to offset+length
// of this file. If the length is 0, then it refers to the end of file. // of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop. // If the system is not caching the file contents, then this is a noop.
virtual Status InvalidateCache(size_t offset, size_t length) { virtual Status InvalidateCache(size_t offset, size_t length) {
return Status::NotSupported("InvalidateCache not supported."); return Status::NotSupported("InvalidateCache not supported.");
} }
// Positioned Read for direct I/O
// If Direct I/O enabled, offset, n, and scratch should be properly aligned
virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result,
char* scratch) {
return Status::NotSupported();
}
}; };
// A file abstraction for randomly reading the contents of a file. // A file abstraction for randomly reading the contents of a file.
@ -452,6 +468,7 @@ class RandomAccessFile {
// status. // status.
// //
// Safe for concurrent use by multiple threads. // Safe for concurrent use by multiple threads.
// If Direct I/O enabled, offset, n, and scratch should be aligned properly.
virtual Status Read(uint64_t offset, size_t n, Slice* result, virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const = 0; char* scratch) const = 0;
@ -489,6 +506,14 @@ class RandomAccessFile {
virtual void Hint(AccessPattern pattern) {} virtual void Hint(AccessPattern pattern) {}
// Indicates the upper layers if the current RandomAccessFile implementation
// uses direct IO.
virtual bool UseDirectIO() const { return false; }
// Use the returned alignment value to allocate
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
// Remove any kind of caching of data from the offset to offset+length // Remove any kind of caching of data from the offset to offset+length
// of this file. If the length is 0, then it refers to the end of file. // of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop. // If the system is not caching the file contents, then this is a noop.
@ -509,19 +534,6 @@ class WritableFile {
} }
virtual ~WritableFile(); virtual ~WritableFile();
// Indicates if the class makes use of direct IO
// If true you must pass aligned buffer to Write()
virtual bool UseDirectIO() const { return false; }
const size_t c_DefaultPageSize = 4 * 1024;
// Use the returned alignment value to allocate
// aligned buffer for Write() when UseDirectIO()
// returns true
virtual size_t GetRequiredBufferAlignment() const {
return c_DefaultPageSize;
}
// Append data to the end of the file // Append data to the end of the file
// Note: A WriteabelFile object must support either Append or // Note: A WriteabelFile object must support either Append or
// PositionedAppend, so the users cannot mix the two. // PositionedAppend, so the users cannot mix the two.
@ -578,6 +590,13 @@ class WritableFile {
return false; return false;
} }
// Indicates the upper layers if the current WritableFile implementation
// uses direct IO.
virtual bool UseDirectIO() const { return false; }
// Use the returned alignment value to allocate
// aligned buffer for Direct I/O
virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
/* /*
* Change the priority in rate limiter if rate limiting is enabled. * Change the priority in rate limiter if rate limiting is enabled.
* If rate limiting is not enabled, this call has no effect. * If rate limiting is not enabled, this call has no effect.
@ -690,13 +709,9 @@ class RandomRWFile {
// If false you must pass aligned buffer to Write() // If false you must pass aligned buffer to Write()
virtual bool UseDirectIO() const { return false; } virtual bool UseDirectIO() const { return false; }
const size_t c_DefaultPageSize = 4 * 1024; // Use the returned alignment value to allocate
// aligned buffer for Direct I/O
// Use the returned alignment value to allocate aligned virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; }
// buffer for Write() when UseDirectIO() returns true
virtual size_t GetRequiredBufferAlignment() const {
return c_DefaultPageSize;
}
// Used by the file_reader_writer to decide if the ReadAhead wrapper // Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact read_ahead buffering or locking. // should simply forward the call and do not enact read_ahead buffering or locking.

@ -58,6 +58,14 @@ public:
AlignedBuffer& operator=(const AlignedBuffer&) = delete; AlignedBuffer& operator=(const AlignedBuffer&) = delete;
static bool isAligned(const void* ptr, size_t alignment) {
return reinterpret_cast<uintptr_t>(ptr) % alignment == 0;
}
static bool isAligned(size_t n, size_t alignment) {
return n % alignment == 0;
}
size_t Alignment() const { size_t Alignment() const {
return alignment_; return alignment_;
} }
@ -74,6 +82,8 @@ public:
return bufstart_; return bufstart_;
} }
char* BufferStart() { return bufstart_; }
void Clear() { void Clear() {
cursize_ = 0; cursize_ = 0;
} }

@ -5,7 +5,7 @@
// //
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors
#include <dirent.h> #include <dirent.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
@ -149,42 +149,45 @@ class PosixEnv : public Env {
unique_ptr<SequentialFile>* result, unique_ptr<SequentialFile>* result,
const EnvOptions& options) override { const EnvOptions& options) override {
result->reset(); result->reset();
FILE* f = nullptr; int fd = -1;
do {
IOSTATS_TIMER_GUARD(open_nanos);
f = fopen(fname.c_str(), "r");
} while (f == nullptr && errno == EINTR);
if (f == nullptr) {
*result = nullptr;
return IOError(fname, errno);
} else if (options.use_direct_reads && !options.use_mmap_writes) {
fclose(f);
#ifdef OS_MACOSX
int flags = O_RDONLY; int flags = O_RDONLY;
#else FILE* file = nullptr;
int flags = O_RDONLY | O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewSequentialFile:O_DIRECT", &flags); if (options.use_direct_reads && !options.use_mmap_reads) {
#ifndef OS_MACOSX
flags |= O_DIRECT;
#endif #endif
int fd = open(fname.c_str(), flags, 0644); }
do {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), flags, 0644);
} while (fd < 0 && errno == EINTR);
if (fd < 0) { if (fd < 0) {
return IOError(fname, errno); return IOError(fname, errno);
} }
SetFD_CLOEXEC(fd, &options);
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef OS_MACOSX #ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) { if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd); close(fd);
return IOError(fname, errno); return IOError(fname, errno);
} }
#endif #endif
std::unique_ptr<PosixDirectIOSequentialFile> file(
new PosixDirectIOSequentialFile(fname, fd));
*result = std::move(file);
return Status::OK();
} else { } else {
int fd = fileno(f); do {
SetFD_CLOEXEC(fd, &options); IOSTATS_TIMER_GUARD(open_nanos);
result->reset(new PosixSequentialFile(fname, f, options)); file = fdopen(fd, "r");
return Status::OK(); } while (file == nullptr && errno == EINTR);
if (file == nullptr) {
close(fd);
return IOError(fname, errno);
}
} }
result->reset(new PosixSequentialFile(fname, file, fd, options));
return Status::OK();
} }
virtual Status NewRandomAccessFile(const std::string& fname, virtual Status NewRandomAccessFile(const std::string& fname,
@ -193,14 +196,24 @@ class PosixEnv : public Env {
result->reset(); result->reset();
Status s; Status s;
int fd; int fd;
{ int flags = O_RDONLY;
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifndef OS_MACOSX
flags |= O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
#endif
}
do {
IOSTATS_TIMER_GUARD(open_nanos); IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), O_RDONLY); fd = open(fname.c_str(), flags, 0644);
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
return IOError(fname, errno);
} }
SetFD_CLOEXEC(fd, &options); SetFD_CLOEXEC(fd, &options);
if (fd < 0) {
s = IOError(fname, errno); if (options.use_mmap_reads && sizeof(void*) >= 8) {
} else if (options.use_mmap_reads && sizeof(void*) >= 8) {
// Use of mmap for random reads has been removed because it // Use of mmap for random reads has been removed because it
// kills performance when storage is fast. // kills performance when storage is fast.
// Use mmap when virtual address-space is plentiful. // Use mmap when virtual address-space is plentiful.
@ -216,30 +229,15 @@ class PosixEnv : public Env {
} }
} }
close(fd); close(fd);
} else if (options.use_direct_reads) {
close(fd);
#ifdef OS_MACOSX
int flags = O_RDONLY;
#else
int flags = O_RDONLY | O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
#endif
fd = open(fname.c_str(), flags, 0644);
if (fd < 0) {
s = IOError(fname, errno);
} else { } else {
std::unique_ptr<PosixDirectIORandomAccessFile> file( if (options.use_direct_reads && !options.use_mmap_reads) {
new PosixDirectIORandomAccessFile(fname, fd));
*result = std::move(file);
s = Status::OK();
#ifdef OS_MACOSX #ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) { if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd); close(fd);
s = IOError(fname, errno); return IOError(fname, errno);
} }
#endif #endif
} }
} else {
result->reset(new PosixRandomAccessFile(fname, fd, options)); result->reset(new PosixRandomAccessFile(fname, fd, options));
} }
return s; return s;

@ -924,7 +924,7 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) {
std::string fname = test::TmpDir(env_) + "/" + "testfile"; std::string fname = test::TmpDir(env_) + "/" + "testfile";
const size_t kSectorSize = 512; const size_t kSectorSize = 512;
auto data = NewAligned(kSectorSize, 'A'); auto data = NewAligned(kSectorSize, 0);
Slice slice(data.get(), kSectorSize); Slice slice(data.get(), kSectorSize);
// Create file. // Create file.
@ -932,11 +932,7 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) {
unique_ptr<WritableFile> wfile; unique_ptr<WritableFile> wfile;
#if !defined(OS_MACOSX) && !defined(OS_WIN) #if !defined(OS_MACOSX) && !defined(OS_WIN)
if (soptions.use_direct_writes) { if (soptions.use_direct_writes) {
rocksdb::SyncPoint::GetInstance()->SetCallBack( soptions.use_direct_writes = false;
"NewWritableFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
} }
#endif #endif
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
@ -948,20 +944,16 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) {
// Random Read // Random Read
{ {
unique_ptr<RandomAccessFile> file; unique_ptr<RandomAccessFile> file;
char scratch[kSectorSize]; auto scratch = NewAligned(kSectorSize, 0);
Slice result; Slice result;
#if !defined(OS_MACOSX) && !defined(OS_WIN) #if !defined(OS_MACOSX) && !defined(OS_WIN)
if (soptions.use_direct_reads) { if (soptions.use_direct_reads) {
rocksdb::SyncPoint::GetInstance()->SetCallBack( soptions.use_direct_reads = false;
"NewRandomAccessFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
} }
#endif #endif
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
ASSERT_OK(file->Read(0, kSectorSize, &result, scratch)); ASSERT_OK(file->Read(0, kSectorSize, &result, scratch.get()));
ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0); ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
ASSERT_OK(file->InvalidateCache(0, 11)); ASSERT_OK(file->InvalidateCache(0, 11));
ASSERT_OK(file->InvalidateCache(0, 0)); ASSERT_OK(file->InvalidateCache(0, 0));
} }
@ -969,20 +961,20 @@ TEST_P(EnvPosixTestWithParam, InvalidateCache) {
// Sequential Read // Sequential Read
{ {
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
char scratch[kSectorSize]; auto scratch = NewAligned(kSectorSize, 0);
Slice result; Slice result;
#if !defined(OS_MACOSX) && !defined(OS_WIN) #if !defined(OS_MACOSX) && !defined(OS_WIN)
if (soptions.use_direct_reads) { if (soptions.use_direct_reads) {
rocksdb::SyncPoint::GetInstance()->SetCallBack( soptions.use_direct_reads = false;
"NewSequentialFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
} }
#endif #endif
ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions)); ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions));
ASSERT_OK(file->Read(kSectorSize, &result, scratch)); if (file->UseDirectIO()) {
ASSERT_EQ(memcmp(scratch, data.get(), kSectorSize), 0); ASSERT_OK(file->PositionedRead(0, kSectorSize, &result, scratch.get()));
} else {
ASSERT_OK(file->Read(kSectorSize, &result, scratch.get()));
}
ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
ASSERT_OK(file->InvalidateCache(0, 11)); ASSERT_OK(file->InvalidateCache(0, 11));
ASSERT_OK(file->InvalidateCache(0, 0)); ASSERT_OK(file->InvalidateCache(0, 0));
} }

@ -22,12 +22,64 @@
namespace rocksdb { namespace rocksdb {
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s = file_->Read(n, result, scratch); Status s;
if (UseDirectIO()) {
size_t offset = offset_.fetch_add(n);
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
size_t r = 0;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
s = file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok() && offset_advance < tmp.size()) {
buf.Size(tmp.size());
r = buf.Read(scratch, offset_advance,
std::min(tmp.size() - offset_advance, n));
}
*result = Slice(scratch, r);
} else {
s = file_->Read(n, result, scratch);
}
IOSTATS_ADD(bytes_read, result->size()); IOSTATS_ADD(bytes_read, result->size());
return s; return s;
} }
Status SequentialFileReader::Skip(uint64_t n) { return file_->Skip(n); } Status SequentialFileReader::DirectRead(size_t n, Slice* result,
char* scratch) {
size_t offset = offset_.fetch_add(n);
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
Status s =
file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok()) {
buf.Size(tmp.size());
size_t r = buf.Read(scratch, offset_advance,
tmp.size() <= offset_advance
? 0
: std::min(tmp.size() - offset_advance, n));
*result = Slice(scratch, r);
}
return s;
}
Status SequentialFileReader::Skip(uint64_t n) {
if (UseDirectIO()) {
offset_ += n;
return Status::OK();
} else {
return file_->Skip(n);
}
}
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const { char* scratch) const {
@ -37,7 +89,26 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
StopWatch sw(env_, stats_, hist_type_, StopWatch sw(env_, stats_, hist_type_,
(stats_ != nullptr) ? &elapsed : nullptr); (stats_ != nullptr) ? &elapsed : nullptr);
IOSTATS_TIMER_GUARD(read_nanos); IOSTATS_TIMER_GUARD(read_nanos);
if (UseDirectIO()) {
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
size_t r = 0;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok() && offset_advance < tmp.size()) {
buf.Size(tmp.size());
r = buf.Read(scratch, offset_advance,
std::min(tmp.size() - offset_advance, n));
}
*result = Slice(scratch, r);
} else {
s = file_->Read(offset, n, result, scratch); s = file_->Read(offset, n, result, scratch);
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
} }
if (stats_ != nullptr && file_read_hist_ != nullptr) { if (stats_ != nullptr && file_read_hist_ != nullptr) {
@ -46,6 +117,28 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
return s; return s;
} }
Status RandomAccessFileReader::DirectRead(uint64_t offset, size_t n,
Slice* result, char* scratch) const {
size_t alignment = file_->GetRequiredBufferAlignment();
size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
size_t offset_advance = offset - aligned_offset;
size_t size = Roundup(offset + n, alignment) - aligned_offset;
AlignedBuffer buf;
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
Status s = file_->Read(aligned_offset, size, &tmp, buf.BufferStart());
if (s.ok()) {
buf.Size(tmp.size());
size_t r = buf.Read(scratch, offset_advance,
tmp.size() <= offset_advance
? 0
: std::min(tmp.size() - offset_advance, n));
*result = Slice(scratch, r);
}
return s;
}
Status WritableFileWriter::Append(const Slice& data) { Status WritableFileWriter::Append(const Slice& data) {
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();

@ -7,10 +7,11 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#include <atomic>
#include <string> #include <string>
#include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/aligned_buffer.h" #include "util/aligned_buffer.h"
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
@ -23,10 +24,11 @@ std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
class SequentialFileReader { class SequentialFileReader {
private: private:
std::unique_ptr<SequentialFile> file_; std::unique_ptr<SequentialFile> file_;
std::atomic<size_t> offset_; // read offset
public: public:
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file) explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file)
: file_(std::move(_file)) {} : file_(std::move(_file)), offset_(0) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o); *this = std::move(o);
@ -45,6 +47,11 @@ class SequentialFileReader {
Status Skip(uint64_t n); Status Skip(uint64_t n);
SequentialFile* file() { return file_.get(); } SequentialFile* file() { return file_.get(); }
bool UseDirectIO() const { return file_->UseDirectIO(); }
protected:
Status DirectRead(size_t n, Slice* result, char* scratch);
}; };
class RandomAccessFileReader { class RandomAccessFileReader {
@ -86,6 +93,12 @@ class RandomAccessFileReader {
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const; Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const;
RandomAccessFile* file() { return file_.get(); } RandomAccessFile* file() { return file_.get(); }
bool UseDirectIO() const { return file_->UseDirectIO(); }
protected:
Status DirectRead(uint64_t offset, size_t n, Slice* result,
char* scratch) const;
}; };
// Use posix write to write data to a file. // Use posix write to write data to a file.
@ -152,6 +165,8 @@ class WritableFileWriter {
WritableFile* writable_file() const { return writable_file_.get(); } WritableFile* writable_file() const { return writable_file_.get(); }
bool UseDirectIO() { return writable_file_->UseDirectIO(); }
private: private:
// Used when os buffering is OFF and we are writing // Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode // DMA such as in Direct I/O mode

@ -58,28 +58,6 @@ const size_t kPageSize = sysconf(_SC_PAGESIZE);
const size_t kPageSize = 4 * 1024; const size_t kPageSize = 4 * 1024;
#endif #endif
std::unique_ptr<void, void (&)(void*)> NewAligned(const size_t size) {
void* ptr = nullptr;
if (posix_memalign(&ptr, 4 * 1024, size) != 0) {
return std::unique_ptr<char, void (&)(void*)>(nullptr, free);
}
std::unique_ptr<void, void (&)(void*)> uptr(ptr, free);
return uptr;
}
size_t Upper(const size_t size, const size_t fac) {
if (size % fac == 0) {
return size;
}
return size + (fac - size % fac);
}
size_t Lower(const size_t size, const size_t fac) {
if (size % fac == 0) {
return size;
}
return size - (size % fac);
}
bool IsSectorAligned(const size_t off) { return off % kSectorSize == 0; } bool IsSectorAligned(const size_t off) { return off % kSectorSize == 0; }
@ -87,89 +65,32 @@ static bool IsPageAligned(const void* ptr) {
return uintptr_t(ptr) % (kPageSize) == 0; return uintptr_t(ptr) % (kPageSize) == 0;
} }
Status ReadAligned(int fd, Slice* data, const uint64_t offset,
const size_t size, char* scratch) {
assert(IsSectorAligned(offset));
assert(IsSectorAligned(size));
assert(IsPageAligned(scratch));
size_t bytes_read = 0;
ssize_t status = -1;
while (bytes_read < size) {
status =
pread(fd, scratch + bytes_read, size - bytes_read, offset + bytes_read);
if (status <= 0) {
if (errno == EINTR) {
continue;
}
break;
}
bytes_read += status;
if (status % static_cast<ssize_t>(kSectorSize) != 0) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
break;
}
}
*data = Slice(scratch, bytes_read);
return status < 0 ? Status::IOError(strerror(errno)) : Status::OK();
}
Status ReadUnaligned(int fd, Slice* data, const uint64_t offset,
const size_t size, char* scratch) {
assert(scratch);
assert(!IsSectorAligned(offset) || !IsSectorAligned(size) ||
!IsPageAligned(scratch));
const uint64_t aligned_off = Lower(offset, kSectorSize);
const size_t aligned_size = Upper(size + (offset - aligned_off), kSectorSize);
auto aligned_scratch = NewAligned(aligned_size);
assert(aligned_scratch);
if (!aligned_scratch) {
return Status::IOError("Unable to allocate");
}
assert(IsSectorAligned(aligned_off));
assert(IsSectorAligned(aligned_size));
assert(aligned_scratch);
assert(IsPageAligned(aligned_scratch.get()));
assert(offset + size <= aligned_off + aligned_size);
Slice scratch_slice;
Status s = ReadAligned(fd, &scratch_slice, aligned_off, aligned_size,
reinterpret_cast<char*>(aligned_scratch.get()));
// copy data upto min(size, what was read)
memcpy(scratch, reinterpret_cast<char*>(aligned_scratch.get()) +
(offset % kSectorSize),
std::min(size, scratch_slice.size()));
*data = Slice(scratch, std::min(size, scratch_slice.size()));
return s;
}
Status DirectIORead(int fd, Slice* result, size_t off, size_t n,
char* scratch) {
if (IsSectorAligned(off) && IsSectorAligned(n) && IsPageAligned(scratch)) {
return ReadAligned(fd, result, off, n, scratch);
} }
return ReadUnaligned(fd, result, off, n, scratch);
}
} // namespace
/* /*
* PosixSequentialFile * PosixSequentialFile
*/ */
PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* f, PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
const EnvOptions& options) int fd, const EnvOptions& options)
: filename_(fname), : filename_(fname),
file_(f), file_(file),
fd_(fileno(f)), fd_(fd),
use_direct_io_(options.use_direct_reads) {} use_direct_io_(options.use_direct_reads) {
assert(!options.use_direct_reads || !options.use_mmap_reads);
}
PosixSequentialFile::~PosixSequentialFile() { fclose(file_); } PosixSequentialFile::~PosixSequentialFile() {
if (!UseDirectIO()) {
assert(file_);
fclose(file_);
} else {
assert(fd_);
close(fd_);
}
}
Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) { Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) {
assert(result != nullptr && !UseDirectIO());
Status s; Status s;
size_t r = 0; size_t r = 0;
do { do {
@ -187,11 +108,41 @@ Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) {
s = IOError(filename_, errno); s = IOError(filename_, errno);
} }
} }
if (use_direct_io_) {
// we need to fadvise away the entire range of pages because // we need to fadvise away the entire range of pages because
// we do not want readahead pages to be cached. // we do not want readahead pages to be cached under buffered io
Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
return s;
}
Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
Slice* result, char* scratch) {
Status s;
ssize_t r = -1;
size_t left = n;
char* ptr = scratch;
assert(UseDirectIO());
while (left > 0) {
r = pread(fd_, ptr, left, static_cast<off_t>(offset));
if (r <= 0) {
if (r == -1 && errno == EINTR) {
continue;
}
break;
}
ptr += r;
offset += r;
left -= r;
if (r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
break;
}
}
if (r < 0) {
// An error: return a non-ok status
s = IOError(filename_, errno);
} }
*result = Slice(scratch, (r < 0) ? 0 : n - left);
return s; return s;
} }
@ -206,32 +157,15 @@ Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
#ifndef OS_LINUX #ifndef OS_LINUX
return Status::OK(); return Status::OK();
#else #else
if (!UseDirectIO()) {
// free OS pages // free OS pages
int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
if (ret == 0) { if (ret != 0) {
return Status::OK();
}
return IOError(filename_, errno); return IOError(filename_, errno);
#endif
}
/*
* PosixDirectIOSequentialFile
*/
Status PosixDirectIOSequentialFile::Read(size_t n, Slice* result,
char* scratch) {
const size_t off = off_.fetch_add(n);
return DirectIORead(fd_, result, off, n, scratch);
} }
Status PosixDirectIOSequentialFile::Skip(uint64_t n) {
off_ += n;
return Status::OK();
} }
Status PosixDirectIOSequentialFile::InvalidateCache(size_t /*offset*/,
size_t /*length*/) {
return Status::OK(); return Status::OK();
#endif
} }
/* /*
@ -295,6 +229,7 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd, PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
const EnvOptions& options) const EnvOptions& options)
: filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads) { : filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads) {
assert(!options.use_direct_reads || !options.use_mmap_reads);
assert(!options.use_mmap_reads || sizeof(void*) < 8); assert(!options.use_mmap_reads || sizeof(void*) < 8);
} }
@ -308,9 +243,8 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
char* ptr = scratch; char* ptr = scratch;
while (left > 0) { while (left > 0) {
r = pread(fd_, ptr, left, static_cast<off_t>(offset)); r = pread(fd_, ptr, left, static_cast<off_t>(offset));
if (r <= 0) { if (r <= 0) {
if (errno == EINTR) { if (r == -1 && errno == EINTR) {
continue; continue;
} }
break; break;
@ -318,19 +252,23 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
ptr += r; ptr += r;
offset += r; offset += r;
left -= r; left -= r;
if (UseDirectIO() &&
r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
break;
}
} }
*result = Slice(scratch, (r < 0) ? 0 : n - left);
if (r < 0) { if (r < 0) {
// An error: return a non-ok status // An error: return a non-ok status
s = IOError(filename_, errno); s = IOError(filename_, errno);
} }
if (!UseDirectIO()) {
if (use_direct_io_) {
// we need to fadvise away the entire range of pages because // we need to fadvise away the entire range of pages because
// we do not want readahead pages to be cached. // we do not want readahead pages to be cached.
Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
} }
*result = Slice(scratch, (r < 0) ? 0 : n - left);
return s; return s;
} }
@ -341,6 +279,9 @@ size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
#endif #endif
void PosixRandomAccessFile::Hint(AccessPattern pattern) { void PosixRandomAccessFile::Hint(AccessPattern pattern) {
if (UseDirectIO()) {
return;
}
switch (pattern) { switch (pattern) {
case NORMAL: case NORMAL:
Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL); Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
@ -364,6 +305,9 @@ void PosixRandomAccessFile::Hint(AccessPattern pattern) {
} }
Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) { Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
if (UseDirectIO()) {
return Status::OK();
}
#ifndef OS_LINUX #ifndef OS_LINUX
return Status::OK(); return Status::OK();
#else #else
@ -376,15 +320,6 @@ Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
#endif #endif
} }
/*
* PosixDirectIORandomAccessFile
*/
Status PosixDirectIORandomAccessFile::Read(uint64_t offset, size_t n,
Slice* result, char* scratch) const {
Status s = DirectIORead(fd_, result, offset, n, scratch);
return s;
}
/* /*
* PosixMmapReadableFile * PosixMmapReadableFile
* *
@ -467,7 +402,6 @@ Status PosixMmapFile::UnmapCurrentRegion() {
Status PosixMmapFile::MapNewRegion() { Status PosixMmapFile::MapNewRegion() {
#ifdef ROCKSDB_FALLOCATE_PRESENT #ifdef ROCKSDB_FALLOCATE_PRESENT
assert(base_ == nullptr); assert(base_ == nullptr);
TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds); TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
// we can't fallocate with FALLOC_FL_KEEP_SIZE here // we can't fallocate with FALLOC_FL_KEEP_SIZE here
if (allow_fallocate_) { if (allow_fallocate_) {

@ -44,30 +44,16 @@ class PosixSequentialFile : public SequentialFile {
bool use_direct_io_; bool use_direct_io_;
public: public:
PosixSequentialFile(const std::string& fname, FILE* f, PosixSequentialFile(const std::string& fname, FILE* file, int fd,
const EnvOptions& options); const EnvOptions& options);
virtual ~PosixSequentialFile(); virtual ~PosixSequentialFile();
virtual Status Read(size_t n, Slice* result, char* scratch) override; virtual Status Read(size_t n, Slice* result, char* scratch) override;
virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result,
char* scratch) override;
virtual Status Skip(uint64_t n) override; virtual Status Skip(uint64_t n) override;
virtual Status InvalidateCache(size_t offset, size_t length) override; virtual Status InvalidateCache(size_t offset, size_t length) override;
}; virtual bool UseDirectIO() const override { return use_direct_io_; }
class PosixDirectIOSequentialFile : public SequentialFile {
public:
explicit PosixDirectIOSequentialFile(const std::string& filename, int fd)
: filename_(filename), fd_(fd) {}
virtual ~PosixDirectIOSequentialFile() {}
Status Read(size_t n, Slice* result, char* scratch) override;
Status Skip(uint64_t n) override;
Status InvalidateCache(size_t offset, size_t length) override;
private:
const std::string filename_;
int fd_ = -1;
std::atomic<size_t> off_{0}; // read offset
}; };
class PosixRandomAccessFile : public RandomAccessFile { class PosixRandomAccessFile : public RandomAccessFile {
@ -88,21 +74,7 @@ class PosixRandomAccessFile : public RandomAccessFile {
#endif #endif
virtual void Hint(AccessPattern pattern) override; virtual void Hint(AccessPattern pattern) override;
virtual Status InvalidateCache(size_t offset, size_t length) override; virtual Status InvalidateCache(size_t offset, size_t length) override;
}; virtual bool UseDirectIO() const override { return use_direct_io_; }
// Direct IO random access file direct IO implementation
class PosixDirectIORandomAccessFile : public PosixRandomAccessFile {
public:
explicit PosixDirectIORandomAccessFile(const std::string& filename, int fd)
: PosixRandomAccessFile(filename, fd, EnvOptions()) {}
virtual ~PosixDirectIORandomAccessFile() {}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
virtual void Hint(AccessPattern pattern) override {}
Status InvalidateCache(size_t offset, size_t length) override {
return Status::OK();
}
}; };
class PosixWritableFile : public WritableFile { class PosixWritableFile : public WritableFile {

Loading…
Cancel
Save