diff --git a/HISTORY.md b/HISTORY.md index 599d8ca49..e449a3dea 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,7 @@ # Rocksdb Change Log ## Unreleased +### New Features +* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`. ## 7.4.0 (06/19/2022) ### Bug Fixes diff --git a/db/c.cc b/db/c.cc index b3ebe7ccc..3d252fdf6 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3350,11 +3350,6 @@ unsigned char rocksdb_options_get_advise_random_on_open( return opt->rep.advise_random_on_open; } -void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt, - double v) { - opt->rep.experimental_mempurge_threshold = v; -} - void rocksdb_options_set_access_hint_on_compaction_start( rocksdb_options_t* opt, int v) { switch(v) { @@ -3540,6 +3535,16 @@ int rocksdb_options_get_max_background_flushes(rocksdb_options_t* opt) { return opt->rep.max_background_flushes; } +void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt, + double v) { + opt->rep.experimental_mempurge_threshold = v; +} + +double rocksdb_options_get_experimental_mempurge_threshold( + rocksdb_options_t* opt) { + return opt->rep.experimental_mempurge_threshold; +} + void rocksdb_options_set_max_log_file_size(rocksdb_options_t* opt, size_t v) { opt->rep.max_log_file_size = v; } diff --git a/db/c_test.c b/db/c_test.c index c99955018..50246a5ca 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -1921,6 +1921,10 @@ int main(int argc, char** argv) { rocksdb_options_set_wal_compression(o, 1); CheckCondition(1 == rocksdb_options_get_wal_compression(o)); + rocksdb_options_set_experimental_mempurge_threshold(o, 29.0); + CheckCondition(29.0 == + rocksdb_options_get_experimental_mempurge_threshold(o)); + /* Blob Options */ rocksdb_options_set_enable_blob_files(o, 1); CheckCondition(1 == rocksdb_options_get_enable_blob_files(o)); @@ -2051,6 +2055,8 @@ int main(int argc, char** argv) { CheckCondition(4 == rocksdb_options_get_bottommost_compression(copy)); CheckCondition(2 == rocksdb_options_get_compaction_style(copy)); CheckCondition(1 == rocksdb_options_get_atomic_flush(copy)); + CheckCondition(29.0 == + rocksdb_options_get_experimental_mempurge_threshold(copy)); // Copies should be independent. rocksdb_options_set_allow_ingest_behind(copy, 0); @@ -2399,6 +2405,12 @@ int main(int argc, char** argv) { CheckCondition(0 == rocksdb_options_get_atomic_flush(copy)); CheckCondition(1 == rocksdb_options_get_atomic_flush(o)); + rocksdb_options_set_experimental_mempurge_threshold(copy, 229.0); + CheckCondition(229.0 == + rocksdb_options_get_experimental_mempurge_threshold(copy)); + CheckCondition(29.0 == + rocksdb_options_get_experimental_mempurge_threshold(o)); + rocksdb_options_destroy(copy); rocksdb_options_destroy(o); } diff --git a/db/column_family.cc b/db/column_family.cc index 27c5dd4c6..952c04def 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -550,7 +550,8 @@ ColumnFamilyData::ColumnFamilyData( prev_compaction_needed_bytes_(0), allow_2pc_(db_options.allow_2pc), last_memtable_id_(0), - db_paths_registered_(false) { + db_paths_registered_(false), + mempurge_used_(false) { if (id_ != kDummyColumnFamilyDataId) { // TODO(cc): RegisterDbPaths can be expensive, considering moving it // outside of this constructor which might be called with db mutex held. diff --git a/db/column_family.h b/db/column_family.h index a2215b0a8..a1d9060b3 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -526,6 +526,10 @@ class ColumnFamilyData { static const uint32_t kDummyColumnFamilyDataId; + // Keep track of whether the mempurge feature was ever used. + void SetMempurgeUsed() { mempurge_used_ = true; } + bool GetMempurgeUsed() { return mempurge_used_; } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -626,6 +630,7 @@ class ColumnFamilyData { // For charging memory usage of file metadata created for newly added files to // a Version associated with this CFD std::shared_ptr file_metadata_cache_res_mgr_; + bool mempurge_used_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index e661d74ea..cb1788179 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -777,13 +777,25 @@ TEST_F(DBFlushTest, MemPurgeBasic) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; - // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; #ifndef ROCKSDB_LITE + // Initially deactivate the MemPurge prototype. + options.experimental_mempurge_threshold = 0.0; TestFlushListener* listener = new TestFlushListener(options.env, this); options.listeners.emplace_back(listener); +#else + // Activate directly the MemPurge prototype. + // (RocksDB lite does not support dynamic options) + options.experimental_mempurge_threshold = 1.0; #endif // !ROCKSDB_LITE ASSERT_OK(TryReopen(options)); + + // RocksDB lite does not support dynamic options +#ifndef ROCKSDB_LITE + // Dynamically activate the MemPurge prototype without restarting the DB. + ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); + ASSERT_OK(db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "1.0"}})); +#endif + std::atomic mempurge_count{0}; std::atomic sst_count{0}; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( @@ -914,6 +926,234 @@ TEST_F(DBFlushTest, MemPurgeBasic) { Close(); } +// RocksDB lite does not support dynamic options +#ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, MemPurgeBasicToggle) { + Options options = CurrentOptions(); + + // The following options are used to enforce several values that + // may already exist as default values to make this test resilient + // to default value updates in the future. + options.statistics = CreateDBStatistics(); + + // Record all statistics. + options.statistics->set_stats_level(StatsLevel::kAll); + + // create the DB if it's not already present + options.create_if_missing = true; + + // Useful for now as we are trying to compare uncompressed data savings on + // flush(). + options.compression = kNoCompression; + + // Prevent memtable in place updates. Should already be disabled + // (from Wiki: + // In place updates can be enabled by toggling on the bool + // inplace_update_support flag. However, this flag is by default set to + // false + // because this thread-safe in-place update support is not compatible + // with concurrent memtable writes. Note that the bool + // allow_concurrent_memtable_write is set to true by default ) + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). + options.write_buffer_size = 1 << 20; + // Initially deactivate the MemPurge prototype. + // (negative values are equivalent to 0.0). + options.experimental_mempurge_threshold = -25.3; + TestFlushListener* listener = new TestFlushListener(options.env, this); + options.listeners.emplace_back(listener); + + ASSERT_OK(TryReopen(options)); + // Dynamically activate the MemPurge prototype without restarting the DB. + ColumnFamilyHandle* cfh = db_->DefaultColumnFamily(); + // Values greater than 1.0 are equivalent to 1.0 + ASSERT_OK( + db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "3.7898"}})); + std::atomic mempurge_count{0}; + std::atomic sst_count{0}; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + const size_t KVSIZE = 3; + std::vector KEYS(KVSIZE); + for (size_t k = 0; k < KVSIZE; k++) { + KEYS[k] = "IamKey" + std::to_string(k); + } + + std::vector RNDVALS(KVSIZE); + const std::string NOT_FOUND = "NOT_FOUND"; + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(719); + const size_t NUM_REPEAT = 100; + const size_t RAND_VALUES_LENGTH = 10240; + + // Insertion of of K-V pairs, multiple times (overwrites). + for (size_t i = 0; i < NUM_REPEAT; i++) { + for (size_t j = 0; j < KEYS.size(); j++) { + RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[j], RNDVALS[j])); + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + for (size_t j = 0; j < KEYS.size(); j++) { + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + } + + // Check that there was at least one mempurge + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1; + // Check that there was no SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 0; + + EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT); + + // Dynamically deactivate MemPurge. + ASSERT_OK( + db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "-1023.0"}})); + + // Insertion of of K-V pairs, multiple times (overwrites). + for (size_t i = 0; i < NUM_REPEAT; i++) { + for (size_t j = 0; j < KEYS.size(); j++) { + RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[j], RNDVALS[j])); + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + for (size_t j = 0; j < KEYS.size(); j++) { + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + } + } + + // Check that there was at least one mempurge + const uint32_t ZERO = 0; + // Assert that at least one flush to storage has been performed + EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT); + // The mempurge count is expected to be set to 0 when the options are updated. + // We expect no mempurge at all. + EXPECT_EQ(mempurge_count.exchange(0), ZERO); + + Close(); +} +// Closes the "#ifndef ROCKSDB_LITE" +// End of MemPurgeBasicToggle, which is not +// supported with RocksDB LITE because it +// relies on dynamically changing the option +// flag experimental_mempurge_threshold. +#endif + +// At the moment, MemPurge feature is deactivated +// when atomic_flush is enabled. This is because the level +// of garbage between Column Families is not guaranteed to +// be consistent, therefore a CF could hypothetically +// trigger a MemPurge while another CF would trigger +// a regular Flush. +TEST_F(DBFlushTest, MemPurgeWithAtomicFlush) { + Options options = CurrentOptions(); + + // The following options are used to enforce several values that + // may already exist as default values to make this test resilient + // to default value updates in the future. + options.statistics = CreateDBStatistics(); + + // Record all statistics. + options.statistics->set_stats_level(StatsLevel::kAll); + + // create the DB if it's not already present + options.create_if_missing = true; + + // Useful for now as we are trying to compare uncompressed data savings on + // flush(). + options.compression = kNoCompression; + + // Prevent memtable in place updates. Should already be disabled + // (from Wiki: + // In place updates can be enabled by toggling on the bool + // inplace_update_support flag. However, this flag is by default set to + // false + // because this thread-safe in-place update support is not compatible + // with concurrent memtable writes. Note that the bool + // allow_concurrent_memtable_write is set to true by default ) + options.inplace_update_support = false; + options.allow_concurrent_memtable_write = true; + + // Enforce size of a single MemTable to 64KB (64KB = 65,536 bytes). + options.write_buffer_size = 1 << 20; + // Activate the MemPurge prototype. + options.experimental_mempurge_threshold = 153.245; + // Activate atomic_flush. + options.atomic_flush = true; + + const std::vector new_cf_names = {"pikachu", "eevie"}; + CreateColumnFamilies(new_cf_names, options); + + Close(); + + // 3 CFs: default will be filled with overwrites (would normally trigger + // mempurge) + // new_cf_names[1] will be filled with random values (would trigger + // flush) new_cf_names[2] not filled with anything. + ReopenWithColumnFamilies( + {kDefaultColumnFamilyName, new_cf_names[0], new_cf_names[1]}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + ASSERT_OK(Put(1, "foo", "bar")); + ASSERT_OK(Put(2, "bar", "baz")); + + std::atomic mempurge_count{0}; + std::atomic sst_count{0}; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:MemPurgeSuccessful", + [&](void* /*arg*/) { mempurge_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + const size_t KVSIZE = 3; + std::vector KEYS(KVSIZE); + for (size_t k = 0; k < KVSIZE; k++) { + KEYS[k] = "IamKey" + std::to_string(k); + } + + std::string RNDKEY; + std::vector RNDVALS(KVSIZE); + const std::string NOT_FOUND = "NOT_FOUND"; + + // Heavy overwrite workload, + // more than would fit in maximum allowed memtables. + Random rnd(106); + const size_t NUM_REPEAT = 100; + const size_t RAND_KEY_LENGTH = 128; + const size_t RAND_VALUES_LENGTH = 10240; + + // Insertion of of K-V pairs, multiple times (overwrites). + for (size_t i = 0; i < NUM_REPEAT; i++) { + for (size_t j = 0; j < KEYS.size(); j++) { + RNDKEY = rnd.RandomString(RAND_KEY_LENGTH); + RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH); + ASSERT_OK(Put(KEYS[j], RNDVALS[j])); + ASSERT_OK(Put(1, RNDKEY, RNDVALS[j])); + ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]); + ASSERT_EQ(Get(1, RNDKEY), RNDVALS[j]); + } + } + + // Check that there was no mempurge because atomic_flush option is true. + const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 0; + // Check that there was at least one SST files created during flush. + const uint32_t EXPECTED_SST_COUNT = 1; + + EXPECT_EQ(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT); + EXPECT_GE(sst_count.exchange(0), EXPECTED_SST_COUNT); + + Close(); +} + TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { Options options = CurrentOptions(); @@ -930,7 +1170,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; + options.experimental_mempurge_threshold = 15.0; ASSERT_OK(TryReopen(options)); @@ -1137,7 +1377,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; + options.experimental_mempurge_threshold = 26.55; ASSERT_OK(TryReopen(options)); @@ -1212,8 +1452,9 @@ TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) { // Enforce size of a single MemTable to 128KB. options.write_buffer_size = 128 << 10; - // Activate the MemPurge prototype. - options.experimental_mempurge_threshold = 1.0; + // Activate the MemPurge prototype + // (values >1.0 are equivalent to 1.0). + options.experimental_mempurge_threshold = 2.5; ASSERT_OK(TryReopen(options)); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 15be4dcd8..5f91f53ce 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -578,38 +578,6 @@ Status DBImpl::CloseHelper() { flush_scheduler_.Clear(); trim_history_scheduler_.Clear(); - // For now, simply trigger a manual flush at close time - // on all the column families. - // TODO(bjlemaire): Check if this is needed. Also, in the - // future we can contemplate doing a more fine-grained - // flushing by first checking if there is a need for - // flushing (but need to implement something - // else than imm()->IsFlushPending() because the output - // memtables added to imm() don't trigger flushes). - if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { - Status flush_ret; - mutex_.Unlock(); - for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) { - if (immutable_db_options_.atomic_flush) { - flush_ret = AtomicFlushMemTables({cf}, FlushOptions(), - FlushReason::kManualFlush); - if (!flush_ret.ok()) { - ROCKS_LOG_INFO( - immutable_db_options_.info_log, - "Atomic flush memtables failed upon closing (mempurge)."); - } - } else { - flush_ret = - FlushMemTable(cf, FlushOptions(), FlushReason::kManualFlush); - if (!flush_ret.ok()) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Flush memtables failed upon closing (mempurge)."); - } - } - } - mutex_.Lock(); - } - while (!flush_queue_.empty()) { const FlushRequest& flush_req = PopFirstFromFlushQueue(); for (const auto& iter : flush_req) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 57a750065..0db65301b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2616,17 +2616,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, assert(flush_req.size() == 1); ColumnFamilyData* cfd = flush_req[0].first; assert(cfd); - // Note: SchedulePendingFlush is always preceded - // with an imm()->FlushRequested() call. However, - // we want to make this code snipper more resilient to - // future changes. Therefore, we add the following if - // statement - note that calling it twice (or more) - // doesn't break anything. - if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { - // If imm() contains silent memtables, - // requesting a flush will mark the imm_needed as true. - cfd->imm()->FlushRequested(); - } + if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { cfd->Ref(); cfd->set_queued_for_flush(true); @@ -2775,11 +2765,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, for (const auto& iter : flush_req) { ColumnFamilyData* cfd = iter.first; - if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { - // If imm() contains silent memtables, - // requesting a flush will mark the imm_needed as true. + if (cfd->GetMempurgeUsed()) { + // If imm() contains silent memtables (e.g.: because + // MemPurge was activated), requesting a flush will + // mark the imm_needed as true. cfd->imm()->FlushRequested(); } + if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { // can't flush this CF, try next one column_families_not_to_flush.push_back(cfd); diff --git a/db/flush_job.cc b/db/flush_job.cc index bd2fc9681..57d3a510f 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -211,6 +211,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, TEST_SYNC_POINT("FlushJob::Start"); db_mutex_->AssertHeld(); assert(pick_memtable_called); + // Mempurge threshold can be dynamically changed. + // For sake of consistency, mempurge_threshold is + // saved locally to maintain consistency in each + // FlushJob::Run call. + double mempurge_threshold = + mutable_cf_options_.experimental_mempurge_threshold; + AutoThreadOperationStageUpdater stage_run( ThreadStatus::STAGE_FLUSH_RUN); if (mems_.empty()) { @@ -238,9 +245,11 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); } Status mempurge_s = Status::NotFound("No MemPurge."); - if ((db_options_.experimental_mempurge_threshold > 0.0) && + if ((mempurge_threshold > 0.0) && (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && - (!mems_.empty()) && MemPurgeDecider()) { + (!mems_.empty()) && MemPurgeDecider(mempurge_threshold) && + !(db_options_.atomic_flush)) { + cfd_->SetMempurgeUsed(); mempurge_s = MemPurge(); if (!mempurge_s.ok()) { // Mempurge is typically aborted when the output @@ -628,8 +637,7 @@ Status FlushJob::MemPurge() { return s; } -bool FlushJob::MemPurgeDecider() { - double threshold = db_options_.experimental_mempurge_threshold; +bool FlushJob::MemPurgeDecider(double threshold) { // Never trigger mempurge if threshold is not a strictly positive value. if (!(threshold > 0.0)) { return false; @@ -779,10 +787,11 @@ bool FlushJob::MemPurgeDecider() { estimated_useful_payload += (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); - ROCKS_LOG_INFO( - db_options_.info_log, - "Mempurge sampling - found garbage ratio from sampling: %f.\n", - (payload - useful_payload) * 1.0 / payload); + ROCKS_LOG_INFO(db_options_.info_log, + "Mempurge sampling [CF %s] - found garbage ratio from " + "sampling: %f. Threshold is %f\n", + cfd_->GetName().c_str(), + (payload - useful_payload) * 1.0 / payload, threshold); } else { ROCKS_LOG_WARN(db_options_.info_log, "Mempurge sampling: null payload measured, and collected " diff --git a/db/flush_job.h b/db/flush_job.h index 7cfad1398..33c51e645 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -122,7 +122,7 @@ class FlushJob { // recommend all users not to set this flag as true given that the MemPurge // process has not matured yet. Status MemPurge(); - bool MemPurgeDecider(); + bool MemPurgeDecider(double threshold); // The rate limiter priority (io_priority) is determined dynamically here. Env::IOPriority GetRateLimiterPriorityForWrite(); #ifndef ROCKSDB_LITE diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 4879c229c..57ca7e2e2 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -193,6 +193,7 @@ bool StressTest::BuildOptionsTable() { {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}}, {"max_successive_merges", {"0", "2", "4"}}, {"inplace_update_num_locks", {"100", "200", "300"}}, + {"experimental_mempurge_threshold", {"0.0", "1.0"}}, // TODO(ljin): enable test for this option // {"disable_auto_compactions", {"100", "200", "300"}}, {"level0_file_num_compaction_trigger", diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index c484872a0..843ea5e74 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -345,6 +345,23 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through SetOptions() API size_t inplace_update_num_locks = 10000; + // [experimental] + // Used to activate or deactive the Mempurge feature (memtable garbage + // collection). (deactivated by default). At every flush, the total useful + // payload (total entries minus garbage entries) is estimated as a ratio + // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then + // compared to this `threshold` value: + // - if ratio1.0 : aggressive mempurge. + // 0 < threshold < 1.0: mempurge triggered only for very low useful payload + // ratios. + // [experimental] + double experimental_mempurge_threshold = 0.0; + // existing_value - pointer to previous value (from both memtable and sst). // nullptr if key doesn't exist // existing_value_size - pointer to size of existing_value). diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 933650d5e..604737167 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1502,6 +1502,11 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_report_bg_io_stats( extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_report_bg_io_stats( rocksdb_options_t*); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t*, double); +extern ROCKSDB_LIBRARY_API double +rocksdb_options_get_experimental_mempurge_threshold(rocksdb_options_t*); + enum { rocksdb_tolerate_corrupted_tail_records_recovery = 0, rocksdb_absolute_consistency_recovery = 1, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ef1623478..2d273c51d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -865,23 +865,6 @@ struct DBOptions { // Default: true bool advise_random_on_open = true; - // [experimental] - // Used to activate or deactive the Mempurge feature (memtable garbage - // collection). (deactivated by default). At every flush, the total useful - // payload (total entries minus garbage entries) is estimated as a ratio - // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then - // compared to this `threshold` value: - // - if ratio1.0 : aggressive mempurge. - // 0 < threshold < 1.0: mempurge triggered only for very low useful payload - // ratios. - // [experimental] - double experimental_mempurge_threshold = 0.0; - // Amount of data to build up in memtables across all column // families before writing to disk. // diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 7a76ea2f5..6fc232c7f 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -3101,6 +3101,29 @@ void Java_org_rocksdb_Options_setMemtablePrefixBloomSizeRatio( static_cast(jmemtable_prefix_bloom_size_ratio); } +/* + * Class: org_rocksdb_Options + * Method: experimentalMempurgeThreshold + * Signature: (J)I + */ +jdouble Java_org_rocksdb_Options_experimentalMempurgeThreshold(JNIEnv*, jobject, + jlong jhandle) { + return reinterpret_cast(jhandle) + ->experimental_mempurge_threshold; +} + +/* + * Class: org_rocksdb_Options + * Method: setExperimentalMempurgeThreshold + * Signature: (JI)V + */ +void Java_org_rocksdb_Options_setExperimentalMempurgeThreshold( + JNIEnv*, jobject, jlong jhandle, jdouble jexperimental_mempurge_threshold) { + reinterpret_cast(jhandle) + ->experimental_mempurge_threshold = + static_cast(jexperimental_mempurge_threshold); +} + /* * Class: org_rocksdb_Options * Method: memtableWholeKeyFiltering @@ -4955,6 +4978,29 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMemtablePrefixBloomSizeRatio( static_cast(jmemtable_prefix_bloom_size_ratio); } +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: experimentalMempurgeThreshold + * Signature: (J)I + */ +jdouble Java_org_rocksdb_ColumnFamilyOptions_experimentalMempurgeThreshold( + JNIEnv*, jobject, jlong jhandle) { + return reinterpret_cast(jhandle) + ->experimental_mempurge_threshold; +} + +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: setExperimentalMempurgeThreshold + * Signature: (JI)V + */ +void Java_org_rocksdb_ColumnFamilyOptions_setExperimentalMempurgeThreshold( + JNIEnv*, jobject, jlong jhandle, jdouble jexperimental_mempurge_threshold) { + reinterpret_cast(jhandle) + ->experimental_mempurge_threshold = + static_cast(jexperimental_mempurge_threshold); +} + /* * Class: org_rocksdb_ColumnFamilyOptions * Method: memtableWholeKeyFiltering diff --git a/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java b/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java index f9f803d97..928750446 100644 --- a/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java @@ -81,6 +81,30 @@ public interface AdvancedMutableColumnFamilyOptionsInterface< */ double memtablePrefixBloomSizeRatio(); + /** + * Threshold used in the MemPurge (memtable garbage collection) + * feature. A value of 0.0 corresponds to no MemPurge, + * a value of 1.0 will trigger a MemPurge as often as possible. + * + * Default: 0.0 (disabled) + * + * @param experimentalMempurgeThreshold the threshold used by + * the MemPurge decider. + * @return the reference to the current options. + */ + T setExperimentalMempurgeThreshold(double experimentalMempurgeThreshold); + + /** + * Threshold used in the MemPurge (memtable garbage collection) + * feature. A value of 0.0 corresponds to no MemPurge, + * a value of 1.0 will trigger a MemPurge as often as possible. + * + * Default: 0 (disabled) + * + * @return the threshold used by the MemPurge decider + */ + double experimentalMempurgeThreshold(); + /** * Enable whole key bloom filter in memtable. Note this will only take effect * if memtable_prefix_bloom_size_ratio is not 0. Enabling whole key filtering diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index 104ba00c2..433fbcf08 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -683,6 +683,18 @@ public class ColumnFamilyOptions extends RocksObject return memtablePrefixBloomSizeRatio(nativeHandle_); } + @Override + public ColumnFamilyOptions setExperimentalMempurgeThreshold( + final double experimentalMempurgeThreshold) { + setExperimentalMempurgeThreshold(nativeHandle_, experimentalMempurgeThreshold); + return this; + } + + @Override + public double experimentalMempurgeThreshold() { + return experimentalMempurgeThreshold(nativeHandle_); + } + @Override public ColumnFamilyOptions setMemtableWholeKeyFiltering(final boolean memtableWholeKeyFiltering) { setMemtableWholeKeyFiltering(nativeHandle_, memtableWholeKeyFiltering); @@ -1389,6 +1401,9 @@ public class ColumnFamilyOptions extends RocksObject private native void setMemtablePrefixBloomSizeRatio( long handle, double memtablePrefixBloomSizeRatio); private native double memtablePrefixBloomSizeRatio(long handle); + private native void setExperimentalMempurgeThreshold( + long handle, double experimentalMempurgeThreshold); + private native double experimentalMempurgeThreshold(long handle); private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering); private native boolean memtableWholeKeyFiltering(long handle); private native void setBloomLocality( diff --git a/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java index 9379c333e..8f55c5201 100644 --- a/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java @@ -73,7 +73,8 @@ public class MutableColumnFamilyOptions max_successive_merges(ValueType.LONG), @Deprecated filter_deletes(ValueType.BOOLEAN), max_write_buffer_number(ValueType.INT), - inplace_update_num_locks(ValueType.LONG); + inplace_update_num_locks(ValueType.LONG), + experimental_mempurge_threshold(ValueType.DOUBLE); private final ValueType valueType; MemtableOption(final ValueType valueType) { @@ -287,6 +288,18 @@ public class MutableColumnFamilyOptions return getLong(MemtableOption.inplace_update_num_locks); } + @Override + public MutableColumnFamilyOptionsBuilder setExperimentalMempurgeThreshold( + final double experimentalMempurgeThreshold) { + return setDouble( + MemtableOption.experimental_mempurge_threshold, experimentalMempurgeThreshold); + } + + @Override + public double experimentalMempurgeThreshold() { + return getDouble(MemtableOption.experimental_mempurge_threshold); + } + @Override public MutableColumnFamilyOptionsBuilder setDisableAutoCompactions( final boolean disableAutoCompactions) { diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 4d313b7a4..f7e725f07 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -1623,6 +1623,17 @@ public class Options extends RocksObject return this; } + @Override + public double experimentalMempurgeThreshold() { + return experimentalMempurgeThreshold(nativeHandle_); + } + + @Override + public Options setExperimentalMempurgeThreshold(final double experimentalMempurgeThreshold) { + setExperimentalMempurgeThreshold(nativeHandle_, experimentalMempurgeThreshold); + return this; + } + @Override public boolean memtableWholeKeyFiltering() { return memtableWholeKeyFiltering(nativeHandle_); @@ -2420,6 +2431,9 @@ public class Options extends RocksObject private native void setMemtablePrefixBloomSizeRatio( long handle, double memtablePrefixBloomSizeRatio); private native double memtablePrefixBloomSizeRatio(long handle); + private native void setExperimentalMempurgeThreshold( + long handle, double experimentalMempurgeThreshold); + private native double experimentalMempurgeThreshold(long handle); private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering); private native boolean memtableWholeKeyFiltering(long handle); private native void setBloomLocality( diff --git a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java index 2ad669d33..7d7581048 100644 --- a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java +++ b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java @@ -331,6 +331,15 @@ public class ColumnFamilyOptionsTest { } } + @Test + public void experimentalMempurgeThreshold() { + try (final ColumnFamilyOptions opt = new ColumnFamilyOptions()) { + final double doubleValue = rand.nextDouble(); + opt.setExperimentalMempurgeThreshold(doubleValue); + assertThat(opt.experimentalMempurgeThreshold()).isEqualTo(doubleValue); + } + } + @Test public void memtableWholeKeyFiltering() { try (final ColumnFamilyOptions opt = new ColumnFamilyOptions()) { diff --git a/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java b/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java index 8792446f1..66b458a9c 100644 --- a/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java +++ b/java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java @@ -119,7 +119,7 @@ public class MutableColumnFamilyOptionsTest { + "min_write_buffer_number_to_merge=1; max_write_buffer_number_to_maintain=0; compaction_filter=nullptr; merge_operator=nullptr; " + "num_levels=7; optimize_filters_for_hits=false; force_consistency_checks=true; table_factory=BlockBasedTable; " + "max_write_buffer_size_to_maintain=0; memtable_insert_with_hint_prefix_extractor=nullptr; level_compaction_dynamic_level_bytes=false; " - + "inplace_update_support=false;"; + + "inplace_update_support=false; experimental_mempurge_threshold=0.003"; MutableColumnFamilyOptions.MutableColumnFamilyOptionsBuilder cf = MutableColumnFamilyOptions.parse(optionsString, true); @@ -158,6 +158,7 @@ public class MutableColumnFamilyOptionsTest { assertThat(cf.periodicCompactionSeconds()).isEqualTo(0); assertThat(cf.paranoidFileChecks()).isEqualTo(true); assertThat(cf.memtablePrefixBloomSizeRatio()).isEqualTo(7.5); + assertThat(cf.experimentalMempurgeThreshold()).isEqualTo(0.003); assertThat(cf.maxSequentialSkipInIterations()).isEqualTo(8); assertThat(cf.reportBgIoStats()).isEqualTo(true); } diff --git a/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java b/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java index 67e740891..6db940619 100644 --- a/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java +++ b/java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java @@ -50,6 +50,7 @@ public class MutableOptionsGetSetTest { .setBlobFileStartingLevel(2) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.17) + .setExperimentalMempurgeThreshold(0.005) .setMemtableWholeKeyFiltering(false) .setMemtableHugePageSize(3) .setMaxSuccessiveMerges(4) @@ -73,6 +74,7 @@ public class MutableOptionsGetSetTest { .setEnableBlobFiles(false) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.236) + .setExperimentalMempurgeThreshold(0.247) .setMemtableWholeKeyFiltering(true) .setMemtableHugePageSize(8) .setMaxSuccessiveMerges(12) @@ -111,6 +113,7 @@ public class MutableOptionsGetSetTest { assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); + assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); @@ -135,6 +138,7 @@ public class MutableOptionsGetSetTest { assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder2.arenaBlockSize()).isEqualTo(42); assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236); + assertThat(builder2.experimentalMempurgeThreshold()).isEqualTo(0.247); assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true); assertThat(builder2.memtableHugePageSize()).isEqualTo(8); assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12); @@ -202,6 +206,7 @@ public class MutableOptionsGetSetTest { .setBlobFileStartingLevel(3) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.17) + .setExperimentalMempurgeThreshold(0.005) .setMemtableWholeKeyFiltering(false) .setMemtableHugePageSize(3) .setMaxSuccessiveMerges(4) @@ -229,6 +234,7 @@ public class MutableOptionsGetSetTest { assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); + assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); @@ -251,6 +257,7 @@ public class MutableOptionsGetSetTest { .setEnableBlobFiles(false) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.236) + .setExperimentalMempurgeThreshold(0.247) .setMemtableWholeKeyFiltering(true) .setMemtableHugePageSize(8) .setMaxSuccessiveMerges(12) @@ -274,6 +281,7 @@ public class MutableOptionsGetSetTest { assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder2.arenaBlockSize()).isEqualTo(42); assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236); + assertThat(builder2.experimentalMempurgeThreshold()).isEqualTo(0.247); assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true); assertThat(builder2.memtableHugePageSize()).isEqualTo(8); assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12); @@ -324,6 +332,7 @@ public class MutableOptionsGetSetTest { .setBlobFileStartingLevel(4) .setArenaBlockSize(42) .setMemtablePrefixBloomSizeRatio(0.17) + .setExperimentalMempurgeThreshold(0.005) .setMemtableWholeKeyFiltering(false) .setMemtableHugePageSize(3) .setMaxSuccessiveMerges(4) @@ -350,6 +359,7 @@ public class MutableOptionsGetSetTest { assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); + assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index ea454ab36..612b1bd7c 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -274,6 +274,15 @@ public class OptionsTest { } } + @Test + public void experimentalMempurgeThreshold() { + try (final Options opt = new Options()) { + final double doubleValue = rand.nextDouble(); + opt.setExperimentalMempurgeThreshold(doubleValue); + assertThat(opt.experimentalMempurgeThreshold()).isEqualTo(doubleValue); + } + } + @Test public void memtableWholeKeyFiltering() { try (final Options opt = new Options()) { diff --git a/options/cf_options.cc b/options/cf_options.cc index 1e9865265..a681ea848 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -464,6 +464,10 @@ static std::unordered_map offsetof(struct MutableCFOptions, compression_per_level), OptionVerificationType::kNormal, OptionTypeFlags::kMutable, {0, OptionType::kCompressionType})}, + {"experimental_mempurge_threshold", + {offsetof(struct MutableCFOptions, experimental_mempurge_threshold), + OptionType::kDouble, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {kOptNameCompOpts, OptionTypeInfo::Struct( kOptNameCompOpts, &compression_options_type_info, @@ -1037,6 +1041,9 @@ void MutableCFOptions::Dump(Logger* log) const { report_bg_io_stats); ROCKS_LOG_INFO(log, " compression: %d", static_cast(compression)); + ROCKS_LOG_INFO(log, + " experimental_mempurge_threshold: %f", + experimental_mempurge_threshold); // Universal Compaction Options ROCKS_LOG_INFO(log, "compaction_options_universal.size_ratio : %d", diff --git a/options/cf_options.h b/options/cf_options.h index bfdc2e102..dccb559a2 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -112,6 +112,8 @@ struct MutableCFOptions { max_successive_merges(options.max_successive_merges), inplace_update_num_locks(options.inplace_update_num_locks), prefix_extractor(options.prefix_extractor), + experimental_mempurge_threshold( + options.experimental_mempurge_threshold), disable_auto_compactions(options.disable_auto_compactions), soft_pending_compaction_bytes_limit( options.soft_pending_compaction_bytes_limit), @@ -170,6 +172,7 @@ struct MutableCFOptions { max_successive_merges(0), inplace_update_num_locks(0), prefix_extractor(nullptr), + experimental_mempurge_threshold(0.0), disable_auto_compactions(false), soft_pending_compaction_bytes_limit(0), hard_pending_compaction_bytes_limit(0), @@ -231,6 +234,22 @@ struct MutableCFOptions { size_t max_successive_merges; size_t inplace_update_num_locks; std::shared_ptr prefix_extractor; + // [experimental] + // Used to activate or deactive the Mempurge feature (memtable garbage + // collection). (deactivated by default). At every flush, the total useful + // payload (total entries minus garbage entries) is estimated as a ratio + // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then + // compared to this `threshold` value: + // - if ratio1.0 : aggressive mempurge. + // 0 < threshold < 1.0: mempurge triggered only for very low useful payload + // ratios. + // [experimental] + double experimental_mempurge_threshold; // Compaction related options bool disable_auto_compactions; diff --git a/options/db_options.cc b/options/db_options.cc index e8846b222..92c56398d 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -208,8 +208,7 @@ static std::unordered_map {0, OptionType::kString, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, {"experimental_mempurge_threshold", - {offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold), - OptionType::kDouble, OptionVerificationType::kNormal, + {0, OptionType::kDouble, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, {"is_fd_close_on_exec", {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), @@ -716,7 +715,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), - experimental_mempurge_threshold(options.experimental_mempurge_threshold), db_write_buffer_size(options.db_write_buffer_size), write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), @@ -847,9 +845,6 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); - ROCKS_LOG_HEADER( - log, " Options.experimental_mempurge_threshold: %f", - experimental_mempurge_threshold); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index a245d63c6..8946f60ff 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -58,7 +58,6 @@ struct ImmutableDBOptions { bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; - double experimental_mempurge_threshold; size_t db_write_buffer_size; std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; diff --git a/options/options.cc b/options/options.cc index bba166be4..2276c1501 100644 --- a/options/options.cc +++ b/options/options.cc @@ -49,6 +49,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) options.max_write_buffer_size_to_maintain), inplace_update_support(options.inplace_update_support), inplace_update_num_locks(options.inplace_update_num_locks), + experimental_mempurge_threshold(options.experimental_mempurge_threshold), inplace_callback(options.inplace_callback), memtable_prefix_bloom_size_ratio( options.memtable_prefix_bloom_size_ratio), @@ -424,12 +425,14 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " blob_cache options: %s", blob_cache->GetPrintableOptions().c_str()); } + ROCKS_LOG_HEADER(log, "Options.experimental_mempurge_threshold: %f", + experimental_mempurge_threshold); } // ColumnFamilyOptions::Dump void Options::Dump(Logger* log) const { DBOptions::Dump(log); ColumnFamilyOptions::Dump(log); -} // Options::Dump +} // Options::Dump void Options::DumpCFOptions(Logger* log) const { ColumnFamilyOptions::Dump(log); diff --git a/options/options_helper.cc b/options/options_helper.cc index 7c76a31ec..44c57b436 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -212,6 +212,8 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, cf_opts->max_successive_merges = moptions.max_successive_merges; cf_opts->inplace_update_num_locks = moptions.inplace_update_num_locks; cf_opts->prefix_extractor = moptions.prefix_extractor; + cf_opts->experimental_mempurge_threshold = + moptions.experimental_mempurge_threshold; // Compaction related options cf_opts->disable_auto_compactions = moptions.disable_auto_compactions; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 42a6fd577..3a156a30a 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -503,6 +503,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "paranoid_file_checks=true;" "force_consistency_checks=true;" "inplace_update_num_locks=7429;" + "experimental_mempurge_threshold=0.0001;" "optimize_filters_for_hits=false;" "level_compaction_dynamic_level_bytes=false;" "inplace_update_support=false;" diff --git a/options/options_test.cc b/options/options_test.cc index 264500758..0eeaca484 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -116,6 +116,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"max_successive_merges", "30"}, {"min_partial_merge_operands", "31"}, {"prefix_extractor", "fixed:31"}, + {"experimental_mempurge_threshold", "0.003"}, {"optimize_filters_for_hits", "true"}, {"enable_blob_files", "true"}, {"min_blob_size", "1K"}, @@ -164,7 +165,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"compaction_readahead_size", "100"}, {"random_access_max_buffer_size", "3145728"}, @@ -256,6 +256,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31"); + ASSERT_EQ(new_cf_opt.experimental_mempurge_threshold, 0.003); ASSERT_EQ(new_cf_opt.enable_blob_files, true); ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10); ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30); @@ -329,7 +330,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); @@ -2345,6 +2345,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"max_successive_merges", "30"}, {"min_partial_merge_operands", "31"}, {"prefix_extractor", "fixed:31"}, + {"experimental_mempurge_threshold", "0.003"}, {"optimize_filters_for_hits", "true"}, {"enable_blob_files", "true"}, {"min_blob_size", "1K"}, @@ -2393,7 +2394,6 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"compaction_readahead_size", "100"}, {"random_access_max_buffer_size", "3145728"}, @@ -2479,6 +2479,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31"); + ASSERT_EQ(new_cf_opt.experimental_mempurge_threshold, 0.003); ASSERT_EQ(new_cf_opt.enable_blob_files, true); ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10); ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30); @@ -2553,7 +2554,6 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);