From 8ffabdc226d0faf9049edd7a1fe57765e180480a Mon Sep 17 00:00:00 2001 From: anand76 Date: Sun, 4 Dec 2022 22:58:25 -0800 Subject: [PATCH] Fix table cache leak in MultiGet with async_io (#10997) Summary: When MultiGet with the async_io option encounters an IO error in TableCache::GetTableReader, it may result in leakage of table cache handles due to queued coroutines being abandoned. This PR fixes it by ensuring any queued coroutines are run before aborting the MultiGet. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10997 Test Plan: 1. New unit test in db_basic_test 2. asan_crash Reviewed By: pdillinger Differential Revision: D41587244 Pulled By: anand1976 fbshipit-source-id: 900920cd3fba47cb0fc744a62facc5ffe2eccb64 --- HISTORY.md | 1 + db/db_basic_test.cc | 77 ++++++++++++++++++++++++++++++++++++++++++--- db/table_cache.cc | 2 ++ db/version_set.cc | 36 +++++++++++++-------- 4 files changed, 98 insertions(+), 18 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 4d8a72484..b13011eba 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### Bug Fixes * Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed. +* Fixed a memory leak in MultiGet with async_io read option, caused by IO errors during table file open ## 7.9.0 (11/21/2022) ### Performance Improvements diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 622ea2f6e..a28ac2b88 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2158,11 +2158,11 @@ class DBMultiGetAsyncIOTest : public DBBasicTest, : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) { BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10)); - Options options = CurrentOptions(); - options.disable_auto_compactions = true; - options.statistics = statistics_; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - Reopen(options); + options_ = CurrentOptions(); + options_.disable_auto_compactions = true; + options_.statistics = statistics_; + options_.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options_); int num_keys = 0; // Put all keys in the bottommost level, and overwrite some keys @@ -2227,8 +2227,12 @@ class DBMultiGetAsyncIOTest : public DBBasicTest, const std::shared_ptr& statistics() { return statistics_; } + protected: + void ReopenDB() { Reopen(options_); } + private: std::shared_ptr statistics_; + Options options_; }; TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { @@ -2305,6 +2309,69 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) { ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); } +TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + key_strs.push_back(Key(33)); + key_strs.push_back(Key(54)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + SyncPoint::GetInstance()->SetCallBack( + "TableCache::GetTableReader:BeforeOpenFile", [&](void* status) { + static int count = 0; + count++; + // Fail the last table reader open, which is the 6th SST file + // since 3 overlapping L0 files + 3 L1 files containing the keys + if (count == 6) { + Status* s = static_cast(status); + *s = Status::IOError(); + } + }); + // DB open will create table readers unless we reduce the table cache + // capacity. + // SanitizeOptions will set max_open_files to minimum of 20. Table cache + // is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 11 so table cache capacity will become 1. This will + // prevent file open during DB open and force the file to be opened + // during MultiGet + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ReopenDB(); + + ReadOptions ro; + ro.async_io = true; + ro.optimize_multiget_for_io = GetParam(); + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::IOError()); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // A batch of 3 async IOs is expected, one for each overlapping file in L1 + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 2); + ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2); +} + TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { std::vector key_strs; std::vector keys; diff --git a/db/table_cache.cc b/db/table_cache.cc index a9ea14348..c44c4bb84 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -127,6 +127,8 @@ Status TableCache::GetTableReader( FileOptions fopts = file_options; fopts.temperature = file_temperature; Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options); + TEST_SYNC_POINT_CALLBACK("TableCache::GetTableReader:BeforeOpenFile", + const_cast(&s)); if (s.ok()) { s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr); } diff --git a/db/version_set.cc b/db/version_set.cc index 3b9df2aec..33cd70022 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2539,16 +2539,19 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } f = fp.GetNextFileInLevel(); } - if (s.ok() && mget_tasks.size() > 0) { + if (mget_tasks.size() > 0) { RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); // Collect all results so far std::vector statuses = folly::coro::blockingWait( folly::coro::collectAllRange(std::move(mget_tasks)) .scheduleOn(&range->context()->executor())); - for (Status stat : statuses) { - if (!stat.ok()) { - s = stat; + if (s.ok()) { + for (Status stat : statuses) { + if (!stat.ok()) { + s = std::move(stat); + break; + } } } @@ -2794,6 +2797,9 @@ Status Version::MultiGetAsync( unsigned int num_tasks_queued = 0; to_process.pop_front(); if (batch->IsSearchEnded() || batch->GetRange().empty()) { + // If to_process is empty, i.e no more batches to look at, then we need + // schedule the enqueued coroutines and wait for them. Otherwise, we + // skip this batch and move to the next one in to_process. if (!to_process.empty()) { continue; } @@ -2802,9 +2808,6 @@ Status Version::MultiGetAsync( // to_process s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting, to_process, num_tasks_queued, mget_stats); - if (!s.ok()) { - break; - } // If ProcessBatch didn't enqueue any coroutine tasks, it means all // keys were filtered out. So put the batch back in to_process to // lookup in the next level @@ -2815,8 +2818,10 @@ Status Version::MultiGetAsync( waiting.emplace_back(idx); } } - if (to_process.empty()) { - if (s.ok() && mget_tasks.size() > 0) { + // If ProcessBatch() returned an error, then schedule the enqueued + // coroutines and wait for them, then abort the MultiGet. + if (to_process.empty() || !s.ok()) { + if (mget_tasks.size() > 0) { assert(waiting.size()); RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); // Collect all results so far @@ -2824,10 +2829,12 @@ Status Version::MultiGetAsync( folly::coro::collectAllRange(std::move(mget_tasks)) .scheduleOn(&range->context()->executor())); mget_tasks.clear(); - for (Status stat : statuses) { - if (!stat.ok()) { - s = stat; - break; + if (s.ok()) { + for (Status stat : statuses) { + if (!stat.ok()) { + s = std::move(stat); + break; + } } } @@ -2850,6 +2857,9 @@ Status Version::MultiGetAsync( assert(!s.ok() || waiting.size() == 0); } } + if (!s.ok()) { + break; + } } uint64_t num_levels = 0;