Fix table cache leak in MultiGet with async_io (#10997)

Summary:
When MultiGet with the async_io option encounters an IO error in TableCache::GetTableReader, it may result in leakage of table cache handles due to queued coroutines being abandoned. This PR fixes it by ensuring any queued coroutines are run before aborting the MultiGet.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10997

Test Plan:
1. New unit test in db_basic_test
2. asan_crash

Reviewed By: pdillinger

Differential Revision: D41587244

Pulled By: anand1976

fbshipit-source-id: 900920cd3fba47cb0fc744a62facc5ffe2eccb64
main
anand76 2 years ago committed by Facebook GitHub Bot
parent 95bf302189
commit 8ffabdc226
  1. 1
      HISTORY.md
  2. 77
      db/db_basic_test.cc
  3. 2
      db/table_cache.cc
  4. 26
      db/version_set.cc

@ -5,6 +5,7 @@
### Bug Fixes ### Bug Fixes
* Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed. * Fixed a regression in iterator where range tombstones after `iterate_upper_bound` is processed.
* Fixed a memory leak in MultiGet with async_io read option, caused by IO errors during table file open
## 7.9.0 (11/21/2022) ## 7.9.0 (11/21/2022)
### Performance Improvements ### Performance Improvements

@ -2158,11 +2158,11 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
: DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) { : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) {
BlockBasedTableOptions bbto; BlockBasedTableOptions bbto;
bbto.filter_policy.reset(NewBloomFilterPolicy(10)); bbto.filter_policy.reset(NewBloomFilterPolicy(10));
Options options = CurrentOptions(); options_ = CurrentOptions();
options.disable_auto_compactions = true; options_.disable_auto_compactions = true;
options.statistics = statistics_; options_.statistics = statistics_;
options.table_factory.reset(NewBlockBasedTableFactory(bbto)); options_.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options); Reopen(options_);
int num_keys = 0; int num_keys = 0;
// Put all keys in the bottommost level, and overwrite some keys // Put all keys in the bottommost level, and overwrite some keys
@ -2227,8 +2227,12 @@ class DBMultiGetAsyncIOTest : public DBBasicTest,
const std::shared_ptr<Statistics>& statistics() { return statistics_; } const std::shared_ptr<Statistics>& statistics() { return statistics_; }
protected:
void ReopenDB() { Reopen(options_); }
private: private:
std::shared_ptr<Statistics> statistics_; std::shared_ptr<Statistics> statistics_;
Options options_;
}; };
TEST_P(DBMultiGetAsyncIOTest, GetFromL0) { TEST_P(DBMultiGetAsyncIOTest, GetFromL0) {
@ -2305,6 +2309,69 @@ TEST_P(DBMultiGetAsyncIOTest, GetFromL1) {
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3); ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 3);
} }
TEST_P(DBMultiGetAsyncIOTest, GetFromL1Error) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
key_strs.push_back(Key(33));
key_strs.push_back(Key(54));
key_strs.push_back(Key(102));
keys.push_back(key_strs[0]);
keys.push_back(key_strs[1]);
keys.push_back(key_strs[2]);
values.resize(keys.size());
statuses.resize(keys.size());
SyncPoint::GetInstance()->SetCallBack(
"TableCache::GetTableReader:BeforeOpenFile", [&](void* status) {
static int count = 0;
count++;
// Fail the last table reader open, which is the 6th SST file
// since 3 overlapping L0 files + 3 L1 files containing the keys
if (count == 6) {
Status* s = static_cast<Status*>(status);
*s = Status::IOError();
}
});
// DB open will create table readers unless we reduce the table cache
// capacity.
// SanitizeOptions will set max_open_files to minimum of 20. Table cache
// is allocated with max_open_files - 10 as capacity. So override
// max_open_files to 11 so table cache capacity will become 1. This will
// prevent file open during DB open and force the file to be opened
// during MultiGet
SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = (int*)arg;
*max_open_files = 11;
});
SyncPoint::GetInstance()->EnableProcessing();
ReopenDB();
ReadOptions ro;
ro.async_io = true;
ro.optimize_multiget_for_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(values.size(), 3);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(statuses[2], Status::IOError());
HistogramData multiget_io_batch_size;
statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size);
// A batch of 3 async IOs is expected, one for each overlapping file in L1
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 2);
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}
TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) { TEST_P(DBMultiGetAsyncIOTest, LastKeyInFile) {
std::vector<std::string> key_strs; std::vector<std::string> key_strs;
std::vector<Slice> keys; std::vector<Slice> keys;

@ -127,6 +127,8 @@ Status TableCache::GetTableReader(
FileOptions fopts = file_options; FileOptions fopts = file_options;
fopts.temperature = file_temperature; fopts.temperature = file_temperature;
Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options); Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options);
TEST_SYNC_POINT_CALLBACK("TableCache::GetTableReader:BeforeOpenFile",
const_cast<Status*>(&s));
if (s.ok()) { if (s.ok()) {
s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr); s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
} }

@ -2539,16 +2539,19 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
} }
f = fp.GetNextFileInLevel(); f = fp.GetNextFileInLevel();
} }
if (s.ok() && mget_tasks.size() > 0) { if (mget_tasks.size() > 0) {
RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT,
mget_tasks.size()); mget_tasks.size());
// Collect all results so far // Collect all results so far
std::vector<Status> statuses = folly::coro::blockingWait( std::vector<Status> statuses = folly::coro::blockingWait(
folly::coro::collectAllRange(std::move(mget_tasks)) folly::coro::collectAllRange(std::move(mget_tasks))
.scheduleOn(&range->context()->executor())); .scheduleOn(&range->context()->executor()));
if (s.ok()) {
for (Status stat : statuses) { for (Status stat : statuses) {
if (!stat.ok()) { if (!stat.ok()) {
s = stat; s = std::move(stat);
break;
}
} }
} }
@ -2794,6 +2797,9 @@ Status Version::MultiGetAsync(
unsigned int num_tasks_queued = 0; unsigned int num_tasks_queued = 0;
to_process.pop_front(); to_process.pop_front();
if (batch->IsSearchEnded() || batch->GetRange().empty()) { if (batch->IsSearchEnded() || batch->GetRange().empty()) {
// If to_process is empty, i.e no more batches to look at, then we need
// schedule the enqueued coroutines and wait for them. Otherwise, we
// skip this batch and move to the next one in to_process.
if (!to_process.empty()) { if (!to_process.empty()) {
continue; continue;
} }
@ -2802,9 +2808,6 @@ Status Version::MultiGetAsync(
// to_process // to_process
s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting, s = ProcessBatch(options, batch, mget_tasks, blob_ctxs, batches, waiting,
to_process, num_tasks_queued, mget_stats); to_process, num_tasks_queued, mget_stats);
if (!s.ok()) {
break;
}
// If ProcessBatch didn't enqueue any coroutine tasks, it means all // If ProcessBatch didn't enqueue any coroutine tasks, it means all
// keys were filtered out. So put the batch back in to_process to // keys were filtered out. So put the batch back in to_process to
// lookup in the next level // lookup in the next level
@ -2815,8 +2818,10 @@ Status Version::MultiGetAsync(
waiting.emplace_back(idx); waiting.emplace_back(idx);
} }
} }
if (to_process.empty()) { // If ProcessBatch() returned an error, then schedule the enqueued
if (s.ok() && mget_tasks.size() > 0) { // coroutines and wait for them, then abort the MultiGet.
if (to_process.empty() || !s.ok()) {
if (mget_tasks.size() > 0) {
assert(waiting.size()); assert(waiting.size());
RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size()); RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size());
// Collect all results so far // Collect all results so far
@ -2824,12 +2829,14 @@ Status Version::MultiGetAsync(
folly::coro::collectAllRange(std::move(mget_tasks)) folly::coro::collectAllRange(std::move(mget_tasks))
.scheduleOn(&range->context()->executor())); .scheduleOn(&range->context()->executor()));
mget_tasks.clear(); mget_tasks.clear();
if (s.ok()) {
for (Status stat : statuses) { for (Status stat : statuses) {
if (!stat.ok()) { if (!stat.ok()) {
s = stat; s = std::move(stat);
break; break;
} }
} }
}
if (!s.ok()) { if (!s.ok()) {
break; break;
@ -2850,6 +2857,9 @@ Status Version::MultiGetAsync(
assert(!s.ok() || waiting.size() == 0); assert(!s.ok() || waiting.size() == 0);
} }
} }
if (!s.ok()) {
break;
}
} }
uint64_t num_levels = 0; uint64_t num_levels = 0;

Loading…
Cancel
Save