From 63da9cfa2694fcb17c9f3f115debfd895a8213fe Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 2 Feb 2023 16:35:27 -0800 Subject: [PATCH] Return any errors returned by ReadAsync to the MultiGet caller (#11171) Summary: Currently, we incorrectly return a Status::Corruption to the MultiGet caller if the file system ReadAsync cannot issue a read and returns an error for some reason, such as IOStatus::NotSupported(). In this PR, we copy the ReadAsync error to the request status so it can be returned to the user. Tests: Update existing unit tests and add a new one for this scenario Pull Request resolved: https://github.com/facebook/rocksdb/pull/11171 Reviewed By: akankshamahajan15 Differential Revision: D42950057 Pulled By: anand1976 fbshipit-source-id: 85ffcb015fa6c064c311f8a28488fec78c487869 --- HISTORY.md | 1 + db/db_basic_test.cc | 90 ++++++++++++++++++++++++++++++++++++++- util/async_file_reader.cc | 32 ++++++++------ 3 files changed, 109 insertions(+), 14 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 165725736..6b74d3ef4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ * Fixed a feature interaction bug where for blobs `GetEntity` would expose the blob reference instead of the blob value. * Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish * Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()` +* Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support. ### Feature Removal * Remove RocksDB Lite. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index d8caefd59..6bd80bc6c 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -30,6 +30,9 @@ namespace ROCKSDB_NAMESPACE { +static bool enable_io_uring = true; +extern "C" bool RocksDbIOUringEnable() { return enable_io_uring; } + class DBBasicTest : public DBTestBase { public: DBBasicTest() : DBTestBase("db_basic_test", /*env_do_fsync=*/false) {} @@ -2173,6 +2176,7 @@ class DBMultiGetAsyncIOTest : public DBBasicTest, options_.disable_auto_compactions = true; options_.statistics = statistics_; options_.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options_.env = Env::Default(); Reopen(options_); int num_keys = 0; @@ -2239,6 +2243,20 @@ class DBMultiGetAsyncIOTest : public DBBasicTest, const std::shared_ptr& statistics() { return statistics_; } protected: + void PrepareDBForTest() { +#ifdef ROCKSDB_IOURING_PRESENT + Reopen(options_); +#else // ROCKSDB_IOURING_PRESENT + // Warm up the block cache so we don't need to use the IO uring + Iterator* iter = dbfull()->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid() && iter->status().ok(); + iter->Next()) + ; + EXPECT_OK(iter->status()); + delete iter; +#endif // ROCKSDB_IOURING_PRESENT + } + void ReopenDB() { Reopen(options_); } private: @@ -2253,6 +2271,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { std::vector values(key_strs.size()); std::vector statuses(key_strs.size()); + PrepareDBForTest(); + ReadOptions ro; ro.async_io = true; ro.optimize_multiget_for_io = GetParam(); @@ -2271,6 +2291,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); // With async IO, lookups will happen in parallel for each key +#ifdef ROCKSDB_IOURING_PRESENT if (GetParam()) { ASSERT_EQ(multiget_io_batch_size.count, 1); ASSERT_EQ(multiget_io_batch_size.max, 3); @@ -2280,6 +2301,11 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { // L0 file ASSERT_EQ(multiget_io_batch_size.count, 3); } +#else // ROCKSDB_IOURING_PRESENT + if (GetParam()) { + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); + } +#endif // ROCKSDB_IOURING_PRESENT } TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { @@ -2297,6 +2323,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { values.resize(keys.size()); statuses.resize(keys.size()); + PrepareDBForTest(); + ReadOptions ro; ro.async_io = true; ro.optimize_multiget_for_io = GetParam(); @@ -2310,6 +2338,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); +#ifdef ROCKSDB_IOURING_PRESENT HistogramData multiget_io_batch_size; statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); @@ -2317,9 +2346,11 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { // A batch of 3 async IOs is expected, one for each overlapping file in L1 ASSERT_EQ(multiget_io_batch_size.count, 1); ASSERT_EQ(multiget_io_batch_size.max, 3); +#endif // ROCKSDB_IOURING_PRESENT ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } +#ifdef ROCKSDB_IOURING_PRESENT TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) { std::vector key_strs; std::vector keys; @@ -2335,9 +2366,9 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) { values.resize(keys.size()); statuses.resize(keys.size()); + int count = 0; SyncPoint::GetInstance()->SetCallBack( "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) { - static int count = 0; count++; // Fail the last table reader open, which is the 6th SST file // since 3 overlapping L0 files + 3 L1 files containing the keys @@ -2360,7 +2391,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) { }); SyncPoint::GetInstance()->EnableProcessing(); - ReopenDB(); + PrepareDBForTest(); ReadOptions ro; ro.async_io = true; @@ -2382,6 +2413,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) { ASSERT_EQ(multiget_io_batch_size.max, 2); ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); } +#endif // ROCKSDB_IOURING_PRESENT TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { std::vector key_strs; @@ -2399,6 +2431,8 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { values.resize(keys.size()); statuses.resize(keys.size()); + PrepareDBForTest(); + ReadOptions ro; ro.async_io = true; ro.optimize_multiget_for_io = GetParam(); @@ -2412,6 +2446,7 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); +#ifdef ROCKSDB_IOURING_PRESENT HistogramData multiget_io_batch_size; statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); @@ -2422,6 +2457,7 @@ TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { // will lookup 2 files in parallel and issue 2 async reads ASSERT_EQ(multiget_io_batch_size.count, 2); ASSERT_EQ(multiget_io_batch_size.max, 2); +#endif // ROCKSDB_IOURING_PRESENT } TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) { @@ -2440,6 +2476,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) { values.resize(keys.size()); statuses.resize(keys.size()); + PrepareDBForTest(); + ReadOptions ro; ro.async_io = true; ro.optimize_multiget_for_io = GetParam(); @@ -2453,6 +2491,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) { ASSERT_EQ(values[1], "val_l2_" + std::to_string(56)); ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); +#ifdef ROCKSDB_IOURING_PRESENT HistogramData multiget_io_batch_size; statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); @@ -2462,6 +2501,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2) { // Otherwise, the L2 lookup will happen after L1. ASSERT_EQ(multiget_io_batch_size.count, GetParam() ? 1 : 2); ASSERT_EQ(multiget_io_batch_size.max, GetParam() ? 3 : 2); +#endif // ROCKSDB_IOURING_PRESENT } TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { @@ -2478,6 +2518,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { values.resize(keys.size()); statuses.resize(keys.size()); + PrepareDBForTest(); + ReadOptions ro; ro.async_io = true; ro.optimize_multiget_for_io = GetParam(); @@ -2493,6 +2535,7 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); } +#ifdef ROCKSDB_IOURING_PRESENT TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { std::vector key_strs; std::vector keys; @@ -2507,6 +2550,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL2WithRangeDelInL1) { values.resize(keys.size()); statuses.resize(keys.size()); + PrepareDBForTest(); + ReadOptions ro; ro.async_io = true; ro.optimize_multiget_for_io = GetParam(); @@ -2536,6 +2581,8 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { values.resize(keys.size()); statuses.resize(keys.size()); + PrepareDBForTest(); + ReadOptions ro; ro.async_io = true; ro.optimize_multiget_for_io = GetParam(); @@ -2550,6 +2597,45 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1AndL2WithRangeDelInL1) { // Bloom filters in L0/L1 will avoid the coroutine calls in those levels ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } +#endif // ROCKSDB_IOURING_PRESENT + +TEST_P(DBMultiGetAsyncIOTest, GetNoIOUring) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + key_strs.push_back(Key(33)); + key_strs.push_back(Key(54)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + enable_io_uring = false; + ReopenDB(); + + ReadOptions ro; + ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::NotSupported()); + ASSERT_EQ(statuses[1], Status::NotSupported()); + ASSERT_EQ(statuses[2], Status::NotSupported()); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // A batch of 3 async IOs is expected, one for each overlapping file in L1 + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 3); + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); +} INSTANTIATE_TEST_CASE_P(DBMultiGetAsyncIOTest, DBMultiGetAsyncIOTest, testing::Bool()); diff --git a/util/async_file_reader.cc b/util/async_file_reader.cc index 8401a6b44..080c1ae96 100644 --- a/util/async_file_reader.cc +++ b/util/async_file_reader.cc @@ -20,17 +20,20 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { awaiter->io_handle_.resize(awaiter->num_reqs_); awaiter->del_fn_.resize(awaiter->num_reqs_); for (size_t i = 0; i < awaiter->num_reqs_; ++i) { - awaiter->file_ - ->ReadAsync( - awaiter->read_reqs_[i], awaiter->opts_, - [](const FSReadRequest& req, void* cb_arg) { - FSReadRequest* read_req = static_cast(cb_arg); - read_req->status = req.status; - read_req->result = req.result; - }, - &awaiter->read_reqs_[i], &awaiter->io_handle_[i], - &awaiter->del_fn_[i], /*aligned_buf=*/nullptr) - .PermitUncheckedError(); + IOStatus s = awaiter->file_->ReadAsync( + awaiter->read_reqs_[i], awaiter->opts_, + [](const FSReadRequest& req, void* cb_arg) { + FSReadRequest* read_req = static_cast(cb_arg); + read_req->status = req.status; + read_req->result = req.result; + }, + &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i], + /*aligned_buf=*/nullptr); + if (!s.ok()) { + // For any non-ok status, the FileSystem will not call the callback + // So let's update the status ourselves + awaiter->read_reqs_[i].status = s; + } } return true; } @@ -41,6 +44,7 @@ void AsyncFileReader::Wait() { } ReadAwaiter* waiter; std::vector io_handles; + IOStatus s; io_handles.reserve(num_reqs_); waiter = head_; do { @@ -52,7 +56,7 @@ void AsyncFileReader::Wait() { } while (waiter != tail_ && (waiter = waiter->next_)); if (io_handles.size() > 0) { StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS); - fs_->Poll(io_handles, io_handles.size()).PermitUncheckedError(); + s = fs_->Poll(io_handles, io_handles.size()); } do { waiter = head_; @@ -62,6 +66,10 @@ void AsyncFileReader::Wait() { if (waiter->io_handle_[i] && waiter->del_fn_[i]) { waiter->del_fn_[i](waiter->io_handle_[i]); } + if (waiter->read_reqs_[i].status.ok() && !s.ok()) { + // Override the request status with the Poll error + waiter->read_reqs_[i].status = s; + } } waiter->awaiting_coro_.resume(); } while (waiter != tail_);