From 0175d58c3c8dc19e4e2126267c7995bf0a3c3106 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Tue, 13 Jun 2017 04:34:51 -0700 Subject: [PATCH] Make direct I/O write use incremental buffer Summary: Currently for direct I/O, the large maximum buffer is always allocated. This will be wasteful if users flush the data in much smaller chunks. This diff fix this by changing the behavior of incremental buffer works. When we enlarge buffer, we try to copy the existing data in the buffer to the enlarged buffer, rather than flush the buffer first. This can make sure that no extra I/O is introduced because of buffer enlargement. Closes https://github.com/facebook/rocksdb/pull/2403 Differential Revision: D5178403 Pulled By: siying fbshipit-source-id: a8fe1e7304bdb8cab2973340022fe80ff83449fd --- util/aligned_buffer.h | 31 ++++++++----- util/file_reader_writer.cc | 31 ++++++------- util/file_reader_writer.h | 4 +- util/file_reader_writer_test.cc | 80 +++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 28 deletions(-) diff --git a/util/aligned_buffer.h b/util/aligned_buffer.h index 5924879cc..0c2ec5059 100644 --- a/util/aligned_buffer.h +++ b/util/aligned_buffer.h @@ -97,20 +97,31 @@ public: } // Allocates a new buffer and sets bufstart_ to the aligned first byte - void AllocateNewBuffer(size_t requestedCapacity) { - + void AllocateNewBuffer(size_t requested_capacity, bool copy_data = false) { assert(alignment_ > 0); assert((alignment_ & (alignment_ - 1)) == 0); - size_t size = Roundup(requestedCapacity, alignment_); - buf_.reset(new char[size + alignment_]); + if (copy_data && requested_capacity < cursize_) { + // If we are downsizing to a capacity that is smaller than the current + // data in the buffer. Ignore the request. + return; + } - char* p = buf_.get(); - bufstart_ = reinterpret_cast( - (reinterpret_cast(p)+(alignment_ - 1)) & - ~static_cast(alignment_ - 1)); - capacity_ = size; - cursize_ = 0; + size_t new_capacity = Roundup(requested_capacity, alignment_); + char* new_buf = new char[new_capacity + alignment_]; + char* new_bufstart = reinterpret_cast( + (reinterpret_cast(new_buf) + (alignment_ - 1)) & + ~static_cast(alignment_ - 1)); + + if (copy_data) { + memcpy(new_bufstart, bufstart_, cursize_); + } else { + cursize_ = 0; + } + + bufstart_ = new_bufstart; + capacity_ = new_capacity; + buf_.reset(new_buf); } // Used for write // Returns the number of bytes appended diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 0e5d40f67..a29fe9715 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -124,6 +124,22 @@ Status WritableFileWriter::Append(const Slice& data) { writable_file_->PrepareWrite(static_cast(GetFileSize()), left); } + // See whether we need to enlarge the buffer to avoid the flush + if (buf_.Capacity() - buf_.CurrentSize() < left) { + for (size_t cap = buf_.Capacity(); + cap < max_buffer_size_; // There is still room to increase + cap *= 2) { + // See whether the next available size is large enough. + // Buffer will never be increased to more than max_buffer_size_. + size_t desired_capacity = std::min(cap * 2, max_buffer_size_); + if (desired_capacity - buf_.CurrentSize() >= left || + (use_direct_io() && desired_capacity == max_buffer_size_)) { + buf_.AllocateNewBuffer(desired_capacity, true); + break; + } + } + } + // Flush only when buffered I/O if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { if (buf_.CurrentSize() > 0) { @@ -132,12 +148,6 @@ Status WritableFileWriter::Append(const Slice& data) { return s; } } - - if (buf_.Capacity() < max_buffer_size_) { - size_t desiredCapacity = buf_.Capacity() * 2; - desiredCapacity = std::min(desiredCapacity, max_buffer_size_); - buf_.AllocateNewBuffer(desiredCapacity); - } assert(buf_.CurrentSize() == 0); } @@ -155,15 +165,6 @@ Status WritableFileWriter::Append(const Slice& data) { if (!s.ok()) { break; } - - // We double the buffer here because - // Flush calls do not keep up with the incoming bytes - // This is the only place when buffer is changed with direct I/O - if (buf_.Capacity() < max_buffer_size_) { - size_t desiredCapacity = buf_.Capacity() * 2; - desiredCapacity = std::min(desiredCapacity, max_buffer_size_); - buf_.AllocateNewBuffer(desiredCapacity); - } } } } else { diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 2823463e0..4a82e1ddc 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -146,9 +146,7 @@ class WritableFileWriter { rate_limiter_(options.rate_limiter), stats_(stats) { buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); - buf_.AllocateNewBuffer(use_direct_io() - ? max_buffer_size_ - : std::min((size_t)65536, max_buffer_size_)); + buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); } WritableFileWriter(const WritableFileWriter&) = delete; diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 02a2d3b59..00a32fc25 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -89,6 +89,86 @@ TEST_F(WritableFileWriterTest, RangeSync) { writer->Close(); } +TEST_F(WritableFileWriterTest, IncrementalBuffer) { + class FakeWF : public WritableFile { + public: + explicit FakeWF(std::string* _file_data, bool _use_direct_io, + bool _no_flush) + : file_data_(_file_data), + use_direct_io_(_use_direct_io), + no_flush_(_no_flush) {} + ~FakeWF() {} + + Status Append(const Slice& data) override { + file_data_->append(data.data(), data.size()); + size_ += data.size(); + return Status::OK(); + } + Status PositionedAppend(const Slice& data, uint64_t pos) override { + EXPECT_TRUE(pos % 512 == 0); + EXPECT_TRUE(data.size() % 512 == 0); + file_data_->resize(pos); + file_data_->append(data.data(), data.size()); + size_ += data.size(); + return Status::OK(); + } + + virtual Status Truncate(uint64_t size) override { + file_data_->resize(size); + return Status::OK(); + } + Status Close() override { return Status::OK(); } + Status Flush() override { return Status::OK(); } + Status Sync() override { return Status::OK(); } + Status Fsync() override { return Status::OK(); } + void SetIOPriority(Env::IOPriority pri) override {} + uint64_t GetFileSize() override { return size_; } + void GetPreallocationStatus(size_t* block_size, + size_t* last_allocated_block) override {} + size_t GetUniqueId(char* id, size_t max_size) const override { return 0; } + Status InvalidateCache(size_t offset, size_t length) override { + return Status::OK(); + } + bool use_direct_io() const override { return use_direct_io_; } + + std::string* file_data_; + bool use_direct_io_; + bool no_flush_; + size_t size_ = 0; + }; + + Random r(301); + const int kNumAttempts = 50; + for (int attempt = 0; attempt < kNumAttempts; attempt++) { + bool no_flush = (attempt % 3 == 0); + EnvOptions env_options; + env_options.writable_file_max_buffer_size = + (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024; + std::string actual; + unique_ptr wf(new FakeWF(&actual, attempt % 2 == 1, no_flush)); + unique_ptr writer( + new WritableFileWriter(std::move(wf), env_options)); + + std::string target; + for (int i = 0; i < 20; i++) { + uint32_t num = r.Skewed(16) * 100 + r.Uniform(100); + std::string random_string; + test::RandomString(&r, num, &random_string); + writer->Append(Slice(random_string.c_str(), num)); + target.append(random_string.c_str(), num); + + // In some attempts, flush in a chance of 1/10. + if (!no_flush && r.Uniform(10) == 0) { + writer->Flush(); + } + } + writer->Flush(); + writer->Close(); + ASSERT_EQ(target.size(), actual.size()); + ASSERT_EQ(target, actual); + } +} + #ifndef ROCKSDB_LITE TEST_F(WritableFileWriterTest, AppendStatusReturn) { class FakeWF : public WritableFile {