Dynamically changeable `MemPurge` option (#10011)

Summary:
**Summary**
Make the mempurge option flag a Mutable Column Family option flag. Therefore, the mempurge feature can be dynamically toggled.

**Motivation**
RocksDB users prefer having the ability to switch features on and off without having to close and reopen the DB. This is particularly important if the feature causes issues and needs to be turned off. Dynamically changing a DB option flag does not seem currently possible.
Moreover, with this new change, the MemPurge feature can be toggled on or off independently between column families, which we see as a major improvement.

**Content of this PR**
This PR includes removal of the `experimental_mempurge_threshold` flag as a DB option flag, and its re-introduction as a `MutableCFOption` flag. I updated the code to handle dynamic changes of the flag (in particular inside the `FlushJob` file). Additionally, this PR includes a new test to demonstrate the capacity of the code to toggle the MemPurge feature on and off, as well as the addition in the `db_stress` module of 2 different mempurge threshold values (0.0 and 1.0) that can be randomly changed with the `set_option_one_in` flag. This is useful to stress test the dynamic changes.

**Benchmarking**
I will add numbers to prove that there is no performance impact within the next 12 hours.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10011

Reviewed By: pdillinger

Differential Revision: D36462357

Pulled By: bjlemaire

fbshipit-source-id: 5e3d63bdadf085c0572ecc2349e7dd9729ce1802
main
Baptiste Lemaire 3 years ago committed by Facebook GitHub Bot
parent 2352e2dfda
commit 5879053fd0
  1. 2
      HISTORY.md
  2. 15
      db/c.cc
  3. 12
      db/c_test.c
  4. 3
      db/column_family.cc
  5. 5
      db/column_family.h
  6. 253
      db/db_flush_test.cc
  7. 32
      db/db_impl/db_impl.cc
  8. 20
      db/db_impl/db_impl_compaction_flush.cc
  9. 25
      db/flush_job.cc
  10. 2
      db/flush_job.h
  11. 1
      db_stress_tool/db_stress_test_base.cc
  12. 17
      include/rocksdb/advanced_options.h
  13. 5
      include/rocksdb/c.h
  14. 17
      include/rocksdb/options.h
  15. 46
      java/rocksjni/options.cc
  16. 24
      java/src/main/java/org/rocksdb/AdvancedMutableColumnFamilyOptionsInterface.java
  17. 15
      java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
  18. 15
      java/src/main/java/org/rocksdb/MutableColumnFamilyOptions.java
  19. 14
      java/src/main/java/org/rocksdb/Options.java
  20. 9
      java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java
  21. 3
      java/src/test/java/org/rocksdb/MutableColumnFamilyOptionsTest.java
  22. 10
      java/src/test/java/org/rocksdb/MutableOptionsGetSetTest.java
  23. 9
      java/src/test/java/org/rocksdb/OptionsTest.java
  24. 7
      options/cf_options.cc
  25. 19
      options/cf_options.h
  26. 7
      options/db_options.cc
  27. 1
      options/db_options.h
  28. 5
      options/options.cc
  29. 2
      options/options_helper.cc
  30. 1
      options/options_settable_test.cc
  31. 8
      options/options_test.cc

@ -1,5 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### New Features
* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`.
## 7.4.0 (06/19/2022) ## 7.4.0 (06/19/2022)
### Bug Fixes ### Bug Fixes

@ -3350,11 +3350,6 @@ unsigned char rocksdb_options_get_advise_random_on_open(
return opt->rep.advise_random_on_open; return opt->rep.advise_random_on_open;
} }
void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt,
double v) {
opt->rep.experimental_mempurge_threshold = v;
}
void rocksdb_options_set_access_hint_on_compaction_start( void rocksdb_options_set_access_hint_on_compaction_start(
rocksdb_options_t* opt, int v) { rocksdb_options_t* opt, int v) {
switch(v) { switch(v) {
@ -3540,6 +3535,16 @@ int rocksdb_options_get_max_background_flushes(rocksdb_options_t* opt) {
return opt->rep.max_background_flushes; return opt->rep.max_background_flushes;
} }
void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt,
double v) {
opt->rep.experimental_mempurge_threshold = v;
}
double rocksdb_options_get_experimental_mempurge_threshold(
rocksdb_options_t* opt) {
return opt->rep.experimental_mempurge_threshold;
}
void rocksdb_options_set_max_log_file_size(rocksdb_options_t* opt, size_t v) { void rocksdb_options_set_max_log_file_size(rocksdb_options_t* opt, size_t v) {
opt->rep.max_log_file_size = v; opt->rep.max_log_file_size = v;
} }

@ -1921,6 +1921,10 @@ int main(int argc, char** argv) {
rocksdb_options_set_wal_compression(o, 1); rocksdb_options_set_wal_compression(o, 1);
CheckCondition(1 == rocksdb_options_get_wal_compression(o)); CheckCondition(1 == rocksdb_options_get_wal_compression(o));
rocksdb_options_set_experimental_mempurge_threshold(o, 29.0);
CheckCondition(29.0 ==
rocksdb_options_get_experimental_mempurge_threshold(o));
/* Blob Options */ /* Blob Options */
rocksdb_options_set_enable_blob_files(o, 1); rocksdb_options_set_enable_blob_files(o, 1);
CheckCondition(1 == rocksdb_options_get_enable_blob_files(o)); CheckCondition(1 == rocksdb_options_get_enable_blob_files(o));
@ -2051,6 +2055,8 @@ int main(int argc, char** argv) {
CheckCondition(4 == rocksdb_options_get_bottommost_compression(copy)); CheckCondition(4 == rocksdb_options_get_bottommost_compression(copy));
CheckCondition(2 == rocksdb_options_get_compaction_style(copy)); CheckCondition(2 == rocksdb_options_get_compaction_style(copy));
CheckCondition(1 == rocksdb_options_get_atomic_flush(copy)); CheckCondition(1 == rocksdb_options_get_atomic_flush(copy));
CheckCondition(29.0 ==
rocksdb_options_get_experimental_mempurge_threshold(copy));
// Copies should be independent. // Copies should be independent.
rocksdb_options_set_allow_ingest_behind(copy, 0); rocksdb_options_set_allow_ingest_behind(copy, 0);
@ -2399,6 +2405,12 @@ int main(int argc, char** argv) {
CheckCondition(0 == rocksdb_options_get_atomic_flush(copy)); CheckCondition(0 == rocksdb_options_get_atomic_flush(copy));
CheckCondition(1 == rocksdb_options_get_atomic_flush(o)); CheckCondition(1 == rocksdb_options_get_atomic_flush(o));
rocksdb_options_set_experimental_mempurge_threshold(copy, 229.0);
CheckCondition(229.0 ==
rocksdb_options_get_experimental_mempurge_threshold(copy));
CheckCondition(29.0 ==
rocksdb_options_get_experimental_mempurge_threshold(o));
rocksdb_options_destroy(copy); rocksdb_options_destroy(copy);
rocksdb_options_destroy(o); rocksdb_options_destroy(o);
} }

@ -550,7 +550,8 @@ ColumnFamilyData::ColumnFamilyData(
prev_compaction_needed_bytes_(0), prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc), allow_2pc_(db_options.allow_2pc),
last_memtable_id_(0), last_memtable_id_(0),
db_paths_registered_(false) { db_paths_registered_(false),
mempurge_used_(false) {
if (id_ != kDummyColumnFamilyDataId) { if (id_ != kDummyColumnFamilyDataId) {
// TODO(cc): RegisterDbPaths can be expensive, considering moving it // TODO(cc): RegisterDbPaths can be expensive, considering moving it
// outside of this constructor which might be called with db mutex held. // outside of this constructor which might be called with db mutex held.

@ -526,6 +526,10 @@ class ColumnFamilyData {
static const uint32_t kDummyColumnFamilyDataId; static const uint32_t kDummyColumnFamilyDataId;
// Keep track of whether the mempurge feature was ever used.
void SetMempurgeUsed() { mempurge_used_ = true; }
bool GetMempurgeUsed() { return mempurge_used_; }
private: private:
friend class ColumnFamilySet; friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name, ColumnFamilyData(uint32_t id, const std::string& name,
@ -626,6 +630,7 @@ class ColumnFamilyData {
// For charging memory usage of file metadata created for newly added files to // For charging memory usage of file metadata created for newly added files to
// a Version associated with this CFD // a Version associated with this CFD
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_; std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
bool mempurge_used_;
}; };
// ColumnFamilySet has interesting thread-safety requirements // ColumnFamilySet has interesting thread-safety requirements

@ -777,13 +777,25 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20; options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_mempurge_threshold = 1.0;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// Initially deactivate the MemPurge prototype.
options.experimental_mempurge_threshold = 0.0;
TestFlushListener* listener = new TestFlushListener(options.env, this); TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
#else
// Activate directly the MemPurge prototype.
// (RocksDB lite does not support dynamic options)
options.experimental_mempurge_threshold = 1.0;
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
// RocksDB lite does not support dynamic options
#ifndef ROCKSDB_LITE
// Dynamically activate the MemPurge prototype without restarting the DB.
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
ASSERT_OK(db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "1.0"}}));
#endif
std::atomic<uint32_t> mempurge_count{0}; std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0}; std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
@ -914,6 +926,234 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
Close(); Close();
} }
// RocksDB lite does not support dynamic options
#ifndef ROCKSDB_LITE
TEST_F(DBFlushTest, MemPurgeBasicToggle) {
Options options = CurrentOptions();
// The following options are used to enforce several values that
// may already exist as default values to make this test resilient
// to default value updates in the future.
options.statistics = CreateDBStatistics();
// Record all statistics.
options.statistics->set_stats_level(StatsLevel::kAll);
// create the DB if it's not already present
options.create_if_missing = true;
// Useful for now as we are trying to compare uncompressed data savings on
// flush().
options.compression = kNoCompression;
// Prevent memtable in place updates. Should already be disabled
// (from Wiki:
// In place updates can be enabled by toggling on the bool
// inplace_update_support flag. However, this flag is by default set to
// false
// because this thread-safe in-place update support is not compatible
// with concurrent memtable writes. Note that the bool
// allow_concurrent_memtable_write is set to true by default )
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Initially deactivate the MemPurge prototype.
// (negative values are equivalent to 0.0).
options.experimental_mempurge_threshold = -25.3;
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
ASSERT_OK(TryReopen(options));
// Dynamically activate the MemPurge prototype without restarting the DB.
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
// Values greater than 1.0 are equivalent to 1.0
ASSERT_OK(
db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "3.7898"}}));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
const size_t KVSIZE = 3;
std::vector<std::string> KEYS(KVSIZE);
for (size_t k = 0; k < KVSIZE; k++) {
KEYS[k] = "IamKey" + std::to_string(k);
}
std::vector<std::string> RNDVALS(KVSIZE);
const std::string NOT_FOUND = "NOT_FOUND";
// Heavy overwrite workload,
// more than would fit in maximum allowed memtables.
Random rnd(719);
const size_t NUM_REPEAT = 100;
const size_t RAND_VALUES_LENGTH = 10240;
// Insertion of of K-V pairs, multiple times (overwrites).
for (size_t i = 0; i < NUM_REPEAT; i++) {
for (size_t j = 0; j < KEYS.size(); j++) {
RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
for (size_t j = 0; j < KEYS.size(); j++) {
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
}
// Check that there was at least one mempurge
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
// Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
// Dynamically deactivate MemPurge.
ASSERT_OK(
db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "-1023.0"}}));
// Insertion of of K-V pairs, multiple times (overwrites).
for (size_t i = 0; i < NUM_REPEAT; i++) {
for (size_t j = 0; j < KEYS.size(); j++) {
RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
for (size_t j = 0; j < KEYS.size(); j++) {
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
}
}
// Check that there was at least one mempurge
const uint32_t ZERO = 0;
// Assert that at least one flush to storage has been performed
EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
// The mempurge count is expected to be set to 0 when the options are updated.
// We expect no mempurge at all.
EXPECT_EQ(mempurge_count.exchange(0), ZERO);
Close();
}
// Closes the "#ifndef ROCKSDB_LITE"
// End of MemPurgeBasicToggle, which is not
// supported with RocksDB LITE because it
// relies on dynamically changing the option
// flag experimental_mempurge_threshold.
#endif
// At the moment, MemPurge feature is deactivated
// when atomic_flush is enabled. This is because the level
// of garbage between Column Families is not guaranteed to
// be consistent, therefore a CF could hypothetically
// trigger a MemPurge while another CF would trigger
// a regular Flush.
TEST_F(DBFlushTest, MemPurgeWithAtomicFlush) {
Options options = CurrentOptions();
// The following options are used to enforce several values that
// may already exist as default values to make this test resilient
// to default value updates in the future.
options.statistics = CreateDBStatistics();
// Record all statistics.
options.statistics->set_stats_level(StatsLevel::kAll);
// create the DB if it's not already present
options.create_if_missing = true;
// Useful for now as we are trying to compare uncompressed data savings on
// flush().
options.compression = kNoCompression;
// Prevent memtable in place updates. Should already be disabled
// (from Wiki:
// In place updates can be enabled by toggling on the bool
// inplace_update_support flag. However, this flag is by default set to
// false
// because this thread-safe in-place update support is not compatible
// with concurrent memtable writes. Note that the bool
// allow_concurrent_memtable_write is set to true by default )
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;
// Enforce size of a single MemTable to 64KB (64KB = 65,536 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_mempurge_threshold = 153.245;
// Activate atomic_flush.
options.atomic_flush = true;
const std::vector<std::string> new_cf_names = {"pikachu", "eevie"};
CreateColumnFamilies(new_cf_names, options);
Close();
// 3 CFs: default will be filled with overwrites (would normally trigger
// mempurge)
// new_cf_names[1] will be filled with random values (would trigger
// flush) new_cf_names[2] not filled with anything.
ReopenWithColumnFamilies(
{kDefaultColumnFamilyName, new_cf_names[0], new_cf_names[1]}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(2, "bar", "baz"));
std::atomic<uint32_t> mempurge_count{0};
std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
const size_t KVSIZE = 3;
std::vector<std::string> KEYS(KVSIZE);
for (size_t k = 0; k < KVSIZE; k++) {
KEYS[k] = "IamKey" + std::to_string(k);
}
std::string RNDKEY;
std::vector<std::string> RNDVALS(KVSIZE);
const std::string NOT_FOUND = "NOT_FOUND";
// Heavy overwrite workload,
// more than would fit in maximum allowed memtables.
Random rnd(106);
const size_t NUM_REPEAT = 100;
const size_t RAND_KEY_LENGTH = 128;
const size_t RAND_VALUES_LENGTH = 10240;
// Insertion of of K-V pairs, multiple times (overwrites).
for (size_t i = 0; i < NUM_REPEAT; i++) {
for (size_t j = 0; j < KEYS.size(); j++) {
RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
ASSERT_OK(Put(1, RNDKEY, RNDVALS[j]));
ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
ASSERT_EQ(Get(1, RNDKEY), RNDVALS[j]);
}
}
// Check that there was no mempurge because atomic_flush option is true.
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 0;
// Check that there was at least one SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 1;
EXPECT_EQ(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_GE(sst_count.exchange(0), EXPECTED_SST_COUNT);
Close();
}
TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -930,7 +1170,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20; options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype. // Activate the MemPurge prototype.
options.experimental_mempurge_threshold = 1.0; options.experimental_mempurge_threshold = 15.0;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
@ -1137,7 +1377,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20; options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype. // Activate the MemPurge prototype.
options.experimental_mempurge_threshold = 1.0; options.experimental_mempurge_threshold = 26.55;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
@ -1212,8 +1452,9 @@ TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) {
// Enforce size of a single MemTable to 128KB. // Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10; options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype. // Activate the MemPurge prototype
options.experimental_mempurge_threshold = 1.0; // (values >1.0 are equivalent to 1.0).
options.experimental_mempurge_threshold = 2.5;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));

@ -578,38 +578,6 @@ Status DBImpl::CloseHelper() {
flush_scheduler_.Clear(); flush_scheduler_.Clear();
trim_history_scheduler_.Clear(); trim_history_scheduler_.Clear();
// For now, simply trigger a manual flush at close time
// on all the column families.
// TODO(bjlemaire): Check if this is needed. Also, in the
// future we can contemplate doing a more fine-grained
// flushing by first checking if there is a need for
// flushing (but need to implement something
// else than imm()->IsFlushPending() because the output
// memtables added to imm() don't trigger flushes).
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
Status flush_ret;
mutex_.Unlock();
for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) {
if (immutable_db_options_.atomic_flush) {
flush_ret = AtomicFlushMemTables({cf}, FlushOptions(),
FlushReason::kManualFlush);
if (!flush_ret.ok()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Atomic flush memtables failed upon closing (mempurge).");
}
} else {
flush_ret =
FlushMemTable(cf, FlushOptions(), FlushReason::kManualFlush);
if (!flush_ret.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Flush memtables failed upon closing (mempurge).");
}
}
}
mutex_.Lock();
}
while (!flush_queue_.empty()) { while (!flush_queue_.empty()) {
const FlushRequest& flush_req = PopFirstFromFlushQueue(); const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req) { for (const auto& iter : flush_req) {

@ -2616,17 +2616,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
assert(flush_req.size() == 1); assert(flush_req.size() == 1);
ColumnFamilyData* cfd = flush_req[0].first; ColumnFamilyData* cfd = flush_req[0].first;
assert(cfd); assert(cfd);
// Note: SchedulePendingFlush is always preceded
// with an imm()->FlushRequested() call. However,
// we want to make this code snipper more resilient to
// future changes. Therefore, we add the following if
// statement - note that calling it twice (or more)
// doesn't break anything.
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
// If imm() contains silent memtables,
// requesting a flush will mark the imm_needed as true.
cfd->imm()->FlushRequested();
}
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
cfd->Ref(); cfd->Ref();
cfd->set_queued_for_flush(true); cfd->set_queued_for_flush(true);
@ -2775,11 +2765,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
for (const auto& iter : flush_req) { for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first; ColumnFamilyData* cfd = iter.first;
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { if (cfd->GetMempurgeUsed()) {
// If imm() contains silent memtables, // If imm() contains silent memtables (e.g.: because
// requesting a flush will mark the imm_needed as true. // MemPurge was activated), requesting a flush will
// mark the imm_needed as true.
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
} }
if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one // can't flush this CF, try next one
column_families_not_to_flush.push_back(cfd); column_families_not_to_flush.push_back(cfd);

@ -211,6 +211,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
TEST_SYNC_POINT("FlushJob::Start"); TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
assert(pick_memtable_called); assert(pick_memtable_called);
// Mempurge threshold can be dynamically changed.
// For sake of consistency, mempurge_threshold is
// saved locally to maintain consistency in each
// FlushJob::Run call.
double mempurge_threshold =
mutable_cf_options_.experimental_mempurge_threshold;
AutoThreadOperationStageUpdater stage_run( AutoThreadOperationStageUpdater stage_run(
ThreadStatus::STAGE_FLUSH_RUN); ThreadStatus::STAGE_FLUSH_RUN);
if (mems_.empty()) { if (mems_.empty()) {
@ -238,9 +245,11 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
} }
Status mempurge_s = Status::NotFound("No MemPurge."); Status mempurge_s = Status::NotFound("No MemPurge.");
if ((db_options_.experimental_mempurge_threshold > 0.0) && if ((mempurge_threshold > 0.0) &&
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
(!mems_.empty()) && MemPurgeDecider()) { (!mems_.empty()) && MemPurgeDecider(mempurge_threshold) &&
!(db_options_.atomic_flush)) {
cfd_->SetMempurgeUsed();
mempurge_s = MemPurge(); mempurge_s = MemPurge();
if (!mempurge_s.ok()) { if (!mempurge_s.ok()) {
// Mempurge is typically aborted when the output // Mempurge is typically aborted when the output
@ -628,8 +637,7 @@ Status FlushJob::MemPurge() {
return s; return s;
} }
bool FlushJob::MemPurgeDecider() { bool FlushJob::MemPurgeDecider(double threshold) {
double threshold = db_options_.experimental_mempurge_threshold;
// Never trigger mempurge if threshold is not a strictly positive value. // Never trigger mempurge if threshold is not a strictly positive value.
if (!(threshold > 0.0)) { if (!(threshold > 0.0)) {
return false; return false;
@ -779,10 +787,11 @@ bool FlushJob::MemPurgeDecider() {
estimated_useful_payload += estimated_useful_payload +=
(mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
ROCKS_LOG_INFO( ROCKS_LOG_INFO(db_options_.info_log,
db_options_.info_log, "Mempurge sampling [CF %s] - found garbage ratio from "
"Mempurge sampling - found garbage ratio from sampling: %f.\n", "sampling: %f. Threshold is %f\n",
(payload - useful_payload) * 1.0 / payload); cfd_->GetName().c_str(),
(payload - useful_payload) * 1.0 / payload, threshold);
} else { } else {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Mempurge sampling: null payload measured, and collected " "Mempurge sampling: null payload measured, and collected "

@ -122,7 +122,7 @@ class FlushJob {
// recommend all users not to set this flag as true given that the MemPurge // recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet. // process has not matured yet.
Status MemPurge(); Status MemPurge();
bool MemPurgeDecider(); bool MemPurgeDecider(double threshold);
// The rate limiter priority (io_priority) is determined dynamically here. // The rate limiter priority (io_priority) is determined dynamically here.
Env::IOPriority GetRateLimiterPriorityForWrite(); Env::IOPriority GetRateLimiterPriorityForWrite();
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

@ -193,6 +193,7 @@ bool StressTest::BuildOptionsTable() {
{"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}}, {"memtable_huge_page_size", {"0", std::to_string(2 * 1024 * 1024)}},
{"max_successive_merges", {"0", "2", "4"}}, {"max_successive_merges", {"0", "2", "4"}},
{"inplace_update_num_locks", {"100", "200", "300"}}, {"inplace_update_num_locks", {"100", "200", "300"}},
{"experimental_mempurge_threshold", {"0.0", "1.0"}},
// TODO(ljin): enable test for this option // TODO(ljin): enable test for this option
// {"disable_auto_compactions", {"100", "200", "300"}}, // {"disable_auto_compactions", {"100", "200", "300"}},
{"level0_file_num_compaction_trigger", {"level0_file_num_compaction_trigger",

@ -345,6 +345,23 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API
size_t inplace_update_num_locks = 10000; size_t inplace_update_num_locks = 10000;
// [experimental]
// Used to activate or deactive the Mempurge feature (memtable garbage
// collection). (deactivated by default). At every flush, the total useful
// payload (total entries minus garbage entries) is estimated as a ratio
// [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then
// compared to this `threshold` value:
// - if ratio<threshold: the flush is replaced by a mempurge operation
// - else: a regular flush operation takes place.
// Threshold values:
// 0.0: mempurge deactivated (default).
// 1.0: recommended threshold value.
// >1.0 : aggressive mempurge.
// 0 < threshold < 1.0: mempurge triggered only for very low useful payload
// ratios.
// [experimental]
double experimental_mempurge_threshold = 0.0;
// existing_value - pointer to previous value (from both memtable and sst). // existing_value - pointer to previous value (from both memtable and sst).
// nullptr if key doesn't exist // nullptr if key doesn't exist
// existing_value_size - pointer to size of existing_value). // existing_value_size - pointer to size of existing_value).

@ -1502,6 +1502,11 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_report_bg_io_stats(
extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_report_bg_io_stats( extern ROCKSDB_LIBRARY_API unsigned char rocksdb_options_get_report_bg_io_stats(
rocksdb_options_t*); rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t*, double);
extern ROCKSDB_LIBRARY_API double
rocksdb_options_get_experimental_mempurge_threshold(rocksdb_options_t*);
enum { enum {
rocksdb_tolerate_corrupted_tail_records_recovery = 0, rocksdb_tolerate_corrupted_tail_records_recovery = 0,
rocksdb_absolute_consistency_recovery = 1, rocksdb_absolute_consistency_recovery = 1,

@ -865,23 +865,6 @@ struct DBOptions {
// Default: true // Default: true
bool advise_random_on_open = true; bool advise_random_on_open = true;
// [experimental]
// Used to activate or deactive the Mempurge feature (memtable garbage
// collection). (deactivated by default). At every flush, the total useful
// payload (total entries minus garbage entries) is estimated as a ratio
// [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then
// compared to this `threshold` value:
// - if ratio<threshold: the flush is replaced by a mempurge operation
// - else: a regular flush operation takes place.
// Threshold values:
// 0.0: mempurge deactivated (default).
// 1.0: recommended threshold value.
// >1.0 : aggressive mempurge.
// 0 < threshold < 1.0: mempurge triggered only for very low useful payload
// ratios.
// [experimental]
double experimental_mempurge_threshold = 0.0;
// Amount of data to build up in memtables across all column // Amount of data to build up in memtables across all column
// families before writing to disk. // families before writing to disk.
// //

@ -3101,6 +3101,29 @@ void Java_org_rocksdb_Options_setMemtablePrefixBloomSizeRatio(
static_cast<double>(jmemtable_prefix_bloom_size_ratio); static_cast<double>(jmemtable_prefix_bloom_size_ratio);
} }
/*
* Class: org_rocksdb_Options
* Method: experimentalMempurgeThreshold
* Signature: (J)I
*/
jdouble Java_org_rocksdb_Options_experimentalMempurgeThreshold(JNIEnv*, jobject,
jlong jhandle) {
return reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle)
->experimental_mempurge_threshold;
}
/*
* Class: org_rocksdb_Options
* Method: setExperimentalMempurgeThreshold
* Signature: (JI)V
*/
void Java_org_rocksdb_Options_setExperimentalMempurgeThreshold(
JNIEnv*, jobject, jlong jhandle, jdouble jexperimental_mempurge_threshold) {
reinterpret_cast<ROCKSDB_NAMESPACE::Options*>(jhandle)
->experimental_mempurge_threshold =
static_cast<double>(jexperimental_mempurge_threshold);
}
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
* Method: memtableWholeKeyFiltering * Method: memtableWholeKeyFiltering
@ -4955,6 +4978,29 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMemtablePrefixBloomSizeRatio(
static_cast<double>(jmemtable_prefix_bloom_size_ratio); static_cast<double>(jmemtable_prefix_bloom_size_ratio);
} }
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: experimentalMempurgeThreshold
* Signature: (J)I
*/
jdouble Java_org_rocksdb_ColumnFamilyOptions_experimentalMempurgeThreshold(
JNIEnv*, jobject, jlong jhandle) {
return reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle)
->experimental_mempurge_threshold;
}
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setExperimentalMempurgeThreshold
* Signature: (JI)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setExperimentalMempurgeThreshold(
JNIEnv*, jobject, jlong jhandle, jdouble jexperimental_mempurge_threshold) {
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jhandle)
->experimental_mempurge_threshold =
static_cast<double>(jexperimental_mempurge_threshold);
}
/* /*
* Class: org_rocksdb_ColumnFamilyOptions * Class: org_rocksdb_ColumnFamilyOptions
* Method: memtableWholeKeyFiltering * Method: memtableWholeKeyFiltering

@ -81,6 +81,30 @@ public interface AdvancedMutableColumnFamilyOptionsInterface<
*/ */
double memtablePrefixBloomSizeRatio(); double memtablePrefixBloomSizeRatio();
/**
* Threshold used in the MemPurge (memtable garbage collection)
* feature. A value of 0.0 corresponds to no MemPurge,
* a value of 1.0 will trigger a MemPurge as often as possible.
*
* Default: 0.0 (disabled)
*
* @param experimentalMempurgeThreshold the threshold used by
* the MemPurge decider.
* @return the reference to the current options.
*/
T setExperimentalMempurgeThreshold(double experimentalMempurgeThreshold);
/**
* Threshold used in the MemPurge (memtable garbage collection)
* feature. A value of 0.0 corresponds to no MemPurge,
* a value of 1.0 will trigger a MemPurge as often as possible.
*
* Default: 0 (disabled)
*
* @return the threshold used by the MemPurge decider
*/
double experimentalMempurgeThreshold();
/** /**
* Enable whole key bloom filter in memtable. Note this will only take effect * Enable whole key bloom filter in memtable. Note this will only take effect
* if memtable_prefix_bloom_size_ratio is not 0. Enabling whole key filtering * if memtable_prefix_bloom_size_ratio is not 0. Enabling whole key filtering

@ -683,6 +683,18 @@ public class ColumnFamilyOptions extends RocksObject
return memtablePrefixBloomSizeRatio(nativeHandle_); return memtablePrefixBloomSizeRatio(nativeHandle_);
} }
@Override
public ColumnFamilyOptions setExperimentalMempurgeThreshold(
final double experimentalMempurgeThreshold) {
setExperimentalMempurgeThreshold(nativeHandle_, experimentalMempurgeThreshold);
return this;
}
@Override
public double experimentalMempurgeThreshold() {
return experimentalMempurgeThreshold(nativeHandle_);
}
@Override @Override
public ColumnFamilyOptions setMemtableWholeKeyFiltering(final boolean memtableWholeKeyFiltering) { public ColumnFamilyOptions setMemtableWholeKeyFiltering(final boolean memtableWholeKeyFiltering) {
setMemtableWholeKeyFiltering(nativeHandle_, memtableWholeKeyFiltering); setMemtableWholeKeyFiltering(nativeHandle_, memtableWholeKeyFiltering);
@ -1389,6 +1401,9 @@ public class ColumnFamilyOptions extends RocksObject
private native void setMemtablePrefixBloomSizeRatio( private native void setMemtablePrefixBloomSizeRatio(
long handle, double memtablePrefixBloomSizeRatio); long handle, double memtablePrefixBloomSizeRatio);
private native double memtablePrefixBloomSizeRatio(long handle); private native double memtablePrefixBloomSizeRatio(long handle);
private native void setExperimentalMempurgeThreshold(
long handle, double experimentalMempurgeThreshold);
private native double experimentalMempurgeThreshold(long handle);
private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering); private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering);
private native boolean memtableWholeKeyFiltering(long handle); private native boolean memtableWholeKeyFiltering(long handle);
private native void setBloomLocality( private native void setBloomLocality(

@ -73,7 +73,8 @@ public class MutableColumnFamilyOptions
max_successive_merges(ValueType.LONG), max_successive_merges(ValueType.LONG),
@Deprecated filter_deletes(ValueType.BOOLEAN), @Deprecated filter_deletes(ValueType.BOOLEAN),
max_write_buffer_number(ValueType.INT), max_write_buffer_number(ValueType.INT),
inplace_update_num_locks(ValueType.LONG); inplace_update_num_locks(ValueType.LONG),
experimental_mempurge_threshold(ValueType.DOUBLE);
private final ValueType valueType; private final ValueType valueType;
MemtableOption(final ValueType valueType) { MemtableOption(final ValueType valueType) {
@ -287,6 +288,18 @@ public class MutableColumnFamilyOptions
return getLong(MemtableOption.inplace_update_num_locks); return getLong(MemtableOption.inplace_update_num_locks);
} }
@Override
public MutableColumnFamilyOptionsBuilder setExperimentalMempurgeThreshold(
final double experimentalMempurgeThreshold) {
return setDouble(
MemtableOption.experimental_mempurge_threshold, experimentalMempurgeThreshold);
}
@Override
public double experimentalMempurgeThreshold() {
return getDouble(MemtableOption.experimental_mempurge_threshold);
}
@Override @Override
public MutableColumnFamilyOptionsBuilder setDisableAutoCompactions( public MutableColumnFamilyOptionsBuilder setDisableAutoCompactions(
final boolean disableAutoCompactions) { final boolean disableAutoCompactions) {

@ -1623,6 +1623,17 @@ public class Options extends RocksObject
return this; return this;
} }
@Override
public double experimentalMempurgeThreshold() {
return experimentalMempurgeThreshold(nativeHandle_);
}
@Override
public Options setExperimentalMempurgeThreshold(final double experimentalMempurgeThreshold) {
setExperimentalMempurgeThreshold(nativeHandle_, experimentalMempurgeThreshold);
return this;
}
@Override @Override
public boolean memtableWholeKeyFiltering() { public boolean memtableWholeKeyFiltering() {
return memtableWholeKeyFiltering(nativeHandle_); return memtableWholeKeyFiltering(nativeHandle_);
@ -2420,6 +2431,9 @@ public class Options extends RocksObject
private native void setMemtablePrefixBloomSizeRatio( private native void setMemtablePrefixBloomSizeRatio(
long handle, double memtablePrefixBloomSizeRatio); long handle, double memtablePrefixBloomSizeRatio);
private native double memtablePrefixBloomSizeRatio(long handle); private native double memtablePrefixBloomSizeRatio(long handle);
private native void setExperimentalMempurgeThreshold(
long handle, double experimentalMempurgeThreshold);
private native double experimentalMempurgeThreshold(long handle);
private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering); private native void setMemtableWholeKeyFiltering(long handle, boolean memtableWholeKeyFiltering);
private native boolean memtableWholeKeyFiltering(long handle); private native boolean memtableWholeKeyFiltering(long handle);
private native void setBloomLocality( private native void setBloomLocality(

@ -331,6 +331,15 @@ public class ColumnFamilyOptionsTest {
} }
} }
@Test
public void experimentalMempurgeThreshold() {
try (final ColumnFamilyOptions opt = new ColumnFamilyOptions()) {
final double doubleValue = rand.nextDouble();
opt.setExperimentalMempurgeThreshold(doubleValue);
assertThat(opt.experimentalMempurgeThreshold()).isEqualTo(doubleValue);
}
}
@Test @Test
public void memtableWholeKeyFiltering() { public void memtableWholeKeyFiltering() {
try (final ColumnFamilyOptions opt = new ColumnFamilyOptions()) { try (final ColumnFamilyOptions opt = new ColumnFamilyOptions()) {

@ -119,7 +119,7 @@ public class MutableColumnFamilyOptionsTest {
+ "min_write_buffer_number_to_merge=1; max_write_buffer_number_to_maintain=0; compaction_filter=nullptr; merge_operator=nullptr; " + "min_write_buffer_number_to_merge=1; max_write_buffer_number_to_maintain=0; compaction_filter=nullptr; merge_operator=nullptr; "
+ "num_levels=7; optimize_filters_for_hits=false; force_consistency_checks=true; table_factory=BlockBasedTable; " + "num_levels=7; optimize_filters_for_hits=false; force_consistency_checks=true; table_factory=BlockBasedTable; "
+ "max_write_buffer_size_to_maintain=0; memtable_insert_with_hint_prefix_extractor=nullptr; level_compaction_dynamic_level_bytes=false; " + "max_write_buffer_size_to_maintain=0; memtable_insert_with_hint_prefix_extractor=nullptr; level_compaction_dynamic_level_bytes=false; "
+ "inplace_update_support=false;"; + "inplace_update_support=false; experimental_mempurge_threshold=0.003";
MutableColumnFamilyOptions.MutableColumnFamilyOptionsBuilder cf = MutableColumnFamilyOptions.MutableColumnFamilyOptionsBuilder cf =
MutableColumnFamilyOptions.parse(optionsString, true); MutableColumnFamilyOptions.parse(optionsString, true);
@ -158,6 +158,7 @@ public class MutableColumnFamilyOptionsTest {
assertThat(cf.periodicCompactionSeconds()).isEqualTo(0); assertThat(cf.periodicCompactionSeconds()).isEqualTo(0);
assertThat(cf.paranoidFileChecks()).isEqualTo(true); assertThat(cf.paranoidFileChecks()).isEqualTo(true);
assertThat(cf.memtablePrefixBloomSizeRatio()).isEqualTo(7.5); assertThat(cf.memtablePrefixBloomSizeRatio()).isEqualTo(7.5);
assertThat(cf.experimentalMempurgeThreshold()).isEqualTo(0.003);
assertThat(cf.maxSequentialSkipInIterations()).isEqualTo(8); assertThat(cf.maxSequentialSkipInIterations()).isEqualTo(8);
assertThat(cf.reportBgIoStats()).isEqualTo(true); assertThat(cf.reportBgIoStats()).isEqualTo(true);
} }

@ -50,6 +50,7 @@ public class MutableOptionsGetSetTest {
.setBlobFileStartingLevel(2) .setBlobFileStartingLevel(2)
.setArenaBlockSize(42) .setArenaBlockSize(42)
.setMemtablePrefixBloomSizeRatio(0.17) .setMemtablePrefixBloomSizeRatio(0.17)
.setExperimentalMempurgeThreshold(0.005)
.setMemtableWholeKeyFiltering(false) .setMemtableWholeKeyFiltering(false)
.setMemtableHugePageSize(3) .setMemtableHugePageSize(3)
.setMaxSuccessiveMerges(4) .setMaxSuccessiveMerges(4)
@ -73,6 +74,7 @@ public class MutableOptionsGetSetTest {
.setEnableBlobFiles(false) .setEnableBlobFiles(false)
.setArenaBlockSize(42) .setArenaBlockSize(42)
.setMemtablePrefixBloomSizeRatio(0.236) .setMemtablePrefixBloomSizeRatio(0.236)
.setExperimentalMempurgeThreshold(0.247)
.setMemtableWholeKeyFiltering(true) .setMemtableWholeKeyFiltering(true)
.setMemtableHugePageSize(8) .setMemtableHugePageSize(8)
.setMaxSuccessiveMerges(12) .setMaxSuccessiveMerges(12)
@ -111,6 +113,7 @@ public class MutableOptionsGetSetTest {
assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize);
assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.arenaBlockSize()).isEqualTo(42);
assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17);
assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005);
assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false);
assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.memtableHugePageSize()).isEqualTo(3);
assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4);
@ -135,6 +138,7 @@ public class MutableOptionsGetSetTest {
assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize);
assertThat(builder2.arenaBlockSize()).isEqualTo(42); assertThat(builder2.arenaBlockSize()).isEqualTo(42);
assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236); assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236);
assertThat(builder2.experimentalMempurgeThreshold()).isEqualTo(0.247);
assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true); assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true);
assertThat(builder2.memtableHugePageSize()).isEqualTo(8); assertThat(builder2.memtableHugePageSize()).isEqualTo(8);
assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12); assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12);
@ -202,6 +206,7 @@ public class MutableOptionsGetSetTest {
.setBlobFileStartingLevel(3) .setBlobFileStartingLevel(3)
.setArenaBlockSize(42) .setArenaBlockSize(42)
.setMemtablePrefixBloomSizeRatio(0.17) .setMemtablePrefixBloomSizeRatio(0.17)
.setExperimentalMempurgeThreshold(0.005)
.setMemtableWholeKeyFiltering(false) .setMemtableWholeKeyFiltering(false)
.setMemtableHugePageSize(3) .setMemtableHugePageSize(3)
.setMaxSuccessiveMerges(4) .setMaxSuccessiveMerges(4)
@ -229,6 +234,7 @@ public class MutableOptionsGetSetTest {
assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize);
assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.arenaBlockSize()).isEqualTo(42);
assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17);
assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005);
assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false);
assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.memtableHugePageSize()).isEqualTo(3);
assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4);
@ -251,6 +257,7 @@ public class MutableOptionsGetSetTest {
.setEnableBlobFiles(false) .setEnableBlobFiles(false)
.setArenaBlockSize(42) .setArenaBlockSize(42)
.setMemtablePrefixBloomSizeRatio(0.236) .setMemtablePrefixBloomSizeRatio(0.236)
.setExperimentalMempurgeThreshold(0.247)
.setMemtableWholeKeyFiltering(true) .setMemtableWholeKeyFiltering(true)
.setMemtableHugePageSize(8) .setMemtableHugePageSize(8)
.setMaxSuccessiveMerges(12) .setMaxSuccessiveMerges(12)
@ -274,6 +281,7 @@ public class MutableOptionsGetSetTest {
assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder2.minBlobSize()).isEqualTo(minBlobSize);
assertThat(builder2.arenaBlockSize()).isEqualTo(42); assertThat(builder2.arenaBlockSize()).isEqualTo(42);
assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236); assertThat(builder2.memtablePrefixBloomSizeRatio()).isEqualTo(0.236);
assertThat(builder2.experimentalMempurgeThreshold()).isEqualTo(0.247);
assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true); assertThat(builder2.memtableWholeKeyFiltering()).isEqualTo(true);
assertThat(builder2.memtableHugePageSize()).isEqualTo(8); assertThat(builder2.memtableHugePageSize()).isEqualTo(8);
assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12); assertThat(builder2.maxSuccessiveMerges()).isEqualTo(12);
@ -324,6 +332,7 @@ public class MutableOptionsGetSetTest {
.setBlobFileStartingLevel(4) .setBlobFileStartingLevel(4)
.setArenaBlockSize(42) .setArenaBlockSize(42)
.setMemtablePrefixBloomSizeRatio(0.17) .setMemtablePrefixBloomSizeRatio(0.17)
.setExperimentalMempurgeThreshold(0.005)
.setMemtableWholeKeyFiltering(false) .setMemtableWholeKeyFiltering(false)
.setMemtableHugePageSize(3) .setMemtableHugePageSize(3)
.setMaxSuccessiveMerges(4) .setMaxSuccessiveMerges(4)
@ -350,6 +359,7 @@ public class MutableOptionsGetSetTest {
assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize); assertThat(builder1.minBlobSize()).isEqualTo(minBlobSize);
assertThat(builder1.arenaBlockSize()).isEqualTo(42); assertThat(builder1.arenaBlockSize()).isEqualTo(42);
assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17); assertThat(builder1.memtablePrefixBloomSizeRatio()).isEqualTo(0.17);
assertThat(builder1.experimentalMempurgeThreshold()).isEqualTo(0.005);
assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false); assertThat(builder1.memtableWholeKeyFiltering()).isEqualTo(false);
assertThat(builder1.memtableHugePageSize()).isEqualTo(3); assertThat(builder1.memtableHugePageSize()).isEqualTo(3);
assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4); assertThat(builder1.maxSuccessiveMerges()).isEqualTo(4);

@ -274,6 +274,15 @@ public class OptionsTest {
} }
} }
@Test
public void experimentalMempurgeThreshold() {
try (final Options opt = new Options()) {
final double doubleValue = rand.nextDouble();
opt.setExperimentalMempurgeThreshold(doubleValue);
assertThat(opt.experimentalMempurgeThreshold()).isEqualTo(doubleValue);
}
}
@Test @Test
public void memtableWholeKeyFiltering() { public void memtableWholeKeyFiltering() {
try (final Options opt = new Options()) { try (final Options opt = new Options()) {

@ -464,6 +464,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
offsetof(struct MutableCFOptions, compression_per_level), offsetof(struct MutableCFOptions, compression_per_level),
OptionVerificationType::kNormal, OptionTypeFlags::kMutable, OptionVerificationType::kNormal, OptionTypeFlags::kMutable,
{0, OptionType::kCompressionType})}, {0, OptionType::kCompressionType})},
{"experimental_mempurge_threshold",
{offsetof(struct MutableCFOptions, experimental_mempurge_threshold),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{kOptNameCompOpts, {kOptNameCompOpts,
OptionTypeInfo::Struct( OptionTypeInfo::Struct(
kOptNameCompOpts, &compression_options_type_info, kOptNameCompOpts, &compression_options_type_info,
@ -1037,6 +1041,9 @@ void MutableCFOptions::Dump(Logger* log) const {
report_bg_io_stats); report_bg_io_stats);
ROCKS_LOG_INFO(log, " compression: %d", ROCKS_LOG_INFO(log, " compression: %d",
static_cast<int>(compression)); static_cast<int>(compression));
ROCKS_LOG_INFO(log,
" experimental_mempurge_threshold: %f",
experimental_mempurge_threshold);
// Universal Compaction Options // Universal Compaction Options
ROCKS_LOG_INFO(log, "compaction_options_universal.size_ratio : %d", ROCKS_LOG_INFO(log, "compaction_options_universal.size_ratio : %d",

@ -112,6 +112,8 @@ struct MutableCFOptions {
max_successive_merges(options.max_successive_merges), max_successive_merges(options.max_successive_merges),
inplace_update_num_locks(options.inplace_update_num_locks), inplace_update_num_locks(options.inplace_update_num_locks),
prefix_extractor(options.prefix_extractor), prefix_extractor(options.prefix_extractor),
experimental_mempurge_threshold(
options.experimental_mempurge_threshold),
disable_auto_compactions(options.disable_auto_compactions), disable_auto_compactions(options.disable_auto_compactions),
soft_pending_compaction_bytes_limit( soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit), options.soft_pending_compaction_bytes_limit),
@ -170,6 +172,7 @@ struct MutableCFOptions {
max_successive_merges(0), max_successive_merges(0),
inplace_update_num_locks(0), inplace_update_num_locks(0),
prefix_extractor(nullptr), prefix_extractor(nullptr),
experimental_mempurge_threshold(0.0),
disable_auto_compactions(false), disable_auto_compactions(false),
soft_pending_compaction_bytes_limit(0), soft_pending_compaction_bytes_limit(0),
hard_pending_compaction_bytes_limit(0), hard_pending_compaction_bytes_limit(0),
@ -231,6 +234,22 @@ struct MutableCFOptions {
size_t max_successive_merges; size_t max_successive_merges;
size_t inplace_update_num_locks; size_t inplace_update_num_locks;
std::shared_ptr<const SliceTransform> prefix_extractor; std::shared_ptr<const SliceTransform> prefix_extractor;
// [experimental]
// Used to activate or deactive the Mempurge feature (memtable garbage
// collection). (deactivated by default). At every flush, the total useful
// payload (total entries minus garbage entries) is estimated as a ratio
// [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then
// compared to this `threshold` value:
// - if ratio<threshold: the flush is replaced by a mempurge operation
// - else: a regular flush operation takes place.
// Threshold values:
// 0.0: mempurge deactivated (default).
// 1.0: recommended threshold value.
// >1.0 : aggressive mempurge.
// 0 < threshold < 1.0: mempurge triggered only for very low useful payload
// ratios.
// [experimental]
double experimental_mempurge_threshold;
// Compaction related options // Compaction related options
bool disable_auto_compactions; bool disable_auto_compactions;

@ -208,8 +208,7 @@ static std::unordered_map<std::string, OptionTypeInfo>
{0, OptionType::kString, OptionVerificationType::kDeprecated, {0, OptionType::kString, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
{"experimental_mempurge_threshold", {"experimental_mempurge_threshold",
{offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold), {0, OptionType::kDouble, OptionVerificationType::kDeprecated,
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
{"is_fd_close_on_exec", {"is_fd_close_on_exec",
{offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
@ -716,7 +715,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_fallocate(options.allow_fallocate), allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec), is_fd_close_on_exec(options.is_fd_close_on_exec),
advise_random_on_open(options.advise_random_on_open), advise_random_on_open(options.advise_random_on_open),
experimental_mempurge_threshold(options.experimental_mempurge_threshold),
db_write_buffer_size(options.db_write_buffer_size), db_write_buffer_size(options.db_write_buffer_size),
write_buffer_manager(options.write_buffer_manager), write_buffer_manager(options.write_buffer_manager),
access_hint_on_compaction_start(options.access_hint_on_compaction_start), access_hint_on_compaction_start(options.access_hint_on_compaction_start),
@ -847,9 +845,6 @@ void ImmutableDBOptions::Dump(Logger* log) const {
is_fd_close_on_exec); is_fd_close_on_exec);
ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d",
advise_random_on_open); advise_random_on_open);
ROCKS_LOG_HEADER(
log, " Options.experimental_mempurge_threshold: %f",
experimental_mempurge_threshold);
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(
log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
db_write_buffer_size); db_write_buffer_size);

@ -58,7 +58,6 @@ struct ImmutableDBOptions {
bool allow_fallocate; bool allow_fallocate;
bool is_fd_close_on_exec; bool is_fd_close_on_exec;
bool advise_random_on_open; bool advise_random_on_open;
double experimental_mempurge_threshold;
size_t db_write_buffer_size; size_t db_write_buffer_size;
std::shared_ptr<WriteBufferManager> write_buffer_manager; std::shared_ptr<WriteBufferManager> write_buffer_manager;
DBOptions::AccessHint access_hint_on_compaction_start; DBOptions::AccessHint access_hint_on_compaction_start;

@ -49,6 +49,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
options.max_write_buffer_size_to_maintain), options.max_write_buffer_size_to_maintain),
inplace_update_support(options.inplace_update_support), inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks), inplace_update_num_locks(options.inplace_update_num_locks),
experimental_mempurge_threshold(options.experimental_mempurge_threshold),
inplace_callback(options.inplace_callback), inplace_callback(options.inplace_callback),
memtable_prefix_bloom_size_ratio( memtable_prefix_bloom_size_ratio(
options.memtable_prefix_bloom_size_ratio), options.memtable_prefix_bloom_size_ratio),
@ -424,12 +425,14 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " blob_cache options: %s", ROCKS_LOG_HEADER(log, " blob_cache options: %s",
blob_cache->GetPrintableOptions().c_str()); blob_cache->GetPrintableOptions().c_str());
} }
ROCKS_LOG_HEADER(log, "Options.experimental_mempurge_threshold: %f",
experimental_mempurge_threshold);
} // ColumnFamilyOptions::Dump } // ColumnFamilyOptions::Dump
void Options::Dump(Logger* log) const { void Options::Dump(Logger* log) const {
DBOptions::Dump(log); DBOptions::Dump(log);
ColumnFamilyOptions::Dump(log); ColumnFamilyOptions::Dump(log);
} // Options::Dump } // Options::Dump
void Options::DumpCFOptions(Logger* log) const { void Options::DumpCFOptions(Logger* log) const {
ColumnFamilyOptions::Dump(log); ColumnFamilyOptions::Dump(log);

@ -212,6 +212,8 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
cf_opts->max_successive_merges = moptions.max_successive_merges; cf_opts->max_successive_merges = moptions.max_successive_merges;
cf_opts->inplace_update_num_locks = moptions.inplace_update_num_locks; cf_opts->inplace_update_num_locks = moptions.inplace_update_num_locks;
cf_opts->prefix_extractor = moptions.prefix_extractor; cf_opts->prefix_extractor = moptions.prefix_extractor;
cf_opts->experimental_mempurge_threshold =
moptions.experimental_mempurge_threshold;
// Compaction related options // Compaction related options
cf_opts->disable_auto_compactions = moptions.disable_auto_compactions; cf_opts->disable_auto_compactions = moptions.disable_auto_compactions;

@ -503,6 +503,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"paranoid_file_checks=true;" "paranoid_file_checks=true;"
"force_consistency_checks=true;" "force_consistency_checks=true;"
"inplace_update_num_locks=7429;" "inplace_update_num_locks=7429;"
"experimental_mempurge_threshold=0.0001;"
"optimize_filters_for_hits=false;" "optimize_filters_for_hits=false;"
"level_compaction_dynamic_level_bytes=false;" "level_compaction_dynamic_level_bytes=false;"
"inplace_update_support=false;" "inplace_update_support=false;"

@ -116,6 +116,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"max_successive_merges", "30"}, {"max_successive_merges", "30"},
{"min_partial_merge_operands", "31"}, {"min_partial_merge_operands", "31"},
{"prefix_extractor", "fixed:31"}, {"prefix_extractor", "fixed:31"},
{"experimental_mempurge_threshold", "0.003"},
{"optimize_filters_for_hits", "true"}, {"optimize_filters_for_hits", "true"},
{"enable_blob_files", "true"}, {"enable_blob_files", "true"},
{"min_blob_size", "1K"}, {"min_blob_size", "1K"},
@ -164,7 +165,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"persist_stats_to_disk", "false"}, {"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"}, {"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"}, {"advise_random_on_open", "true"},
{"experimental_mempurge_threshold", "0.0"},
{"use_adaptive_mutex", "false"}, {"use_adaptive_mutex", "false"},
{"compaction_readahead_size", "100"}, {"compaction_readahead_size", "100"},
{"random_access_max_buffer_size", "3145728"}, {"random_access_max_buffer_size", "3145728"},
@ -256,6 +256,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr); ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr);
ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true);
ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31"); ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31");
ASSERT_EQ(new_cf_opt.experimental_mempurge_threshold, 0.003);
ASSERT_EQ(new_cf_opt.enable_blob_files, true); ASSERT_EQ(new_cf_opt.enable_blob_files, true);
ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10); ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10);
ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30); ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30);
@ -329,7 +330,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);
@ -2345,6 +2345,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"max_successive_merges", "30"}, {"max_successive_merges", "30"},
{"min_partial_merge_operands", "31"}, {"min_partial_merge_operands", "31"},
{"prefix_extractor", "fixed:31"}, {"prefix_extractor", "fixed:31"},
{"experimental_mempurge_threshold", "0.003"},
{"optimize_filters_for_hits", "true"}, {"optimize_filters_for_hits", "true"},
{"enable_blob_files", "true"}, {"enable_blob_files", "true"},
{"min_blob_size", "1K"}, {"min_blob_size", "1K"},
@ -2393,7 +2394,6 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"persist_stats_to_disk", "false"}, {"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"}, {"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"}, {"advise_random_on_open", "true"},
{"experimental_mempurge_threshold", "0.0"},
{"use_adaptive_mutex", "false"}, {"use_adaptive_mutex", "false"},
{"compaction_readahead_size", "100"}, {"compaction_readahead_size", "100"},
{"random_access_max_buffer_size", "3145728"}, {"random_access_max_buffer_size", "3145728"},
@ -2479,6 +2479,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr); ASSERT_TRUE(new_cf_opt.prefix_extractor != nullptr);
ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true); ASSERT_EQ(new_cf_opt.optimize_filters_for_hits, true);
ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31"); ASSERT_EQ(new_cf_opt.prefix_extractor->AsString(), "rocksdb.FixedPrefix.31");
ASSERT_EQ(new_cf_opt.experimental_mempurge_threshold, 0.003);
ASSERT_EQ(new_cf_opt.enable_blob_files, true); ASSERT_EQ(new_cf_opt.enable_blob_files, true);
ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10); ASSERT_EQ(new_cf_opt.min_blob_size, 1ULL << 10);
ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30); ASSERT_EQ(new_cf_opt.blob_file_size, 1ULL << 30);
@ -2553,7 +2554,6 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true); ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728);

Loading…
Cancel
Save