Make direct I/O write use incremental buffer

Summary:
Currently for direct I/O, the large maximum buffer is always allocated. This will be wasteful if users flush the data in much smaller chunks. This diff fix this by changing the behavior of incremental buffer works. When we enlarge buffer, we try to copy the existing data in the buffer to the enlarged buffer, rather than flush the buffer first. This can make sure that no extra I/O is introduced because of buffer enlargement.
Closes https://github.com/facebook/rocksdb/pull/2403

Differential Revision: D5178403

Pulled By: siying

fbshipit-source-id: a8fe1e7304bdb8cab2973340022fe80ff83449fd
main
Siying Dong 8 years ago committed by Facebook Github Bot
parent 7a270069b3
commit 0175d58c3c
  1. 31
      util/aligned_buffer.h
  2. 31
      util/file_reader_writer.cc
  3. 4
      util/file_reader_writer.h
  4. 80
      util/file_reader_writer_test.cc

@ -97,20 +97,31 @@ public:
}
// Allocates a new buffer and sets bufstart_ to the aligned first byte
void AllocateNewBuffer(size_t requestedCapacity) {
void AllocateNewBuffer(size_t requested_capacity, bool copy_data = false) {
assert(alignment_ > 0);
assert((alignment_ & (alignment_ - 1)) == 0);
size_t size = Roundup(requestedCapacity, alignment_);
buf_.reset(new char[size + alignment_]);
if (copy_data && requested_capacity < cursize_) {
// If we are downsizing to a capacity that is smaller than the current
// data in the buffer. Ignore the request.
return;
}
char* p = buf_.get();
bufstart_ = reinterpret_cast<char*>(
(reinterpret_cast<uintptr_t>(p)+(alignment_ - 1)) &
~static_cast<uintptr_t>(alignment_ - 1));
capacity_ = size;
cursize_ = 0;
size_t new_capacity = Roundup(requested_capacity, alignment_);
char* new_buf = new char[new_capacity + alignment_];
char* new_bufstart = reinterpret_cast<char*>(
(reinterpret_cast<uintptr_t>(new_buf) + (alignment_ - 1)) &
~static_cast<uintptr_t>(alignment_ - 1));
if (copy_data) {
memcpy(new_bufstart, bufstart_, cursize_);
} else {
cursize_ = 0;
}
bufstart_ = new_bufstart;
capacity_ = new_capacity;
buf_.reset(new_buf);
}
// Used for write
// Returns the number of bytes appended

@ -124,6 +124,22 @@ Status WritableFileWriter::Append(const Slice& data) {
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
}
// See whether we need to enlarge the buffer to avoid the flush
if (buf_.Capacity() - buf_.CurrentSize() < left) {
for (size_t cap = buf_.Capacity();
cap < max_buffer_size_; // There is still room to increase
cap *= 2) {
// See whether the next available size is large enough.
// Buffer will never be increased to more than max_buffer_size_.
size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
if (desired_capacity - buf_.CurrentSize() >= left ||
(use_direct_io() && desired_capacity == max_buffer_size_)) {
buf_.AllocateNewBuffer(desired_capacity, true);
break;
}
}
}
// Flush only when buffered I/O
if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
if (buf_.CurrentSize() > 0) {
@ -132,12 +148,6 @@ Status WritableFileWriter::Append(const Slice& data) {
return s;
}
}
if (buf_.Capacity() < max_buffer_size_) {
size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
buf_.AllocateNewBuffer(desiredCapacity);
}
assert(buf_.CurrentSize() == 0);
}
@ -155,15 +165,6 @@ Status WritableFileWriter::Append(const Slice& data) {
if (!s.ok()) {
break;
}
// We double the buffer here because
// Flush calls do not keep up with the incoming bytes
// This is the only place when buffer is changed with direct I/O
if (buf_.Capacity() < max_buffer_size_) {
size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
buf_.AllocateNewBuffer(desiredCapacity);
}
}
}
} else {

@ -146,9 +146,7 @@ class WritableFileWriter {
rate_limiter_(options.rate_limiter),
stats_(stats) {
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
buf_.AllocateNewBuffer(use_direct_io()
? max_buffer_size_
: std::min((size_t)65536, max_buffer_size_));
buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
}
WritableFileWriter(const WritableFileWriter&) = delete;

@ -89,6 +89,86 @@ TEST_F(WritableFileWriterTest, RangeSync) {
writer->Close();
}
TEST_F(WritableFileWriterTest, IncrementalBuffer) {
class FakeWF : public WritableFile {
public:
explicit FakeWF(std::string* _file_data, bool _use_direct_io,
bool _no_flush)
: file_data_(_file_data),
use_direct_io_(_use_direct_io),
no_flush_(_no_flush) {}
~FakeWF() {}
Status Append(const Slice& data) override {
file_data_->append(data.data(), data.size());
size_ += data.size();
return Status::OK();
}
Status PositionedAppend(const Slice& data, uint64_t pos) override {
EXPECT_TRUE(pos % 512 == 0);
EXPECT_TRUE(data.size() % 512 == 0);
file_data_->resize(pos);
file_data_->append(data.data(), data.size());
size_ += data.size();
return Status::OK();
}
virtual Status Truncate(uint64_t size) override {
file_data_->resize(size);
return Status::OK();
}
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
Status Fsync() override { return Status::OK(); }
void SetIOPriority(Env::IOPriority pri) override {}
uint64_t GetFileSize() override { return size_; }
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {}
size_t GetUniqueId(char* id, size_t max_size) const override { return 0; }
Status InvalidateCache(size_t offset, size_t length) override {
return Status::OK();
}
bool use_direct_io() const override { return use_direct_io_; }
std::string* file_data_;
bool use_direct_io_;
bool no_flush_;
size_t size_ = 0;
};
Random r(301);
const int kNumAttempts = 50;
for (int attempt = 0; attempt < kNumAttempts; attempt++) {
bool no_flush = (attempt % 3 == 0);
EnvOptions env_options;
env_options.writable_file_max_buffer_size =
(attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
std::string actual;
unique_ptr<FakeWF> wf(new FakeWF(&actual, attempt % 2 == 1, no_flush));
unique_ptr<WritableFileWriter> writer(
new WritableFileWriter(std::move(wf), env_options));
std::string target;
for (int i = 0; i < 20; i++) {
uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
std::string random_string;
test::RandomString(&r, num, &random_string);
writer->Append(Slice(random_string.c_str(), num));
target.append(random_string.c_str(), num);
// In some attempts, flush in a chance of 1/10.
if (!no_flush && r.Uniform(10) == 0) {
writer->Flush();
}
}
writer->Flush();
writer->Close();
ASSERT_EQ(target.size(), actual.size());
ASSERT_EQ(target, actual);
}
}
#ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest, AppendStatusReturn) {
class FakeWF : public WritableFile {

Loading…
Cancel
Save