Added support for sequential read-ahead file (#5580)

Summary:
Added support for sequential read-ahead file that can prefetch the read data and later serve it from internal cache buffer.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5580

Differential Revision: D16287082

Pulled By: elipoz

fbshipit-source-id: a3e7ad9643d377d39352ff63058ce050ec31dcf3
main
Eli Pozniansky 6 years ago committed by Facebook Github Bot
parent 699a569c52
commit 0f4d90e6e4
  1. 21
      test_util/testutil.h
  2. 212
      util/file_reader_writer.cc
  3. 18
      util/file_reader_writer.h
  4. 119
      util/file_reader_writer_test.cc

@ -492,13 +492,11 @@ inline std::string EncodeInt(uint64_t x) {
return result;
}
class StringEnv : public EnvWrapper {
public:
class SeqStringSource : public SequentialFile {
public:
explicit SeqStringSource(const std::string& data)
: data_(data), offset_(0) {}
~SeqStringSource() {}
~SeqStringSource() override {}
Status Read(size_t n, Slice* result, char* scratch) override {
std::string output;
if (offset_ < data_.size()) {
@ -527,6 +525,8 @@ class StringEnv : public EnvWrapper {
size_t offset_;
};
class StringEnv : public EnvWrapper {
public:
class StringSink : public WritableFile {
public:
explicit StringSink(std::string* contents)
@ -548,7 +548,7 @@ class StringEnv : public EnvWrapper {
};
explicit StringEnv(Env* t) : EnvWrapper(t) {}
virtual ~StringEnv() {}
~StringEnv() override {}
const std::string& GetContent(const std::string& f) { return files_[f]; }
@ -582,7 +582,8 @@ class StringEnv : public EnvWrapper {
const EnvOptions& /*options*/) override {
return Status::NotSupported();
}
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
Status NewWritableFile(const std::string& f,
std::unique_ptr<WritableFile>* r,
const EnvOptions& /*options*/) override {
auto iter = files_.find(f);
if (iter != files_.end()) {
@ -591,7 +592,8 @@ class StringEnv : public EnvWrapper {
r->reset(new StringSink(&files_[f]));
return Status::OK();
}
virtual Status NewDirectory(const std::string& /*name*/,
virtual Status NewDirectory(
const std::string& /*name*/,
std::unique_ptr<Directory>* /*result*/) override {
return Status::NotSupported();
}
@ -637,7 +639,8 @@ class StringEnv : public EnvWrapper {
return Status::NotSupported();
}
Status LinkFile(const std::string& /*s*/, const std::string& /*t*/) override {
Status LinkFile(const std::string& /*s*/,
const std::string& /*t*/) override {
return Status::NotSupported();
}
@ -645,7 +648,9 @@ class StringEnv : public EnvWrapper {
return Status::NotSupported();
}
Status UnlockFile(FileLock* /*l*/) override { return Status::NotSupported(); }
Status UnlockFile(FileLock* /*l*/) override {
return Status::NotSupported();
}
protected:
std::unordered_map<std::string, std::string> files_;

@ -639,6 +639,7 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
// Read-ahead only make sense if we have some slack left after reading
if (n + alignment_ >= readahead_size_) {
return file_->Read(offset, n, result, scratch);
}
@ -646,14 +647,13 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
std::unique_lock<std::mutex> lk(lock_);
size_t cached_len = 0;
// Check if there is a cache hit, means that [offset, offset + n) is either
// completely or partially in the buffer
// Check if there is a cache hit, meaning that [offset, offset + n) is either
// completely or partially in the buffer.
// If it's completely cached, including end of file case when offset + n is
// greater than EOF, return
// greater than EOF, then return.
if (TryReadFromCache(offset, n, &cached_len, scratch) &&
(cached_len == n ||
// End of file
buffer_.CurrentSize() < readahead_size_)) {
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
// We read exactly what we needed, or we hit end of file - return.
*result = Slice(scratch, cached_len);
return Status::OK();
}
@ -661,25 +661,14 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
// In the case of cache hit advanced_offset is already aligned, means that
// chunk_offset equals to advanced_offset
size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset);
Slice readahead_result;
Status s = ReadIntoBuffer(chunk_offset, readahead_size_);
if (s.ok()) {
// In the case of cache miss, i.e. when cached_len equals 0, an offset can
// exceed the file end position, so the following check is required
if (advanced_offset < chunk_offset + buffer_.CurrentSize()) {
// In the case of cache miss, the first chunk_padding bytes in buffer_
// are
// stored for alignment only and must be skipped
size_t chunk_padding = advanced_offset - chunk_offset;
auto remaining_len =
std::min(buffer_.CurrentSize() - chunk_padding, n - cached_len);
memcpy(scratch + cached_len, buffer_.BufferStart() + chunk_padding,
remaining_len);
// The data we need is now in cache, so we can safely read it
size_t remaining_len;
TryReadFromCache(advanced_offset, n - cached_len, &remaining_len,
scratch + cached_len);
*result = Slice(scratch, cached_len + remaining_len);
} else {
*result = Slice(scratch, cached_len);
}
}
return s;
}
@ -690,6 +679,9 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
// `Read()` assumes a smaller prefetch buffer indicates EOF was reached.
return Status::OK();
}
std::unique_lock<std::mutex> lk(lock_);
size_t offset_ = static_cast<size_t>(offset);
size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_);
if (prefetch_offset == buffer_offset_) {
@ -706,12 +698,18 @@ class ReadaheadRandomAccessFile : public RandomAccessFile {
void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
Status InvalidateCache(size_t offset, size_t length) override {
std::unique_lock<std::mutex> lk(lock_);
buffer_.Clear();
return file_->InvalidateCache(offset, length);
}
bool use_direct_io() const override { return file_->use_direct_io(); }
private:
// Tries to read from buffer_ n bytes starting at offset. If anything was read
// from the cache, it sets cached_len to the number of bytes actually read,
// copies these number of bytes to scratch and returns true.
// If nothing was read sets cached_len to 0 and returns false.
bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len,
char* scratch) const {
if (offset < buffer_offset_ ||
@ -726,6 +724,9 @@ private:
return true;
}
// Reads into buffer_ the next n bytes from file_ starting at offset.
// Can actually read less if EOF was reached.
// Returns the status of the read operastion on the file.
Status ReadIntoBuffer(uint64_t offset, size_t n) const {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
@ -742,14 +743,171 @@ private:
return s;
}
std::unique_ptr<RandomAccessFile> file_;
const std::unique_ptr<RandomAccessFile> file_;
const size_t alignment_;
size_t readahead_size_;
const size_t readahead_size_;
mutable std::mutex lock_;
// The buffer storing the prefetched data
mutable AlignedBuffer buffer_;
// The offset in file_, corresponding to data stored in buffer_
mutable uint64_t buffer_offset_;
};
// This class wraps a SequentialFile, exposing same API, with the differenece
// of being able to prefetch up to readahead_size bytes and then serve them
// from memory, avoiding the entire round-trip if, for example, the data for the
// file is actually remote.
class ReadaheadSequentialFile : public SequentialFile {
public:
ReadaheadSequentialFile(std::unique_ptr<SequentialFile>&& file,
size_t readahead_size)
: file_(std::move(file)),
alignment_(file_->GetRequiredBufferAlignment()),
readahead_size_(Roundup(readahead_size, alignment_)),
buffer_(),
buffer_offset_(0),
read_offset_(0) {
buffer_.Alignment(alignment_);
buffer_.AllocateNewBuffer(readahead_size_);
}
ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
Status Read(size_t n, Slice* result, char* scratch) override {
std::unique_lock<std::mutex> lk(lock_);
size_t cached_len = 0;
// Check if there is a cache hit, meaning that [offset, offset + n) is
// either completely or partially in the buffer. If it's completely cached,
// including end of file case when offset + n is greater than EOF, then
// return.
if (TryReadFromCache(n, &cached_len, scratch) &&
(cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
// We read exactly what we needed, or we hit end of file - return.
*result = Slice(scratch, cached_len);
return Status::OK();
}
n -= cached_len;
Status s;
// Read-ahead only make sense if we have some slack left after reading
if (n + alignment_ >= readahead_size_) {
s = file_->Read(n, result, scratch + cached_len);
if (s.ok()) {
read_offset_ += result->size();
*result = Slice(scratch, cached_len + result->size());
}
buffer_.Clear();
return s;
}
s = ReadIntoBuffer(readahead_size_);
if (s.ok()) {
// The data we need is now in cache, so we can safely read it
size_t remaining_len;
TryReadFromCache(n, &remaining_len, scratch + cached_len);
*result = Slice(scratch, cached_len + remaining_len);
}
return s;
}
Status Skip(uint64_t n) override {
std::unique_lock<std::mutex> lk(lock_);
Status s = Status::OK();
// First check if we need to skip already cached data
if (buffer_.CurrentSize() > 0) {
// Do we need to skip beyond cached data?
if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
// Yes. Skip whaterver is in memory and adjust offset accordingly
n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
read_offset_ = buffer_offset_ + buffer_.CurrentSize();
} else {
// No. The entire section to be skipped is entirely i cache.
read_offset_ += n;
n = 0;
}
}
if (n > 0) {
// We still need to skip more, so call the file API for skipping
s = file_->Skip(n);
if (s.ok()) {
read_offset_ += n;
}
buffer_.Clear();
}
return s;
}
Status PositionedRead(uint64_t offset, size_t n, Slice* result,
char* scratch) override {
return file_->PositionedRead(offset, n, result, scratch);
}
Status InvalidateCache(size_t offset, size_t length) override {
std::unique_lock<std::mutex> lk(lock_);
buffer_.Clear();
return file_->InvalidateCache(offset, length);
}
bool use_direct_io() const override { return file_->use_direct_io(); }
private:
// Tries to read from buffer_ n bytes. If anything was read from the cache, it
// sets cached_len to the number of bytes actually read, copies these number
// of bytes to scratch and returns true.
// If nothing was read sets cached_len to 0 and returns false.
bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
if (read_offset_ < buffer_offset_ ||
read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
*cached_len = 0;
return false;
}
uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
*cached_len = std::min(
buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
read_offset_ += *cached_len;
return true;
}
// Reads into buffer_ the next n bytes from file_.
// Can actually read less if EOF was reached.
// Returns the status of the read operastion on the file.
Status ReadIntoBuffer(size_t n) {
if (n > buffer_.Capacity()) {
n = buffer_.Capacity();
}
assert(IsFileSectorAligned(n, alignment_));
Slice result;
Status s = file_->Read(n, &result, buffer_.BufferStart());
if (s.ok()) {
buffer_offset_ = read_offset_;
buffer_.Size(result.size());
assert(buffer_.BufferStart() == result.data());
}
return s;
}
const std::unique_ptr<SequentialFile> file_;
const size_t alignment_;
const size_t readahead_size_;
std::mutex lock_;
// The buffer storing the prefetched data
AlignedBuffer buffer_;
// The offset in file_, corresponding to data stored in buffer_
uint64_t buffer_offset_;
// The offset up to which data was read from file_. In fact, it can be larger
// than the actual file size, since the file_->Skip(n) call doesn't return the
// actual number of bytes that were skipped, which can be less than n.
// This is not a problemm since read_offset_ is monotonically increasing and
// its only use is to figure out if next piece of data should be read from
// buffer_ or file_ directly.
uint64_t read_offset_;
};
} // namespace
Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
@ -866,6 +1024,14 @@ std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
return result;
}
std::unique_ptr<SequentialFile>
SequentialFileReader::NewReadaheadSequentialFile(
std::unique_ptr<SequentialFile>&& file, size_t readahead_size) {
std::unique_ptr<SequentialFile> result(
new ReadaheadSequentialFile(std::move(file), readahead_size));
return result;
}
Status NewWritableFile(Env* env, const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {

@ -43,12 +43,18 @@ class SequentialFileReader {
private:
std::unique_ptr<SequentialFile> file_;
std::string file_name_;
std::atomic<size_t> offset_; // read offset
std::atomic<size_t> offset_{0}; // read offset
public:
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
const std::string& _file_name)
: file_(std::move(_file)), file_name_(_file_name), offset_(0) {}
: file_(std::move(_file)), file_name_(_file_name) {}
explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
const std::string& _file_name,
size_t _readahead_size)
: file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)),
file_name_(_file_name) {}
SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
@ -66,13 +72,17 @@ class SequentialFileReader {
Status Skip(uint64_t n);
void Rewind();
SequentialFile* file() { return file_.get(); }
std::string file_name() { return file_name_; }
bool use_direct_io() const { return file_->use_direct_io(); }
private:
// NewReadaheadSequentialFile provides a wrapper over SequentialFile to
// always prefetch additional data with every read.
static std::unique_ptr<SequentialFile> NewReadaheadSequentialFile(
std::unique_ptr<SequentialFile>&& file, size_t readahead_size);
};
// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is

@ -275,7 +275,7 @@ TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSizeTest) {
}
TEST_P(ReadaheadRandomAccessFileTest,
SourceStrLenCanBeGreaterThanReadaheadSizeTest) {
SourceStrLenGreaterThanReadaheadSizeTest) {
Random rng(42);
for (int k = 0; k < 100; ++k) {
size_t strLen = k * GetReadaheadSize() +
@ -286,13 +286,13 @@ TEST_P(ReadaheadRandomAccessFileTest,
for (int test = 1; test <= 100; ++test) {
size_t offset = rng.Uniform(static_cast<int>(strLen));
size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
Read(offset, n));
}
}
}
TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) {
TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSizeTest) {
Random rng(7);
size_t strLen = 4 * GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
@ -303,7 +303,7 @@ TEST_P(ReadaheadRandomAccessFileTest, NExceedReadaheadTest) {
size_t offset = rng.Uniform(static_cast<int>(strLen));
size_t n =
GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
ASSERT_EQ(str.substr(offset, std::min(n, str.size() - offset)),
ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
Read(offset, n));
}
}
@ -315,13 +315,118 @@ INSTANTIATE_TEST_CASE_P(
SourceStrLenLessThanReadaheadSizeTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
SourceStrLenCanBeGreaterThanReadaheadSizeTest,
ReadaheadRandomAccessFileTest,
SourceStrLenGreaterThanReadaheadSizeTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
NExceedReadaheadTest, ReadaheadRandomAccessFileTest,
ReadExceedsReadaheadSizeTest, ReadaheadRandomAccessFileTest,
::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
class ReadaheadSequentialFileTest : public testing::Test,
public testing::WithParamInterface<size_t> {
public:
static std::vector<size_t> GetReadaheadSizeList() {
return {1lu << 12, 1lu << 16};
}
void SetUp() override {
readahead_size_ = GetParam();
scratch_.reset(new char[2 * readahead_size_]);
ResetSourceStr();
}
ReadaheadSequentialFileTest() {}
std::string Read(size_t n) {
Slice result;
test_read_holder_->Read(n, &result, scratch_.get());
return std::string(result.data(), result.size());
}
void Skip(size_t n) { test_read_holder_->Skip(n); }
void ResetSourceStr(const std::string& str = "") {
auto read_holder =
std::unique_ptr<SequentialFile>(new test::SeqStringSource(str));
test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
"test", readahead_size_));
}
size_t GetReadaheadSize() const { return readahead_size_; }
private:
size_t readahead_size_;
std::unique_ptr<SequentialFileReader> test_read_holder_;
std::unique_ptr<char[]> scratch_;
};
TEST_P(ReadaheadSequentialFileTest, EmptySourceStrTest) {
ASSERT_EQ("", Read(0));
ASSERT_EQ("", Read(1));
ASSERT_EQ("", Read(13));
}
TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSizeTest) {
std::string str = "abcdefghijklmnopqrs";
ResetSourceStr(str);
ASSERT_EQ(str.substr(0, 3), Read(3));
ASSERT_EQ(str.substr(3, 1), Read(1));
ASSERT_EQ(str.substr(4), Read(str.size()));
ASSERT_EQ("", Read(100));
}
TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSizeTest) {
Random rng(42);
for (int s = 0; s < 1; ++s) {
for (int k = 0; k < 100; ++k) {
size_t strLen = k * GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
std::string str =
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
ResetSourceStr(str);
size_t offset = 0;
for (int test = 1; test <= 100; ++test) {
size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
if (s && test % 2) {
Skip(n);
} else {
ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
}
offset = std::min(offset + n, strLen);
}
}
}
}
TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSizeTest) {
Random rng(42);
for (int s = 0; s < 1; ++s) {
for (int k = 0; k < 100; ++k) {
size_t strLen = k * GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
std::string str =
test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
ResetSourceStr(str);
size_t offset = 0;
for (int test = 1; test <= 100; ++test) {
size_t n = GetReadaheadSize() +
rng.Uniform(static_cast<int>(GetReadaheadSize()));
if (s && test % 2) {
Skip(n);
} else {
ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
}
offset = std::min(offset + n, strLen);
}
}
}
}
INSTANTIATE_TEST_CASE_P(
EmptySourceStrTest, ReadaheadSequentialFileTest,
::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
SourceStrLenLessThanReadaheadSizeTest, ReadaheadSequentialFileTest,
::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
SourceStrLenGreaterThanReadaheadSizeTest, ReadaheadSequentialFileTest,
::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
INSTANTIATE_TEST_CASE_P(
ReadExceedsReadaheadSizeTest, ReadaheadSequentialFileTest,
::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
} // namespace rocksdb
int main(int argc, char** argv) {

Loading…
Cancel
Save