From ebc8a799801d1eed26d9042c95b879fc83aa0b3f Mon Sep 17 00:00:00 2001 From: Yulia Kartseva Date: Sat, 18 Feb 2017 11:54:49 -0800 Subject: [PATCH] alignment is on in ReadaheadRandomAccessFile::Read() Summary: Closes https://github.com/facebook/rocksdb/pull/1857 Differential Revision: D4534518 Pulled By: wat-ze-hex fbshipit-source-id: b456946 --- util/file_reader_writer.cc | 74 +++++++++++++++------- util/file_reader_writer_test.cc | 106 +++++++++++++++++++++++++++++++- 2 files changed, 157 insertions(+), 23 deletions(-) diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index 642864598..0184bed74 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -431,13 +431,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { ReadaheadRandomAccessFile(std::unique_ptr&& file, size_t readahead_size) : file_(std::move(file)), - readahead_size_(readahead_size), + alignment_(file_->GetRequiredBufferAlignment()), + readahead_size_(Roundup(readahead_size, alignment_)), forward_calls_(file_->ShouldForwardRawRequest()), buffer_(), buffer_offset_(0), buffer_len_(0) { if (!forward_calls_) { - buffer_.reset(new char[readahead_size_]); + buffer_.Alignment(alignment_); + buffer_.AllocateNewBuffer(readahead_size_ + alignment_); } else if (readahead_size_ > 0) { file_->EnableReadAhead(); } @@ -463,31 +465,45 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { std::unique_lock lk(lock_); - size_t copied = 0; - // if offset between [buffer_offset_, buffer_offset_ + buffer_len> - if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) { - uint64_t offset_in_buffer = offset - buffer_offset_; - copied = std::min(buffer_len_ - static_cast(offset_in_buffer), n); - memcpy(scratch, buffer_.get() + offset_in_buffer, copied); - if (copied == n) { - // fully cached - *result = Slice(scratch, n); - return Status::OK(); - } + size_t cached_len = 0; + // Check if there is a cache hit, means that [offset, offset + n) is either + // complitely or partially in the buffer + // If it's completely cached, including end of file case when offset + n is + // greater than EOF, return + if (TryReadFromCache_(offset, n, &cached_len, scratch) && + (cached_len == n || + // End of file + buffer_len_ < readahead_size_ + alignment_)) { + *result = Slice(scratch, cached_len); + return Status::OK(); } + size_t advanced_offset = offset + cached_len; + // In the case of cache hit advanced_offset is already aligned, means that + // chunk_offset equals to advanced_offset + size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); Slice readahead_result; - Status s = file_->Read(offset + copied, readahead_size_, &readahead_result, - buffer_.get()); + Status s = file_->Read(chunk_offset, readahead_size_ + alignment_, + &readahead_result, buffer_.BufferStart()); if (!s.ok()) { return s; } + // In the case of cache miss, i.e. when cached_len equals 0, an offset can + // exceed the file end position, so the following check is required + if (advanced_offset < chunk_offset + readahead_result.size()) { + // In the case of cache miss, the first chunk_padding bytes in buffer_ are + // stored for alignment only and must be skipped + size_t chunk_padding = advanced_offset - chunk_offset; + auto remaining_len = + std::min(readahead_result.size() - chunk_padding, n - cached_len); + memcpy(scratch + cached_len, readahead_result.data() + chunk_padding, + remaining_len); + *result = Slice(scratch, cached_len + remaining_len); + } else { + *result = Slice(scratch, cached_len); + } - auto left_to_copy = std::min(readahead_result.size(), n - copied); - memcpy(scratch + copied, readahead_result.data(), left_to_copy); - *result = Slice(scratch, copied + left_to_copy); - - if (readahead_result.data() == buffer_.get()) { - buffer_offset_ = offset + copied; + if (readahead_result.data() == buffer_.BufferStart()) { + buffer_offset_ = chunk_offset; buffer_len_ = readahead_result.size(); } else { buffer_len_ = 0; @@ -507,12 +523,26 @@ class ReadaheadRandomAccessFile : public RandomAccessFile { } private: + bool TryReadFromCache_(uint64_t offset, size_t n, size_t* cached_len, + char* scratch) const { + if (offset < buffer_offset_ || offset >= buffer_offset_ + buffer_len_) { + *cached_len = 0; + return false; + } + uint64_t offset_in_buffer = offset - buffer_offset_; + *cached_len = + std::min(buffer_len_ - static_cast(offset_in_buffer), n); + memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); + return true; + } + std::unique_ptr file_; + const size_t alignment_; size_t readahead_size_; const bool forward_calls_; mutable std::mutex lock_; - mutable std::unique_ptr buffer_; + mutable AlignedBuffer buffer_; mutable uint64_t buffer_offset_; mutable size_t buffer_len_; }; diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index af6b825f6..1294871ab 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -3,10 +3,12 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // -#include #include "util/file_reader_writer.h" +#include +#include #include "util/random.h" #include "util/testharness.h" +#include "util/testutil.h" namespace rocksdb { @@ -127,6 +129,108 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) { } #endif +class ReadaheadRandomAccessFileTest + : public testing::Test, + public testing::WithParamInterface { + public: + static std::vector GetReadaheadSizeList() { + return {1lu << 12, 1lu << 16}; + } + virtual void SetUp() override { + readahead_size_ = GetParam(); + scratch_.reset(new char[2 * readahead_size_]); + ResetSourceStr(); + } + ReadaheadRandomAccessFileTest() : control_contents_() {} + std::string Read(uint64_t offset, size_t n) { + Slice result; + test_read_holder_->Read(offset, n, &result, scratch_.get()); + return std::string(result.data(), result.size()); + } + void ResetSourceStr(const std::string& str = "") { + auto write_holder = std::unique_ptr( + test::GetWritableFileWriter(new test::StringSink(&control_contents_))); + write_holder->Append(Slice(str)); + write_holder->Flush(); + auto read_holder = std::unique_ptr( + new test::StringSource(control_contents_)); + test_read_holder_ = + NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_); + } + size_t GetReadaheadSize() const { return readahead_size_; } + + private: + size_t readahead_size_; + Slice control_contents_; + std::unique_ptr test_read_holder_; + std::unique_ptr scratch_; +}; + +TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStrTest) { + ASSERT_EQ("", Read(0, 1)); + ASSERT_EQ("", Read(0, 0)); + ASSERT_EQ("", Read(13, 13)); +} + +TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) { + std::string str = "abcdefghijklmnopqrs"; + ResetSourceStr(str); + ASSERT_EQ(str.substr(3, 4), Read(3, 4)); + ASSERT_EQ(str.substr(0, 3), Read(0, 3)); + ASSERT_EQ(str, Read(0, str.size())); + ASSERT_EQ(str.substr(7, std::min(static_cast(str.size()) - 7, 30)), + Read(7, 30)); + ASSERT_EQ("", Read(100, 100)); +} + +TEST_P(ReadaheadRandomAccessFileTest, + SourceStrLenCanBeGreaterThanReadaheadSizeTest) { + Random rng(42); + for (int k = 0; k < 100; ++k) { + size_t strLen = k * GetReadaheadSize() + + rng.Uniform(static_cast(GetReadaheadSize())); + std::string str = + test::RandomHumanReadableString(&rng, static_cast(strLen)); + ResetSourceStr(str); + for (int test = 1; test <= 100; ++test) { + size_t offset = rng.Uniform(static_cast(strLen)); + size_t n = rng.Uniform(static_cast(GetReadaheadSize())); + ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)), + Read(offset, n)); + } + } +} + +TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) { + Random rng(7); + size_t strLen = 4 * GetReadaheadSize() + + rng.Uniform(static_cast(GetReadaheadSize())); + std::string str = + test::RandomHumanReadableString(&rng, static_cast(strLen)); + ResetSourceStr(str); + for (int test = 1; test <= 100; ++test) { + size_t offset = rng.Uniform(static_cast(strLen)); + size_t n = + GetReadaheadSize() + rng.Uniform(static_cast(GetReadaheadSize())); + ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)), + Read(offset, n)); + } +} + +INSTANTIATE_TEST_CASE_P( + EmptySourceStrTest, ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + SourceStrLenCanBeGreaterThanReadaheadSizeTest, + ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); +INSTANTIATE_TEST_CASE_P( + NExceedReadaheadTest, ReadaheadRandomAccessFileTest, + ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList())); + } // namespace rocksdb int main(int argc, char** argv) {