Return any errors returned by ReadAsync to the MultiGet caller (#11171)

Summary:
Currently, we incorrectly return a Status::Corruption to the MultiGet caller if the file system ReadAsync cannot issue a read and returns an error for some reason, such as IOStatus::NotSupported(). In this PR, we copy the ReadAsync error to the request status so it can be returned to the user.

Tests:
Update existing unit tests and add a new one for this scenario

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11171

Reviewed By: akankshamahajan15

Differential Revision: D42950057

Pulled By: anand1976

fbshipit-source-id: 85ffcb015fa6c064c311f8a28488fec78c487869
oxigraph-8.1.1
anand76 2 years ago committed by Facebook GitHub Bot
parent 701a19cc83
commit 63da9cfa26
  1. 1
      HISTORY.md
  2. 90
      db/db_basic_test.cc
  3. 20
      util/async_file_reader.cc

@ -11,6 +11,7 @@
* Fixed a feature interaction bug where for blobs `GetEntity` would expose the blob reference instead of the blob value. * Fixed a feature interaction bug where for blobs `GetEntity` would expose the blob reference instead of the blob value.
* Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish * Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish
* Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()` * Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()`
* Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support.
### Feature Removal ### Feature Removal
* Remove RocksDB Lite. * Remove RocksDB Lite.

@ -30,6 +30,9 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
static bool enable_io_uring = true;
extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; }
class DBBasicTest : public DBTestBase { class DBBasicTest : public DBTestBase {
public: public:
DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {} DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {}
@ -2173,6 +2176,7 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
options_.disable_auto_compactions = true; options_.disable_auto_compactions = true;
options_.statistics = statistics_; options_.statistics = statistics_;
options_.table_factory.reset(NewBlockBasedTableFactory(bbto)); options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
options_.env = Env::Default();
Reopen(options_); Reopen(options_);
int num_keys = 0; int num_keys = 0;
@ -2239,6 +2243,20 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
const std::shared_ptr<Statistics>& statistics() { return statistics_; } const std::shared_ptr<Statistics>& statistics() { return statistics_; }
protected: protected:
void PrepareDBForTest() {
#ifdef ROCKSDB_IOURING_PRESENT
Reopen(options_);
#else // ROCKSDB_IOURING_PRESENT
// Warm up the block cache so we don't need to use the IO uring
Iterator* iter = dbfull()->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid() && iter->status().ok();
iter->Next())
;
EXPECT_OK(iter->status());
delete iter;
#endif // ROCKSDB_IOURING_PRESENT
}
void ReopenDB() { Reopen(options_); } void ReopenDB() { Reopen(options_); }
private: private:
@ -2253,6 +2271,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
std::vector<PinnableSlice> values(key_strs.size()); std::vector<PinnableSlice> values(key_strs.size());
std::vector<Status> statuses(key_strs.size()); std::vector<Status> statuses(key_strs.size());
PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
ro.optimize_multiget_for_io = GetParam(); ro.optimize_multiget_for_io = GetParam();
@ -2271,6 +2291,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
// With async IO, lookups will happen in parallel for each key // With async IO, lookups will happen in parallel for each key
#ifdef ROCKSDB_IOURING_PRESENT
if (GetParam()) { if (GetParam()) {
ASSERT_EQ(multiget_io_batch_size.count, 1); ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3); ASSERT_EQ(multiget_io_batch_size.max, 3);
@ -2280,6 +2301,11 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
// L0 file // L0 file
ASSERT_EQ(multiget_io_batch_size.count, 3); ASSERT_EQ(multiget_io_batch_size.count, 3);
} }
#else // ROCKSDB_IOURING_PRESENT
if (GetParam()) {
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
#endif // ROCKSDB_IOURING_PRESENT
} }
TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
@ -2297,6 +2323,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
values.resize(keys.size()); values.resize(keys.size());
statuses.resize(keys.size()); statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
ro.optimize_multiget_for_io = GetParam(); ro.optimize_multiget_for_io = GetParam();
@ -2310,6 +2338,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size; HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
@ -2317,9 +2346,11 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
// A batch of 3 async IOs is expected, one for each overlapping file in L1 // A batch of 3 async IOs is expected, one for each overlapping file in L1
ASSERT_EQ(multiget_io_batch_size.count, 1); ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3); ASSERT_EQ(multiget_io_batch_size.max, 3);
#endif // ROCKSDB_IOURING_PRESENT
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
} }
#ifdef ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) { TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
std::vector<std::string> key_strs; std::vector<std::string> key_strs;
std::vector<Slice> keys; std::vector<Slice> keys;
@ -2335,9 +2366,9 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
values.resize(keys.size()); values.resize(keys.size());
statuses.resize(keys.size()); statuses.resize(keys.size());
int count = 0;
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"TableCache::GetTableReader:BeforeOpenFile", [&](void* status) { "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
static int count = 0;
count++; count++;
// Fail the last table reader open, which is the 6th SST file // Fail the last table reader open, which is the 6th SST file
// since 3 overlapping L0 files + 3 L1 files containing the keys // since 3 overlapping L0 files + 3 L1 files containing the keys
@ -2360,7 +2391,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
ReopenDB(); PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
@ -2382,6 +2413,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
ASSERT_EQ(multiget_io_batch_size.max, 2); ASSERT_EQ(multiget_io_batch_size.max, 2);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
} }
#endif // ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
std::vector<std::string> key_strs; std::vector<std::string> key_strs;
@ -2399,6 +2431,8 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
values.resize(keys.size()); values.resize(keys.size());
statuses.resize(keys.size()); statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
ro.optimize_multiget_for_io = GetParam(); ro.optimize_multiget_for_io = GetParam();
@ -2412,6 +2446,7 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); ASSERT_EQ(values[1], "val_l1_" + std::to_string(54));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size; HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
@ -2422,6 +2457,7 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
// will lookup 2 files in parallel and issue 2 async reads // will lookup 2 files in parallel and issue 2 async reads
ASSERT_EQ(multiget_io_batch_size.count, 2); ASSERT_EQ(multiget_io_batch_size.count, 2);
ASSERT_EQ(multiget_io_batch_size.max, 2); ASSERT_EQ(multiget_io_batch_size.max, 2);
#endif // ROCKSDB_IOURING_PRESENT
} }
TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) { TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
@ -2440,6 +2476,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
values.resize(keys.size()); values.resize(keys.size());
statuses.resize(keys.size()); statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
ro.optimize_multiget_for_io = GetParam(); ro.optimize_multiget_for_io = GetParam();
@ -2453,6 +2491,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
ASSERT_EQ(values[1], "val_l2_" + std::to_string(56)); ASSERT_EQ(values[1], "val_l2_" + std::to_string(56));
ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); ASSERT_EQ(values[2], "val_l1_" + std::to_string(102));
#ifdef ROCKSDB_IOURING_PRESENT
HistogramData multiget_io_batch_size; HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
@ -2462,6 +2501,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
// Otherwise, the L2 lookup will happen after L1. // Otherwise, the L2 lookup will happen after L1.
ASSERT_EQ(multiget_io_batch_size.count, GetParam() ? 1 : 2); ASSERT_EQ(multiget_io_batch_size.count, GetParam() ? 1 : 2);
ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2); ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2);
#endif // ROCKSDB_IOURING_PRESENT
} }
TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
@ -2478,6 +2518,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
values.resize(keys.size()); values.resize(keys.size());
statuses.resize(keys.size()); statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
ro.optimize_multiget_for_io = GetParam(); ro.optimize_multiget_for_io = GetParam();
@ -2493,6 +2535,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
} }
#ifdef ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
std::vector<std::string> key_strs; std::vector<std::string> key_strs;
std::vector<Slice> keys; std::vector<Slice> keys;
@ -2507,6 +2550,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) {
values.resize(keys.size()); values.resize(keys.size());
statuses.resize(keys.size()); statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
ro.optimize_multiget_for_io = GetParam(); ro.optimize_multiget_for_io = GetParam();
@ -2536,6 +2581,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) {
values.resize(keys.size()); values.resize(keys.size());
statuses.resize(keys.size()); statuses.resize(keys.size());
PrepareDBForTest();
ReadOptions ro; ReadOptions ro;
ro.async_io = true; ro.async_io = true;
ro.optimize_multiget_for_io = GetParam(); ro.optimize_multiget_for_io = GetParam();
@ -2550,6 +2597,45 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) {
// Bloom filters in L0/L1 will avoid the coroutine calls in those levels // Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
} }
#endif // ROCKSDB_IOURING_PRESENT
TEST_P(DBMultiGetAsyncIOTest, GetNoIOUring) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(33));
key_strs.push_back(Key(54));
key_strs.push_back(Key(102));
keys.push_back(key_strs[0]);
keys.push_back(key_strs[1]);
keys.push_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
enable_io_uring = false;
ReopenDB();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::NotSupported());
ASSERT_EQ(statuses[1], Status::NotSupported());
ASSERT_EQ(statuses[2], Status::NotSupported());
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
// A batch of 3 async IOs is expected, one for each overlapping file in L1
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 3);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
}
INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest, INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest,
testing::Bool()); testing::Bool());

@ -20,17 +20,20 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) {
awaiter->io_handle_.resize(awaiter->num_reqs_); awaiter->io_handle_.resize(awaiter->num_reqs_);
awaiter->del_fn_.resize(awaiter->num_reqs_); awaiter->del_fn_.resize(awaiter->num_reqs_);
for (size_t i = 0; i < awaiter->num_reqs_; ++i) { for (size_t i = 0; i < awaiter->num_reqs_; ++i) {
awaiter->file_ IOStatus s = awaiter->file_->ReadAsync(
->ReadAsync(
awaiter->read_reqs_[i], awaiter->opts_, awaiter->read_reqs_[i], awaiter->opts_,
[](const FSReadRequest& req, void* cb_arg) { [](const FSReadRequest& req, void* cb_arg) {
FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg); FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg);
read_req->status = req.status; read_req->status = req.status;
read_req->result = req.result; read_req->result = req.result;
}, },
&awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i],
&awaiter->del_fn_[i], /*aligned_buf=*/nullptr) /*aligned_buf=*/nullptr);
.PermitUncheckedError(); if (!s.ok()) {
// For any non-ok status, the FileSystem will not call the callback
// So let's update the status ourselves
awaiter->read_reqs_[i].status = s;
}
} }
return true; return true;
} }
@ -41,6 +44,7 @@ void AsyncFileReader::Wait() {
} }
ReadAwaiter* waiter; ReadAwaiter* waiter;
std::vector<void*> io_handles; std::vector<void*> io_handles;
IOStatus s;
io_handles.reserve(num_reqs_); io_handles.reserve(num_reqs_);
waiter = head_; waiter = head_;
do { do {
@ -52,7 +56,7 @@ void AsyncFileReader::Wait() {
} while (waiter != tail_ && (waiter = waiter->next_)); } while (waiter != tail_ && (waiter = waiter->next_));
if (io_handles.size() > 0) { if (io_handles.size() > 0) {
StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS); StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS);
fs_->Poll(io_handles, io_handles.size()).PermitUncheckedError(); s = fs_->Poll(io_handles, io_handles.size());
} }
do { do {
waiter = head_; waiter = head_;
@ -62,6 +66,10 @@ void AsyncFileReader::Wait() {
if (waiter->io_handle_[i] && waiter->del_fn_[i]) { if (waiter->io_handle_[i] && waiter->del_fn_[i]) {
waiter->del_fn_[i](waiter->io_handle_[i]); waiter->del_fn_[i](waiter->io_handle_[i]);
} }
if (waiter->read_reqs_[i].status.ok() && !s.ok()) {
// Override the request status with the Poll error
waiter->read_reqs_[i].status = s;
}
} }
waiter->awaiting_coro_.resume(); waiter->awaiting_coro_.resume();
} while (waiter != tail_); } while (waiter != tail_);

Loading…
Cancel
Save