Improve direct IO range scan performance with readahead (#3884)

Summary:
This PR extends the improvements in #3282 to also work when using Direct IO.
We see **4.5X performance improvement** in seekrandom benchmark doing long range scans, when using direct reads, on flash.

**Description:**
This change improves the performance of iterators doing long range scans (e.g. big/full index or table scans in MyRocks) by using readahead and prefetching additional data on each disk IO, and storing in a local buffer. This prefetching is automatically enabled on noticing more than 2 IOs for the same table file during iteration. The readahead size starts with 8KB and is exponentially increased on each additional sequential IO, up to a max of 256 KB. This helps in cutting down the number of IOs needed to complete the range scan.

**Implementation Details:**
- Used `FilePrefetchBuffer` as the underlying buffer to store the readahead data. `FilePrefetchBuffer` can now take file_reader, readahead_size and max_readahead_size as input to the constructor, and automatically do readahead.
- `FilePrefetchBuffer::TryReadFromCache` can now call `FilePrefetchBuffer::Prefetch` if readahead is enabled.
- `AlignedBuffer` (which is the underlying store for `FilePrefetchBuffer`) now takes a few additional args in `AlignedBuffer::AllocateNewBuffer` to allow copying data from the old buffer.
- Made sure not to re-read partial chunks of data that were already available in the buffer, from device again.
- Fixed a couple of cases where `AlignedBuffer::cursize_` was not being properly kept up-to-date.

**Constraints:**
- Similar to #3282, this gets currently enabled only when ReadOptions.readahead_size = 0 (which is the default value).
- Since the prefetched data is stored in a temporary buffer allocated on heap, this could increase the memory usage if you have many iterators doing long range scans simultaneously.
- Enabled only for user reads, and disabled for compactions. Compaction reads are controlled by the options `use_direct_io_for_flush_and_compaction` and `compaction_readahead_size`, and the current feature takes precautions not to mess with them.

**Benchmarks:**
I used the same benchmark as used in #3282.
Data fill:
```
TEST_TMPDIR=/data/users/$USER/benchmarks/iter ./db_bench -benchmarks=fillrandom -num=1000000000 -compression_type="none" -level_compaction_dynamic_level_bytes
```

Do a long range scan: Seekrandom with large number of nexts
```
TEST_TMPDIR=/data/users/$USER/benchmarks/iter ./db_bench -benchmarks=seekrandom -use_direct_reads -duration=60 -num=1000000000 -use_existing_db -seek_nexts=10000 -statistics -histogram
```

```
Before:
seekrandom   :   37939.906 micros/op 26 ops/sec;   29.2 MB/s (1636 of 1999 found)
With this change:
seekrandom   :   8527.720 micros/op 117 ops/sec;  129.7 MB/s (6530 of 7999 found)
```
~4.5X perf improvement. Taken on an average of 3 runs.
Closes https://github.com/facebook/rocksdb/pull/3884

Differential Revision: D8082143

Pulled By: sagar0

fbshipit-source-id: 4d7a8561cbac03478663713df4d31ad2620253bb
main
Sagar Vemuri 7 years ago committed by Facebook Github Bot
parent 524c6e6b72
commit 7103559f49
  1. 1
      HISTORY.md
  2. 53
      table/block_based_table_reader.cc
  3. 26
      table/block_based_table_reader.h
  4. 16
      util/aligned_buffer.h
  5. 73
      util/file_reader_writer.cc
  6. 17
      util/file_reader_writer.h

@ -6,6 +6,7 @@
### New Features ### New Features
* Changes the format of index blocks by storing the key in their raw form rather than converting them to InternalKey. This saves 8 bytes per index key. The feature is backward compatbile but not forward compatible. It is disabled by default unless format_version 3 or above is used. * Changes the format of index blocks by storing the key in their raw form rather than converting them to InternalKey. This saves 8 bytes per index key. The feature is backward compatbile but not forward compatible. It is disabled by default unless format_version 3 or above is used.
* Improve the performance of iterators doing long range scans by using readahead, when using direct IO.
### Bug Fixes ### Bug Fixes
* fix deadlock with enable_pipelined_write=true and max_successive_merges > 0 * fix deadlock with enable_pipelined_write=true and max_successive_merges > 0

@ -1004,6 +1004,7 @@ void BlockBasedTable::SetupForCompaction() {
default: default:
assert(false); assert(false);
} }
rep_->for_compaction = true;
} }
std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties() std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
@ -1532,14 +1533,16 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
BlockIter* BlockBasedTable::NewDataBlockIterator( BlockIter* BlockBasedTable::NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const Slice& index_value, Rep* rep, const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter, bool is_index, bool key_includes_seq, BlockIter* input_iter, bool is_index, bool key_includes_seq,
GetContext* get_context) { GetContext* get_context,
FilePrefetchBuffer* prefetch_buffer) {
BlockHandle handle; BlockHandle handle;
Slice input = index_value; Slice input = index_value;
// We intentionally allow extra stuff in index_value so that we // We intentionally allow extra stuff in index_value so that we
// can add more features in the future. // can add more features in the future.
Status s = handle.DecodeFrom(&input); Status s = handle.DecodeFrom(&input);
return NewDataBlockIterator(rep, ro, handle, input_iter, is_index, return NewDataBlockIterator(rep, ro, handle, input_iter, is_index,
key_includes_seq, get_context, s); key_includes_seq, get_context, s,
prefetch_buffer);
} }
// Convert an index iterator value (i.e., an encoded BlockHandle) // Convert an index iterator value (i.e., an encoded BlockHandle)
@ -1549,7 +1552,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator(
BlockIter* BlockBasedTable::NewDataBlockIterator( BlockIter* BlockBasedTable::NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const BlockHandle& handle, Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
BlockIter* input_iter, bool is_index, bool key_includes_seq, BlockIter* input_iter, bool is_index, bool key_includes_seq,
GetContext* get_context, Status s) { GetContext* get_context, Status s, FilePrefetchBuffer* prefetch_buffer) {
PERF_TIMER_GUARD(new_table_block_iter_nanos); PERF_TIMER_GUARD(new_table_block_iter_nanos);
const bool no_io = (ro.read_tier == kBlockCacheTier); const bool no_io = (ro.read_tier == kBlockCacheTier);
@ -1560,7 +1563,7 @@ BlockIter* BlockBasedTable::NewDataBlockIterator(
if (rep->compression_dict_block) { if (rep->compression_dict_block) {
compression_dict = rep->compression_dict_block->data; compression_dict = rep->compression_dict_block->data;
} }
s = MaybeLoadDataBlockToCache(nullptr /*prefetch_buffer*/, rep, ro, handle, s = MaybeLoadDataBlockToCache(prefetch_buffer, rep, ro, handle,
compression_dict, &block, is_index, compression_dict, &block, is_index,
get_context); get_context);
} }
@ -1583,8 +1586,8 @@ BlockIter* BlockBasedTable::NewDataBlockIterator(
StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, StopWatch sw(rep->ioptions.env, rep->ioptions.statistics,
READ_BLOCK_GET_MICROS); READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile( s = ReadBlockFromFile(
rep->file.get(), nullptr /* prefetch_buffer */, rep->footer, ro, rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
handle, &block_value, rep->ioptions, rep->blocks_maybe_compressed, &block_value, rep->ioptions, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit); rep->table_options.read_amp_bytes_per_bit);
@ -1975,31 +1978,39 @@ void BlockBasedTableIterator::InitDataBlock() {
auto* rep = table_->get_rep(); auto* rep = table_->get_rep();
// Automatically prefetch additional data when a range scan (iterator) does // Automatically prefetch additional data when a range scan (iterator) does
// more than 2 sequential IOs. This is enabled only when // more than 2 sequential IOs. This is enabled only for user reads and when
// ReadOptions.readahead_size is 0. // ReadOptions.readahead_size is 0.
if (read_options_.readahead_size == 0) { if (!rep->for_compaction && read_options_.readahead_size == 0) {
if (num_file_reads_ < 2) {
num_file_reads_++; num_file_reads_++;
} else if (data_block_handle.offset() + if (num_file_reads_ > 2) {
if (!rep->file->use_direct_io() &&
(data_block_handle.offset() +
static_cast<size_t>(data_block_handle.size()) + static_cast<size_t>(data_block_handle.size()) +
kBlockTrailerSize > kBlockTrailerSize >
readahead_limit_) { readahead_limit_)) {
num_file_reads_++; // Buffered I/O
// Do not readahead more than kMaxReadaheadSize. // Discarding the return status of Prefetch calls intentionally, as we
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_); // can fallback to reading from disk if Prefetch fails.
table_->get_rep()->file->Prefetch(data_block_handle.offset(), rep->file->Prefetch(data_block_handle.offset(), readahead_size_);
readahead_size_); readahead_limit_ =
readahead_limit_ = static_cast<size_t>(data_block_handle.offset() static_cast<size_t>(data_block_handle.offset() + readahead_size_);
+ readahead_size_); // Keep exponentially increasing readahead size until
// Keep exponentially increasing readahead size until kMaxReadaheadSize. // kMaxReadaheadSize.
readahead_size_ *= 2; readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2);
} else if (rep->file->use_direct_io() && !prefetch_buffer_) {
// Direct I/O
// Let FilePrefetchBuffer take care of the readahead.
prefetch_buffer_.reset(new FilePrefetchBuffer(
rep->file.get(), kInitReadaheadSize, kMaxReadaheadSize));
}
} }
} }
BlockBasedTable::NewDataBlockIterator(rep, read_options_, data_block_handle, BlockBasedTable::NewDataBlockIterator(rep, read_options_, data_block_handle,
&data_block_iter_, is_index_, &data_block_iter_, is_index_,
key_includes_seq_, key_includes_seq_,
/* get_context */ nullptr, s); /* get_context */ nullptr, s,
prefetch_buffer_.get());
block_iter_points_to_real_block_ = true; block_iter_points_to_real_block_ = true;
} }
} }

@ -213,19 +213,16 @@ class BlockBasedTable : public TableReader {
Rep* get_rep() { return rep_; } Rep* get_rep() { return rep_; }
// input_iter: if it is not null, update this one and return it as Iterator // input_iter: if it is not null, update this one and return it as Iterator
static BlockIter* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, static BlockIter* NewDataBlockIterator(
const Slice& index_value, Rep* rep, const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter = nullptr, BlockIter* input_iter = nullptr, bool is_index = false,
bool is_index = false, bool key_includes_seq = true, GetContext* get_context = nullptr,
bool key_includes_seq = true, FilePrefetchBuffer* prefetch_buffer = nullptr);
GetContext* get_context = nullptr); static BlockIter* NewDataBlockIterator(
static BlockIter* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, Rep* rep, const ReadOptions& ro, const BlockHandle& block_hanlde,
const BlockHandle& block_hanlde, BlockIter* input_iter = nullptr, bool is_index = false,
BlockIter* input_iter = nullptr, bool key_includes_seq = true, GetContext* get_context = nullptr,
bool is_index = false, Status s = Status(), FilePrefetchBuffer* prefetch_buffer = nullptr);
bool key_includes_seq = true,
GetContext* get_context = nullptr,
Status s = Status());
class PartitionedIndexIteratorState; class PartitionedIndexIteratorState;
@ -505,6 +502,8 @@ struct BlockBasedTable::Rep {
bool blocks_maybe_compressed = true; bool blocks_maybe_compressed = true;
bool closed = false; bool closed = false;
bool for_compaction = false;
}; };
class BlockBasedTableIterator : public InternalIterator { class BlockBasedTableIterator : public InternalIterator {
@ -632,6 +631,7 @@ class BlockBasedTableIterator : public InternalIterator {
size_t readahead_size_ = kInitReadaheadSize; size_t readahead_size_ = kInitReadaheadSize;
size_t readahead_limit_ = 0; size_t readahead_limit_ = 0;
int num_file_reads_ = 0; int num_file_reads_ = 0;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -96,12 +96,19 @@ public:
alignment_ = alignment; alignment_ = alignment;
} }
// Allocates a new buffer and sets bufstart_ to the aligned first byte // Allocates a new buffer and sets bufstart_ to the aligned first byte.
void AllocateNewBuffer(size_t requested_capacity, bool copy_data = false) { // requested_capacity: requested new buffer capacity. This capacity will be
// rounded up based on alignment.
// copy_data: Copy data from old buffer to new buffer.
// copy_offset: Copy data from this offset in old buffer.
// copy_len: Number of bytes to copy.
void AllocateNewBuffer(size_t requested_capacity, bool copy_data = false,
uint64_t copy_offset = 0, size_t copy_len = 0) {
assert(alignment_ > 0); assert(alignment_ > 0);
assert((alignment_ & (alignment_ - 1)) == 0); assert((alignment_ & (alignment_ - 1)) == 0);
if (copy_data && requested_capacity < cursize_) { copy_len = copy_len > 0 ? copy_len : cursize_;
if (copy_data && requested_capacity < copy_len) {
// If we are downsizing to a capacity that is smaller than the current // If we are downsizing to a capacity that is smaller than the current
// data in the buffer. Ignore the request. // data in the buffer. Ignore the request.
return; return;
@ -114,7 +121,8 @@ public:
~static_cast<uintptr_t>(alignment_ - 1)); ~static_cast<uintptr_t>(alignment_ - 1));
if (copy_data) { if (copy_data) {
memcpy(new_bufstart, bufstart_, cursize_); memcpy(new_bufstart, bufstart_ + copy_offset, copy_len);
cursize_ = copy_len;
} else { } else {
cursize_ = 0; cursize_ = 0;
} }

@ -647,24 +647,85 @@ Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
uint64_t roundup_len = roundup_end - rounddown_offset; uint64_t roundup_len = roundup_end - rounddown_offset;
assert(roundup_len >= alignment); assert(roundup_len >= alignment);
assert(roundup_len % alignment == 0); assert(roundup_len % alignment == 0);
// Check if requested bytes are in the existing buffer_.
// If all bytes exist -- return.
// If only a few bytes exist -- reuse them & read only what is really needed.
// This is typically the case of incremental reading of data.
// If no bytes exist in buffer -- full pread.
Status s;
uint64_t chunk_offset_in_buffer = 0;
uint64_t chunk_len = 0;
bool copy_data_to_new_buffer = false;
if (buffer_len_ > 0 && offset >= buffer_offset_ &&
offset <= buffer_offset_ + buffer_len_) {
if (offset + n <= buffer_offset_ + buffer_len_) {
// All requested bytes are already in the buffer. So no need to Read
// again.
return s;
} else {
// Only a few requested bytes are in the buffer. memmove those chunk of
// bytes to the beginning, and memcpy them back into the new buffer if a
// new buffer is created.
chunk_offset_in_buffer = Rounddown(offset - buffer_offset_, alignment);
chunk_len = buffer_len_ - chunk_offset_in_buffer;
assert(chunk_offset_in_buffer % alignment == 0);
assert(chunk_len % alignment == 0);
copy_data_to_new_buffer = true;
}
}
// Create a new buffer only if current capacity is not sufficient, and memcopy
// bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
if (buffer_.Capacity() < roundup_len) {
buffer_.Alignment(alignment); buffer_.Alignment(alignment);
buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len)); buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
copy_data_to_new_buffer, chunk_offset_in_buffer,
chunk_len);
} else if (chunk_len > 0) {
// New buffer not needed. But memmove bytes from tail to the beginning since
// chunk_len is greater than 0.
buffer_.RefitTail(chunk_offset_in_buffer, chunk_len);
}
Slice result; Slice result;
Status s = reader->Read(rounddown_offset, static_cast<size_t>(roundup_len), s = reader->Read(rounddown_offset + chunk_len,
&result, buffer_.BufferStart()); static_cast<size_t>(roundup_len - chunk_len), &result,
buffer_.BufferStart() + chunk_len);
if (s.ok()) { if (s.ok()) {
buffer_offset_ = rounddown_offset; buffer_offset_ = rounddown_offset;
buffer_len_ = result.size(); buffer_len_ = chunk_len + result.size();
buffer_.Size(buffer_len_);
} }
return s; return s;
} }
bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n,
Slice* result) const { Slice* result) {
if (offset < buffer_offset_ || offset + n > buffer_offset_ + buffer_len_) { if (offset < buffer_offset_) {
return false; return false;
} }
// If the buffer contains only a few of the requested bytes:
// If readahead is enabled: prefetch the remaining bytes + readadhead bytes
// and satisfy the request.
// If readahead is not enabled: return false.
if (offset + n > buffer_offset_ + buffer_len_) {
if (readahead_size_ > 0) {
assert(file_reader_ != nullptr);
assert(max_readahead_size_ >= readahead_size_);
Status s = Prefetch(file_reader_, offset, n + readahead_size_);
if (!s.ok()) {
return false;
}
readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
} else {
return false;
}
}
uint64_t offset_in_buffer = offset - buffer_offset_; uint64_t offset_in_buffer = offset - buffer_offset_;
*result = Slice(buffer_.BufferStart() + offset_in_buffer, n); *result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
return true; return true;

@ -207,16 +207,29 @@ class WritableFileWriter {
Status SyncInternal(bool use_fsync); Status SyncInternal(bool use_fsync);
}; };
// FilePrefetchBuffer can automatically do the readahead if file_reader,
// readahead_size, and max_readahead_size are passed in.
// max_readahead_size should be greater than or equal to readahead_size.
// readahead_size will be doubled on every IO, until max_readahead_size.
class FilePrefetchBuffer { class FilePrefetchBuffer {
public: public:
FilePrefetchBuffer() : buffer_offset_(0), buffer_len_(0) {} FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
size_t readadhead_size = 0, size_t max_readahead_size = 0)
: buffer_offset_(0),
buffer_len_(0),
file_reader_(file_reader),
readahead_size_(readadhead_size),
max_readahead_size_(max_readahead_size) {}
Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n); Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n);
bool TryReadFromCache(uint64_t offset, size_t n, Slice* result) const; bool TryReadFromCache(uint64_t offset, size_t n, Slice* result);
private: private:
AlignedBuffer buffer_; AlignedBuffer buffer_;
uint64_t buffer_offset_; uint64_t buffer_offset_;
size_t buffer_len_; size_t buffer_len_;
RandomAccessFileReader* file_reader_;
size_t readahead_size_;
size_t max_readahead_size_;
}; };
extern Status NewWritableFile(Env* env, const std::string& fname, extern Status NewWritableFile(Env* env, const std::string& fname,

Loading…
Cancel
Save