From 30e82d5c411d8d57c24fa98f955ab14f10e2a3d2 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Fri, 11 Sep 2015 09:57:02 -0700 Subject: [PATCH 1/2] Refactor to support file_reader_writer on Windows. Summary. A change https://reviews.facebook.net/differential/diff/224721/ Has attempted to move common functionality out of platform dependent code to a new facility called file_reader_writer. This includes: - perf counters - Buffering - RateLimiting However, the change did not attempt to refactor Windows code. To mitigate, we introduce new quering interfaces such as UseOSBuffer(), GetRequiredBufferAlignment() and ReaderWriterForward() for pure forwarding where required. Introduce WritableFile got a new method Truncate(). This is to communicate to the file as to how much data it has on close. - When space is pre-allocated on Linux it is filled with zeros implicitly, no such thing exist on Windows so we must truncate file on close. - When operating in unbuffered mode the last page is filled with zeros but we still want to truncate. Previously, Close() would take care of it but now buffer management is shifted to the wrappers and the file has no idea about the file true size. This means that Close() on the wrapper level must always include Truncate() as well as wrapper __dtor should call Close() and against double Close(). Move buffered/unbuffered write logic to the wrapper. Utilize Aligned buffer class. Adjust tests and implement Truncate() where necessary. Come up with reasonable defaults for new virtual interfaces. Forward calls for RandomAccessReadAhead class to avoid double buffering and locking (double locking in unbuffered mode on WIndows). --- db/db_bench.cc | 1 + db/fault_injection_test.cc | 1 + include/rocksdb/env.h | 32 ++- port/win/env_win.cc | 444 +++++++------------------------- util/aligned_buffer.h | 154 +++++++++++ util/db_test_util.h | 5 + util/env_posix.cc | 12 + util/env_test.cc | 1 + util/file_reader_writer.cc | 302 ++++++++++++++++------ util/file_reader_writer.h | 96 +++++-- util/file_reader_writer_test.cc | 3 + util/memenv.cc | 4 +- util/mock_env.cc | 4 +- util/testutil.h | 4 + 14 files changed, 610 insertions(+), 453 deletions(-) create mode 100644 util/aligned_buffer.h diff --git a/db/db_bench.cc b/db/db_bench.cc index ca05b2b20..a98e7c7d3 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -898,6 +898,7 @@ class ReportFileOpEnv : public EnvWrapper { return rv; } + Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } Status Sync() override { return target_->Sync(); } diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 87028e381..7ce18c8e7 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -141,6 +141,7 @@ class TestWritableFile : public WritableFile { FaultInjectionTestEnv* env); virtual ~TestWritableFile(); virtual Status Append(const Slice& data) override; + virtual Status Truncate(uint64_t size) override { return target_->Truncate(size); } virtual Status Close() override; virtual Status Flush() override; virtual Status Sync() override; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 093292f66..b346effba 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -393,6 +393,12 @@ class RandomAccessFile { virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const = 0; + // Used by the file_reader_writer to decide if the ReadAhead wrapper + // should simply forward the call and do not enact buffering or locking. + virtual bool ReaderWriterForward() const { + return false; + } + // Tries to get an unique ID for this file that will be the same each time // the file is opened (and will stay the same while the file is open). // Furthermore, it tries to make this ID at most "max_size" bytes. If such an @@ -413,7 +419,6 @@ class RandomAccessFile { // compatibility. }; - enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED }; virtual void Hint(AccessPattern pattern) {} @@ -438,7 +443,31 @@ class WritableFile { } virtual ~WritableFile(); + // Indicates if the class makes use of unbuffered I/O + virtual bool UseOSBuffer() const { + return true; + } + + // This is needed when you want to allocate + // AlignedBuffer for use with file I/O classes + // Used for unbuffered file I/O when UseOSBuffer() returns false + virtual size_t GetRequiredBufferAlignment() const { + return 4 * 1024; + } + virtual Status Append(const Slice& data) = 0; + + // Positional write for unbuffered access default forward + // to simple append as most of the tests are buffered by default + virtual Status Append(const Slice& /* data */, uint64_t /* offset */) { + return Status::NotSupported(); + } + + // Truncate is necessary to trim the file to the correct size + // before closing. It is not always possible to keep track of the file + // size due to whole pages writes. The behavior is undefined if called + // with other writes to follow. + virtual Status Truncate(uint64_t size) = 0; virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; // sync data @@ -839,6 +868,7 @@ class WritableFileWrapper : public WritableFile { explicit WritableFileWrapper(WritableFile* t) : target_(t) { } Status Append(const Slice& data) override { return target_->Append(data); } + Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } Status Sync() override { return target_->Sync(); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 70c68b8fb..c6d0cb3ca 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -30,6 +30,7 @@ #include "util/iostats_context_imp.h" #include "util/rate_limiter.h" #include "util/sync_point.h" +#include "util/aligned_buffer.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" @@ -161,15 +162,6 @@ inline int fsync(HANDLE hFile) { return 0; } -inline size_t TruncateToPageBoundary(size_t page_size, size_t s) { - s -= (s & (page_size - 1)); - assert((s % page_size) == 0); - return s; -} - -// Roundup x to a multiple of y -inline size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; } - // SetFileInformationByHandle() is capable of fast pre-allocates. // However, this does not change the file end position unless the file is // truncated and the pre-allocated space is not considered filled with zeros. @@ -492,7 +484,6 @@ class WinMmapFile : public WritableFile { size_t n = std::min(left, avail); memcpy(dst_, src, n); - IOSTATS_ADD(bytes_written, n); dst_ += n; src += n; left -= n; @@ -502,6 +493,12 @@ class WinMmapFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; @@ -612,94 +609,6 @@ class WinMmapFile : public WritableFile { } }; -// This class is to manage an aligned user -// allocated buffer for unbuffered I/O purposes -// though it does not make a difference if you need a buffer. -class AlignedBuffer { - const size_t alignment_; - std::unique_ptr buf_; - size_t capacity_; - size_t cursize_; - char* bufstart_; - - public: - explicit AlignedBuffer(size_t alignment) - : alignment_(alignment), capacity_(0), cursize_(0), bufstart_(nullptr) { - assert(alignment > 0); - assert((alignment & (alignment - 1)) == 0); - } - - size_t GetAlignment() const { return alignment_; } - - size_t GetCapacity() const { return capacity_; } - - size_t GetCurrentSize() const { return cursize_; } - - const char* GetBufferStart() const { return bufstart_; } - - void Clear() { cursize_ = 0; } - - // Allocates a new buffer and sets bufstart_ to the aligned first byte - void AllocateNewBuffer(size_t requestedCapacity) { - size_t size = Roundup(requestedCapacity, alignment_); - buf_.reset(new char[size + alignment_]); - - char* p = buf_.get(); - bufstart_ = reinterpret_cast( - (reinterpret_cast(p) + (alignment_ - 1)) & - ~static_cast(alignment_ - 1)); - capacity_ = size; - cursize_ = 0; - } - - // Used for write - // Returns the number of bytes appended - size_t Append(const char* src, size_t append_size) { - size_t buffer_remaining = capacity_ - cursize_; - size_t to_copy = std::min(append_size, buffer_remaining); - - if (to_copy > 0) { - memcpy(bufstart_ + cursize_, src, to_copy); - cursize_ += to_copy; - } - return to_copy; - } - - size_t Read(char* dest, size_t offset, size_t read_size) const { - assert(offset < cursize_); - size_t to_read = std::min(cursize_ - offset, read_size); - if (to_read > 0) { - memcpy(dest, bufstart_ + offset, to_read); - } - return to_read; - } - - /// Pad to alignment - void PadToAlignmentWith(int padding) { - size_t total_size = Roundup(cursize_, alignment_); - size_t pad_size = total_size - cursize_; - - if (pad_size > 0) { - assert((pad_size + cursize_) <= capacity_); - memset(bufstart_ + cursize_, padding, pad_size); - cursize_ += pad_size; - } - } - - // After a partial flush move the tail to the beginning of the buffer - void RefitTail(size_t tail_offset, size_t tail_size) { - if (tail_size > 0) { - memmove(bufstart_, bufstart_ + tail_offset, tail_size); - } - cursize_ = tail_size; - } - - // Returns place to start writing - char* GetDestination() { return bufstart_ + cursize_; } - - void SetSize(size_t cursize) { cursize_ = cursize; } -}; - class WinSequentialFile : public SequentialFile { private: const std::string filename_; @@ -734,7 +643,7 @@ class WinSequentialFile : public SequentialFile { // Windows ReadFile API accepts a DWORD. // While it is possible to read in a loop if n is > UINT_MAX // it is a highly unlikely case. - if (n > UINT_MAX) { + if (n > UINT_MAX) { return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); } @@ -747,8 +656,6 @@ class WinSequentialFile : public SequentialFile { return IOErrorFromWindowsError(filename_, GetLastError()); } - IOSTATS_ADD(bytes_read, r); - *result = Slice(scratch, r); return s; @@ -791,12 +698,13 @@ class WinRandomAccessFile : public RandomAccessFile { : filename_(fname), hFile_(hFile), use_os_buffer_(options.use_os_buffer), - buffer_(alignment), + buffer_(), buffered_start_(0) { assert(!options.use_mmap_reads); // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { + buffer_.SetAlignment(alignment); // Random read, no need in a big buffer // We read things in database blocks which are likely to be similar to // the alignment we use. @@ -854,11 +762,8 @@ class WinRandomAccessFile : public RandomAccessFile { } SSIZE_T read = 0; - { - IOSTATS_TIMER_GUARD(read_nanos); - read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, - start_page_start); - } + read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, + start_page_start); if (read > 0) { buffer_.SetSize(read); @@ -884,7 +789,6 @@ class WinRandomAccessFile : public RandomAccessFile { } } - IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left); *result = Slice(scratch, (r < 0) ? 0 : n - left); if (r < 0) { @@ -893,6 +797,10 @@ class WinRandomAccessFile : public RandomAccessFile { return s; } + virtual bool ReaderWriterForward() const override { + return true; + } + virtual void Hint(AccessPattern pattern) override {} virtual Status InvalidateCache(size_t offset, size_t length) override { @@ -915,33 +823,23 @@ class WinRandomAccessFile : public RandomAccessFile { class WinWritableFile : public WritableFile { private: const std::string filename_; - HANDLE hFile_; - AlignedBuffer buffer_; - - uint64_t filesize_; // How much data is actually written disk - uint64_t reservedsize_; // how far we have reserved space - - bool pending_sync_; - - RateLimiter* rate_limiter_; - - const bool use_os_buffer_; // Used to indicate unbuffered access, the file - // must be opened as unbuffered if false + HANDLE hFile_; + const bool use_os_buffer_; // Used to indicate unbuffered access, the file + const uint64_t alignment_; + // must be opened as unbuffered if false + uint64_t filesize_; // How much data is actually written disk + uint64_t reservedsize_; // how far we have reserved space public: WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, size_t capacity, const EnvOptions& options) : filename_(fname), hFile_(hFile), - buffer_(alignment), + use_os_buffer_(options.use_os_buffer), + alignment_(alignment), filesize_(0), - reservedsize_(0), - pending_sync_(false), - rate_limiter_(options.rate_limiter), - use_os_buffer_(options.use_os_buffer) { + reservedsize_(0) { assert(!options.use_mmap_writes); - - buffer_.AllocateNewBuffer(capacity); } ~WinWritableFile() { @@ -950,106 +848,84 @@ class WinWritableFile : public WritableFile { } } - virtual Status Append(const Slice& data) override { - const char* src = data.data(); + // Indicates if the class makes use of unbuffered I/O + virtual bool UseOSBuffer() const override { + return use_os_buffer_; + } - assert(data.size() < INT_MAX); + virtual size_t GetRequiredBufferAlignment() const override { + return alignment_; + } - size_t left = data.size(); - Status s; - pending_sync_ = true; + virtual Status Append(const Slice& data) override { - // This would call Alloc() if we are out of blocks - PrepareWrite(GetFileSize(), left); + // Used for buffered access ONLY + assert(use_os_buffer_); + assert(data.size() < std::numeric_limits::max()); - // Flush only when I/O is buffered - if (use_os_buffer_ && - (buffer_.GetCapacity() - buffer_.GetCurrentSize()) < left) { - if (buffer_.GetCurrentSize() > 0) { - s = Flush(); - if (!s.ok()) { - return s; - } - } + Status s; - if (buffer_.GetCapacity() < c_OneMB) { - size_t desiredCapacity = buffer_.GetCapacity() * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMB); - buffer_.AllocateNewBuffer(desiredCapacity); - } + DWORD bytesWritten = 0; + if (!WriteFile(hFile_, data.data(), + data.size(), &bytesWritten, NULL)) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to WriteFile: " + filename_, + lastError); + } else { + assert(size_t(bytesWritten) == data.size()); + filesize_ += data.size(); } - // We always use the internal buffer for the unbuffered I/O - // or we simply use it for its original purpose to accumulate many small - // chunks - if (!use_os_buffer_ || (buffer_.GetCapacity() >= left)) { - while (left > 0) { - size_t appended = buffer_.Append(src, left); - left -= appended; - src += appended; + return s; + } - if (left > 0) { - s = Flush(); - if (!s.ok()) { - break; - } + virtual Status Append(const Slice& data, uint64_t offset) override { + Status s; - size_t cursize = buffer_.GetCurrentSize(); - size_t capacity = buffer_.GetCapacity(); + SSIZE_T ret = pwrite(hFile_, data.data(), + data.size(), offset); - // We double the buffer here because - // Flush calls do not keep up with the incoming bytes - // This is the only place when buffer is changed with unbuffered I/O - if (cursize == 0 && capacity < c_OneMB) { - size_t desiredCapacity = capacity * 2; - desiredCapacity = std::min(desiredCapacity, c_OneMB); - buffer_.AllocateNewBuffer(desiredCapacity); - } - } - } + // Error break + if (ret < 0) { + auto lastError = GetLastError(); + s = IOErrorFromWindowsError( + "Failed to pwrite for: " + filename_, lastError); } else { - // Writing directly to file bypassing what is in the buffer - assert(buffer_.GetCurrentSize() == 0); - // Use rate limiter for normal I/O very large request if available - s = WriteBuffered(src, left); + // With positional write it is not clear at all + // if this actually extends the filesize + assert(size_t(ret) == data.size()); + filesize_ += data.size(); } + return s; + } + // Need to implement this so the file is truncated correctly + // when buffered and unbuffered mode + virtual Status Truncate(uint64_t size) override { + Status s = ftruncate(filename_, hFile_, size); + if (s.ok()) { + filesize_ = size; + } return s; } virtual Status Close() override { - Status s; - // If there is any data in the cache not written we need to deal with it - const size_t cursize = buffer_.GetCurrentSize(); - const uint64_t final_size = filesize_ + cursize; - - if (cursize > 0) { - // If OS buffering is on, we just flush the remainder, otherwise need - if (!use_os_buffer_) { - s = WriteUnbuffered(); - } else { - s = WriteBuffered(buffer_.GetBufferStart(), cursize); - } - } + Status s; - if (s.ok()) { - s = ftruncate(filename_, hFile_, final_size); - } + assert(INVALID_HANDLE_VALUE != hFile_); - // Sync data if buffer was flushed - if (s.ok() && (cursize > 0) && fsync(hFile_) < 0) { + if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_, - lastError); + lastError); } if (FALSE == ::CloseHandle(hFile_)) { - if (s.ok()) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, - lastError); - } + auto lastError = GetLastError(); + s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, + lastError); } hFile_ = INVALID_HANDLE_VALUE; @@ -1057,36 +933,18 @@ class WinWritableFile : public WritableFile { } // write out the cached data to the OS cache + // This is now taken care of the WritableFileWriter virtual Status Flush() override { - Status status; - - if (buffer_.GetCurrentSize() > 0) { - if (!use_os_buffer_) { - status = WriteUnbuffered(); - } else { - status = - WriteBuffered(buffer_.GetBufferStart(), buffer_.GetCurrentSize()); - if (status.ok()) { - buffer_.SetSize(0); - } - } - } - return status; + return Status::OK(); } virtual Status Sync() override { - Status s = Flush(); - if (!s.ok()) { - return s; - } - + Status s; // Calls flush buffers - if (pending_sync_ && fsync(hFile_) < 0) { + if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_, lastError); - } else { - pending_sync_ = false; } return s; } @@ -1094,7 +952,12 @@ class WinWritableFile : public WritableFile { virtual Status Fsync() override { return Sync(); } virtual uint64_t GetFileSize() override { - return filesize_ + buffer_.GetCurrentSize(); + // Double accounting now here with WritableFileWriter + // and this size will be wrong when unbuffered access is used + // but tests implement their own writable files and do not use WritableFileWrapper + // so we need to squeeze a square peg through + // a round hole here. + return filesize_; } virtual Status Allocate(off_t offset, off_t len) override { @@ -1104,7 +967,7 @@ class WinWritableFile : public WritableFile { // Make sure that we reserve an aligned amount of space // since the reservation block size is driven outside so we want // to check if we are ok with reservation here - size_t spaceToReserve = Roundup(offset + len, buffer_.GetAlignment()); + size_t spaceToReserve = Roundup(offset + len, alignment_); // Nothing to do if (spaceToReserve <= reservedsize_) { return status; @@ -1117,133 +980,6 @@ class WinWritableFile : public WritableFile { } return status; } - - private: - // This method writes to disk the specified data and makes use of the rate - // limiter - // if available - Status WriteBuffered(const char* data, size_t size) { - Status s; - assert(use_os_buffer_); - const char* src = data; - size_t left = size; - - size_t actually_written = 0; - - while (left > 0) { - size_t bytes_allowed = RequestToken(left, false); - - DWORD bytesWritten = 0; - if (!WriteFile(hFile_, src, bytes_allowed, &bytesWritten, NULL)) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to write buffered via rate_limiter: " + filename_, - lastError); - break; - } else { - actually_written += bytesWritten; - src += bytesWritten; - left -= bytesWritten; - } - } - - IOSTATS_ADD(bytes_written, actually_written); - filesize_ += actually_written; - - return s; - } - - // This flushes the accumulated data in the buffer. We pad data with zeros if - // necessary to the whole page. - // However, during automatic flushes padding would not be necessary. - // We always use RateLimiter if available. We move (Refit) any buffer bytes - // that are left over the - // whole number of pages to be written again on the next flush because we can - // only write on aligned - // offsets. - Status WriteUnbuffered() { - Status s; - - assert(!use_os_buffer_); - size_t alignment = buffer_.GetAlignment(); - assert((filesize_ % alignment) == 0); - - // Calculate whole page final file advance if all writes succeed - size_t file_advance = - TruncateToPageBoundary(alignment, buffer_.GetCurrentSize()); - - // Calculate the leftover tail, we write it here padded with zeros BUT we - // will write - // it again in the future either on Close() OR when the current whole page - // fills out - size_t leftover_tail = buffer_.GetCurrentSize() - file_advance; - - // Round up and pad - buffer_.PadToAlignmentWith(0); - - const char* src = buffer_.GetBufferStart(); - size_t left = buffer_.GetCurrentSize(); - uint64_t file_offset = filesize_; - size_t actually_written = 0; - - while (left > 0) { - // Request how much is allowed. If this is less than one alignment we may - // be blocking a lot on every write - // because we can not write less than one alignment (page) unit thus check - // the configuration. - size_t bytes_allowed = RequestToken(left, true); - SSIZE_T ret = pwrite(hFile_, buffer_.GetBufferStart() + actually_written, - bytes_allowed, file_offset); - - // Error break - if (ret < 0) { - auto lastError = GetLastError(); - s = IOErrorFromWindowsError( - "Failed to pwrite for unbuffered: " + filename_, lastError); - buffer_.SetSize(file_advance + leftover_tail); - break; - } - actually_written += ret; - file_offset += ret; - left -= ret; - } - - IOSTATS_ADD(bytes_written, actually_written); - - if (s.ok()) { - // Move the tail to the beginning of the buffer - // This never happens during normal Append but rather during - // explicit call to Flush()/Sync() or Close() - buffer_.RefitTail(file_advance, leftover_tail); - // This is where we start writing next time which may or not be - // the actual file size on disk. They match if the buffer size - // is a multiple of whole pages otherwise filesize_ is leftover_tail - // behind - filesize_ += file_advance; - } - return s; - } - - // This truncates the request to a single burst bytes - // and then goes through the request to make sure we are - // satisfied in the order of the I/O priority - size_t RequestToken(size_t bytes, bool align) const { - if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) { - bytes = std::min( - bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); - - if (align) { - // Here we may actually require more than burst and block - // but we can not write less than one page at a time on unbuffered - // thus we may want not to use ratelimiter s - size_t alignment = buffer_.GetAlignment(); - bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); - } - - rate_limiter_->Request(bytes, io_priority_); - } - return bytes; - } }; class WinDirectory : public Directory { @@ -2092,7 +1828,7 @@ class WinEnv : public Env { ThreadPool* thread_pool_; size_t thread_id_; // Thread count in the thread. - explicit BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) + BGThreadMetadata(ThreadPool* thread_pool, size_t thread_id) : thread_pool_(thread_pool), thread_id_(thread_id) {} }; diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h new file mode 100644 index 000000000..7fa80926c --- /dev/null +++ b/util/aligned_buffer.h @@ -0,0 +1,154 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include "port/port.h" + +namespace rocksdb { + +inline size_t TruncateToPageBoundary(size_t page_size, size_t s) { + s -= (s & (page_size - 1)); + assert((s % page_size) == 0); + return s; +} + +inline size_t Roundup(size_t x, size_t y) { + return ((x + y - 1) / y) * y; +} + +// This class is to manage an aligned user +// allocated buffer for unbuffered I/O purposes +// though it does not make a difference if you need a buffer. +class AlignedBuffer { + size_t alignment_; + std::unique_ptr buf_; + size_t capacity_; + size_t cursize_; + char* bufstart_; + +public: + AlignedBuffer() + : alignment_(), + capacity_(0), + cursize_(0), + bufstart_(nullptr) { + } + + AlignedBuffer(AlignedBuffer&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + AlignedBuffer& operator=(AlignedBuffer&& o) ROCKSDB_NOEXCEPT { + alignment_ = std::move(o.alignment_); + buf_ = std::move(o.buf_); + capacity_ = std::move(o.capacity_); + cursize_ = std::move(o.cursize_); + bufstart_ = std::move(o.bufstart_); + return *this; + } + + AlignedBuffer(const AlignedBuffer&) = delete; + + AlignedBuffer& operator=(const AlignedBuffer&) = delete; + + size_t GetAlignment() const { + return alignment_; + } + + size_t GetCapacity() const { + return capacity_; + } + + size_t GetCurrentSize() const { + return cursize_; + } + + const char* GetBufferStart() const { + return bufstart_; + } + + void Clear() { + cursize_ = 0; + } + + void SetAlignment(size_t alignment) { + assert(alignment > 0); + assert((alignment & (alignment - 1)) == 0); + alignment_ = alignment; + } + + // Allocates a new buffer and sets bufstart_ to the aligned first byte + void AllocateNewBuffer(size_t requestedCapacity) { + + assert(alignment_ > 0); + assert((alignment_ & (alignment_ - 1)) == 0); + + size_t size = Roundup(requestedCapacity, alignment_); + buf_.reset(new char[size + alignment_]); + + char* p = buf_.get(); + bufstart_ = reinterpret_cast( + (reinterpret_cast(p)+(alignment_ - 1)) & + ~static_cast(alignment_ - 1)); + capacity_ = size; + cursize_ = 0; + } + // Used for write + // Returns the number of bytes appended + size_t Append(const char* src, size_t append_size) { + size_t buffer_remaining = capacity_ - cursize_; + size_t to_copy = std::min(append_size, buffer_remaining); + + if (to_copy > 0) { + memcpy(bufstart_ + cursize_, src, to_copy); + cursize_ += to_copy; + } + return to_copy; + } + + size_t Read(char* dest, size_t offset, size_t read_size) const { + assert(offset < cursize_); + size_t to_read = std::min(cursize_ - offset, read_size); + if (to_read > 0) { + memcpy(dest, bufstart_ + offset, to_read); + } + return to_read; + } + + /// Pad to alignment + void PadToAlignmentWith(int padding) { + size_t total_size = Roundup(cursize_, alignment_); + size_t pad_size = total_size - cursize_; + + if (pad_size > 0) { + assert((pad_size + cursize_) <= capacity_); + memset(bufstart_ + cursize_, padding, pad_size); + cursize_ += pad_size; + } + } + + // After a partial flush move the tail to the beginning of the buffer + void RefitTail(size_t tail_offset, size_t tail_size) { + if (tail_size > 0) { + memmove(bufstart_, bufstart_ + tail_offset, tail_size); + } + cursize_ = tail_size; + } + + // Returns place to start writing + char* GetDestination() { + return bufstart_ + cursize_; + } + + void SetSize(size_t cursize) { + cursize_ = cursize; + } +}; +} diff --git a/util/db_test_util.h b/util/db_test_util.h index bb1bfa7cb..dfb2b3e6d 100644 --- a/util/db_test_util.h +++ b/util/db_test_util.h @@ -148,6 +148,9 @@ class SpecialEnv : public EnvWrapper { return base_->Append(data); } } + Status Truncate(uint64_t size) override { + return base_->Truncate(size); + } Status Close() override { // SyncPoint is not supported in Released Windows Mode. #if !(defined NDEBUG) || !defined(OS_WIN) @@ -185,6 +188,7 @@ class SpecialEnv : public EnvWrapper { return base_->Append(data); } } + Status Truncate(uint64_t size) override { return base_->Truncate(size); } Status Close() override { return base_->Close(); } Status Flush() override { return base_->Flush(); } Status Sync() override { @@ -225,6 +229,7 @@ class SpecialEnv : public EnvWrapper { #endif return s; } + Status Truncate(uint64_t size) override { return base_->Truncate(size); } Status Close() override { return base_->Close(); } Status Flush() override { return base_->Flush(); } Status Sync() override { diff --git a/util/env_posix.cc b/util/env_posix.cc index af2ab8e5d..b856f1d70 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -494,6 +494,12 @@ class PosixMmapFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; size_t unused = limit_ - dst_; @@ -624,6 +630,12 @@ class PosixWritableFile : public WritableFile { return Status::OK(); } + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } + virtual Status Close() override { Status s; diff --git a/util/env_test.cc b/util/env_test.cc index 0e4feabfd..6fe8ed80b 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -989,6 +989,7 @@ TEST_F(EnvPosixTest, WritableFileWrapper) { } Status Append(const Slice& data) override { inc(1); return Status::OK(); } + Status Truncate(uint64_t size) override { return Status::OK(); } Status Close() override { inc(2); return Status::OK(); } Status Flush() override { inc(3); return Status::OK(); } Status Sync() override { inc(4); return Status::OK(); } diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 140969843..453bb7461 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -59,81 +59,116 @@ Status WritableFileWriter::Append(const Slice& data) { TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); writable_file_->PrepareWrite(static_cast(GetFileSize()), left); } - // if there is no space in the cache, then flush - if (cursize_ + left > capacity_) { - s = Flush(); - if (!s.ok()) { - return s; + + // Flush only when I/O is buffered + if (use_os_buffer_ && + (buf_.GetCapacity() - buf_.GetCurrentSize()) < left) { + if (buf_.GetCurrentSize() > 0) { + s = Flush(); + if (!s.ok()) { + return s; + } } - // Increase the buffer size, but capped at 1MB - if (capacity_ < (1 << 20)) { - capacity_ *= 2; - buf_.reset(new char[capacity_]); + + if (buf_.GetCapacity() < (1 << 20)) { + size_t desiredCapacity = buf_.GetCapacity() * 2; + desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + buf_.AllocateNewBuffer(desiredCapacity); } - assert(cursize_ == 0); + assert(buf_.GetCurrentSize() == 0); } - // if the write fits into the cache, then write to cache - // otherwise do a write() syscall to write to OS buffers. - if (cursize_ + left <= capacity_) { - memcpy(buf_.get() + cursize_, src, left); - cursize_ += left; - } else { - while (left != 0) { - size_t size = RequestToken(left); - { - IOSTATS_TIMER_GUARD(write_nanos); - s = writable_file_->Append(Slice(src, size)); + // We never write directly to disk with unbuffered I/O on. + // or we simply use it for its original purpose to accumulate many small + // chunks + if (!use_os_buffer_ || (buf_.GetCapacity() >= left)) { + while (left > 0) { + size_t appended = buf_.Append(src, left); + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); if (!s.ok()) { - return s; + break; } - } - IOSTATS_ADD(bytes_written, size); - TEST_KILL_RANDOM(rocksdb_kill_odds); - left -= size; - src += size; + // We double the buffer here because + // Flush calls do not keep up with the incoming bytes + // This is the only place when buffer is changed with unbuffered I/O + if (buf_.GetCapacity() < (1 << 20)) { + size_t desiredCapacity = buf_.GetCapacity() * 2; + desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + buf_.AllocateNewBuffer(desiredCapacity); + } + } } + } else { + // Writing directly to file bypassing the buffer + assert(buf_.GetCurrentSize() == 0); + s = WriteBuffered(src, left); } + TEST_KILL_RANDOM(rocksdb_kill_odds); filesize_ += data.size(); return Status::OK(); } Status WritableFileWriter::Close() { + + // Do not quit immediately on failure the file MUST be closed Status s; - s = Flush(); // flush cache to OS - if (!s.ok()) { + + // Possible to close it twice now as we MUST close + // in __dtor, simply flushing is not enough + // Windows when pre-allocating does not fill with zeros + // also with unbuffered access we also set the end of data. + if (!writable_file_) { return s; } + s = Flush(); // flush cache to OS + + // In unbuffered mode we write whole pages so + // we need to let the file know where data ends. + Status interim = writable_file_->Truncate(filesize_); + if (!interim.ok() && s.ok()) { + s = interim; + } + TEST_KILL_RANDOM(rocksdb_kill_odds); - return writable_file_->Close(); + interim = writable_file_->Close(); + if (!interim.ok() && s.ok()) { + s = interim; + } + + writable_file_.reset(); + + return s; } + // write out the cached data to the OS cache Status WritableFileWriter::Flush() { + Status s; TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - size_t left = cursize_; - char* src = buf_.get(); - while (left != 0) { - size_t size = RequestToken(left); - { - IOSTATS_TIMER_GUARD(write_nanos); - TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); - Status s = writable_file_->Append(Slice(src, size)); - if (!s.ok()) { - return s; - } + + if (buf_.GetCurrentSize() > 0) { + if (use_os_buffer_) { + s = WriteBuffered(buf_.GetBufferStart(), buf_.GetCurrentSize()); + } else { + s = WriteUnbuffered(); + } + if (!s.ok()) { + return s; } - IOSTATS_ADD(bytes_written, size); - TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - left -= size; - src += size; } - cursize_ = 0; - writable_file_->Flush(); + s = writable_file_->Flush(); + + if (!s.ok()) { + return s; + } // sync OS cache to disk for every bytes_per_sync_ // TODO: give log file and sst file different options (log @@ -147,21 +182,21 @@ Status WritableFileWriter::Flush() { // Xfs does neighbor page flushing outside of the specified ranges. We // need to make sure sync range is far from the write offset. if (!direct_io_ && bytes_per_sync_) { - uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. - uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. + const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. + const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. if (filesize_ > kBytesNotSyncRange) { uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; assert(offset_sync_to >= last_sync_size_); if (offset_sync_to > 0 && offset_sync_to - last_sync_size_ >= bytes_per_sync_) { - RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); + s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); last_sync_size_ = offset_sync_to; } } } - return Status::OK(); + return s; } Status WritableFileWriter::Sync(bool use_fsync) { @@ -214,27 +249,140 @@ Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) { return writable_file_->RangeSync(offset, nbytes); } -size_t WritableFileWriter::RequestToken(size_t bytes) { +size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { Env::IOPriority io_priority; - if (rate_limiter_&&(io_priority = writable_file_->GetIOPriority()) < + if (rate_limiter_ && (io_priority = writable_file_->GetIOPriority()) < Env::IO_TOTAL) { - bytes = std::min(bytes, - static_cast(rate_limiter_->GetSingleBurstBytes())); + bytes = std::min( + bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); + + if (align) { + // Here we may actually require more than burst and block + // but we can not write less than one page at a time on unbuffered + // thus we may want not to use ratelimiter s + size_t alignment = buf_.GetAlignment(); + bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); + } rate_limiter_->Request(bytes, io_priority); } return bytes; } +// This method writes to disk the specified data and makes use of the rate +// limiter if available +Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { + Status s; + assert(use_os_buffer_); + const char* src = data; + size_t left = size; + + while (left > 0) { + size_t allowed = RequestToken(left, false); + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + s = writable_file_->Append(Slice(src, allowed)); + if (!s.ok()) { + return s; + } + } + + IOSTATS_ADD(bytes_written, allowed); + TEST_KILL_RANDOM(rocksdb_kill_odds); + + left -= allowed; + src += allowed; + } + buf_.SetSize(0); + return s; +} + + +// This flushes the accumulated data in the buffer. We pad data with zeros if +// necessary to the whole page. +// However, during automatic flushes padding would not be necessary. +// We always use RateLimiter if available. We move (Refit) any buffer bytes +// that are left over the +// whole number of pages to be written again on the next flush because we can +// only write on aligned +// offsets. +Status WritableFileWriter::WriteUnbuffered() { + Status s; + + assert(!use_os_buffer_); + const size_t alignment = buf_.GetAlignment(); + assert((next_write_offset_ % alignment) == 0); + + // Calculate whole page final file advance if all writes succeed + size_t file_advance = + TruncateToPageBoundary(alignment, buf_.GetCurrentSize()); + + // Calculate the leftover tail, we write it here padded with zeros BUT we + // will write + // it again in the future either on Close() OR when the current whole page + // fills out + size_t leftover_tail = buf_.GetCurrentSize() - file_advance; + + // Round up and pad + buf_.PadToAlignmentWith(0); + + const char* src = buf_.GetBufferStart(); + uint64_t write_offset = next_write_offset_; + size_t left = buf_.GetCurrentSize(); + + while (left > 0) { + // Check how much is allowed + size_t size = RequestToken(left, true); + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + // Unbuffered writes must be positional + s = writable_file_->Append(Slice(src, size), write_offset); + if (!s.ok()) { + buf_.SetSize(file_advance + leftover_tail); + return s; + } + } + + IOSTATS_ADD(bytes_written, size); + left -= size; + src += size; + write_offset += size; + assert((next_write_offset_ % alignment) == 0); + } + + if (s.ok()) { + // Move the tail to the beginning of the buffer + // This never happens during normal Append but rather during + // explicit call to Flush()/Sync() or Close() + buf_.RefitTail(file_advance, leftover_tail); + // This is where we start writing next time which may or not be + // the actual file size on disk. They match if the buffer size + // is a multiple of whole pages otherwise filesize_ is leftover_tail + // behind + next_write_offset_ += file_advance; + } + return s; +} + + namespace { class ReadaheadRandomAccessFile : public RandomAccessFile { public: - ReadaheadRandomAccessFile(std::unique_ptr file, - size_t readahead_size) - : file_(std::move(file)), - readahead_size_(readahead_size), - buffer_(new char[readahead_size_]), - buffer_offset_(0), - buffer_len_(0) {} + ReadaheadRandomAccessFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + readahead_size_(readahead_size), + forward_calls_(file_->ReaderWriterForward()), + buffer_(new char[readahead_size_]), + buffer_offset_(0), + buffer_len_(0) {} + + ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; + + ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { @@ -242,14 +390,22 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { return file_->Read(offset, n, result, scratch); } + // On Windows in unbuffered mode this will lead to double buffering + // and double locking so we avoid that. + // In normal mode Windows caches so much data from disk that we do + // not need readahead. + if (forward_calls_) { + return file_->Read(offset, n, result, scratch); + } + std::unique_lock lk(lock_); size_t copied = 0; // if offset between [buffer_offset_, buffer_offset_ + buffer_len> if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { uint64_t offset_in_buffer = offset - buffer_offset_; - copied = std::min(static_cast(buffer_len_) - offset_in_buffer, - static_cast(n)); + copied = std::min(static_cast(buffer_len_)-offset_in_buffer, + static_cast(n)); memcpy(scratch, buffer_.get() + offset_in_buffer, copied); if (copied == n) { // fully cached @@ -259,7 +415,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { } Slice readahead_result; Status s = file_->Read(offset + copied, readahead_size_, &readahead_result, - buffer_.get()); + buffer_.get()); if (!s.ok()) { return s; } @@ -290,20 +446,20 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { private: std::unique_ptr file_; - size_t readahead_size_; + size_t readahead_size_; + const bool forward_calls_; - mutable std::mutex lock_; + mutable std::mutex lock_; mutable std::unique_ptr buffer_; - mutable uint64_t buffer_offset_; - mutable size_t buffer_len_; + mutable uint64_t buffer_offset_; + mutable size_t buffer_len_; }; } // namespace std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr file, size_t readahead_size) { - std::unique_ptr wrapped_file( - new ReadaheadRandomAccessFile(std::move(file), readahead_size)); - return std::move(wrapped_file); + std::unique_ptr&& file, size_t readahead_size) { + return std::make_unique ( + std::move(file), readahead_size); } } // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 616d174a2..a6d52c881 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once #include "rocksdb/env.h" +#include "util/aligned_buffer.h" +#include "port/port.h" namespace rocksdb { @@ -15,7 +17,7 @@ class Statistics; class HistogramImpl; std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr file, size_t readahead_size); + std::unique_ptr&& file, size_t readahead_size); class SequentialFileReader { private: @@ -24,6 +26,19 @@ class SequentialFileReader { public: explicit SequentialFileReader(std::unique_ptr&& _file) : file_(std::move(_file)) {} + + SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + file_ = std::move(o.file_); + return *this; + } + + SequentialFileReader(SequentialFileReader&) = delete; + SequentialFileReader& operator=(SequentialFileReader&) = delete; + Status Read(size_t n, Slice* result, char* scratch); Status Skip(uint64_t n); @@ -34,10 +49,10 @@ class SequentialFileReader { class RandomAccessFileReader : public RandomAccessFile { private: std::unique_ptr file_; - Env* env_; - Statistics* stats_; - uint32_t hist_type_; - HistogramImpl* file_read_hist_; + Env* env_; + Statistics* stats_; + uint32_t hist_type_; + HistogramImpl* file_read_hist_; public: explicit RandomAccessFileReader(std::unique_ptr&& raf, @@ -51,6 +66,22 @@ class RandomAccessFileReader : public RandomAccessFile { hist_type_(hist_type), file_read_hist_(file_read_hist) {} + RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + RandomAccessFileReader& operator=(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT{ + file_ = std::move(o.file_); + env_ = std::move(o.env_); + stats_ = std::move(o.stats_); + hist_type_ = std::move(o.hist_type_); + file_read_hist_ = std::move(o.file_read_hist_); + return *this; + } + + RandomAccessFileReader(const RandomAccessFileReader&) = delete; + RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; + Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const; RandomAccessFile* file() { return file_.get(); } @@ -60,33 +91,47 @@ class RandomAccessFileReader : public RandomAccessFile { class WritableFileWriter { private: std::unique_ptr writable_file_; - size_t cursize_; // current size of cached data in buf_ - size_t capacity_; // max size of buf_ - unique_ptr buf_; // a buffer to cache writes - uint64_t filesize_; - bool pending_sync_; - bool pending_fsync_; - bool direct_io_; - uint64_t last_sync_size_; - uint64_t bytes_per_sync_; - RateLimiter* rate_limiter_; + AlignedBuffer buf_; + // Actually written data size can be used for truncate + // not counting padding data + uint64_t filesize_; + // This is necessary when we use unbuffered access + // and writes must happen on aligned offsets + // so we need to go back and write that page again + uint64_t next_write_offset_; + bool pending_sync_; + bool pending_fsync_; + const bool direct_io_; + const bool use_os_buffer_; + uint64_t last_sync_size_; + uint64_t bytes_per_sync_; + RateLimiter* rate_limiter_; public: - explicit WritableFileWriter(std::unique_ptr&& file, - const EnvOptions& options) + WritableFileWriter(std::unique_ptr&& file, + const EnvOptions& options) : writable_file_(std::move(file)), - cursize_(0), - capacity_(65536), - buf_(new char[capacity_]), + buf_(), filesize_(0), + next_write_offset_(0), pending_sync_(false), pending_fsync_(false), direct_io_(writable_file_->UseDirectIO()), + use_os_buffer_(writable_file_->UseOSBuffer()), last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), - rate_limiter_(options.rate_limiter) {} + rate_limiter_(options.rate_limiter) { + + buf_.SetAlignment(writable_file_->GetRequiredBufferAlignment()); + buf_.AllocateNewBuffer(65536); + } + + WritableFileWriter(const WritableFileWriter&) = delete; + + WritableFileWriter& operator=(const WritableFileWriter&) = delete; + + ~WritableFileWriter() { Close(); } - ~WritableFileWriter() { Flush(); } Status Append(const Slice& data); Status Flush(); @@ -109,8 +154,13 @@ class WritableFileWriter { WritableFile* writable_file() const { return writable_file_.get(); } private: + // Used when os buffering is OFF and we are writing + // DMA such as in Windows unbuffered mode + Status WriteUnbuffered(); + // Normal write + Status WriteBuffered(const char* data, size_t size); Status RangeSync(off_t offset, off_t nbytes); - size_t RequestToken(size_t bytes); + size_t RequestToken(size_t bytes, bool align); Status SyncInternal(bool use_fsync); }; } // namespace rocksdb diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 924c171b7..d1f0dcbec 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -24,6 +24,9 @@ TEST_F(WritableFileWriterTest, RangeSync) { size_ += data.size(); return Status::OK(); } + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } Status Close() override { EXPECT_GE(size_, last_synced_ + kMb); EXPECT_LT(size_, last_synced_ + 2 * kMb); diff --git a/util/memenv.cc b/util/memenv.cc index 72fdbd242..573737023 100644 --- a/util/memenv.cc +++ b/util/memenv.cc @@ -232,7 +232,9 @@ class WritableFileImpl : public WritableFile { virtual Status Append(const Slice& data) override { return file_->Append(data); } - + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } virtual Status Close() override { return Status::OK(); } virtual Status Flush() override { return Status::OK(); } virtual Status Sync() override { return Status::OK(); } diff --git a/util/mock_env.cc b/util/mock_env.cc index 088675071..409e16e3a 100644 --- a/util/mock_env.cc +++ b/util/mock_env.cc @@ -250,7 +250,9 @@ class MockWritableFile : public WritableFile { } return Status::OK(); } - + virtual Status Truncate(uint64_t size) override { + return Status::OK(); + } virtual Status Close() override { return file_->Fsync(); } virtual Status Flush() override { return Status::OK(); } diff --git a/util/testutil.h b/util/testutil.h index 67a2aafad..990a3ba81 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -184,6 +184,10 @@ class StringSink: public WritableFile { const std::string& contents() const { return contents_; } + virtual Status Truncate(uint64_t size) override { + contents_.resize(size); + return Status::OK(); + } virtual Status Close() override { return Status::OK(); } virtual Status Flush() override { if (reader_contents_ != nullptr) { From ddc8b44998077453f02fb8d4de73f23f56bc8d03 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Fri, 11 Sep 2015 17:36:48 -0700 Subject: [PATCH 2/2] Address code review comments both GH and internal Fix compilation issues on GCC/CLANG Address Windows Release test build issues due to Sync --- db/db_iter_test.cc | 3 ++ include/rocksdb/env.h | 17 +++++++---- port/win/env_win.cc | 16 +++++------ util/aligned_buffer.h | 16 +++++------ util/file_reader_writer.cc | 58 +++++++++++++++++++++----------------- util/file_reader_writer.h | 2 +- 6 files changed, 64 insertions(+), 48 deletions(-) diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 73ef91b6a..90ec90831 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -1884,6 +1884,8 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) { ASSERT_EQ(db_iter_->value().ToString(), "4"); } +#if !(defined NDEBUG) || !defined(OS_WIN) + TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { // Test Prev() when one child iterator is at its end but more rows // are added. @@ -2234,6 +2236,7 @@ TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +#endif // #if !(defined NDEBUG) || !defined(OS_WIN) } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index b346effba..6300a4703 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -395,7 +395,7 @@ class RandomAccessFile { // Used by the file_reader_writer to decide if the ReadAhead wrapper // should simply forward the call and do not enact buffering or locking. - virtual bool ReaderWriterForward() const { + virtual bool ShouldForwardRawRequest() const { return false; } @@ -448,18 +448,20 @@ class WritableFile { return true; } + const size_t c_DefaultPageSize = 4 * 1024; + // This is needed when you want to allocate // AlignedBuffer for use with file I/O classes // Used for unbuffered file I/O when UseOSBuffer() returns false virtual size_t GetRequiredBufferAlignment() const { - return 4 * 1024; + return c_DefaultPageSize; } virtual Status Append(const Slice& data) = 0; - // Positional write for unbuffered access default forward + // Positioned write for unbuffered access default forward // to simple append as most of the tests are buffered by default - virtual Status Append(const Slice& /* data */, uint64_t /* offset */) { + virtual Status PositionedAppend(const Slice& /* data */, uint64_t /* offset */) { return Status::NotSupported(); } @@ -467,7 +469,9 @@ class WritableFile { // before closing. It is not always possible to keep track of the file // size due to whole pages writes. The behavior is undefined if called // with other writes to follow. - virtual Status Truncate(uint64_t size) = 0; + virtual Status Truncate(uint64_t size) { + return Status::OK(); + } virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; // sync data @@ -868,6 +872,9 @@ class WritableFileWrapper : public WritableFile { explicit WritableFileWrapper(WritableFile* t) : target_(t) { } Status Append(const Slice& data) override { return target_->Append(data); } + Status PositionedAppend(const Slice& data, uint64_t offset) override { + return target_->PositionedAppend(data, offset); + } Status Truncate(uint64_t size) override { return target_->Truncate(size); } Status Close() override { return target_->Close(); } Status Flush() override { return target_->Flush(); } diff --git a/port/win/env_win.cc b/port/win/env_win.cc index c6d0cb3ca..2668ad1d2 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -704,7 +704,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { - buffer_.SetAlignment(alignment); + buffer_.Alignment(alignment); // Random read, no need in a big buffer // We read things in database blocks which are likely to be similar to // the alignment we use. @@ -734,7 +734,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Let's see if at least some of the requested data is already // in the buffer if (offset >= buffered_start_ && - offset < (buffered_start_ + buffer_.GetCurrentSize())) { + offset < (buffered_start_ + buffer_.CurrentSize())) { size_t buffer_offset = offset - buffered_start_; r = buffer_.Read(dest, buffer_offset, left); assert(r >= 0); @@ -747,7 +747,7 @@ class WinRandomAccessFile : public RandomAccessFile { // Still some left or none was buffered if (left > 0) { // Figure out the start/end offset for reading and amount to read - const size_t alignment = buffer_.GetAlignment(); + const size_t alignment = buffer_.Alignment(); const size_t start_page_start = TruncateToPageBoundary(alignment, offset); const size_t end_page_start = @@ -755,18 +755,18 @@ class WinRandomAccessFile : public RandomAccessFile { const size_t actual_bytes_toread = (end_page_start - start_page_start) + alignment; - if (buffer_.GetCapacity() < actual_bytes_toread) { + if (buffer_.Capacity() < actual_bytes_toread) { buffer_.AllocateNewBuffer(actual_bytes_toread); } else { buffer_.Clear(); } SSIZE_T read = 0; - read = pread(hFile_, buffer_.GetDestination(), actual_bytes_toread, + read = pread(hFile_, buffer_.Destination(), actual_bytes_toread, start_page_start); if (read > 0) { - buffer_.SetSize(read); + buffer_.Size(read); buffered_start_ = start_page_start; // Let's figure out how much we read from the users standpoint @@ -797,7 +797,7 @@ class WinRandomAccessFile : public RandomAccessFile { return s; } - virtual bool ReaderWriterForward() const override { + virtual bool ShouldForwardRawRequest() const override { return true; } @@ -880,7 +880,7 @@ class WinWritableFile : public WritableFile { return s; } - virtual Status Append(const Slice& data, uint64_t offset) override { + virtual Status PositionedAppend(const Slice& data, uint64_t offset) override { Status s; SSIZE_T ret = pwrite(hFile_, data.data(), diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index 7fa80926c..2244316fe 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -25,7 +25,7 @@ inline size_t Roundup(size_t x, size_t y) { // This class is to manage an aligned user // allocated buffer for unbuffered I/O purposes -// though it does not make a difference if you need a buffer. +// though can be used for any purpose. class AlignedBuffer { size_t alignment_; std::unique_ptr buf_; @@ -58,19 +58,19 @@ public: AlignedBuffer& operator=(const AlignedBuffer&) = delete; - size_t GetAlignment() const { + size_t Alignment() const { return alignment_; } - size_t GetCapacity() const { + size_t Capacity() const { return capacity_; } - size_t GetCurrentSize() const { + size_t CurrentSize() const { return cursize_; } - const char* GetBufferStart() const { + const char* BufferStart() const { return bufstart_; } @@ -78,7 +78,7 @@ public: cursize_ = 0; } - void SetAlignment(size_t alignment) { + void Alignment(size_t alignment) { assert(alignment > 0); assert((alignment & (alignment - 1)) == 0); alignment_ = alignment; @@ -143,11 +143,11 @@ public: } // Returns place to start writing - char* GetDestination() { + char* Destination() { return bufstart_ + cursize_; } - void SetSize(size_t cursize) { + void Size(size_t cursize) { cursize_ = cursize; } }; diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 453bb7461..86d70b62d 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -20,6 +20,11 @@ #include "util/sync_point.h" namespace rocksdb { + +namespace { + const size_t c_OneMb = (1 << 20); +} + Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { Status s = file_->Read(n, result, scratch); IOSTATS_ADD(bytes_read, result->size()); @@ -62,26 +67,26 @@ Status WritableFileWriter::Append(const Slice& data) { // Flush only when I/O is buffered if (use_os_buffer_ && - (buf_.GetCapacity() - buf_.GetCurrentSize()) < left) { - if (buf_.GetCurrentSize() > 0) { + (buf_.Capacity() - buf_.CurrentSize()) < left) { + if (buf_.CurrentSize() > 0) { s = Flush(); if (!s.ok()) { return s; } } - if (buf_.GetCapacity() < (1 << 20)) { - size_t desiredCapacity = buf_.GetCapacity() * 2; - desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + if (buf_.Capacity() < c_OneMb) { + size_t desiredCapacity = buf_.Capacity() * 2; + desiredCapacity = std::min(desiredCapacity, c_OneMb); buf_.AllocateNewBuffer(desiredCapacity); } - assert(buf_.GetCurrentSize() == 0); + assert(buf_.CurrentSize() == 0); } // We never write directly to disk with unbuffered I/O on. // or we simply use it for its original purpose to accumulate many small // chunks - if (!use_os_buffer_ || (buf_.GetCapacity() >= left)) { + if (!use_os_buffer_ || (buf_.Capacity() >= left)) { while (left > 0) { size_t appended = buf_.Append(src, left); left -= appended; @@ -96,16 +101,16 @@ Status WritableFileWriter::Append(const Slice& data) { // We double the buffer here because // Flush calls do not keep up with the incoming bytes // This is the only place when buffer is changed with unbuffered I/O - if (buf_.GetCapacity() < (1 << 20)) { - size_t desiredCapacity = buf_.GetCapacity() * 2; - desiredCapacity = std::min(desiredCapacity, size_t(1 << 20)); + if (buf_.Capacity() < (1 << 20)) { + size_t desiredCapacity = buf_.Capacity() * 2; + desiredCapacity = std::min(desiredCapacity, c_OneMb); buf_.AllocateNewBuffer(desiredCapacity); } } } } else { // Writing directly to file bypassing the buffer - assert(buf_.GetCurrentSize() == 0); + assert(buf_.CurrentSize() == 0); s = WriteBuffered(src, left); } @@ -153,9 +158,9 @@ Status WritableFileWriter::Flush() { Status s; TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2); - if (buf_.GetCurrentSize() > 0) { + if (buf_.CurrentSize() > 0) { if (use_os_buffer_) { - s = WriteBuffered(buf_.GetBufferStart(), buf_.GetCurrentSize()); + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); } else { s = WriteUnbuffered(); } @@ -260,7 +265,7 @@ size_t WritableFileWriter::RequestToken(size_t bytes, bool align) { // Here we may actually require more than burst and block // but we can not write less than one page at a time on unbuffered // thus we may want not to use ratelimiter s - size_t alignment = buf_.GetAlignment(); + size_t alignment = buf_.Alignment(); bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); } rate_limiter_->Request(bytes, io_priority); @@ -294,7 +299,7 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { left -= allowed; src += allowed; } - buf_.SetSize(0); + buf_.Size(0); return s; } @@ -311,25 +316,25 @@ Status WritableFileWriter::WriteUnbuffered() { Status s; assert(!use_os_buffer_); - const size_t alignment = buf_.GetAlignment(); + const size_t alignment = buf_.Alignment(); assert((next_write_offset_ % alignment) == 0); // Calculate whole page final file advance if all writes succeed size_t file_advance = - TruncateToPageBoundary(alignment, buf_.GetCurrentSize()); + TruncateToPageBoundary(alignment, buf_.CurrentSize()); // Calculate the leftover tail, we write it here padded with zeros BUT we // will write // it again in the future either on Close() OR when the current whole page // fills out - size_t leftover_tail = buf_.GetCurrentSize() - file_advance; + size_t leftover_tail = buf_.CurrentSize() - file_advance; // Round up and pad buf_.PadToAlignmentWith(0); - const char* src = buf_.GetBufferStart(); + const char* src = buf_.BufferStart(); uint64_t write_offset = next_write_offset_; - size_t left = buf_.GetCurrentSize(); + size_t left = buf_.CurrentSize(); while (left > 0) { // Check how much is allowed @@ -339,9 +344,9 @@ Status WritableFileWriter::WriteUnbuffered() { IOSTATS_TIMER_GUARD(write_nanos); TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); // Unbuffered writes must be positional - s = writable_file_->Append(Slice(src, size), write_offset); + s = writable_file_->PositionedAppend(Slice(src, size), write_offset); if (!s.ok()) { - buf_.SetSize(file_advance + leftover_tail); + buf_.Size(file_advance + leftover_tail); return s; } } @@ -375,7 +380,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { size_t readahead_size) : file_(std::move(file)), readahead_size_(readahead_size), - forward_calls_(file_->ReaderWriterForward()), + forward_calls_(file_->ShouldForwardRawRequest()), buffer_(new char[readahead_size_]), buffer_offset_(0), buffer_len_(0) {} @@ -404,7 +409,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { // if offset between [buffer_offset_, buffer_offset_ + buffer_len> if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { uint64_t offset_in_buffer = offset - buffer_offset_; - copied = std::min(static_cast(buffer_len_)-offset_in_buffer, + copied = std::min(static_cast(buffer_len_) - offset_in_buffer, static_cast(n)); memcpy(scratch, buffer_.get() + offset_in_buffer, copied); if (copied == n) { @@ -458,8 +463,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { std::unique_ptr NewReadaheadRandomAccessFile( std::unique_ptr&& file, size_t readahead_size) { - return std::make_unique ( - std::move(file), readahead_size); + std::unique_ptr result( + new ReadaheadRandomAccessFile(std::move(file), readahead_size)); + return result; } } // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index a6d52c881..f33965dc6 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -122,7 +122,7 @@ class WritableFileWriter { bytes_per_sync_(options.bytes_per_sync), rate_limiter_(options.rate_limiter) { - buf_.SetAlignment(writable_file_->GetRequiredBufferAlignment()); + buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); buf_.AllocateNewBuffer(65536); }