From 5879053fd04c15a4de741bb097a514c2c10a8780 Mon Sep 17 00:00:00 2001 From: Baptiste Lemaire <31483855+bjlemaire@users.noreply.github.com> Date: Thu, 23 Jun 2022 09:42:18 -0700 Subject: [PATCH] Dynamically changeable `MemPurge` option (#10011) Summary: **Summary** Make the mempurge option flag a Mutable Column Family option flag. Therefore, the mempurge feature can be dynamically toggled. **Motivation** RocksDB users prefer having the ability to switch features on and off without having to close and reopen the DB. This is particularly important if the feature causes issues and needs to be turned off. Dynamically changing a DB option flag does not seem currently possible. Moreover, with this new change, the MemPurge feature can be toggled on or off independently between column families, which we see as a major improvement. **Content of this PR** This PR includes removal of the `experimental_mempurge_threshold` flag as a DB option flag, and its re-introduction as a `MutableCFOption` flag. I updated the code to handle dynamic changes of the flag (in particular inside the `FlushJob` file). Additionally, this PR includes a new test to demonstrate the capacity of the code to toggle the MemPurge feature on and off, as well as the addition in the `db_stress` module of 2 different mempurge threshold values (0.0 and 1.0) that can be randomly changed with the `set_option_one_in` flag. This is useful to stress test the dynamic changes. **Benchmarking** I will add numbers to prove that there is no performance impact within the next 12 hours. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10011 Reviewed By: pdillinger Differential Revision: D36462357 Pulled By: bjlemaire fbshipit-source-id: 5e3d63bdadf085c0572ecc2349e7dd9729ce1802 --- HISTORY.md | 2 + db/c.cc | 15 +- db/c_test.c | 12 + db/column_family.cc | 3 +- db/column_family.h | 5 + db/db_flush_test.cc | 253 +++++++++++++++++- db/db_impl/db_impl.cc | 32 --- db/db_impl/db_impl_compaction_flush.cc | 20 +- db/flush_job.cc | 25 +- db/flush_job.h | 2 +- db_stress_tool/db_stress_test_base.cc | 1 + include/rocksdb/advanced_options.h | 17 ++ include/rocksdb/c.h | 5 + include/rocksdb/options.h | 17 -- java/rocksjni/options.cc | 46 ++++ ...edMutableColumnFamilyOptionsInterface.java | 24 ++ .../java/org/rocksdb/ColumnFamilyOptions.java | 15 ++ .../rocksdb/MutableColumnFamilyOptions.java | 15 +- java/src/main/java/org/rocksdb/Options.java | 14 + .../org/rocksdb/ColumnFamilyOptionsTest.java | 9 + .../MutableColumnFamilyOptionsTest.java | 3 +- .../org/rocksdb/MutableOptionsGetSetTest.java | 10 + .../test/java/org/rocksdb/OptionsTest.java | 9 + options/cf_options.cc | 7 + options/cf_options.h | 19 ++ options/db_options.cc | 7 +- options/db_options.h | 1 - options/options.cc | 5 +- options/options_helper.cc | 2 + options/options_settable_test.cc | 1 + options/options_test.cc | 8 +- 31 files changed, 506 insertions(+), 98 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 599d8ca49..e449a3dea 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased +### New Features +* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`. ## 7.4.0 (06/19/2022) ### Bug Fixes diff --git a/db/c.cc b/db/c.cc index b3ebe7ccc..3d252fdf6 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3350,11 +3350,6 @@ unsigned char rocksdb_options_get_advise_random_on_open( return opt->rep.advise_random_on_open; } -void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt, - double v) { - opt->rep.experimental_mempurge_threshold = v; -} - void rocksdb_options_set_access_hint_on_compaction_start( rocksdb_options_t* opt, int v) { switch(v) { @@ -3540,6 +3535,16 @@ int rocksdb_options_get_max_background_flushes(rocksdb_options_t* opt) { return opt->rep.max_background_flushes; } +void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt, + double v) { + opt->rep.experimental_mempurge_threshold = v; +} + +double rocksdb_options_get_experimental_mempurge_threshold( + rocksdb_options_t* opt) { + return opt->rep.experimental_mempurge_threshold; +} + void rocksdb_options_set_max_log_file_size(rocksdb_options_t* opt, size_t v) { opt->rep.max_log_file_size = v; } diff --git a/db/c_test.c b/db/c_test.c index c99955018..50246a5ca 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -1921,6 +1921,10 @@ int main(int argc, char** argv) { rocksdb_options_set_wal_compression(o, 1); CheckCondition(1 == rocksdb_options_get_wal_compression(o)); + rocksdb_options_set_experimental_mempurge_threshold(o, 29.0); + CheckCondition(29.0 == + rocksdb_options_get_experimental_mempurge_threshold(o)); + /* Blob Options */ rocksdb_options_set_enable_blob_files(o, 1); CheckCondition(1 == rocksdb_options_get_enable_blob_files(o)); @@ -2051,6 +2055,8 @@ int main(int argc, char** argv) { CheckCondition(4 == rocksdb_options_get_bottommost_compression(copy)); CheckCondition(2 == rocksdb_options_get_compaction_style(copy)); CheckCondition(1 == rocksdb_options_get_atomic_flush(copy)); + CheckCondition(29.0 == + rocksdb_options_get_experimental_mempurge_threshold(copy)); // Copies should be independent. rocksdb_options_set_allow_ingest_behind(copy, 0); @@ -2399,6 +2405,12 @@ int main(int argc, char** argv) { CheckCondition(0 == rocksdb_options_get_atomic_flush(copy)); CheckCondition(1 == rocksdb_options_get_atomic_flush(o)); + rocksdb_options_set_experimental_mempurge_threshold(copy, 229.0); + CheckCondition(229.0 == + rocksdb_options_get_experimental_mempurge_threshold(copy)); + CheckCondition(29.0 == + rocksdb_options_get_experimental_mempurge_threshold(o)); + rocksdb_options_destroy(copy); rocksdb_options_destroy(o); } diff --git a/db/column_family.cc b/db/column_family.cc index 27c5dd4c6..952c04def 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -550,7 +550,8 @@ ColumnFamilyData::ColumnFamilyData( prev_compaction_needed_bytes_(0), allow_2pc_(db_options.allow_2pc), last_memtable_id_(0), - db_paths_registered_(false) { + db_paths_registered_(false), + mempurge_used_(false) { if (id_ != kDummyColumnFamilyDataId) { // TODO(cc): RegisterDbPaths can be expensive, considering moving it // outside of this constructor which might be called with db mutex held. diff --git a/db/column_family.h b/db/column_family.h index a2215b0a8..a1d9060b3 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -526,6 +526,10 @@ class ColumnFamilyData { static const uint32_t kDummyColumnFamilyDataId; + // Keep track of whether the mempurge feature was ever used. + void SetMempurgeUsed() { mempurge_used_ = true; } + bool GetMempurgeUsed() { return mempurge_used_; } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -626,6 +630,7 @@ class ColumnFamilyData { // For charging memory usage of file metadata created for newly added files to // a Version associated with this CFD std::shared_ptr file_metadata_cache_res_mgr_; + bool mempurge_used_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index e661d74ea..cb1788179 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -777,13 +777,25 @@ TEST_F(DBFlushTest, MemPurgeBasic) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; - // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; #ifndef ROCKSDB_LITE + // Initially deactivate the MemPurge prototype. + options.experimental_mempurge_threshold = 0.0; TestFlushListener* listener = new TestFlushListener(options.env, this); options.listeners.emplace_back(listener); +#else + // Activate directly the MemPurge prototype. + // (RocksDB lite does not support dynamic options) + options.experimental_mempurge_threshold = 1.0; #endif // !ROCKSDB_LITE ASSERT_OK(TryReopen(options)); + + // RocksDB lite does not support dynamic options +#ifndef ROCKSDB_LITE + // Dynamically activate the MemPurge prototype without restarting the DB. + ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); + ASSERT_OK(db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "1.0"}})); +#endif + std::atomic mempurge_count{0}; std::atomic sst_count{0}; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -914,6 +926,234 @@ TEST_F(DBFlushTest, MemPurgeBasic) { Close(); } +// RocksDB lite does not support dynamic options +#ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, MemPurgeBasicToggle) { + Options options = CurrentOptions(); + + // The following options are used to enforce several values that + // may already exist as default values to make this test resilient + // to default value updates in the future. + options.statistics = CreateDBStatistics(); + + // Record all statistics. + options.statistics->set_stats_level(StatsLevel::kAll); + + // create the DB if it's not already present + options.create_if_missing = true; + + // Useful for now as we are trying to compare uncompressed data savings on + // flush(). + options.compression = kNoCompression; + + // Prevent memtable in place updates. Should already be disabled + // (from Wiki: + // In place updates can be enabled by toggling on the bool + // inplace_update_support flag. However, this flag is by default set to + // false + // because this thread-safe in-place update support is not compatible + // with concurrent memtable writes. Note that the bool + // allow_concurrent_memtable_write is set to true by default ) + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 1 << 20; + // Initially deactivate the MemPurge prototype. + // (negative values are equivalent to 0.0). + options.experimental_mempurge_threshold = -25.3; + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); + + ASSERT_OK(TryReopen(options)); + // Dynamically activate the MemPurge prototype without restarting the DB. + ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); + // Values greater than 1.0 are equivalent to 1.0 + ASSERT_OK( + db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "3.7898"}})); + std::atomic mempurge_count{0}; + std::atomic sst_count{0}; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + const size_t KVSIZE = 3; + std::vector KEYS(KVSIZE); + for (size_t k = 0; k < KVSIZE; k++) { + KEYS[k] = "IamKey" + std::to_string(k); + } + + std::vector RNDVALS(KVSIZE); + const std::string NOT_FOUND = "NOT_FOUND"; + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(719); + const size_t NUM_REPEAT = 100; + const size_t RAND_VALUES_LENGTH = 10240; + + // Insertion of of K-V pairs, multiple times (overwrites). + for (size_t i = 0; i < NUM_REPEAT; i++) { + for (size_t j = 0; j < KEYS.size(); j++) { + RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[j], RNDVALS[j])); + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + for (size_t j = 0; j < KEYS.size(); j++) { + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 0; + + EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT); + + // Dynamically deactivate MemPurge. + ASSERT_OK( + db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "-1023.0"}})); + + // Insertion of of K-V pairs, multiple times (overwrites). + for (size_t i = 0; i < NUM_REPEAT; i++) { + for (size_t j = 0; j < KEYS.size(); j++) { + RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[j], RNDVALS[j])); + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + for (size_t j = 0; j < KEYS.size(); j++) { + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + } + + // Check that there was at least one mempurge + const uint32_t ZERO = 0; + // Assert that at least one flush to storage has been performed + EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT); + // The mempurge count is expected to be set to 0 when the options are updated. + // We expect no mempurge at all. + EXPECT_EQ(mempurge_count.exchange(0), ZERO); + + Close(); +} +// Closes the "#ifndef ROCKSDB_LITE" +// End of MemPurgeBasicToggle, which is not +// supported with RocksDB LITE because it +// relies on dynamically changing the option +// flag experimental_mempurge_threshold. +#endif + +// At the moment, MemPurge feature is deactivated +// when atomic_flush is enabled. This is because the level +// of garbage between Column Families is not guaranteed to +// be consistent, therefore a CF could hypothetically +// trigger a MemPurge while another CF would trigger +// a regular Flush. +TEST_F(DBFlushTest, MemPurgeWithAtomicFlush) { + Options options = CurrentOptions(); + + // The following options are used to enforce several values that + // may already exist as default values to make this test resilient + // to default value updates in the future. + options.statistics = CreateDBStatistics(); + + // Record all statistics. + options.statistics->set_stats_level(StatsLevel::kAll); + + // create the DB if it's not already present + options.create_if_missing = true; + + // Useful for now as we are trying to compare uncompressed data savings on + // flush(). + options.compression = kNoCompression; + + // Prevent memtable in place updates. Should already be disabled + // (from Wiki: + // In place updates can be enabled by toggling on the bool + // inplace_update_support flag. However, this flag is by default set to + // false + // because this thread-safe in-place update support is not compatible + // with concurrent memtable writes. Note that the bool + // allow_concurrent_memtable_write is set to true by default ) + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64KB (64KB = 65,536 bytes). + options.write_buffer_size = 1 << 20; + // Activate the MemPurge prototype. + options.experimental_mempurge_threshold = 153.245; + // Activate atomic_flush. + options.atomic_flush = true; + + const std::vector new_cf_names = {"pikachu", "eevie"}; + CreateColumnFamilies(new_cf_names, options); + + Close(); + + // 3 CFs: default will be filled with overwrites (would normally trigger + // mempurge) + // new_cf_names[1] will be filled with random values (would trigger + // flush) new_cf_names[2] not filled with anything. + ReopenWithColumnFamilies( + {kDefaultColumnFamilyName, new_cf_names[0], new_cf_names[1]}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(2, "bar", "baz")); + + std::atomic mempurge_count{0}; + std::atomic sst_count{0}; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + const size_t KVSIZE = 3; + std::vector KEYS(KVSIZE); + for (size_t k = 0; k < KVSIZE; k++) { + KEYS[k] = "IamKey" + std::to_string(k); + } + + std::string RNDKEY; + std::vector RNDVALS(KVSIZE); + const std::string NOT_FOUND = "NOT_FOUND"; + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(106); + const size_t NUM_REPEAT = 100; + const size_t RAND_KEY_LENGTH = 128; + const size_t RAND_VALUES_LENGTH = 10240; + + // Insertion of of K-V pairs, multiple times (overwrites). + for (size_t i = 0; i < NUM_REPEAT; i++) { + for (size_t j = 0; j < KEYS.size(); j++) { + RNDKEY = rnd.RandomString(RAND_KEY_LENGTH); + RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[j], RNDVALS[j])); + ASSERT_OK(Put(1, RNDKEY, RNDVALS[j])); + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + ASSERT_EQ(Get(1, RNDKEY), RNDVALS[j]); + } + } + + // Check that there was no mempurge because atomic_flush option is true. + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 0; + // Check that there was at least one SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 1; + + EXPECT_EQ(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_GE(sst_count.exchange(0), EXPECTED_SST_COUNT); + + Close(); +} + TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { Options options = CurrentOptions(); @@ -930,7 +1170,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; + options.experimental_mempurge_threshold = 15.0; ASSERT_OK(TryReopen(options)); @@ -1137,7 +1377,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; + options.experimental_mempurge_threshold = 26.55; ASSERT_OK(TryReopen(options)); @@ -1212,8 +1452,9 @@ TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) { // Enforce size of a single MemTable to 128KB. options.write_buffer_size = 128 << 10; - // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; + // Activate the MemPurge prototype + // (values >1.0 are equivalent to 1.0). + options.experimental_mempurge_threshold = 2.5; ASSERT_OK(TryReopen(options)); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 15be4dcd8..5f91f53ce 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -578,38 +578,6 @@ Status DBImpl::CloseHelper() { flush_scheduler_.Clear(); trim_history_scheduler_.Clear(); - // For now, simply trigger a manual flush at close time - // on all the column families. - // TODO(bjlemaire): Check if this is needed. Also, in the - // future we can contemplate doing a more fine-grained - // flushing by first checking if there is a need for - // flushing (but need to implement something - // else than imm()->IsFlushPending() because the output - // memtables added to imm() don't trigger flushes). - if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { - Status flush_ret; - mutex_.Unlock(); - for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) { - if (immutable_db_options_.atomic_flush) { - flush_ret = AtomicFlushMemTables({cf}, FlushOptions(), - FlushReason::kManualFlush); - if (!flush_ret.ok()) { - ROCKS_LOG_INFO( - immutable_db_options_.info_log, - "Atomic flush memtables failed upon closing (mempurge)."); - } - } else { - flush_ret = - FlushMemTable(cf, FlushOptions(), FlushReason::kManualFlush); - if (!flush_ret.ok()) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Flush memtables failed upon closing (mempurge)."); - } - } - } - mutex_.Lock(); - } - while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); for (const auto& iter : flush_req) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 57a750065..0db65301b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2616,17 +2616,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, assert(flush_req.size() == 1); ColumnFamilyData* cfd = flush_req[0].first; assert(cfd); - // Note: SchedulePendingFlush is always preceded - // with an imm()->FlushRequested() call. However, - // we want to make this code snipper more resilient to - // future changes. Therefore, we add the following if - // statement - note that calling it twice (or more) - // doesn't break anything. - if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { - // If imm() contains silent memtables, - // requesting a flush will mark the imm_needed as true. - cfd->imm()->FlushRequested(); - } + if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { cfd->Ref(); cfd->set_queued_for_flush(true); @@ -2775,11 +2765,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, for (const auto& iter : flush_req) { ColumnFamilyData* cfd = iter.first; - if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { - // If imm() contains silent memtables, - // requesting a flush will mark the imm_needed as true. + if (cfd->GetMempurgeUsed()) { + // If imm() contains silent memtables (e.g.: because + // MemPurge was activated), requesting a flush will + // mark the imm_needed as true. cfd->imm()->FlushRequested(); } + if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { // can't flush this CF, try next one column_families_not_to_flush.push_back(cfd); diff --git a/db/flush_job.cc b/db/flush_job.cc index bd2fc9681..57d3a510f 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -211,6 +211,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, TEST_SYNC_POINT("FlushJob::Start"); db_mutex_->AssertHeld(); assert(pick_memtable_called); + // Mempurge threshold can be dynamically changed. + // For sake of consistency, mempurge_threshold is + // saved locally to maintain consistency in each + // FlushJob::Run call. + double mempurge_threshold = + mutable_cf_options_.experimental_mempurge_threshold; + AutoThreadOperationStageUpdater stage_run( ThreadStatus::STAGE_FLUSH_RUN); if (mems_.empty()) { @@ -238,9 +245,11 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); } Status mempurge_s = Status::NotFound("No MemPurge."); - if ((db_options_.experimental_mempurge_threshold > 0.0) && + if ((mempurge_threshold > 0.0) && (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && - (!mems_.empty()) && MemPurgeDecider()) { + (!mems_.empty()) && MemPurgeDecider(mempurge_threshold) && + !(db_options_.atomic_flush)) { + cfd_->SetMempurgeUsed(); mempurge_s = MemPurge(); if (!mempurge_s.ok()) { // Mempurge is typically aborted when the output @@ -628,8 +637,7 @@ Status FlushJob::MemPurge() { return s; } -bool FlushJob::MemPurgeDecider() { - double threshold = db_options_.experimental_mempurge_threshold; +bool FlushJob::MemPurgeDecider(double threshold) { // Never trigger mempurge if threshold is not a strictly positive value. if (!(threshold > 0.0)) { return false; @@ -779,10 +787,11 @@ bool FlushJob::MemPurgeDecider() { estimated_useful_payload += (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); - ROCKS_LOG_INFO( - db_options_.info_log, - "Mempurge sampling - found garbage ratio from sampling: %f.\n", - (payload - useful_payload) * 1.0 / payload); + ROCKS_LOG_INFO(db_options_.info_log, + "Mempurge sampling [CF %s] - found garbage ratio from " + "sampling: %f. Threshold is %f\n", + cfd_->GetName().c_str(), + (payload - useful_payload) * 1.0 / payload, threshold); } else { ROCKS_LOG_WARN(db_options_.info_log, "Mempurge sampling: null payload measured, and collected " diff --git a/db/flush_job.h b/db/flush_job.h index 7cfad1398..33c51e645 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -122,7 +122,7 @@ class FlushJob { // recommend all users not to set this flag as true given that the MemPurge // process has not matured yet. Status MemPurge(); - bool MemPurgeDecider(); + bool MemPurgeDecider(double threshold); // The rate limiter priority (io_priority) is determined dynamically here. Env::IOPriority GetRateLimiterPriorityForWrite(); #ifndef ROCKSDB_LITE diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 4879c229c..57ca7e2e2 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -193,6 +193,7 @@ bool StressTest::BuildOptionsTable() { {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}}, {"max_successive_merges", {"0", "2", "4"}}, {"inplace_update_num_locks", {"100", "200", "300"}}, + {"experimental_mempurge_threshold", {"0.0", "1.0"}}, // TODO(ljin): enable test for this option // {"disable_auto_compactions", {"100", "200", "300"}}, {"level0_file_num_compaction_trigger", diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index c484872a0..843ea5e74 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -345,6 +345,23 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through SetOptions() API size_t inplace_update_num_locks = 10000; + // [experimental] + // Used to activate or deactive the Mempurge feature (memtable garbage + // collection). (deactivated by default). At every flush, the total useful + // payload (total entries minus garbage entries) is estimated as a ratio + // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then + // compared to this `threshold` value: + // - if ratio1.0 : aggressive mempurge. + // 0 < threshold < 1.0: mempurge triggered only for very low useful payload + // ratios. + // [experimental] + double experimental_mempurge_threshold = 0.0; + // existing_value - pointer to previous value (from both memtable and sst). // nullptr if key doesn't exist // existing_value_size - pointer to size of existing_value). diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 933650d5e..604737167 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1502,6 +1502,11 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_report_bg_io_stats( extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_report_bg_io_stats( rocksdb_options_t*); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t*, double); +extern ROCKSDB_LIBRARY_API double +rocksdb_options_get_experimental_mempurge_threshold(rocksdb_options_t*); + enum { rocksdb_tolerate_corrupted_tail_records_recovery = 0, rocksdb_absolute_consistency_recovery = 1, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ef1623478..2d273c51d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -865,23 +865,6 @@ struct DBOptions { // Default: true bool advise_random_on_open = true; - // [experimental] - // Used to activate or deactive the Mempurge feature (memtable garbage - // collection). (deactivated by default). At every flush, the total useful - // payload (total entries minus garbage entries) is estimated as a ratio - // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then - // compared to this `threshold` value: - // - if ratio1.0 : aggressive mempurge. - // 0 < threshold < 1.0: mempurge triggered only for very low useful payload - // ratios. - // [experimental] - double experimental_mempurge_threshold = 0.0; - // Amount of data to build up in memtables across all column // families before writing to disk. // diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 7a76ea2f5..6fc232c7f 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -3101,6 +3101,29 @@ void Java_org_rocksdb_Options_setMemtablePrefixBloomSizeRatio( static_cast(jmemtable_prefix_bloom_size_ratio); } +/* + * Class: org_rocksdb_Options + * Method: experimentalMempurgeThreshold + * Signature: (J)I + */ +jdouble Java_org_rocksdb_Options_experimentalMempurgeThreshold(JNIEnv*, jobject, + jlong jhandle) { + return reinterpret_cast(jhandle) + ->experimental_mempurge_threshold; +} + +/* + * Class: org_rocksdb_Options + * Method: setExperimentalMempurgeThreshold + * Signature: (JI)V + */ +void Java_org_rocksdb_Options_setExperimentalMempurgeThreshold( + JNIEnv*, jobject, jlong jhandle, jdouble jexperimental_mempurge_threshold) { + reinterpret_cast(jhandle) + ->experimental_mempurge_threshold = + static_cast(jexperimental_mempurge_threshold); +} + /* * Class: org_rocksdb_Options * Method: memtableWholeKeyFiltering @@ -4955,6 +4978,29 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMemtablePrefixBloomSizeRatio( static_cast(jmemtable_prefix_bloom_size_ratio); } +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: experimentalMempurgeThreshold + * Signature: (J)I + */ +jdouble Java_org_rocksdb_ColumnFamilyOptions_experimentalMempurgeThreshold( + JNIEnv*, jobject, jlong jhandle) { + return reinterpret_cast(jhandle) + ->experimental_mempurge_threshold; +} + +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: setExperimentalMempurgeThreshold + * Signature: (JI)V + */ +void Java_org_rocksdb_ColumnFamilyOptions_setExperimentalMempurgeThreshold( + JNIEnv*, jobject, jlong jhandle, jdouble jexperimental_mempurge_threshold) { + reinterpret_cast(jhandle) + ->experimental_mempurge_threshold = + static_cast(jexperimental_mempurge_threshold); +} + /* * Class: org_rocksdb_ColumnFamilyOptions * Method: memtableWholeKeyFiltering diff --git a/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java b/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java index f9f803d97..928750446 100644 --- a/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java @@ -81,6 +81,30 @@ public interface AdvancedMutableColumnFamilyOptionsInterface< */ double memtablePrefixBloomSizeRatio(); + /** + * Threshold used in the MemPurge (memtable garbage collection) + * feature. A value of 0.0 corresponds to no MemPurge, + * a value of 1.0 will trigger a MemPurge as often as possible. + * + * Default: 0.0 (disabled) + * + * @param experimentalMempurgeThreshold the threshold used by + * the MemPurge decider. + * @return the reference to the current options. + */ + T setExperimentalMempurgeThreshold(double experimentalMempurgeThreshold); + + /** + * Threshold used in the MemPurge (memtable garbage collection) + * feature. A value of 0.0 corresponds to no MemPurge, + * a value of 1.0 will trigger a MemPurge as often as possible. + * + * Default: 0 (disabled) + * + * @return the threshold used by the MemPurge decider + */ + double experimentalMempurgeThreshold(); + /** * Enable whole key bloom filter in memtable. Note this will only take effect * if memtable_prefix_bloom_size_ratio is not 0. Enabling whole key filtering diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index 104ba00c2..433fbcf08 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -683,6 +683,18 @@ public class ColumnFamilyOptions extends RocksObject return memtablePrefixBloomSizeRatio(nativeHandle_); } + @Override + public ColumnFamilyOptions setExperimentalMempurgeThreshold( + final double experimentalMempurgeThreshold) { + setExperimentalMempurgeThreshold(nativeHandle_, experimentalMempurgeThreshold); + return this; + } + + @Override + public double experimentalMempurgeThreshold() { + return experimentalMempurgeThreshold(nativeHandle_); + } + @Override public ColumnFamilyOptions setMemtableWholeKeyFiltering(final boolean memtableWholeKeyFiltering) { setMemtableWholeKeyFiltering(nativeHandle_, memtableWholeKeyFiltering); @@ -1389,6 +1401,9 @@ public class ColumnFamilyOptions extends RocksObject private native void setMemtablePrefixBloomSizeRatio( long handle, double memtablePrefixBloomSizeRatio); private native double memtablePrefixBloomSizeRatio(long handle); + private native void setExperimentalMempurgeThreshold( + long handle, double experimentalMempurgeThreshold); + private native double experimentalMempurgeThreshold(long handle); private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering); private native boolean memtableWholeKeyFiltering(long handle); private native void setBloomLocality( diff --git a/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java index 9379c333e..8f55c5201 100644 --- a/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java @@ -73,7 +73,8 @@ public class MutableColumnFamilyOptions max_successive_merges(ValueType.LONG), @Deprecated filter_deletes(ValueType.BOOLEAN), max_write_buffer_number(ValueType.INT), - inplace_update_num_locks(ValueType.LONG); + inplace_update_num_locks(ValueType.LONG), + experimental_mempurge_threshold(ValueType.DOUBLE); private final ValueType valueType; MemtableOption(final ValueType valueType) { @@ -287,6 +288,18 @@ public class MutableColumnFamilyOptions return getLong(MemtableOption.inplace_update_num_locks); } + @Override + public MutableColumnFamilyOptionsBuilder setExperimentalMempurgeThreshold( + final double experimentalMempurgeThreshold) { + return setDouble( + MemtableOption.experimental_mempurge_threshold, experimentalMempurgeThreshold); + } + + @Override + public double experimentalMempurgeThreshold() { + return getDouble(MemtableOption.experimental_mempurge_threshold); + } + @Override public MutableColumnFamilyOptionsBuilder setDisableAutoCompactions( final boolean disableAutoCompactions) { diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 4d313b7a4..f7e725f07 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -1623,6 +1623,17 @@ public class Options extends RocksObject return this; } + @Override + public double experimentalMempurgeThreshold() { + return experimentalMempurgeThreshold(nativeHandle_); + } + + @Override + public Options setExperimentalMempurgeThreshold(final double experimentalMempurgeThreshold) { + setExperimentalMempurgeThreshold(nativeHandle_, experimentalMempurgeThreshold); + return this; + } + @Override public boolean memtableWholeKeyFiltering() { return memtableWholeKeyFiltering(nativeHandle_); @@ -2420,6 +2431,9 @@ public class Options extends RocksObject private native void setMemtablePrefixBloomSizeRatio( long handle, double memtablePrefixBloomSizeRatio); private native double memtablePrefixBloomSizeRatio(long handle); + private native void setExperimentalMempurgeThreshold( + long handle, double experimentalMempurgeThreshold); + private native double experimentalMempurgeThreshold(long handle); private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering); private native boolean memtableWholeKeyFiltering(long handle); private native void setBloomLocality( diff --git a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java index 2ad669d33..7d7581048 100644 --- a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java +++ b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java @@ -331,6 +331,15 @@ public class ColumnFamilyOptionsTest { } } + @Test + public void experimentalMempurgeThreshold() { + try (final ColumnFamilyOptions opt = new ColumnFamilyOptions()) { + final double doubleValue = rand.nextDouble(); + opt.setExperimentalMempurgeThreshold(doubleValue); + assertThat(opt.experimentalMempurgeThreshold()).isEqualTo(doubleValue); + } + } + @Test public void memtableWholeKeyFiltering() { try (final ColumnFamilyOptions opt = new ColumnFamilyOptions()) { diff --git a/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java b/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java index 8792446f1..66b458a9c 100644 --- a/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java +++ b/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java @@ -119,7 +119,7 @@ public class MutableColumnFamilyOptionsTest { + "min_write_buffer_number_to_merge=1; max_write_buffer_number_to_maintain=0; compaction_filter=nullptr; merge_operator=nullptr; " + "num_levels=7; optimize_filters_for_hits=false; force_consistency_checks=true; table_factory=BlockBasedTable; " + "max_write_buffer_size_to_maintain=0; memtable_insert_with_hint_prefix_extractor=nullptr; level_compaction_dynamic_level_bytes=false; " - + "inplace_update_support=false;"; + + "inplace_update_support=false; experimental_mempurge_threshold=0.003"; MutableColumnFamilyOptions.MutableColumnFamilyOptionsBuilder cf = MutableColumnFamilyOptions.parse(optionsString, true); @@ -158,6 +158,7 @@ public class MutableColumnFamilyOptionsTest { assertThat(cf.periodicCompactionSeconds()).isEqualTo(0); assertThat(cf.paranoidFileChecks()).isEqualTo(true); assertThat(cf.memtablePrefixBloomSizeRatio()).isEqualTo(7.5); + assertThat(cf.experimentalMempurgeThreshold()).isEqualTo(0.003); assertThat(cf.maxSequentialSkipInIterations()).isEqualTo(8); assertThat(cf.reportBgIoStats()).isEqualTo(true); } diff --git a/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java b/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java index 67e740891..6db940619 100644 --- a/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java +++ b/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java @@ -50,6 +50,7 @@ public class MutableOptionsGetSetTest { .setBlobFileStartingLevel(2) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.17) + .setExperimentalMempurgeThreshold(0.005) .setMemtableWholeKeyFiltering(false) .setMemtableHugePageSize(3) .setMaxSuccessiveMerges(4) @@ -73,6 +74,7 @@ public class MutableOptionsGetSetTest { .setEnableBlobFiles(false) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.236) + .setExperimentalMempurgeThreshold(0.247) .setMemtableWholeKeyFiltering(true) .setMemtableHugePageSize(8) .setMaxSuccessiveMerges(12) @@ -111,6 +113,7 @@ public class MutableOptionsGetSetTest { assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); + assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); @@ -135,6 +138,7 @@ public class MutableOptionsGetSetTest { assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder2.arenaBlockSize()).isEqualTo(42); assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236); + assertThat(builder2.experimentalMempurgeThreshold()).isEqualTo(0.247); assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true); assertThat(builder2.memtableHugePageSize()).isEqualTo(8); assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12); @@ -202,6 +206,7 @@ public class MutableOptionsGetSetTest { .setBlobFileStartingLevel(3) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.17) + .setExperimentalMempurgeThreshold(0.005) .setMemtableWholeKeyFiltering(false) .setMemtableHugePageSize(3) .setMaxSuccessiveMerges(4) @@ -229,6 +234,7 @@ public class MutableOptionsGetSetTest { assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); + assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); @@ -251,6 +257,7 @@ public class MutableOptionsGetSetTest { .setEnableBlobFiles(false) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.236) + .setExperimentalMempurgeThreshold(0.247) .setMemtableWholeKeyFiltering(true) .setMemtableHugePageSize(8) .setMaxSuccessiveMerges(12) @@ -274,6 +281,7 @@ public class MutableOptionsGetSetTest { assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder2.arenaBlockSize()).isEqualTo(42); assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236); + assertThat(builder2.experimentalMempurgeThreshold()).isEqualTo(0.247); assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true); assertThat(builder2.memtableHugePageSize()).isEqualTo(8); assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12); @@ -324,6 +332,7 @@ public class MutableOptionsGetSetTest { .setBlobFileStartingLevel(4) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.17) + .setExperimentalMempurgeThreshold(0.005) .setMemtableWholeKeyFiltering(false) .setMemtableHugePageSize(3) .setMaxSuccessiveMerges(4) @@ -350,6 +359,7 @@ public class MutableOptionsGetSetTest { assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); + assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index ea454ab36..612b1bd7c 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -274,6 +274,15 @@ public class OptionsTest { } } + @Test + public void experimentalMempurgeThreshold() { + try (final Options opt = new Options()) { + final double doubleValue = rand.nextDouble(); + opt.setExperimentalMempurgeThreshold(doubleValue); + assertThat(opt.experimentalMempurgeThreshold()).isEqualTo(doubleValue); + } + } + @Test public void memtableWholeKeyFiltering() { try (final Options opt = new Options()) { diff --git a/options/cf_options.cc b/options/cf_options.cc index 1e9865265..a681ea848 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -464,6 +464,10 @@ static std::unordered_map offsetof(struct MutableCFOptions, compression_per_level), OptionVerificationType::kNormal, OptionTypeFlags::kMutable, {0, OptionType::kCompressionType})}, + {"experimental_mempurge_threshold", + {offsetof(struct MutableCFOptions, experimental_mempurge_threshold), + OptionType::kDouble, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {kOptNameCompOpts, OptionTypeInfo::Struct( kOptNameCompOpts, &compression_options_type_info, @@ -1037,6 +1041,9 @@ void MutableCFOptions::Dump(Logger* log) const { report_bg_io_stats); ROCKS_LOG_INFO(log, " compression: %d", static_cast(compression)); + ROCKS_LOG_INFO(log, + " experimental_mempurge_threshold: %f", + experimental_mempurge_threshold); // Universal Compaction Options ROCKS_LOG_INFO(log, "compaction_options_universal.size_ratio : %d", diff --git a/options/cf_options.h b/options/cf_options.h index bfdc2e102..dccb559a2 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -112,6 +112,8 @@ struct MutableCFOptions { max_successive_merges(options.max_successive_merges), inplace_update_num_locks(options.inplace_update_num_locks), prefix_extractor(options.prefix_extractor), + experimental_mempurge_threshold( + options.experimental_mempurge_threshold), disable_auto_compactions(options.disable_auto_compactions), soft_pending_compaction_bytes_limit( options.soft_pending_compaction_bytes_limit), @@ -170,6 +172,7 @@ struct MutableCFOptions { max_successive_merges(0), inplace_update_num_locks(0), prefix_extractor(nullptr), + experimental_mempurge_threshold(0.0), disable_auto_compactions(false), soft_pending_compaction_bytes_limit(0), hard_pending_compaction_bytes_limit(0), @@ -231,6 +234,22 @@ struct MutableCFOptions { size_t max_successive_merges; size_t inplace_update_num_locks; std::shared_ptr prefix_extractor; + // [experimental] + // Used to activate or deactive the Mempurge feature (memtable garbage + // collection). (deactivated by default). At every flush, the total useful + // payload (total entries minus garbage entries) is estimated as a ratio + // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then + // compared to this `threshold` value: + // - if ratio1.0 : aggressive mempurge. + // 0 < threshold < 1.0: mempurge triggered only for very low useful payload + // ratios. + // [experimental] + double experimental_mempurge_threshold; // Compaction related options bool disable_auto_compactions; diff --git a/options/db_options.cc b/options/db_options.cc index e8846b222..92c56398d 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -208,8 +208,7 @@ static std::unordered_map {0, OptionType::kString, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, {"experimental_mempurge_threshold", - {offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold), - OptionType::kDouble, OptionVerificationType::kNormal, + {0, OptionType::kDouble, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, {"is_fd_close_on_exec", {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), @@ -716,7 +715,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), - experimental_mempurge_threshold(options.experimental_mempurge_threshold), db_write_buffer_size(options.db_write_buffer_size), write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), @@ -847,9 +845,6 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); - ROCKS_LOG_HEADER( - log, " Options.experimental_mempurge_threshold: %f", - experimental_mempurge_threshold); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index a245d63c6..8946f60ff 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -58,7 +58,6 @@ struct ImmutableDBOptions { bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; - double experimental_mempurge_threshold; size_t db_write_buffer_size; std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; diff --git a/options/options.cc b/options/options.cc index bba166be4..2276c1501 100644 --- a/options/options.cc +++ b/options/options.cc @@ -49,6 +49,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) options.max_write_buffer_size_to_maintain), inplace_update_support(options.inplace_update_support), inplace_update_num_locks(options.inplace_update_num_locks), + experimental_mempurge_threshold(options.experimental_mempurge_threshold), inplace_callback(options.inplace_callback), memtable_prefix_bloom_size_ratio( options.memtable_prefix_bloom_size_ratio), @@ -424,12 +425,14 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " blob_cache options: %s", blob_cache->GetPrintableOptions().c_str()); } + ROCKS_LOG_HEADER(log, "Options.experimental_mempurge_threshold: %f", + experimental_mempurge_threshold); } // ColumnFamilyOptions::Dump void Options::Dump(Logger* log) const { DBOptions::Dump(log); ColumnFamilyOptions::Dump(log); -} // Options::Dump +} // Options::Dump void Options::DumpCFOptions(Logger* log) const { ColumnFamilyOptions::Dump(log); diff --git a/options/options_helper.cc b/options/options_helper.cc index 7c76a31ec..44c57b436 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -212,6 +212,8 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, cf_opts->max_successive_merges = moptions.max_successive_merges; cf_opts->inplace_update_num_locks = moptions.inplace_update_num_locks; cf_opts->prefix_extractor = moptions.prefix_extractor; + cf_opts->experimental_mempurge_threshold = + moptions.experimental_mempurge_threshold; // Compaction related options cf_opts->disable_auto_compactions = moptions.disable_auto_compactions; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 42a6fd577..3a156a30a 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -503,6 +503,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "paranoid_file_checks=true;" "force_consistency_checks=true;" "inplace_update_num_locks=7429;" + "experimental_mempurge_threshold=0.0001;" "optimize_filters_for_hits=false;" "level_compaction_dynamic_level_bytes=false;" "inplace_update_support=false;" diff --git a/options/options_test.cc b/options/options_test.cc index 264500758..0eeaca484 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -116,6 +116,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"max_successive_merges", "30"}, {"min_partial_merge_operands", "31"}, {"prefix_extractor", "fixed:31"}, + {"experimental_mempurge_threshold", "0.003"}, {"optimize_filters_for_hits", "true"}, {"enable_blob_files", "true"}, {"min_blob_size", "1K"}, @@ -164,7 +165,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"compaction_readahead_size", "100"}, {"random_access_max_buffer_size", "3145728"}, @@ -256,6 +256,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31"); + ASSERT_EQ(new_cf_opt.experimental_mempurge_threshold, 0.003); ASSERT_EQ(new_cf_opt.enable_blob_files, true); ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10); ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30); @@ -329,7 +330,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); @@ -2345,6 +2345,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"max_successive_merges", "30"}, {"min_partial_merge_operands", "31"}, {"prefix_extractor", "fixed:31"}, + {"experimental_mempurge_threshold", "0.003"}, {"optimize_filters_for_hits", "true"}, {"enable_blob_files", "true"}, {"min_blob_size", "1K"}, @@ -2393,7 +2394,6 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"compaction_readahead_size", "100"}, {"random_access_max_buffer_size", "3145728"}, @@ -2479,6 +2479,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31"); + ASSERT_EQ(new_cf_opt.experimental_mempurge_threshold, 0.003); ASSERT_EQ(new_cf_opt.enable_blob_files, true); ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10); ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30); @@ -2553,7 +2554,6 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);