Fix some errors in async prefetching in FilePrefetchBuffer (#9734)

Summary:
In ReadOption `async_io` which prefetches the data asynchronously, db_bench and db_stress runs were failing  because wrong data was prefetched which resulted in Error: Checksum mismatched. Wrong data was copied because capacity was less than actual size needed. It has been fixed in this PR.

Since there are two separate methods for async and sync prefetching, these changes are in async prefetching methods and any changes would not effect normal prefetching. I ran the regressions to make sure normal prefetching is fine.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9734

Test Plan:
1. CircleCI jobs

2.  Ran db_bench
```
. /db_bench -use_existing_db=true
-db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32
-value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680
-duration=120 -ops_between_duration_checks=1 -async_io=1 -adaptive_readahead=1

```
3. Ran db_stress test
```
export CRASH_TEST_EXT_ARGS=" --async_io=1 --adaptive_readahead=1"
make crash_test -j
```

4. Run regressions for async_io disabled.

Old flow without any async changes:
```
./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 7.0
Date:       Thu Mar 17 13:11:34 2022
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
DB path: [/tmp/prefix_scan_prefetch_main]
seekrandom   :  483618.390 micros/op 2 ops/sec;  338.9 MB/s (249 of 249 found)
```

With async prefetching changes and async_io disabled to make sure in normal prefetching there is no regression.
 ```
 ./db_bench -use_existing_db=true -db=/tmp/prefix_scan_prefetch_main -benchmarks="seekrandom" -key_size=32 -value_size=512 -num=5000000 -use_direct_reads=true -seek_nexts=327680 -duration=120 -ops_between_duration_checks=1 --async_io=0
Initializing RocksDB Options from the specified file
Initializing RocksDB Options from command-line flags
RocksDB:    version 7.1
Date:       Wed Mar 23 15:56:37 2022
CPU:        24 * Intel Core Processor (Broadwell)
CPUCache:   16384 KB
Keys:       32 bytes each (+ 0 bytes user-defined timestamp)
Values:     512 bytes each (256 bytes after compression)
Entries:    5000000
Prefix:    0 bytes
Keys per prefix:    0
RawSize:    2594.0 MB (estimated)
FileSize:   1373.3 MB (estimated)
Write rate: 0 bytes/second
Read rate: 0 ops/second
Compression: Snappy
Compression sampling rate: 0
Memtablerep: SkipListFactory
Perf Level: 1
------------------------------------------------
DB path: [/tmp/prefix_scan_prefetch_main]
seekrandom   :  481819.816 micros/op 2 ops/sec;  340.2 MB/s (250 of 250 found)
```

Reviewed By: riversand963

Differential Revision: D35058471

Pulled By: akankshamahajan15

fbshipit-source-id: 9233a1e6d97cea0c7a8111bfb9e8ac3251c341ce
main
Akanksha Mahajan 3 years ago committed by Facebook GitHub Bot
parent 37de4e1d08
commit 33f8a08af2
  1. 56
      file/file_prefetch_buffer.cc

@ -66,6 +66,17 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment,
// chunk_len is greater than 0.
bufs_[index].buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
static_cast<size_t>(chunk_len));
} else if (chunk_len > 0) {
// For async prefetching, it doesn't call RefitTail with chunk_len > 0.
// Allocate new buffer if needed because aligned buffer calculate remaining
// buffer as capacity_ - cursize_ which might not be the case in this as we
// are not refitting.
// TODO akanksha: Update the condition when asynchronous prefetching is
// stable.
bufs_[index].buffer_.Alignment(alignment);
bufs_[index].buffer_.AllocateNewBuffer(
static_cast<size_t>(roundup_len), copy_data_to_new_buffer,
chunk_offset_in_buffer, static_cast<size_t>(chunk_len));
}
}
@ -236,34 +247,47 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Index of second buffer.
uint32_t second = curr_ ^ 1;
// First clear the buffers if it contains outdated data. Outdated data can be
// because previous sequential reads were read from the cache instead of these
// buffer.
{
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
offset >= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
bufs_[curr_].buffer_.Clear();
}
if (bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
bufs_[second].buffer_.Clear();
}
}
// If data is in second buffer, make it curr_. Second buffer can be either
// partial filled or full.
if (bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[second].offset_ &&
offset <= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
offset < bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
// Clear the curr_ as buffers have been swapped and curr_ contains the
// outdated data.
// outdated data and switch the buffers.
bufs_[curr_].buffer_.Clear();
// Switch the buffers.
curr_ = curr_ ^ 1;
second = curr_ ^ 1;
}
// If second buffer contains outdated data, clear it for async prefetching.
// Outdated can be because previous sequential reads were read from the cache
// instead of this buffer.
if (bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) {
bufs_[second].buffer_.Clear();
// After swap check if all the requested bytes are in curr_, it will go for
// async prefetching only.
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
offset + length <=
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) {
offset += length;
length = 0;
prefetch_size -= length;
}
// Data is overlapping i.e. some of the data is in curr_ buffer and remaining
// in second buffer.
if (bufs_[curr_].buffer_.CurrentSize() > 0 &&
bufs_[second].buffer_.CurrentSize() > 0 &&
offset >= bufs_[curr_].offset_ &&
offset < bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() &&
offset + prefetch_size > bufs_[second].offset_) {
offset + length > bufs_[second].offset_) {
// Allocate new buffer to third buffer;
bufs_[2].buffer_.Clear();
bufs_[2].buffer_.Alignment(alignment);
@ -273,12 +297,10 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Move data from curr_ buffer to third.
CopyDataToBuffer(curr_, offset, length);
if (length == 0) {
// Requested data has been copied and curr_ still has unconsumed data.
return s;
}
CopyDataToBuffer(second, offset, length);
// Length == 0: All the requested data has been copied to third buffer. It
// should go for only async prefetching.
@ -306,6 +328,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
if (length > 0) {
CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_,
false /*refit_tail*/, chunk_len1);
assert(roundup_len1 >= chunk_len1);
read_len1 = static_cast<size_t>(roundup_len1 - chunk_len1);
}
{
@ -316,7 +339,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
Roundup(rounddown_start2 + readahead_size, alignment);
// For length == 0, do the asynchronous prefetching in second instead of
// synchronous prefetching of remaining prefetch_size.
// synchronous prefetching in curr_.
if (length == 0) {
rounddown_start2 =
bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize();
@ -330,8 +353,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Update the buffer offset.
bufs_[second].offset_ = rounddown_start2;
assert(roundup_len2 >= chunk_len2);
uint64_t read_len2 = static_cast<size_t>(roundup_len2 - chunk_len2);
ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2,
rounddown_start2, second)
.PermitUncheckedError();
@ -344,7 +367,6 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
return s;
}
}
// Copy remaining requested bytes to third_buffer.
if (copy_to_third_buffer && length > 0) {
CopyDataToBuffer(curr_, offset, length);

Loading…
Cancel
Save