Refactor to support file_reader_writer on Windows.

Summary. A change https://reviews.facebook.net/differential/diff/224721/
  Has attempted to move common functionality out of platform dependent
  code to a new facility called file_reader_writer.
  This includes:
  - perf counters
  - Buffering
  - RateLimiting

  However, the change did not attempt to refactor Windows code.
  To mitigate, we introduce new quering interfaces such as UseOSBuffer(),
  GetRequiredBufferAlignment() and ReaderWriterForward()
  for pure forwarding where required.
  Introduce WritableFile got a new method Truncate(). This is to communicate
  to the file as to how much data it has on close.
   - When space is pre-allocated on Linux it is filled with zeros implicitly,
    no such thing exist on Windows so we must truncate file on close.
   - When operating in unbuffered mode the last page is filled with zeros but we still want to truncate.

   Previously, Close() would take care of it but now buffer management is shifted to the wrappers and the file has
   no idea about the file true size.

   This means that Close() on the wrapper level must always include
   Truncate() as well as wrapper __dtor should call Close() and
   against double Close().
   Move buffered/unbuffered write logic to the wrapper.
   Utilize Aligned buffer class.
   Adjust tests and implement Truncate() where necessary.
   Come up with reasonable defaults for new virtual interfaces.
   Forward calls for RandomAccessReadAhead class to avoid double
   buffering and locking (double locking in unbuffered mode on WIndows).
main
Dmitri Smirnov 9 years ago
parent af7cdbf647
commit 30e82d5c41
  1. 1
      db/db_bench.cc
  2. 1
      db/fault_injection_test.cc
  3. 32
      include/rocksdb/env.h
  4. 444
      port/win/env_win.cc
  5. 154
      util/aligned_buffer.h
  6. 5
      util/db_test_util.h
  7. 12
      util/env_posix.cc
  8. 1
      util/env_test.cc
  9. 302
      util/file_reader_writer.cc
  10. 96
      util/file_reader_writer.h
  11. 3
      util/file_reader_writer_test.cc
  12. 4
      util/memenv.cc
  13. 4
      util/mock_env.cc
  14. 4
      util/testutil.h

@ -898,6 +898,7 @@ class ReportFileOpEnv : public EnvWrapper {
return rv;
}
Status Truncate(uint64_t size) override { return target_->Truncate(size); }
Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); }
Status Sync() override { return target_->Sync(); }

@ -141,6 +141,7 @@ class TestWritableFile : public WritableFile {
FaultInjectionTestEnv* env);
virtual ~TestWritableFile();
virtual Status Append(const Slice& data) override;
virtual Status Truncate(uint64_t size) override { return target_->Truncate(size); }
virtual Status Close() override;
virtual Status Flush() override;
virtual Status Sync() override;

@ -393,6 +393,12 @@ class RandomAccessFile {
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const = 0;
// Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact buffering or locking.
virtual bool ReaderWriterForward() const {
return false;
}
// Tries to get an unique ID for this file that will be the same each time
// the file is opened (and will stay the same while the file is open).
// Furthermore, it tries to make this ID at most "max_size" bytes. If such an
@ -413,7 +419,6 @@ class RandomAccessFile {
// compatibility.
};
enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
virtual void Hint(AccessPattern pattern) {}
@ -438,7 +443,31 @@ class WritableFile {
}
virtual ~WritableFile();
// Indicates if the class makes use of unbuffered I/O
virtual bool UseOSBuffer() const {
return true;
}
// This is needed when you want to allocate
// AlignedBuffer for use with file I/O classes
// Used for unbuffered file I/O when UseOSBuffer() returns false
virtual size_t GetRequiredBufferAlignment() const {
return 4 * 1024;
}
virtual Status Append(const Slice& data) = 0;
// Positional write for unbuffered access default forward
// to simple append as most of the tests are buffered by default
virtual Status Append(const Slice& /* data */, uint64_t /* offset */) {
return Status::NotSupported();
}
// Truncate is necessary to trim the file to the correct size
// before closing. It is not always possible to keep track of the file
// size due to whole pages writes. The behavior is undefined if called
// with other writes to follow.
virtual Status Truncate(uint64_t size) = 0;
virtual Status Close() = 0;
virtual Status Flush() = 0;
virtual Status Sync() = 0; // sync data
@ -839,6 +868,7 @@ class WritableFileWrapper : public WritableFile {
explicit WritableFileWrapper(WritableFile* t) : target_(t) { }
Status Append(const Slice& data) override { return target_->Append(data); }
Status Truncate(uint64_t size) override { return target_->Truncate(size); }
Status Close() override { return target_->Close(); }
Status Flush() override { return target_->Flush(); }
Status Sync() override { return target_->Sync(); }

@ -30,6 +30,7 @@
#include "util/iostats_context_imp.h"
#include "util/rate_limiter.h"
#include "util/sync_point.h"
#include "util/aligned_buffer.h"
#include "util/thread_status_updater.h"
#include "util/thread_status_util.h"
@ -161,15 +162,6 @@ inline int fsync(HANDLE hFile) {
return 0;
}
inline size_t TruncateToPageBoundary(size_t page_size, size_t s) {
s -= (s & (page_size - 1));
assert((s % page_size) == 0);
return s;
}
// Roundup x to a multiple of y
inline size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; }
// SetFileInformationByHandle() is capable of fast pre-allocates.
// However, this does not change the file end position unless the file is
// truncated and the pre-allocated space is not considered filled with zeros.
@ -492,7 +484,6 @@ class WinMmapFile : public WritableFile {
size_t n = std::min(left, avail);
memcpy(dst_, src, n);
IOSTATS_ADD(bytes_written, n);
dst_ += n;
src += n;
left -= n;
@ -502,6 +493,12 @@ class WinMmapFile : public WritableFile {
return Status::OK();
}
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override {
Status s;
@ -612,94 +609,6 @@ class WinMmapFile : public WritableFile {
}
};
// This class is to manage an aligned user
// allocated buffer for unbuffered I/O purposes
// though it does not make a difference if you need a buffer.
class AlignedBuffer {
const size_t alignment_;
std::unique_ptr<char[]> buf_;
size_t capacity_;
size_t cursize_;
char* bufstart_;
public:
explicit AlignedBuffer(size_t alignment)
: alignment_(alignment), capacity_(0), cursize_(0), bufstart_(nullptr) {
assert(alignment > 0);
assert((alignment & (alignment - 1)) == 0);
}
size_t GetAlignment() const { return alignment_; }
size_t GetCapacity() const { return capacity_; }
size_t GetCurrentSize() const { return cursize_; }
const char* GetBufferStart() const { return bufstart_; }
void Clear() { cursize_ = 0; }
// Allocates a new buffer and sets bufstart_ to the aligned first byte
void AllocateNewBuffer(size_t requestedCapacity) {
size_t size = Roundup(requestedCapacity, alignment_);
buf_.reset(new char[size + alignment_]);
char* p = buf_.get();
bufstart_ = reinterpret_cast<char*>(
(reinterpret_cast<uintptr_t>(p) + (alignment_ - 1)) &
~static_cast<uintptr_t>(alignment_ - 1));
capacity_ = size;
cursize_ = 0;
}
// Used for write
// Returns the number of bytes appended
size_t Append(const char* src, size_t append_size) {
size_t buffer_remaining = capacity_ - cursize_;
size_t to_copy = std::min(append_size, buffer_remaining);
if (to_copy > 0) {
memcpy(bufstart_ + cursize_, src, to_copy);
cursize_ += to_copy;
}
return to_copy;
}
size_t Read(char* dest, size_t offset, size_t read_size) const {
assert(offset < cursize_);
size_t to_read = std::min(cursize_ - offset, read_size);
if (to_read > 0) {
memcpy(dest, bufstart_ + offset, to_read);
}
return to_read;
}
/// Pad to alignment
void PadToAlignmentWith(int padding) {
size_t total_size = Roundup(cursize_, alignment_);
size_t pad_size = total_size - cursize_;
if (pad_size > 0) {
assert((pad_size + cursize_) <= capacity_);
memset(bufstart_ + cursize_, padding, pad_size);
cursize_ += pad_size;
}
}
// After a partial flush move the tail to the beginning of the buffer
void RefitTail(size_t tail_offset, size_t tail_size) {
if (tail_size > 0) {
memmove(bufstart_, bufstart_ + tail_offset, tail_size);
}
cursize_ = tail_size;
}
// Returns place to start writing
char* GetDestination() { return bufstart_ + cursize_; }
void SetSize(size_t cursize) { cursize_ = cursize; }
};
class WinSequentialFile : public SequentialFile {
private:
const std::string filename_;
@ -734,7 +643,7 @@ class WinSequentialFile : public SequentialFile {
// Windows ReadFile API accepts a DWORD.
// While it is possible to read in a loop if n is > UINT_MAX
// it is a highly unlikely case.
if (n > UINT_MAX) {
if (n > UINT_MAX) {
return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER);
}
@ -747,8 +656,6 @@ class WinSequentialFile : public SequentialFile {
return IOErrorFromWindowsError(filename_, GetLastError());
}
IOSTATS_ADD(bytes_read, r);
*result = Slice(scratch, r);
return s;
@ -791,12 +698,13 @@ class WinRandomAccessFile : public RandomAccessFile {
: filename_(fname),
hFile_(hFile),
use_os_buffer_(options.use_os_buffer),
buffer_(alignment),
buffer_(),
buffered_start_(0) {
assert(!options.use_mmap_reads);
// Unbuffered access, use internal buffer for reads
if (!use_os_buffer_) {
buffer_.SetAlignment(alignment);
// Random read, no need in a big buffer
// We read things in database blocks which are likely to be similar to
// the alignment we use.
@ -854,11 +762,8 @@ class WinRandomAccessFile : public RandomAccessFile {
}
SSIZE_T read = 0;
{
IOSTATS_TIMER_GUARD(read_nanos);
read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread,
start_page_start);
}
read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread,
start_page_start);
if (read > 0) {
buffer_.SetSize(read);
@ -884,7 +789,6 @@ class WinRandomAccessFile : public RandomAccessFile {
}
}
IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left);
*result = Slice(scratch, (r < 0) ? 0 : n - left);
if (r < 0) {
@ -893,6 +797,10 @@ class WinRandomAccessFile : public RandomAccessFile {
return s;
}
virtual bool ReaderWriterForward() const override {
return true;
}
virtual void Hint(AccessPattern pattern) override {}
virtual Status InvalidateCache(size_t offset, size_t length) override {
@ -915,33 +823,23 @@ class WinRandomAccessFile : public RandomAccessFile {
class WinWritableFile : public WritableFile {
private:
const std::string filename_;
HANDLE hFile_;
AlignedBuffer buffer_;
uint64_t filesize_; // How much data is actually written disk
uint64_t reservedsize_; // how far we have reserved space
bool pending_sync_;
RateLimiter* rate_limiter_;
const bool use_os_buffer_; // Used to indicate unbuffered access, the file
// must be opened as unbuffered if false
HANDLE hFile_;
const bool use_os_buffer_; // Used to indicate unbuffered access, the file
const uint64_t alignment_;
// must be opened as unbuffered if false
uint64_t filesize_; // How much data is actually written disk
uint64_t reservedsize_; // how far we have reserved space
public:
WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
size_t capacity, const EnvOptions& options)
: filename_(fname),
hFile_(hFile),
buffer_(alignment),
use_os_buffer_(options.use_os_buffer),
alignment_(alignment),
filesize_(0),
reservedsize_(0),
pending_sync_(false),
rate_limiter_(options.rate_limiter),
use_os_buffer_(options.use_os_buffer) {
reservedsize_(0) {
assert(!options.use_mmap_writes);
buffer_.AllocateNewBuffer(capacity);
}
~WinWritableFile() {
@ -950,106 +848,84 @@ class WinWritableFile : public WritableFile {
}
}
virtual Status Append(const Slice& data) override {
const char* src = data.data();
// Indicates if the class makes use of unbuffered I/O
virtual bool UseOSBuffer() const override {
return use_os_buffer_;
}
assert(data.size() < INT_MAX);
virtual size_t GetRequiredBufferAlignment() const override {
return alignment_;
}
size_t left = data.size();
Status s;
pending_sync_ = true;
virtual Status Append(const Slice& data) override {
// This would call Alloc() if we are out of blocks
PrepareWrite(GetFileSize(), left);
// Used for buffered access ONLY
assert(use_os_buffer_);
assert(data.size() < std::numeric_limits<int>::max());
// Flush only when I/O is buffered
if (use_os_buffer_ &&
(buffer_.GetCapacity() - buffer_.GetCurrentSize()) < left) {
if (buffer_.GetCurrentSize() > 0) {
s = Flush();
if (!s.ok()) {
return s;
}
}
Status s;
if (buffer_.GetCapacity() < c_OneMB) {
size_t desiredCapacity = buffer_.GetCapacity() * 2;
desiredCapacity = std::min(desiredCapacity, c_OneMB);
buffer_.AllocateNewBuffer(desiredCapacity);
}
DWORD bytesWritten = 0;
if (!WriteFile(hFile_, data.data(),
data.size(), &bytesWritten, NULL)) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to WriteFile: " + filename_,
lastError);
} else {
assert(size_t(bytesWritten) == data.size());
filesize_ += data.size();
}
// We always use the internal buffer for the unbuffered I/O
// or we simply use it for its original purpose to accumulate many small
// chunks
if (!use_os_buffer_ || (buffer_.GetCapacity() >= left)) {
while (left > 0) {
size_t appended = buffer_.Append(src, left);
left -= appended;
src += appended;
return s;
}
if (left > 0) {
s = Flush();
if (!s.ok()) {
break;
}
virtual Status Append(const Slice& data, uint64_t offset) override {
Status s;
size_t cursize = buffer_.GetCurrentSize();
size_t capacity = buffer_.GetCapacity();
SSIZE_T ret = pwrite(hFile_, data.data(),
data.size(), offset);
// We double the buffer here because
// Flush calls do not keep up with the incoming bytes
// This is the only place when buffer is changed with unbuffered I/O
if (cursize == 0 && capacity < c_OneMB) {
size_t desiredCapacity = capacity * 2;
desiredCapacity = std::min(desiredCapacity, c_OneMB);
buffer_.AllocateNewBuffer(desiredCapacity);
}
}
}
// Error break
if (ret < 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to pwrite for: " + filename_, lastError);
} else {
// Writing directly to file bypassing what is in the buffer
assert(buffer_.GetCurrentSize() == 0);
// Use rate limiter for normal I/O very large request if available
s = WriteBuffered(src, left);
// With positional write it is not clear at all
// if this actually extends the filesize
assert(size_t(ret) == data.size());
filesize_ += data.size();
}
return s;
}
// Need to implement this so the file is truncated correctly
// when buffered and unbuffered mode
virtual Status Truncate(uint64_t size) override {
Status s = ftruncate(filename_, hFile_, size);
if (s.ok()) {
filesize_ = size;
}
return s;
}
virtual Status Close() override {
Status s;
// If there is any data in the cache not written we need to deal with it
const size_t cursize = buffer_.GetCurrentSize();
const uint64_t final_size = filesize_ + cursize;
if (cursize > 0) {
// If OS buffering is on, we just flush the remainder, otherwise need
if (!use_os_buffer_) {
s = WriteUnbuffered();
} else {
s = WriteBuffered(buffer_.GetBufferStart(), cursize);
}
}
Status s;
if (s.ok()) {
s = ftruncate(filename_, hFile_, final_size);
}
assert(INVALID_HANDLE_VALUE != hFile_);
// Sync data if buffer was flushed
if (s.ok() && (cursize > 0) && fsync(hFile_) < 0) {
if (fsync(hFile_) < 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_,
lastError);
lastError);
}
if (FALSE == ::CloseHandle(hFile_)) {
if (s.ok()) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_,
lastError);
}
auto lastError = GetLastError();
s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_,
lastError);
}
hFile_ = INVALID_HANDLE_VALUE;
@ -1057,36 +933,18 @@ class WinWritableFile : public WritableFile {
}
// write out the cached data to the OS cache
// This is now taken care of the WritableFileWriter
virtual Status Flush() override {
Status status;
if (buffer_.GetCurrentSize() > 0) {
if (!use_os_buffer_) {
status = WriteUnbuffered();
} else {
status =
WriteBuffered(buffer_.GetBufferStart(), buffer_.GetCurrentSize());
if (status.ok()) {
buffer_.SetSize(0);
}
}
}
return status;
return Status::OK();
}
virtual Status Sync() override {
Status s = Flush();
if (!s.ok()) {
return s;
}
Status s;
// Calls flush buffers
if (pending_sync_ && fsync(hFile_) < 0) {
if (fsync(hFile_) < 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_,
lastError);
} else {
pending_sync_ = false;
}
return s;
}
@ -1094,7 +952,12 @@ class WinWritableFile : public WritableFile {
virtual Status Fsync() override { return Sync(); }
virtual uint64_t GetFileSize() override {
return filesize_ + buffer_.GetCurrentSize();
// Double accounting now here with WritableFileWriter
// and this size will be wrong when unbuffered access is used
// but tests implement their own writable files and do not use WritableFileWrapper
// so we need to squeeze a square peg through
// a round hole here.
return filesize_;
}
virtual Status Allocate(off_t offset, off_t len) override {
@ -1104,7 +967,7 @@ class WinWritableFile : public WritableFile {
// Make sure that we reserve an aligned amount of space
// since the reservation block size is driven outside so we want
// to check if we are ok with reservation here
size_t spaceToReserve = Roundup(offset + len, buffer_.GetAlignment());
size_t spaceToReserve = Roundup(offset + len, alignment_);
// Nothing to do
if (spaceToReserve <= reservedsize_) {
return status;
@ -1117,133 +980,6 @@ class WinWritableFile : public WritableFile {
}
return status;
}
private:
// This method writes to disk the specified data and makes use of the rate
// limiter
// if available
Status WriteBuffered(const char* data, size_t size) {
Status s;
assert(use_os_buffer_);
const char* src = data;
size_t left = size;
size_t actually_written = 0;
while (left > 0) {
size_t bytes_allowed = RequestToken(left, false);
DWORD bytesWritten = 0;
if (!WriteFile(hFile_, src, bytes_allowed, &bytesWritten, NULL)) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to write buffered via rate_limiter: " + filename_,
lastError);
break;
} else {
actually_written += bytesWritten;
src += bytesWritten;
left -= bytesWritten;
}
}
IOSTATS_ADD(bytes_written, actually_written);
filesize_ += actually_written;
return s;
}
// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.
// We always use RateLimiter if available. We move (Refit) any buffer bytes
// that are left over the
// whole number of pages to be written again on the next flush because we can
// only write on aligned
// offsets.
Status WriteUnbuffered() {
Status s;
assert(!use_os_buffer_);
size_t alignment = buffer_.GetAlignment();
assert((filesize_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
size_t file_advance =
TruncateToPageBoundary(alignment, buffer_.GetCurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write
// it again in the future either on Close() OR when the current whole page
// fills out
size_t leftover_tail = buffer_.GetCurrentSize() - file_advance;
// Round up and pad
buffer_.PadToAlignmentWith(0);
const char* src = buffer_.GetBufferStart();
size_t left = buffer_.GetCurrentSize();
uint64_t file_offset = filesize_;
size_t actually_written = 0;
while (left > 0) {
// Request how much is allowed. If this is less than one alignment we may
// be blocking a lot on every write
// because we can not write less than one alignment (page) unit thus check
// the configuration.
size_t bytes_allowed = RequestToken(left, true);
SSIZE_T ret = pwrite(hFile_, buffer_.GetBufferStart() + actually_written,
bytes_allowed, file_offset);
// Error break
if (ret < 0) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError(
"Failed to pwrite for unbuffered: " + filename_, lastError);
buffer_.SetSize(file_advance + leftover_tail);
break;
}
actually_written += ret;
file_offset += ret;
left -= ret;
}
IOSTATS_ADD(bytes_written, actually_written);
if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close()
buffer_.RefitTail(file_advance, leftover_tail);
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
filesize_ += file_advance;
}
return s;
}
// This truncates the request to a single burst bytes
// and then goes through the request to make sure we are
// satisfied in the order of the I/O priority
size_t RequestToken(size_t bytes, bool align) const {
if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) {
bytes = std::min(
bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
if (align) {
// Here we may actually require more than burst and block
// but we can not write less than one page at a time on unbuffered
// thus we may want not to use ratelimiter s
size_t alignment = buffer_.GetAlignment();
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
rate_limiter_->Request(bytes, io_priority_);
}
return bytes;
}
};
class WinDirectory : public Directory {
@ -2092,7 +1828,7 @@ class WinEnv : public Env {
ThreadPool* thread_pool_;
size_t thread_id_; // Thread count in the thread.
explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {}
};

@ -0,0 +1,154 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <algorithm>
#include "port/port.h"
namespace rocksdb {
inline size_t TruncateToPageBoundary(size_t page_size, size_t s) {
s -= (s & (page_size - 1));
assert((s % page_size) == 0);
return s;
}
inline size_t Roundup(size_t x, size_t y) {
return ((x + y - 1) / y) * y;
}
// This class is to manage an aligned user
// allocated buffer for unbuffered I/O purposes
// though it does not make a difference if you need a buffer.
class AlignedBuffer {
size_t alignment_;
std::unique_ptr<char[]> buf_;
size_t capacity_;
size_t cursize_;
char* bufstart_;
public:
AlignedBuffer()
: alignment_(),
capacity_(0),
cursize_(0),
bufstart_(nullptr) {
}
AlignedBuffer(AlignedBuffer&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
AlignedBuffer& operator=(AlignedBuffer&& o) ROCKSDB_NOEXCEPT {
alignment_ = std::move(o.alignment_);
buf_ = std::move(o.buf_);
capacity_ = std::move(o.capacity_);
cursize_ = std::move(o.cursize_);
bufstart_ = std::move(o.bufstart_);
return *this;
}
AlignedBuffer(const AlignedBuffer&) = delete;
AlignedBuffer& operator=(const AlignedBuffer&) = delete;
size_t GetAlignment() const {
return alignment_;
}
size_t GetCapacity() const {
return capacity_;
}
size_t GetCurrentSize() const {
return cursize_;
}
const char* GetBufferStart() const {
return bufstart_;
}
void Clear() {
cursize_ = 0;
}
void SetAlignment(size_t alignment) {
assert(alignment > 0);
assert((alignment & (alignment - 1)) == 0);
alignment_ = alignment;
}
// Allocates a new buffer and sets bufstart_ to the aligned first byte
void AllocateNewBuffer(size_t requestedCapacity) {
assert(alignment_ > 0);
assert((alignment_ & (alignment_ - 1)) == 0);
size_t size = Roundup(requestedCapacity, alignment_);
buf_.reset(new char[size + alignment_]);
char* p = buf_.get();
bufstart_ = reinterpret_cast<char*>(
(reinterpret_cast<uintptr_t>(p)+(alignment_ - 1)) &
~static_cast<uintptr_t>(alignment_ - 1));
capacity_ = size;
cursize_ = 0;
}
// Used for write
// Returns the number of bytes appended
size_t Append(const char* src, size_t append_size) {
size_t buffer_remaining = capacity_ - cursize_;
size_t to_copy = std::min(append_size, buffer_remaining);
if (to_copy > 0) {
memcpy(bufstart_ + cursize_, src, to_copy);
cursize_ += to_copy;
}
return to_copy;
}
size_t Read(char* dest, size_t offset, size_t read_size) const {
assert(offset < cursize_);
size_t to_read = std::min(cursize_ - offset, read_size);
if (to_read > 0) {
memcpy(dest, bufstart_ + offset, to_read);
}
return to_read;
}
/// Pad to alignment
void PadToAlignmentWith(int padding) {
size_t total_size = Roundup(cursize_, alignment_);
size_t pad_size = total_size - cursize_;
if (pad_size > 0) {
assert((pad_size + cursize_) <= capacity_);
memset(bufstart_ + cursize_, padding, pad_size);
cursize_ += pad_size;
}
}
// After a partial flush move the tail to the beginning of the buffer
void RefitTail(size_t tail_offset, size_t tail_size) {
if (tail_size > 0) {
memmove(bufstart_, bufstart_ + tail_offset, tail_size);
}
cursize_ = tail_size;
}
// Returns place to start writing
char* GetDestination() {
return bufstart_ + cursize_;
}
void SetSize(size_t cursize) {
cursize_ = cursize;
}
};
}

@ -148,6 +148,9 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status Truncate(uint64_t size) override {
return base_->Truncate(size);
}
Status Close() override {
// SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN)
@ -185,6 +188,7 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override { return base_->Close(); }
Status Flush() override { return base_->Flush(); }
Status Sync() override {
@ -225,6 +229,7 @@ class SpecialEnv : public EnvWrapper {
#endif
return s;
}
Status Truncate(uint64_t size) override { return base_->Truncate(size); }
Status Close() override { return base_->Close(); }
Status Flush() override { return base_->Flush(); }
Status Sync() override {

@ -494,6 +494,12 @@ class PosixMmapFile : public WritableFile {
return Status::OK();
}
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override {
Status s;
size_t unused = limit_ - dst_;
@ -624,6 +630,12 @@ class PosixWritableFile : public WritableFile {
return Status::OK();
}
// Means Close() will properly take care of truncate
// and it does not need any additional information
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override {
Status s;

@ -989,6 +989,7 @@ TEST_F(EnvPosixTest, WritableFileWrapper) {
}
Status Append(const Slice& data) override { inc(1); return Status::OK(); }
Status Truncate(uint64_t size) override { return Status::OK(); }
Status Close() override { inc(2); return Status::OK(); }
Status Flush() override { inc(3); return Status::OK(); }
Status Sync() override { inc(4); return Status::OK(); }

@ -59,81 +59,116 @@ Status WritableFileWriter::Append(const Slice& data) {
TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
}
// if there is no space in the cache, then flush
if (cursize_ + left > capacity_) {
s = Flush();
if (!s.ok()) {
return s;
// Flush only when I/O is buffered
if (use_os_buffer_ &&
(buf_.GetCapacity() - buf_.GetCurrentSize()) < left) {
if (buf_.GetCurrentSize() > 0) {
s = Flush();
if (!s.ok()) {
return s;
}
}
// Increase the buffer size, but capped at 1MB
if (capacity_ < (1 << 20)) {
capacity_ *= 2;
buf_.reset(new char[capacity_]);
if (buf_.GetCapacity() < (1 << 20)) {
size_t desiredCapacity = buf_.GetCapacity() * 2;
desiredCapacity = std::min(desiredCapacity, size_t(1 << 20));
buf_.AllocateNewBuffer(desiredCapacity);
}
assert(cursize_ == 0);
assert(buf_.GetCurrentSize() == 0);
}
// if the write fits into the cache, then write to cache
// otherwise do a write() syscall to write to OS buffers.
if (cursize_ + left <= capacity_) {
memcpy(buf_.get() + cursize_, src, left);
cursize_ += left;
} else {
while (left != 0) {
size_t size = RequestToken(left);
{
IOSTATS_TIMER_GUARD(write_nanos);
s = writable_file_->Append(Slice(src, size));
// We never write directly to disk with unbuffered I/O on.
// or we simply use it for its original purpose to accumulate many small
// chunks
if (!use_os_buffer_ || (buf_.GetCapacity() >= left)) {
while (left > 0) {
size_t appended = buf_.Append(src, left);
left -= appended;
src += appended;
if (left > 0) {
s = Flush();
if (!s.ok()) {
return s;
break;
}
}
IOSTATS_ADD(bytes_written, size);
TEST_KILL_RANDOM(rocksdb_kill_odds);
left -= size;
src += size;
// We double the buffer here because
// Flush calls do not keep up with the incoming bytes
// This is the only place when buffer is changed with unbuffered I/O
if (buf_.GetCapacity() < (1 << 20)) {
size_t desiredCapacity = buf_.GetCapacity() * 2;
desiredCapacity = std::min(desiredCapacity, size_t(1 << 20));
buf_.AllocateNewBuffer(desiredCapacity);
}
}
}
} else {
// Writing directly to file bypassing the buffer
assert(buf_.GetCurrentSize() == 0);
s = WriteBuffered(src, left);
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
filesize_ += data.size();
return Status::OK();
}
Status WritableFileWriter::Close() {
// Do not quit immediately on failure the file MUST be closed
Status s;
s = Flush(); // flush cache to OS
if (!s.ok()) {
// Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough
// Windows when pre-allocating does not fill with zeros
// also with unbuffered access we also set the end of data.
if (!writable_file_) {
return s;
}
s = Flush(); // flush cache to OS
// In unbuffered mode we write whole pages so
// we need to let the file know where data ends.
Status interim = writable_file_->Truncate(filesize_);
if (!interim.ok() && s.ok()) {
s = interim;
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
return writable_file_->Close();
interim = writable_file_->Close();
if (!interim.ok() && s.ok()) {
s = interim;
}
writable_file_.reset();
return s;
}
// write out the cached data to the OS cache
Status WritableFileWriter::Flush() {
Status s;
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
size_t left = cursize_;
char* src = buf_.get();
while (left != 0) {
size_t size = RequestToken(left);
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
Status s = writable_file_->Append(Slice(src, size));
if (!s.ok()) {
return s;
}
if (buf_.GetCurrentSize() > 0) {
if (use_os_buffer_) {
s = WriteBuffered(buf_.GetBufferStart(), buf_.GetCurrentSize());
} else {
s = WriteUnbuffered();
}
if (!s.ok()) {
return s;
}
IOSTATS_ADD(bytes_written, size);
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
left -= size;
src += size;
}
cursize_ = 0;
writable_file_->Flush();
s = writable_file_->Flush();
if (!s.ok()) {
return s;
}
// sync OS cache to disk for every bytes_per_sync_
// TODO: give log file and sst file different options (log
@ -147,21 +182,21 @@ Status WritableFileWriter::Flush() {
// Xfs does neighbor page flushing outside of the specified ranges. We
// need to make sure sync range is far from the write offset.
if (!direct_io_ && bytes_per_sync_) {
uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced.
uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced.
const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
if (filesize_ > kBytesNotSyncRange) {
uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
assert(offset_sync_to >= last_sync_size_);
if (offset_sync_to > 0 &&
offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
last_sync_size_ = offset_sync_to;
}
}
}
return Status::OK();
return s;
}
Status WritableFileWriter::Sync(bool use_fsync) {
@ -214,27 +249,140 @@ Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) {
return writable_file_->RangeSync(offset, nbytes);
}
size_t WritableFileWriter::RequestToken(size_t bytes) {
size_t WritableFileWriter::RequestToken(size_t bytes, bool align) {
Env::IOPriority io_priority;
if (rate_limiter_&&(io_priority = writable_file_->GetIOPriority()) <
if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) <
Env::IO_TOTAL) {
bytes = std::min(bytes,
static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
bytes = std::min(
bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
if (align) {
// Here we may actually require more than burst and block
// but we can not write less than one page at a time on unbuffered
// thus we may want not to use ratelimiter s
size_t alignment = buf_.GetAlignment();
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
rate_limiter_->Request(bytes, io_priority);
}
return bytes;
}
// This method writes to disk the specified data and makes use of the rate
// limiter if available
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
Status s;
assert(use_os_buffer_);
const char* src = data;
size_t left = size;
while (left > 0) {
size_t allowed = RequestToken(left, false);
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
s = writable_file_->Append(Slice(src, allowed));
if (!s.ok()) {
return s;
}
}
IOSTATS_ADD(bytes_written, allowed);
TEST_KILL_RANDOM(rocksdb_kill_odds);
left -= allowed;
src += allowed;
}
buf_.SetSize(0);
return s;
}
// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.
// We always use RateLimiter if available. We move (Refit) any buffer bytes
// that are left over the
// whole number of pages to be written again on the next flush because we can
// only write on aligned
// offsets.
Status WritableFileWriter::WriteUnbuffered() {
Status s;
assert(!use_os_buffer_);
const size_t alignment = buf_.GetAlignment();
assert((next_write_offset_ % alignment) == 0);
// Calculate whole page final file advance if all writes succeed
size_t file_advance =
TruncateToPageBoundary(alignment, buf_.GetCurrentSize());
// Calculate the leftover tail, we write it here padded with zeros BUT we
// will write
// it again in the future either on Close() OR when the current whole page
// fills out
size_t leftover_tail = buf_.GetCurrentSize() - file_advance;
// Round up and pad
buf_.PadToAlignmentWith(0);
const char* src = buf_.GetBufferStart();
uint64_t write_offset = next_write_offset_;
size_t left = buf_.GetCurrentSize();
while (left > 0) {
// Check how much is allowed
size_t size = RequestToken(left, true);
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
// Unbuffered writes must be positional
s = writable_file_->Append(Slice(src, size), write_offset);
if (!s.ok()) {
buf_.SetSize(file_advance + leftover_tail);
return s;
}
}
IOSTATS_ADD(bytes_written, size);
left -= size;
src += size;
write_offset += size;
assert((next_write_offset_ % alignment) == 0);
}
if (s.ok()) {
// Move the tail to the beginning of the buffer
// This never happens during normal Append but rather during
// explicit call to Flush()/Sync() or Close()
buf_.RefitTail(file_advance, leftover_tail);
// This is where we start writing next time which may or not be
// the actual file size on disk. They match if the buffer size
// is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind
next_write_offset_ += file_advance;
}
return s;
}
namespace {
class ReadaheadRandomAccessFile : public RandomAccessFile {
public:
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile> file,
size_t readahead_size)
: file_(std::move(file)),
readahead_size_(readahead_size),
buffer_(new char[readahead_size_]),
buffer_offset_(0),
buffer_len_(0) {}
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
readahead_size_(readahead_size),
forward_calls_(file_->ReaderWriterForward()),
buffer_(new char[readahead_size_]),
buffer_offset_(0),
buffer_len_(0) {}
ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete;
ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete;
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
@ -242,14 +390,22 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
return file_->Read(offset, n, result, scratch);
}
// On Windows in unbuffered mode this will lead to double buffering
// and double locking so we avoid that.
// In normal mode Windows caches so much data from disk that we do
// not need readahead.
if (forward_calls_) {
return file_->Read(offset, n, result, scratch);
}
std::unique_lock<std::mutex> lk(lock_);
size_t copied = 0;
// if offset between [buffer_offset_, buffer_offset_ + buffer_len>
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
uint64_t offset_in_buffer = offset - buffer_offset_;
copied = std::min(static_cast<uint64_t>(buffer_len_) - offset_in_buffer,
static_cast<uint64_t>(n));
copied = std::min(static_cast<uint64_t>(buffer_len_)-offset_in_buffer,
static_cast<uint64_t>(n));
memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
if (copied == n) {
// fully cached
@ -259,7 +415,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
}
Slice readahead_result;
Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
buffer_.get());
buffer_.get());
if (!s.ok()) {
return s;
}
@ -290,20 +446,20 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
private:
std::unique_ptr<RandomAccessFile> file_;
size_t readahead_size_;
size_t readahead_size_;
const bool forward_calls_;
mutable std::mutex lock_;
mutable std::mutex lock_;
mutable std::unique_ptr<char[]> buffer_;
mutable uint64_t buffer_offset_;
mutable size_t buffer_len_;
mutable uint64_t buffer_offset_;
mutable size_t buffer_len_;
};
} // namespace
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile> file, size_t readahead_size) {
std::unique_ptr<ReadaheadRandomAccessFile> wrapped_file(
new ReadaheadRandomAccessFile(std::move(file), readahead_size));
return std::move(wrapped_file);
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size) {
return std::make_unique<ReadaheadRandomAccessFile> (
std::move(file), readahead_size);
}
} // namespace rocksdb

@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/env.h"
#include "util/aligned_buffer.h"
#include "port/port.h"
namespace rocksdb {
@ -15,7 +17,7 @@ class Statistics;
class HistogramImpl;
std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
std::unique_ptr<RandomAccessFile> file, size_t readahead_size);
std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
class SequentialFileReader {
private:
@ -24,6 +26,19 @@ class SequentialFileReader {
public:
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file)
: file_(std::move(_file)) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
file_ = std::move(o.file_);
return *this;
}
SequentialFileReader(SequentialFileReader&) = delete;
SequentialFileReader& operator=(SequentialFileReader&) = delete;
Status Read(size_t n, Slice* result, char* scratch);
Status Skip(uint64_t n);
@ -34,10 +49,10 @@ class SequentialFileReader {
class RandomAccessFileReader : public RandomAccessFile {
private:
std::unique_ptr<RandomAccessFile> file_;
Env* env_;
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
Env* env_;
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
public:
explicit RandomAccessFileReader(std::unique_ptr<RandomAccessFile>&& raf,
@ -51,6 +66,22 @@ class RandomAccessFileReader : public RandomAccessFile {
hist_type_(hist_type),
file_read_hist_(file_read_hist) {}
RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{
file_ = std::move(o.file_);
env_ = std::move(o.env_);
stats_ = std::move(o.stats_);
hist_type_ = std::move(o.hist_type_);
file_read_hist_ = std::move(o.file_read_hist_);
return *this;
}
RandomAccessFileReader(const RandomAccessFileReader&) = delete;
RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const;
RandomAccessFile* file() { return file_.get(); }
@ -60,33 +91,47 @@ class RandomAccessFileReader : public RandomAccessFile {
class WritableFileWriter {
private:
std::unique_ptr<WritableFile> writable_file_;
size_t cursize_; // current size of cached data in buf_
size_t capacity_; // max size of buf_
unique_ptr<char[]> buf_; // a buffer to cache writes
uint64_t filesize_;
bool pending_sync_;
bool pending_fsync_;
bool direct_io_;
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
AlignedBuffer buf_;
// Actually written data size can be used for truncate
// not counting padding data
uint64_t filesize_;
// This is necessary when we use unbuffered access
// and writes must happen on aligned offsets
// so we need to go back and write that page again
uint64_t next_write_offset_;
bool pending_sync_;
bool pending_fsync_;
const bool direct_io_;
const bool use_os_buffer_;
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
public:
explicit WritableFileWriter(std::unique_ptr<WritableFile>&& file,
const EnvOptions& options)
WritableFileWriter(std::unique_ptr<WritableFile>&& file,
const EnvOptions& options)
: writable_file_(std::move(file)),
cursize_(0),
capacity_(65536),
buf_(new char[capacity_]),
buf_(),
filesize_(0),
next_write_offset_(0),
pending_sync_(false),
pending_fsync_(false),
direct_io_(writable_file_->UseDirectIO()),
use_os_buffer_(writable_file_->UseOSBuffer()),
last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter) {}
rate_limiter_(options.rate_limiter) {
buf_.SetAlignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(65536);
}
WritableFileWriter(const WritableFileWriter&) = delete;
WritableFileWriter& operator=(const WritableFileWriter&) = delete;
~WritableFileWriter() { Close(); }
~WritableFileWriter() { Flush(); }
Status Append(const Slice& data);
Status Flush();
@ -109,8 +154,13 @@ class WritableFileWriter {
WritableFile* writable_file() const { return writable_file_.get(); }
private:
// Used when os buffering is OFF and we are writing
// DMA such as in Windows unbuffered mode
Status WriteUnbuffered();
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(off_t offset, off_t nbytes);
size_t RequestToken(size_t bytes);
size_t RequestToken(size_t bytes, bool align);
Status SyncInternal(bool use_fsync);
};
} // namespace rocksdb

@ -24,6 +24,9 @@ TEST_F(WritableFileWriterTest, RangeSync) {
size_ += data.size();
return Status::OK();
}
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
Status Close() override {
EXPECT_GE(size_, last_synced_ + kMb);
EXPECT_LT(size_, last_synced_ + 2 * kMb);

@ -232,7 +232,9 @@ class WritableFileImpl : public WritableFile {
virtual Status Append(const Slice& data) override {
return file_->Append(data);
}
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override { return Status::OK(); }
virtual Status Sync() override { return Status::OK(); }

@ -250,7 +250,9 @@ class MockWritableFile : public WritableFile {
}
return Status::OK();
}
virtual Status Truncate(uint64_t size) override {
return Status::OK();
}
virtual Status Close() override { return file_->Fsync(); }
virtual Status Flush() override { return Status::OK(); }

@ -184,6 +184,10 @@ class StringSink: public WritableFile {
const std::string& contents() const { return contents_; }
virtual Status Truncate(uint64_t size) override {
contents_.resize(size);
return Status::OK();
}
virtual Status Close() override { return Status::OK(); }
virtual Status Flush() override {
if (reader_contents_ != nullptr) {

Loading…
Cancel
Save