From 77f479951579322a0a095a6f5a82ca8c2532d5fa Mon Sep 17 00:00:00 2001 From: mpoeter Date: Tue, 14 Jun 2022 21:29:52 -0700 Subject: [PATCH] Fix potential leak when reusing PinnableSlice instances. (#10166) Summary: `PinnableSlice` may hold a handle to a cache value which must be released to correctly decrement the ref-counter. However, when `PinnableSlice` variables are reused, e.g. like this: ``` PinnableSlice pin_slice; db.Get("foo", &pin_slice); db.Get("foo", &pin_slice); ``` then the second `Get` simply overwrites the old value in `pin_slice` and the handle returned by the first `Get` is _not_ released. This PR adds `Reset` calls to the `Get`/`MultiGet` calls that accept `PinnableSlice` arguments to ensure proper cleanup of old values. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10166 Reviewed By: hx235 Differential Revision: D37151632 Pulled By: ajkr fbshipit-source-id: 9dd3c3288300f560531b843f67db11aeb569a9ff --- db/db_impl/db_impl.cc | 4 ++ db/db_test.cc | 126 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 106 insertions(+), 24 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 147517f2b..298adaf04 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1742,6 +1742,8 @@ Status DBImpl::Get(const ReadOptions& read_options, Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, std::string* timestamp) { + assert(value != nullptr); + value->Reset(); GetImplOptions get_impl_options; get_impl_options.column_family = column_family; get_impl_options.value = value; @@ -2349,6 +2351,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, autovector sorted_keys; sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { + values[i].Reset(); key_context.emplace_back(column_families[i], keys[i], &values[i], timestamps ? ×tamps[i] : nullptr, &statuses[i]); @@ -2495,6 +2498,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, autovector sorted_keys; sorted_keys.resize(num_keys); for (size_t i = 0; i < num_keys; ++i) { + values[i].Reset(); key_context.emplace_back(column_family, keys[i], &values[i], timestamps ? ×tamps[i] : nullptr, &statuses[i]); diff --git a/db/db_test.cc b/db/db_test.cc index 0cff96d3e..3fb685680 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -321,7 +321,7 @@ TEST_F(DBTest, MixedSlowdownOptions) { // We need the 2nd write to trigger delay. This is because delay is // estimated based on the last write size which is 0 for the first write. ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); - token.reset(); + token.reset(); for (auto& t : threads) { t.join(); @@ -379,7 +379,7 @@ TEST_F(DBTest, MixedSlowdownOptionsInQueue) { // We need the 2nd write to trigger delay. This is because delay is // estimated based on the last write size which is 0 for the first write. ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); - token.reset(); + token.reset(); for (auto& t : threads) { t.join(); @@ -448,7 +448,7 @@ TEST_F(DBTest, MixedSlowdownOptionsStop) { // We need the 2nd write to trigger delay. This is because delay is // estimated based on the last write size which is 0 for the first write. ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); - token.reset(); + token.reset(); for (auto& t : threads) { t.join(); @@ -483,7 +483,6 @@ TEST_F(DBTest, LevelLimitReopen) { } #endif // ROCKSDB_LITE - TEST_F(DBTest, PutSingleDeleteGet) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -782,7 +781,6 @@ TEST_F(DBTest, GetFromImmutableLayer) { } while (ChangeOptions()); } - TEST_F(DBTest, GetLevel0Ordering) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -3807,7 +3805,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) { options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB options.compaction_options_fifo.allow_compaction = false; - options.ttl = 1 * 60 * 60 ; // 1 hour + options.ttl = 1 * 60 * 60; // 1 hour options = CurrentOptions(options); DestroyAndReopen(options); @@ -3881,7 +3879,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) { options.write_buffer_size = 10 << 10; // 10KB options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB options.compaction_options_fifo.allow_compaction = false; - options.ttl = 1 * 60 * 60; // 1 hour + options.ttl = 1 * 60 * 60; // 1 hour options = CurrentOptions(options); DestroyAndReopen(options); @@ -6070,7 +6068,6 @@ TEST_F(DBTest, DISABLED_SuggestCompactRangeTest) { ASSERT_EQ(1, NumTableFilesAtLevel(1)); } - TEST_F(DBTest, PromoteL0) { Options options = CurrentOptions(); options.disable_auto_compactions = true; @@ -6251,13 +6248,12 @@ TEST_F(DBTest, CompactFilesShouldTriggerAutoCompaction) { SyncPoint::GetInstance()->EnableProcessing(); port::Thread manual_compaction_thread([&]() { - auto s = db_->CompactFiles(CompactionOptions(), - db_->DefaultColumnFamily(), input_files, 0); - ASSERT_OK(s); + auto s = db_->CompactFiles(CompactionOptions(), db_->DefaultColumnFamily(), + input_files, 0); + ASSERT_OK(s); }); - TEST_SYNC_POINT( - "DBTest::CompactFilesShouldTriggerAutoCompaction:Begin"); + TEST_SYNC_POINT("DBTest::CompactFilesShouldTriggerAutoCompaction:Begin"); // generate enough files to trigger compaction for (int i = 0; i < 20; ++i) { for (int j = 0; j < 2; ++j) { @@ -6267,16 +6263,15 @@ TEST_F(DBTest, CompactFilesShouldTriggerAutoCompaction) { } db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data); ASSERT_GT(cf_meta_data.levels[0].files.size(), - options.level0_file_num_compaction_trigger); - TEST_SYNC_POINT( - "DBTest::CompactFilesShouldTriggerAutoCompaction:End"); + options.level0_file_num_compaction_trigger); + TEST_SYNC_POINT("DBTest::CompactFilesShouldTriggerAutoCompaction:End"); manual_compaction_thread.join(); ASSERT_OK(dbfull()->TEST_WaitForCompact()); db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data); ASSERT_LE(cf_meta_data.levels[0].files.size(), - options.level0_file_num_compaction_trigger); + options.level0_file_num_compaction_trigger); } #endif // ROCKSDB_LITE @@ -6501,8 +6496,9 @@ class WriteStallListener : public EventListener { MutexLock l(&mutex_); return expected == condition_; } + private: - port::Mutex mutex_; + port::Mutex mutex_; WriteStallCondition condition_; }; @@ -6730,7 +6726,8 @@ TEST_F(DBTest, LastWriteBufferDelay) { sleeping_task.WakeUp(); sleeping_task.WaitUntilDone(); } -#endif // !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) +#endif // !defined(ROCKSDB_LITE) && + // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) TEST_F(DBTest, FailWhenCompressionNotSupportedTest) { CompressionType compressions[] = {kZlibCompression, kBZip2Compression, @@ -6815,6 +6812,89 @@ TEST_F(DBTest, PinnableSliceAndRowCache) { 1); } +TEST_F(DBTest, ReusePinnableSlice) { + Options options = CurrentOptions(); + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + options.row_cache = NewLRUCache(8192); + DestroyAndReopen(options); + + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Flush()); + + ASSERT_EQ(Get("foo"), "bar"); + ASSERT_EQ( + reinterpret_cast(options.row_cache.get())->TEST_GetLRUSize(), + 1); + + { + PinnableSlice pin_slice; + ASSERT_EQ(Get("foo", &pin_slice), Status::OK()); + ASSERT_EQ(Get("foo", &pin_slice), Status::OK()); + ASSERT_EQ(pin_slice.ToString(), "bar"); + + // Entry is already in cache, lookup will remove the element from lru + ASSERT_EQ( + reinterpret_cast(options.row_cache.get())->TEST_GetLRUSize(), + 0); + } + // After PinnableSlice destruction element is added back in LRU + ASSERT_EQ( + reinterpret_cast(options.row_cache.get())->TEST_GetLRUSize(), + 1); + + { + std::vector multiget_keys; + multiget_keys.push_back("foo"); + std::vector multiget_values(1); + std::vector statuses({Status::NotFound()}); + ReadOptions ropt; + dbfull()->MultiGet(ropt, dbfull()->DefaultColumnFamily(), + multiget_keys.size(), multiget_keys.data(), + multiget_values.data(), statuses.data()); + ASSERT_EQ(Status::OK(), statuses[0]); + dbfull()->MultiGet(ropt, dbfull()->DefaultColumnFamily(), + multiget_keys.size(), multiget_keys.data(), + multiget_values.data(), statuses.data()); + ASSERT_EQ(Status::OK(), statuses[0]); + + // Entry is already in cache, lookup will remove the element from lru + ASSERT_EQ( + reinterpret_cast(options.row_cache.get())->TEST_GetLRUSize(), + 0); + } + // After PinnableSlice destruction element is added back in LRU + ASSERT_EQ( + reinterpret_cast(options.row_cache.get())->TEST_GetLRUSize(), + 1); + + { + std::vector multiget_cfs; + multiget_cfs.push_back(dbfull()->DefaultColumnFamily()); + std::vector multiget_keys; + multiget_keys.push_back("foo"); + std::vector multiget_values(1); + std::vector statuses({Status::NotFound()}); + ReadOptions ropt; + dbfull()->MultiGet(ropt, multiget_keys.size(), multiget_cfs.data(), + multiget_keys.data(), multiget_values.data(), + statuses.data()); + ASSERT_EQ(Status::OK(), statuses[0]); + dbfull()->MultiGet(ropt, multiget_keys.size(), multiget_cfs.data(), + multiget_keys.data(), multiget_values.data(), + statuses.data()); + ASSERT_EQ(Status::OK(), statuses[0]); + + // Entry is already in cache, lookup will remove the element from lru + ASSERT_EQ( + reinterpret_cast(options.row_cache.get())->TEST_GetLRUSize(), + 0); + } + // After PinnableSlice destruction element is added back in LRU + ASSERT_EQ( + reinterpret_cast(options.row_cache.get())->TEST_GetLRUSize(), + 1); +} + #endif // ROCKSDB_LITE TEST_F(DBTest, DeletingOldWalAfterDrop) { @@ -6894,9 +6974,7 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { TEST_F(DBTest, ThreadLocalPtrDeadlock) { std::atomic flushes_done{0}; std::atomic threads_destroyed{0}; - auto done = [&] { - return flushes_done.load() > 10; - }; + auto done = [&] { return flushes_done.load() > 10; }; port::Thread flushing_thread([&] { for (int i = 0; !done(); ++i) { @@ -6909,7 +6987,7 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) { }); std::vector thread_spawning_threads(10); - for (auto& t: thread_spawning_threads) { + for (auto& t : thread_spawning_threads) { t = port::Thread([&] { while (!done()) { { @@ -6925,7 +7003,7 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) { }); } - for (auto& t: thread_spawning_threads) { + for (auto& t : thread_spawning_threads) { t.join(); } flushing_thread.join();