diff --git a/HISTORY.md b/HISTORY.md index 4d8a72484..b13011eba 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### Bug Fixes * Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed. +* Fixed a memory leak in MultiGet with async_io read option, caused by IO errors during table file open ## 7.9.0 (11/21/2022) ### Performance Improvements diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 622ea2f6e..a28ac2b88 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2158,11 +2158,11 @@ class DBMultiGetAsyncIOTest : public DBBasicTest, : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) { BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10)); - Options options = CurrentOptions(); - options.disable_auto_compactions = true; - options.statistics = statistics_; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - Reopen(options); + options_ = CurrentOptions(); + options_.disable_auto_compactions = true; + options_.statistics = statistics_; + options_.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options_); int num_keys = 0; // Put all keys in the bottommost level, and overwrite some keys @@ -2227,8 +2227,12 @@ class DBMultiGetAsyncIOTest : public DBBasicTest, const std::shared_ptr& statistics() { return statistics_; } + protected: + void ReopenDB() { Reopen(options_); } + private: std::shared_ptr statistics_; + Options options_; }; TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { @@ -2305,6 +2309,69 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } +TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + key_strs.push_back(Key(33)); + key_strs.push_back(Key(54)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + SyncPoint::GetInstance()->SetCallBack( + "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) { + static int count = 0; + count++; + // Fail the last table reader open, which is the 6th SST file + // since 3 overlapping L0 files + 3 L1 files containing the keys + if (count == 6) { + Status* s = static_cast(status); + *s = Status::IOError(); + } + }); + // DB open will create table readers unless we reduce the table cache + // capacity. + // SanitizeOptions will set max_open_files to minimum of 20. Table cache + // is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 11 so table cache capacity will become 1. This will + // prevent file open during DB open and force the file to be opened + // during MultiGet + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ReopenDB(); + + ReadOptions ro; + ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::IOError()); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // A batch of 3 async IOs is expected, one for each overlapping file in L1 + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 2); + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); +} + TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { std::vector key_strs; std::vector keys; diff --git a/db/table_cache.cc b/db/table_cache.cc index a9ea14348..c44c4bb84 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -127,6 +127,8 @@ Status TableCache::GetTableReader( FileOptions fopts = file_options; fopts.temperature = file_temperature; Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options); + TEST_SYNC_POINT_CALLBACK("TableCache::GetTableReader:BeforeOpenFile", + const_cast(&s)); if (s.ok()) { s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr); } diff --git a/db/version_set.cc b/db/version_set.cc index 3b9df2aec..33cd70022 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2539,16 +2539,19 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } f = fp.GetNextFileInLevel(); } - if (s.ok() && mget_tasks.size() > 0) { + if (mget_tasks.size() > 0) { RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); // Collect all results so far std::vector statuses = folly::coro::blockingWait( folly::coro::collectAllRange(std::move(mget_tasks)) .scheduleOn(&range->context()->executor())); - for (Status stat : statuses) { - if (!stat.ok()) { - s = stat; + if (s.ok()) { + for (Status stat : statuses) { + if (!stat.ok()) { + s = std::move(stat); + break; + } } } @@ -2794,6 +2797,9 @@ Status Version::MultiGetAsync( unsigned int num_tasks_queued = 0; to_process.pop_front(); if (batch->IsSearchEnded() || batch->GetRange().empty()) { + // If to_process is empty, i.e no more batches to look at, then we need + // schedule the enqueued coroutines and wait for them. Otherwise, we + // skip this batch and move to the next one in to_process. if (!to_process.empty()) { continue; } @@ -2802,9 +2808,6 @@ Status Version::MultiGetAsync( // to_process s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting, to_process, num_tasks_queued, mget_stats); - if (!s.ok()) { - break; - } // If ProcessBatch didn't enqueue any coroutine tasks, it means all // keys were filtered out. So put the batch back in to_process to // lookup in the next level @@ -2815,8 +2818,10 @@ Status Version::MultiGetAsync( waiting.emplace_back(idx); } } - if (to_process.empty()) { - if (s.ok() && mget_tasks.size() > 0) { + // If ProcessBatch() returned an error, then schedule the enqueued + // coroutines and wait for them, then abort the MultiGet. + if (to_process.empty() || !s.ok()) { + if (mget_tasks.size() > 0) { assert(waiting.size()); RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); // Collect all results so far @@ -2824,10 +2829,12 @@ Status Version::MultiGetAsync( folly::coro::collectAllRange(std::move(mget_tasks)) .scheduleOn(&range->context()->executor())); mget_tasks.clear(); - for (Status stat : statuses) { - if (!stat.ok()) { - s = stat; - break; + if (s.ok()) { + for (Status stat : statuses) { + if (!stat.ok()) { + s = std::move(stat); + break; + } } } @@ -2850,6 +2857,9 @@ Status Version::MultiGetAsync( assert(!s.ok() || waiting.size() == 0); } } + if (!s.ok()) { + break; + } } uint64_t num_levels = 0;