Fix stress test failure "Corruption: checksum mismatch" or "Iterator Diverged" with async_io enabled (#10032)

Summary:
In case of non sequential reads with `async_io`, `FilePRefetchBuffer::TryReadFromCacheAsync` can be called for previous blocks with `offset < bufs_[curr_].offset_` which wasn't handled correctly resulting wrong data being returned from buffer.

Since `FilePRefetchBuffer::PrefetchAsync` can be called for any data block, it sets `prev_len_` to 0  indicating `FilePRefetchBuffer::TryReadFromCacheAsync` to go for the prefetching even though offset < bufs_[curr_].offset_  This is because async prefetching is always done in second buffer (to avoid mutex) even though curr_ is empty leading to  offset < bufs_[curr_].offset_ in some cases.
If prev_len_ is non zero then `TryReadFromCacheAsync` returns false if `offset < bufs_[curr_].offset_ && prev_len != 0` indicating reads are not sequential and previous call wasn't PrefetchAsync.

-  This PR also simplifies `FilePRefetchBuffer::TryReadFromCacheAsync` as it was getting complicated covering different scenarios based on `async_io` enabled/disabled. If `for_compaction` is set true, it now calls `FilePRefetchBufferTryReadFromCache` following synchronous flow as before. Its decided in BlockFetcher.cc

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10032

Test Plan:
1.  export CRASH_TEST_EXT_ARGS=" --async_io=1"
     make crash_test -j completed successfully locally
2. make crash_test -j completed successfully locally
3. Reran CircleCi mini crashtest job 4 - 5 times.
4. Updated prefetch_test for more coverage.

Reviewed By: anand1976

Differential Revision: D36579858

Pulled By: akankshamahajan15

fbshipit-source-id: 0c428d62b45e12e082a83acf533a5e37a584bedf
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent bea5831bff
commit a479c2c2b2
  1. 51
      file/file_prefetch_buffer.cc
  2. 4
      file/file_prefetch_buffer.h
  3. 53
      file/prefetch_test.cc
  4. 31
      table/block_fetcher.cc

@ -447,18 +447,23 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
bool FilePrefetchBuffer::TryReadFromCacheAsync(
const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority, bool for_compaction /* = false */
) {
Env::IOPriority rate_limiter_priority) {
assert(async_io_);
if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset);
}
if (!enable_) {
return false;
}
// In case of async_io_, offset can be less than bufs_[curr_].offset_ because
// of reads not sequential and PrefetchAsync can be called for any block and
// RocksDB will call TryReadFromCacheAsync after PrefetchAsync to Poll for
// requested bytes. IsEligibleForPrefetch API will return false in case reads
// are not sequential and Non sequential reads will be handled there.
if (!enable_ || (offset < bufs_[curr_].offset_ && async_io_ == false)) {
// requested bytes.
if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset < bufs_[curr_].offset_ &&
prev_len_ != 0) {
return false;
}
@ -476,31 +481,19 @@ bool FilePrefetchBuffer::TryReadFromCacheAsync(
Status s;
assert(reader != nullptr);
assert(max_readahead_size_ >= readahead_size_);
if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
rate_limiter_priority);
} else {
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
// async prefetching is enabled if it's implicit_auto_readahead_ or
// explicit readahead_size_ is passed along with ReadOptions.async_io =
// true.
if (async_io_) {
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsyncInternal(opts, reader, offset, n,
readahead_size_ / 2, rate_limiter_priority,
copy_to_third_buffer);
} else {
s = Prefetch(opts, reader, offset, n + readahead_size_,
rate_limiter_priority);
if (implicit_auto_readahead_) {
if (!IsEligibleForPrefetch(offset, n)) {
// Ignore status as Prefetch is not called.
s.PermitUncheckedError();
return false;
}
}
// Prefetch n + readahead_size_/2 synchronously as remaining
// readahead_size_/2 will be prefetched asynchronously.
s = PrefetchAsyncInternal(opts, reader, offset, n, readahead_size_ / 2,
rate_limiter_priority, copy_to_third_buffer);
if (!s.ok()) {
if (status) {
*status = s;
@ -574,7 +567,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts,
// Index of second buffer.
uint32_t second = curr_ ^ 1;
// Since PrefetchAsync can be called on non sequqential reads. So offset can
// Since PrefetchAsync can be called on non sequential reads. So offset can
// be less than buffers' offset. In that case it clears the buffer and
// prefetch that block.
if (bufs_[curr_].buffer_.CurrentSize() > 0 && offset < bufs_[curr_].offset_) {

@ -89,6 +89,7 @@ class FilePrefetchBuffer {
// while curr_ is being consumed. If data is overlapping in two buffers,
// data is copied to third buffer to return continuous buffer.
bufs_.resize(3);
(void)async_io_;
}
~FilePrefetchBuffer() {
@ -170,8 +171,7 @@ class FilePrefetchBuffer {
bool TryReadFromCacheAsync(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t n, Slice* result, Status* status,
Env::IOPriority rate_limiter_priority,
bool for_compaction);
Env::IOPriority rate_limiter_priority);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be
// tracked if track_min_offset = true.

@ -1252,7 +1252,6 @@ TEST_P(PrefetchTest2, DecreaseReadAheadIfInCache) {
} else {
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
//"ASSERT_EQ(expected_hits, get_perf_context()->bloom_sst_hit_count);")
}
}
@ -1349,12 +1348,15 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
// Read the keys.
{
ASSERT_OK(options.statistics->Reset());
get_perf_context()->Reset();
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
num_keys++;
}
ASSERT_EQ(num_keys, total_keys);
ASSERT_GT(buff_prefetch_count, 0);
@ -1375,6 +1377,55 @@ TEST_P(PrefetchTestWithPosix, ReadAsyncWithPosixFS) {
}
ASSERT_GT(prefetched_bytes_discarded.count, 0);
}
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
{
// Read the keys using seek.
{
ASSERT_OK(options.statistics->Reset());
get_perf_context()->Reset();
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
int num_keys = 0;
iter->Seek(BuildKey(450));
while (iter->Valid()) {
ASSERT_OK(iter->status());
num_keys++;
iter->Next();
}
ASSERT_OK(iter->status());
iter->Seek(BuildKey(450));
while (iter->Valid()) {
ASSERT_OK(iter->status());
num_keys++;
iter->Prev();
}
ASSERT_EQ(num_keys, total_keys + 1);
ASSERT_GT(buff_prefetch_count, 0);
// Check stats to make sure async prefetch is done.
{
HistogramData async_read_bytes;
options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
HistogramData prefetched_bytes_discarded;
options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
&prefetched_bytes_discarded);
// Not all platforms support iouring. In that case, ReadAsync in posix
// won't submit async requests.
if (read_async_called) {
ASSERT_GT(async_read_bytes.count, 0);
ASSERT_GT(get_perf_context()->number_async_seek, 0);
} else {
ASSERT_EQ(async_read_bytes.count, 0);
ASSERT_EQ(get_perf_context()->number_async_seek, 0);
}
ASSERT_GT(prefetched_bytes_discarded.count, 0);
}
}
}
SyncPoint::GetInstance()->DisableProcessing();

@ -9,6 +9,7 @@
#include "table/block_fetcher.h"
#include <cassert>
#include <cinttypes>
#include <string>
@ -72,10 +73,10 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
if (io_s.ok()) {
bool read_from_prefetch_buffer = false;
if (read_options_.async_io) {
if (read_options_.async_io && !for_compaction_) {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, read_options_.rate_limiter_priority, for_compaction_);
&io_s, read_options_.rate_limiter_priority);
} else {
read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
@ -349,20 +350,20 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
#endif // NDEBUG
return IOStatus::OK();
} else if (!TryGetCompressedBlockFromPersistentCache()) {
if (prefetch_buffer_ != nullptr) {
assert(prefetch_buffer_ != nullptr);
if (!for_compaction_) {
IOOptions opts;
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
if (!io_s.ok()) {
return io_s;
}
io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync(
opts, file_, handle_.offset(), block_size_with_trailer_,
read_options_.rate_limiter_priority, &slice_));
if (io_s.IsTryAgain()) {
return io_s;
}
if (io_s.ok()) {
io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync(
opts, file_, handle_.offset(), block_size_with_trailer_,
read_options_.rate_limiter_priority, &slice_));
if (io_s.IsTryAgain()) {
return io_s;
}
if (!io_s.ok()) {
// Fallback to sequential reading of data blocks.
return ReadBlockContents();
}
// Data Block is already in prefetch.
got_from_prefetch_buffer_ = true;
ProcessTrailerIfPresent();
@ -388,8 +389,12 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
GetBlockContents();
}
InsertUncompressedBlockToPersistentCacheIfNeeded();
return io_status_;
}
}
// Fallback to sequential reading of data blocks in case of io_s returns
// error or for_compaction_is true.
return ReadBlockContents();
}
return io_status_;
}

Loading…
Cancel
Save