alignment is on in ReadaheadRandomAccessFile::Read()

Summary: Closes https://github.com/facebook/rocksdb/pull/1857

Differential Revision: D4534518

Pulled By: wat-ze-hex

fbshipit-source-id: b456946
main
Yulia Kartseva 8 years ago committed by Facebook Github Bot
parent 381fd32247
commit ebc8a79980
  1. 72
      util/file_reader_writer.cc
  2. 106
      util/file_reader_writer_test.cc

@ -431,13 +431,15 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
ReadaheadRandomAccessFile(std::unique_ptr<RandomAccessFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
readahead_size_(readahead_size),
alignment_(file_->GetRequiredBufferAlignment()),
readahead_size_(Roundup(readahead_size, alignment_)),
forward_calls_(file_->ShouldForwardRawRequest()),
buffer_(),
buffer_offset_(0),
buffer_len_(0) {
if (!forward_calls_) {
buffer_.reset(new char[readahead_size_]);
buffer_.Alignment(alignment_);
buffer_.AllocateNewBuffer(readahead_size_ + alignment_);
} else if (readahead_size_ > 0) {
file_->EnableReadAhead();
}
@ -463,31 +465,45 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
std::unique_lock<std::mutex> lk(lock_);
size_t copied = 0;
// if offset between [buffer_offset_, buffer_offset_ + buffer_len>
if (offset >= buffer_offset_ && offset < buffer_len_ + buffer_offset_) {
uint64_t offset_in_buffer = offset - buffer_offset_;
copied = std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.get() + offset_in_buffer, copied);
if (copied == n) {
// fully cached
*result = Slice(scratch, n);
size_t cached_len = 0;
// Check if there is a cache hit, means that [offset, offset + n) is either
// complitely or partially in the buffer
// If it's completely cached, including end of file case when offset + n is
// greater than EOF, return
if (TryReadFromCache_(offset, n, &cached_len, scratch) &&
(cached_len == n ||
// End of file
buffer_len_ < readahead_size_ + alignment_)) {
*result = Slice(scratch, cached_len);
return Status::OK();
}
}
size_t advanced_offset = offset + cached_len;
// In the case of cache hit advanced_offset is already aligned, means that
// chunk_offset equals to advanced_offset
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
Slice readahead_result;
Status s = file_->Read(offset + copied, readahead_size_, &readahead_result,
buffer_.get());
Status s = file_->Read(chunk_offset, readahead_size_ + alignment_,
&readahead_result, buffer_.BufferStart());
if (!s.ok()) {
return s;
}
// In the case of cache miss, i.e. when cached_len equals 0, an offset can
// exceed the file end position, so the following check is required
if (advanced_offset < chunk_offset + readahead_result.size()) {
// In the case of cache miss, the first chunk_padding bytes in buffer_ are
// stored for alignment only and must be skipped
size_t chunk_padding = advanced_offset - chunk_offset;
auto remaining_len =
std::min(readahead_result.size() - chunk_padding, n - cached_len);
memcpy(scratch + cached_len, readahead_result.data() + chunk_padding,
remaining_len);
*result = Slice(scratch, cached_len + remaining_len);
} else {
*result = Slice(scratch, cached_len);
}
auto left_to_copy = std::min(readahead_result.size(), n - copied);
memcpy(scratch + copied, readahead_result.data(), left_to_copy);
*result = Slice(scratch, copied + left_to_copy);
if (readahead_result.data() == buffer_.get()) {
buffer_offset_ = offset + copied;
if (readahead_result.data() == buffer_.BufferStart()) {
buffer_offset_ = chunk_offset;
buffer_len_ = readahead_result.size();
} else {
buffer_len_ = 0;
@ -507,12 +523,26 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
}
private:
bool TryReadFromCache_(uint64_t offset, size_t n, size_t* cached_len,
char* scratch) const {
if (offset < buffer_offset_ || offset >= buffer_offset_ + buffer_len_) {
*cached_len = 0;
return false;
}
uint64_t offset_in_buffer = offset - buffer_offset_;
*cached_len =
std::min(buffer_len_ - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
return true;
}
std::unique_ptr<RandomAccessFile> file_;
const size_t alignment_;
size_t readahead_size_;
const bool forward_calls_;
mutable std::mutex lock_;
mutable std::unique_ptr<char[]> buffer_;
mutable AlignedBuffer buffer_;
mutable uint64_t buffer_offset_;
mutable size_t buffer_len_;
};

@ -3,10 +3,12 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include <vector>
#include "util/file_reader_writer.h"
#include <algorithm>
#include <vector>
#include "util/random.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
@ -127,6 +129,108 @@ TEST_F(WritableFileWriterTest, AppendStatusReturn) {
}
#endif
class ReadaheadRandomAccessFileTest
: public testing::Test,
public testing::WithParamInterface<size_t> {
public:
static std::vector<size_t> GetReadaheadSizeList() {
return {1lu << 12, 1lu << 16};
}
virtual void SetUp() override {
readahead_size_ = GetParam();
scratch_.reset(new char[2 * readahead_size_]);
ResetSourceStr();
}
ReadaheadRandomAccessFileTest() : control_contents_() {}
std::string Read(uint64_t offset, size_t n) {
Slice result;
test_read_holder_->Read(offset, n, &result, scratch_.get());
return std::string(result.data(), result.size());
}
void ResetSourceStr(const std::string& str = "") {
auto write_holder = std::unique_ptr<WritableFileWriter>(
test::GetWritableFileWriter(new test::StringSink(&control_contents_)));
write_holder->Append(Slice(str));
write_holder->Flush();
auto read_holder = std::unique_ptr<RandomAccessFile>(
new test::StringSource(control_contents_));
test_read_holder_ =
NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
}
size_t GetReadaheadSize() const { return readahead_size_; }
private:
size_t readahead_size_;
Slice control_contents_;
std::unique_ptr<RandomAccessFile> test_read_holder_;
std::unique_ptr<char[]> scratch_;
};
TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStrTest) {
ASSERT_EQ("", Read(0, 1));
ASSERT_EQ("", Read(0, 0));
ASSERT_EQ("", Read(13, 13));
}
TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) {
std::string str = "abcdefghijklmnopqrs";
ResetSourceStr(str);
ASSERT_EQ(str.substr(3, 4), Read(3, 4));
ASSERT_EQ(str.substr(0, 3), Read(0, 3));
ASSERT_EQ(str, Read(0, str.size()));
ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
Read(7, 30));
ASSERT_EQ("", Read(100, 100));
}
TEST_P(ReadaheadRandomAccessFileTest,
SourceStrLenCanBeGreaterThanReadaheadSizeTest) {
Random rng(42);
for (int k = 0; k < 100; ++k) {
size_t strLen = k * GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
std::string str =
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
ResetSourceStr(str);
for (int test = 1; test <= 100; ++test) {
size_t offset = rng.Uniform(static_cast<int>(strLen));
size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
Read(offset, n));
}
}
}
TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) {
Random rng(7);
size_t strLen = 4 * GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
std::string str =
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
ResetSourceStr(str);
for (int test = 1; test <= 100; ++test) {
size_t offset = rng.Uniform(static_cast<int>(strLen));
size_t n =
GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
Read(offset, n));
}
}
INSTANTIATE_TEST_CASE_P(
EmptySourceStrTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
SourceStrLenCanBeGreaterThanReadaheadSizeTest,
ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
NExceedReadaheadTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
} // namespace rocksdb
int main(int argc, char** argv) {

Loading…
Cancel
Save