Make mempurge a background process (equivalent to in-memory compaction). (#8505)

Summary:
In https://github.com/facebook/rocksdb/issues/8454, I introduced a new process baptized `MemPurge` (memtable garbage collection). This new PR is built upon this past mempurge prototype.
In this PR, I made the `mempurge` process a background task, which provides superior performance since the mempurge process does not cling on the db_mutex anymore, and addresses severe restrictions from the past iteration (including a scenario where the past mempurge was failling, when a memtable was mempurged but was still referred to by an iterator/snapshot/...).
Now the mempurge process ressembles an in-memory compaction process: the stack of immutable memtables is filtered out, and the useful payload is used to populate an output memtable. If the output memtable is filled at more than 60% capacity (arbitrary heuristic) the mempurge process is aborted and a regular flush process takes place, else the output memtable is kept in the immutable memtable stack. Note that adding this output memtable to the `imm()` memtable stack does not trigger another flush process, so that the flush thread can go to sleep at the end of a successful mempurge.
MemPurge is activated by making the `experimental_allow_mempurge` flag `true`. When activated, the `MemPurge` process will always happen when the flush reason is `kWriteBufferFull`.
The 3 unit tests confirm that this process supports `Put`, `Get`, `Delete`, `DeleteRange` operators and is compatible with `Iterators` and `CompactionFilters`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8505

Reviewed By: pdillinger

Differential Revision: D29619283

Pulled By: bjlemaire

fbshipit-source-id: 8a99bee76b63a8211bff1a00e0ae32360aaece95
main
Baptiste Lemaire 3 years ago committed by Facebook GitHub Bot
parent bb485e986a
commit 837705ad80
  1. 21
      db/column_family.cc
  2. 9
      db/column_family.h
  3. 255
      db/db_flush_test.cc
  4. 33
      db/db_impl/db_impl.cc
  5. 19
      db/db_impl/db_impl.h
  6. 5
      db/db_impl/db_impl_compaction_flush.cc
  7. 219
      db/db_impl/db_impl_write.cc
  8. 256
      db/flush_job.cc
  9. 22
      db/flush_job.h
  10. 6
      db/memtable_list.cc
  11. 6
      db/memtable_list.h

@ -443,7 +443,7 @@ bool SuperVersion::Unref() {
return previous_refs == 1;
}
void SuperVersion::Cleanup(const bool noImmMemoryContribution) {
void SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
// Since this SuperVersion object is being deleted,
// decrement reference to the immutable MemtableList
@ -451,18 +451,9 @@ void SuperVersion::Cleanup(const bool noImmMemoryContribution) {
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
// Typically, if the m memtable was not made
// immutable, and therefore was not added to the
// imm list, it does not contribute to the imm
// memory footprint (and actually is not part of
// the 'imm' MemtableList at all).
// At the moment, noImmMemoryContribution is only
// used by the experimental 'MemPurge' prototype.
if (!noImmMemoryContribution) {
auto* memory_usage = current->cfd()->imm()->current_memory_usage();
assert(*memory_usage >= m->ApproximateMemoryUsage());
*memory_usage -= m->ApproximateMemoryUsage();
}
auto* memory_usage = current->cfd()->imm()->current_memory_usage();
assert(*memory_usage >= m->ApproximateMemoryUsage());
*memory_usage -= m->ApproximateMemoryUsage();
to_delete.push_back(m);
}
current->Unref();
@ -1272,7 +1263,7 @@ void ColumnFamilyData::InstallSuperVersion(
void ColumnFamilyData::InstallSuperVersion(
SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options, bool noImmMemoryContribution) {
const MutableCFOptions& mutable_cf_options) {
SuperVersion* new_superversion = sv_context->new_superversion.release();
new_superversion->db_mutex = db_mutex;
new_superversion->mutable_cf_options = mutable_cf_options;
@ -1302,7 +1293,7 @@ void ColumnFamilyData::InstallSuperVersion(
new_superversion->write_stall_condition, GetName(), ioptions());
}
if (old_superversion->Unref()) {
old_superversion->Cleanup(noImmMemoryContribution);
old_superversion->Cleanup();
sv_context->superversions_to_free.push_back(old_superversion);
}
}

@ -222,10 +222,7 @@ struct SuperVersion {
// Cleanup unrefs mem, imm and current. Also, it stores all memtables
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
// The 'noImmMemoryContribution' is set to true if the memtable being
// dereferenced in this SuperVersion was not added to the Immutable
// memtable list.
void Cleanup(bool noImmMemoryContribution = false);
void Cleanup();
void Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
MemTableListVersion* new_imm, Version* new_current);
@ -457,8 +454,7 @@ class ColumnFamilyData {
// IMPORTANT: Only call this from DBImpl::InstallSuperVersion()
void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options,
bool noImmMemoryContribution = false);
const MutableCFOptions& mutable_cf_options);
void InstallSuperVersion(SuperVersionContext* sv_context,
InstrumentedMutex* db_mutex);
@ -524,6 +520,7 @@ class ColumnFamilyData {
}
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
private:
friend class ColumnFamilySet;

@ -692,16 +692,17 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
options.allow_concurrent_memtable_write = true;
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 64 << 20;
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t flush_count = 0;
uint32_t sst_count = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; });
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; });
"DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::string KEY1 = "IamKey1";
@ -709,62 +710,120 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
std::string KEY3 = "IamKey3";
std::string KEY4 = "IamKey4";
std::string KEY5 = "IamKey5";
std::string VALUE1 = "IamValue1";
std::string VALUE2 = "IamValue2";
std::string KEY6 = "IamKey6";
std::string KEY7 = "IamKey7";
std::string KEY8 = "IamKey8";
std::string KEY9 = "IamKey9";
std::string RNDKEY1, RNDKEY2, RNDKEY3;
const std::string NOT_FOUND = "NOT_FOUND";
// Check simple operations (put-delete).
ASSERT_OK(Put(KEY1, VALUE1));
ASSERT_OK(Put(KEY2, VALUE2));
ASSERT_OK(Delete(KEY1));
ASSERT_OK(Put(KEY2, VALUE1));
ASSERT_OK(Put(KEY1, VALUE2));
ASSERT_OK(Flush());
ASSERT_EQ(Get(KEY1), VALUE2);
ASSERT_EQ(Get(KEY2), VALUE1);
ASSERT_OK(Delete(KEY1));
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_OK(Flush());
ASSERT_EQ(Get(KEY1), NOT_FOUND);
// Heavy overwrite workload,
// more than would fit in maximum allowed memtables.
Random rnd(719);
const size_t NUM_REPEAT = 100000;
const size_t RAND_VALUES_LENGTH = 512;
std::string p_v1, p_v2, p_v3, p_v4, p_v5;
// Insertion of of K-V pairs, multiple times.
// Also insert DeleteRange
const size_t NUM_REPEAT = 100;
const size_t RAND_KEYS_LENGTH = 57;
const size_t RAND_VALUES_LENGTH = 10240;
std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9, p_rv1,
p_rv2, p_rv3;
// Insert a very first set of keys that will be
// mempurged at least once.
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
// Insertion of of K-V pairs, multiple times (overwrites).
for (size_t i = 0; i < NUM_REPEAT; i++) {
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_OK(Put(KEY6, p_v6));
ASSERT_OK(Put(KEY7, p_v7));
ASSERT_OK(Put(KEY8, p_v8));
ASSERT_OK(Put(KEY9, p_v9));
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
ASSERT_EQ(Get(KEY6), p_v6);
ASSERT_EQ(Get(KEY7), p_v7);
ASSERT_EQ(Get(KEY8), p_v8);
ASSERT_EQ(Get(KEY9), p_v9);
}
// Check that there was at least one mempurge
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
// Check that there was no flush to storage.
const uint32_t EXPECTED_FLUSH_COUNT = 0;
// Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT);
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
const uint32_t mempurge_count_record = mempurge_count;
// Insertion of of K-V pairs, no overwrites.
for (size_t i = 0; i < NUM_REPEAT; i++) {
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
RNDKEY1 = rnd.RandomString(RAND_KEYS_LENGTH);
RNDKEY2 = rnd.RandomString(RAND_KEYS_LENGTH);
RNDKEY3 = rnd.RandomString(RAND_KEYS_LENGTH);
p_rv1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_rv2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_rv3 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(RNDKEY1, p_rv1));
ASSERT_OK(Put(RNDKEY2, p_rv2));
ASSERT_OK(Put(RNDKEY3, p_rv3));
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
ASSERT_EQ(Get(KEY6), p_v6);
ASSERT_EQ(Get(KEY7), p_v7);
ASSERT_EQ(Get(KEY8), p_v8);
ASSERT_EQ(Get(KEY9), p_v9);
ASSERT_EQ(Get(RNDKEY1), p_rv1);
ASSERT_EQ(Get(RNDKEY2), p_rv2);
ASSERT_EQ(Get(RNDKEY3), p_rv3);
}
// Assert that at least one flush to storage has been performed
ASSERT_GT(sst_count, EXPECTED_SST_COUNT);
// (which will consequently increase the number of mempurges recorded too).
ASSERT_EQ(mempurge_count, mempurge_count_record);
// Assert that there is no data corruption, even with
// a flush to storage.
ASSERT_EQ(Get(KEY1), p_v1);
ASSERT_EQ(Get(KEY2), p_v2);
ASSERT_EQ(Get(KEY3), p_v3);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
ASSERT_EQ(Get(KEY6), p_v6);
ASSERT_EQ(Get(KEY7), p_v7);
ASSERT_EQ(Get(KEY8), p_v8);
ASSERT_EQ(Get(KEY9), p_v9);
ASSERT_EQ(Get(RNDKEY1), p_rv1);
ASSERT_EQ(Get(RNDKEY2), p_rv2);
ASSERT_EQ(Get(RNDKEY3), p_rv3);
Close();
}
@ -780,17 +839,18 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
options.allow_concurrent_memtable_write = true;
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 64 << 20;
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t flush_count = 0;
uint32_t sst_count = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MemPurge", [&](void* /*arg*/) { mempurge_count++; });
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:Flush", [&](void* /*arg*/) { flush_count++; });
"DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::string KEY1 = "ThisIsKey1";
@ -801,9 +861,9 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
const std::string NOT_FOUND = "NOT_FOUND";
Random rnd(117);
const size_t NUM_REPEAT = 200;
const size_t RAND_VALUES_LENGTH = 512;
bool atLeastOneFlush = false;
const size_t NUM_REPEAT = 100;
const size_t RAND_VALUES_LENGTH = 10240;
std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5;
int count = 0;
const int EXPECTED_COUNT_FORLOOP = 3;
@ -813,6 +873,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
ropt.pin_data = true;
ropt.total_order_seek = true;
Iterator* iter = nullptr;
// Insertion of of K-V pairs, multiple times.
// Also insert DeleteRange
for (size_t i = 0; i < NUM_REPEAT; i++) {
@ -836,12 +897,6 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
KEY3));
ASSERT_OK(Delete(KEY1));
// Flush (MemPurge) with a probability of 50%.
if (rnd.OneIn(2)) {
ASSERT_OK(Flush());
atLeastOneFlush = true;
}
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_EQ(Get(KEY2), NOT_FOUND);
ASSERT_EQ(Get(KEY3), p_v3b);
@ -875,19 +930,11 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
// Check that there was at least one mempurge
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
// Check that there was no flush to storage.
const uint32_t EXPECTED_FLUSH_COUNT = 0;
// Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0;
if (atLeastOneFlush) {
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
} else {
// Note that there isn't enough values added to
// automatically trigger a flush/MemPurge in the background.
// Therefore we can make the assumption that if we never
// called "Flush()", no mempurge happened.
EXPECT_EQ(mempurge_count, EXPECTED_FLUSH_COUNT);
}
EXPECT_EQ(flush_count, EXPECTED_FLUSH_COUNT);
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
// Additional test for the iterator+memPurge.
ASSERT_OK(Put(KEY2, p_v2));
@ -911,6 +958,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
ASSERT_EQ(value, NOT_FOUND);
count++;
}
// Expected count here is 4: KEY2, KEY3, KEY4, KEY5.
ASSERT_EQ(count, EXPECTED_COUNT_END);
if (iter) delete iter;
@ -974,6 +1022,10 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
std::string KEY3 = "ThisIsKey3";
std::string KEY4 = "ThisIsKey4";
std::string KEY5 = "ThisIsKey5";
std::string KEY6 = "ThisIsKey6";
std::string KEY7 = "ThisIsKey7";
std::string KEY8 = "ThisIsKey8";
std::string KEY9 = "ThisIsKey9";
const std::string NOT_FOUND = "NOT_FOUND";
options.statistics = CreateDBStatistics();
@ -990,43 +1042,68 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
std::make_shared<ConditionalUpdateFilterFactory>(KEY4);
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 64 << 20;
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(53);
const size_t NUM_REPEAT = 25;
const size_t RAND_VALUES_LENGTH = 128;
std::string p_v1, p_v2, p_v3, p_v4, p_v5;
const size_t NUM_REPEAT = 1000;
const size_t RAND_VALUES_LENGTH = 10240;
std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9;
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
ASSERT_OK(Delete(KEY1));
// Insertion of of K-V pairs, multiple times.
// Also insert DeleteRange
for (size_t i = 0; i < NUM_REPEAT; i++) {
// Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY1, p_v1));
ASSERT_OK(Put(KEY2, p_v2));
ASSERT_OK(Put(KEY3, p_v3));
ASSERT_OK(Put(KEY4, p_v4));
ASSERT_OK(Put(KEY5, p_v5));
// Create value strings of arbitrary
// length RAND_VALUES_LENGTH bytes.
p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
ASSERT_OK(Put(KEY6, p_v6));
ASSERT_OK(Put(KEY7, p_v7));
ASSERT_OK(Put(KEY8, p_v8));
ASSERT_OK(Put(KEY9, p_v9));
ASSERT_OK(Delete(KEY7));
}
ASSERT_OK(Delete(KEY1));
// Check that there was at least one mempurge
const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
// Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0;
ASSERT_OK(Flush());
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
// Verify that the ConditionalUpdateCompactionFilter
// updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_EQ(Get(KEY2), NEW_VALUE);
ASSERT_EQ(Get(KEY3), NEW_VALUE);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
}
// Verify that the ConditionalUpdateCompactionFilter
// updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
ASSERT_EQ(Get(KEY1), NOT_FOUND);
ASSERT_EQ(Get(KEY2), NEW_VALUE);
ASSERT_EQ(Get(KEY3), NEW_VALUE);
ASSERT_EQ(Get(KEY4), p_v4);
ASSERT_EQ(Get(KEY5), p_v5);
}
TEST_P(DBFlushDirectIOTest, DirectIO) {

@ -548,12 +548,45 @@ Status DBImpl::CloseHelper() {
flush_scheduler_.Clear();
trim_history_scheduler_.Clear();
// For now, simply trigger a manual flush at close time
// on all the column families.
// TODO(bjlemaire): Check if this is needed. Also, in the
// future we can contemplate doing a more fine-grained
// flushing by first checking if there is a need for
// flushing (but need to implement something
// else than imm()->IsFlushPending() because the output
// memtables added to imm() dont trigger flushes).
if (immutable_db_options_.experimental_allow_mempurge) {
Status flush_ret;
mutex_.Unlock();
for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) {
if (immutable_db_options_.atomic_flush) {
flush_ret = AtomicFlushMemTables({cf}, FlushOptions(),
FlushReason::kManualFlush);
if (!flush_ret.ok()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Atomic flush memtables failed upon closing (mempurge).");
}
} else {
flush_ret =
FlushMemTable(cf, FlushOptions(), FlushReason::kManualFlush);
if (!flush_ret.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Flush memtables failed upon closing (mempurge).");
}
}
}
mutex_.Lock();
}
while (!flush_queue_.empty()) {
const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req) {
iter.first->UnrefAndTryDelete();
}
}
while (!compaction_queue_.empty()) {
auto cfd = PopFirstFromCompactionQueue();
cfd->UnrefAndTryDelete();

@ -1612,23 +1612,6 @@ class DBImpl : public DB {
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
// Memtable Garbage Collection algorithm: a MemPurge takes the memtable
// and filters (or "purge") the outdated bytes out of it. The output
// (the filtered bytes, or "useful payload") is then transfered into
// the new memtable "new_mem". This process is typically intended for
// workloads with heavy overwrites to save on IO cost resulting from
// expensive flush operations.
// "MemPurge" is an experimental feature still at a very early stage
// of development. At the moment it is only compatible with the Get, Put,
// Delete operations as well as Iterators and CompactionFilters.
// For this early version, "MemPurge" is called by setting the
// options.experimental_allow_mempurge flag as "true". When this is
// the case, ALL flush operations will be replaced by MemPurge operations.
// (for prototype stress-testing purposes). Therefore, we strongly
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge(ColumnFamilyData* cfd, MemTable* new_mem);
void SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData*>* cfds);
// Force current memtable contents to be flushed.
@ -1854,7 +1837,7 @@ class DBImpl : public DB {
// state needs flush or compaction.
void InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options, bool fromMemPurge = false);
const MutableCFOptions& mutable_cf_options);
bool GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,

@ -3436,7 +3436,7 @@ void DBImpl::BuildCompactionJobInfo(
void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options, bool fromMemPurge) {
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
// Update max_total_in_memory_state_
@ -3451,8 +3451,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
if (UNLIKELY(sv_context->new_superversion == nullptr)) {
sv_context->NewSuperVersion();
}
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options,
fromMemPurge);
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
// There may be a small data race here. The snapshot tricking bottommost
// compaction may already be released here. But assuming there will always be

@ -1737,184 +1737,6 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
}
#endif // ROCKSDB_LITE
Status DBImpl::MemPurge(ColumnFamilyData* cfd, MemTable* new_mem) {
Status s;
assert(new_mem != nullptr);
JobContext job_context(next_job_id_.fetch_add(1), true);
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
SnapshotChecker* snapshot_checker;
GetSnapshotContext(&job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker);
// Grab current memtable
MemTable* m = cfd->mem();
SequenceNumber first_seqno = m->GetFirstSequenceNumber();
SequenceNumber earliest_seqno = m->GetEarliestSequenceNumber();
// Create two iterators, one for the memtable data (contains
// info from puts + deletes), and one for the memtable
// Range Tombstones (from DeleteRanges).
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
std::vector<InternalIterator*> memtables(1, m->NewIterator(ro, &arena));
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
ScopedArenaIterator iter(
NewMergingIterator(&(cfd->internal_comparator()), memtables.data(),
static_cast<int>(memtables.size()), &arena));
auto* ioptions = cfd->ioptions();
// Place iterator at the First (meaning most recent) key node.
iter->SeekToFirst();
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
new CompactionRangeDelAggregator(&(cfd->internal_comparator()),
snapshot_seqs));
for (auto& rd_iter : range_del_iters) {
range_del_agg->AddTombstones(std::move(rd_iter));
}
// If there is valid data in the memtable,
// or at least range tombstones, copy over the info
// to the new memtable.
if (iter->Valid() || !range_del_agg->IsEmpty()) {
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions->compaction_filter_factory != nullptr &&
ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
TableFileCreationReason::kFlush)) {
CompactionFilter::Context ctx;
ctx.is_full_compaction = false;
ctx.is_manual_compaction = false;
ctx.column_family_id = cfd->GetID();
ctx.reason = TableFileCreationReason::kFlush;
compaction_filter =
ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
if (compaction_filter != nullptr &&
!compaction_filter->IgnoreSnapshots()) {
s = Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
return s;
}
}
Env* env = immutable_db_options_.env;
assert(env);
MergeHelper merge(
env, (cfd->internal_comparator()).user_comparator(),
(ioptions->merge_operator).get(), compaction_filter.get(),
ioptions->logger, true /* internal key corruption is not ok */,
snapshot_seqs.empty() ? 0 : snapshot_seqs.back(), snapshot_checker);
CompactionIterator c_iter(
iter.get(), (cfd->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, env, ShouldReportDetailedTime(env, ioptions->stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, immutable_db_options_.info_log,
&(cfd->GetFullHistoryTsLow()));
c_iter.SeekToFirst();
mutex_.AssertHeld();
// Set earliest sequence number in the new memtable
// to be equal to the earliest sequence number of the
// memtable being flushed (See later if there is a need
// to update this number!).
new_mem->SetEarliestSequenceNumber(earliest_seqno);
// Likewise for first seq number.
new_mem->SetFirstSequenceNumber(first_seqno);
SequenceNumber new_first_seqno = kMaxSequenceNumber;
// Key transfer
for (; c_iter.Valid(); c_iter.Next()) {
const ParsedInternalKey ikey = c_iter.ikey();
const Slice value = c_iter.value();
new_first_seqno =
ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
// Should we update "OldestKeyTime" ????
s = new_mem->Add(
ikey.sequence, ikey.type, ikey.user_key, value,
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
}
// Check status and propagate
// potential error status from c_iter
if (!s.ok()) {
c_iter.status().PermitUncheckedError();
} else if (!c_iter.status().ok()) {
s = c_iter.status();
}
// Range tombstone transfer.
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
new_first_seqno =
tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
s = new_mem->Add(
tombstone.seq_, // Sequence number
kTypeRangeDeletion, // KV type
tombstone.start_key_, // Key is start key.
tombstone.end_key_, // Value is end key.
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
}
}
// Rectify the first sequence number, which (unlike the earliest seq
// number) needs to be present in the new memtable.
new_mem->SetFirstSequenceNumber(new_first_seqno);
}
// Note: if the mempurge was ineffective, meaning that there was no
// garbage to remove, and this new_mem needs to be flushed again,
// the new_mem->Add would have updated the flush status when it
// called "UpdateFlushState()" internally at the last Add() call.
// Therefore if the new mem needs to be flushed again, we mark
// the return status as "aborted", which will trigger the regular
// flush operation.
if (s.ok() && new_mem->ShouldScheduleFlush()) {
s = Status::Aborted(Slice("No garbage collected."));
}
return s;
}
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
// REQUIRES: this thread is currently at the front of the 2nd writer queue if
@ -2114,48 +1936,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
cfd->mem()->SetNextLogNumber(logfile_number_);
assert(new_mem != nullptr);
// By default, it is assumed that the 'old' memtable
// will be added to the Imm memtable list and will therefore
// contribute to the Imm memory footprint.
bool noImmMemoryContribution = false;
// If MemPurge activated, purge and delete current memtable.
if (immutable_db_options_.experimental_allow_mempurge &&
((cfd->GetFlushReason() == FlushReason::kOthers) ||
(cfd->GetFlushReason() == FlushReason::kManualFlush))) {
Status mempurge_s = MemPurge(cfd, new_mem);
if (mempurge_s.ok()) {
// If mempurge worked successfully,
// create sync point and decrement current memtable reference.
TEST_SYNC_POINT("DBImpl::MemPurge");
cfd->mem()->Unref();
// If the MemPurge is successful, the 'old' (purged) memtable
// is not added to the Imm memtable list and therefore
// does not contribute to the Imm memory cost anymore.
noImmMemoryContribution = true;
} else {
// If mempurge failed, go back to regular mem->imm->flush workflow.
assert(new_mem != nullptr);
delete new_mem;
SuperVersion* new_superversion =
context->superversion_context.new_superversion.release();
if (new_superversion != nullptr) {
delete new_superversion;
}
SequenceNumber seq = versions_->LastSequence();
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
assert(new_mem != nullptr);
context->superversion_context.NewSuperVersion();
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
}
} else {
// Else make the memtable immutable and proceed as usual.
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
}
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
new_mem->Ref();
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options,
noImmMemoryContribution);
mutable_cf_options);
#ifndef ROCKSDB_LITE
mutex_.Unlock();

@ -227,9 +227,25 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table();
Status mempurge_s = Status::NotFound("No MemPurge.");
if (db_options_.experimental_allow_mempurge &&
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
(!mems_.empty())) {
mempurge_s = MemPurge();
if (!mempurge_s.ok()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Mempurge process unsuccessful: %s\n",
mempurge_s.ToString().c_str());
}
}
Status s;
if (mempurge_s.ok()) {
base_->Unref();
s = Status::OK();
} else {
// This will release and re-acquire the mutex.
s = WriteLevel0Table();
}
if (s.ok() && cfd_->IsDropped()) {
s = Status::ColumnFamilyDropped("Column family dropped during compaction");
@ -306,6 +322,237 @@ void FlushJob::Cancel() {
base_->Unref();
}
Status FlushJob::MemPurge() {
Status s;
db_mutex_->AssertHeld();
db_mutex_->Unlock();
assert(!mems_.empty());
MemTable* new_mem = nullptr;
// Create two iterators, one for the memtable data (contains
// info from puts + deletes), and one for the memtable
// Range Tombstones (from DeleteRanges).
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
std::vector<InternalIterator*> memtables;
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
for (MemTable* m : mems_) {
memtables.push_back(m->NewIterator(ro, &arena));
auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
}
assert(!memtables.empty());
SequenceNumber first_seqno = mems_[0]->GetFirstSequenceNumber();
SequenceNumber earliest_seqno = mems_[0]->GetEarliestSequenceNumber();
ScopedArenaIterator iter(
NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
static_cast<int>(memtables.size()), &arena));
auto* ioptions = cfd_->ioptions();
// Place iterator at the First (meaning most recent) key node.
iter->SeekToFirst();
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
existing_snapshots_));
for (auto& rd_iter : range_del_iters) {
range_del_agg->AddTombstones(std::move(rd_iter));
}
// If there is valid data in the memtable,
// or at least range tombstones, copy over the info
// to the new memtable.
if (iter->Valid() || !range_del_agg->IsEmpty()) {
// Arbitrary heuristic: maxSize is 60% cpacity.
size_t maxSize = ((mutable_cf_options_.write_buffer_size + 6U) / 10U);
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions->compaction_filter_factory != nullptr &&
ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
TableFileCreationReason::kFlush)) {
CompactionFilter::Context ctx;
ctx.is_full_compaction = false;
ctx.is_manual_compaction = false;
ctx.column_family_id = cfd_->GetID();
ctx.reason = TableFileCreationReason::kFlush;
compaction_filter =
ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
if (compaction_filter != nullptr &&
!compaction_filter->IgnoreSnapshots()) {
s = Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
return s;
}
}
// mems are ordered by increasing ID, so mems_[0]->GetID
// returns the smallest memtable ID.
new_mem =
new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
mutable_cf_options_, cfd_->write_buffer_mgr(),
mems_[0]->GetEarliestSequenceNumber(), cfd_->GetID());
assert(new_mem != nullptr);
Env* env = db_options_.env;
assert(env);
MergeHelper merge(
env, (cfd_->internal_comparator()).user_comparator(),
(ioptions->merge_operator).get(), compaction_filter.get(),
ioptions->logger, true /* internal key corruption is not ok */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
snapshot_checker_);
CompactionIterator c_iter(
iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
kMaxSequenceNumber, &existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_, env,
ShouldReportDetailedTime(env, ioptions->stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
nullptr, ioptions->allow_data_in_errors,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr,
/*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
/*manual_compaction_canceled=*/nullptr, ioptions->info_log,
&(cfd_->GetFullHistoryTsLow()));
// Set earliest sequence number in the new memtable
// to be equal to the earliest sequence number of the
// memtable being flushed (See later if there is a need
// to update this number!).
new_mem->SetEarliestSequenceNumber(earliest_seqno);
// Likewise for first seq number.
new_mem->SetFirstSequenceNumber(first_seqno);
SequenceNumber new_first_seqno = kMaxSequenceNumber;
c_iter.SeekToFirst();
// Key transfer
for (; c_iter.Valid(); c_iter.Next()) {
const ParsedInternalKey ikey = c_iter.ikey();
const Slice value = c_iter.value();
new_first_seqno =
ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
// Should we update "OldestKeyTime" ???? -> timestamp appear
// to still be an "experimental" feature.
s = new_mem->Add(
ikey.sequence, ikey.type, ikey.user_key, value,
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
// If new_mem has size greater than maxSize,
// then rollback to regular flush operation,
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted("Mempurge filled more than one memtable.");
break;
}
}
// Check status and propagate
// potential error status from c_iter
if (!s.ok()) {
c_iter.status().PermitUncheckedError();
} else if (!c_iter.status().ok()) {
s = c_iter.status();
}
// Range tombstone transfer.
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
new_first_seqno =
tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
s = new_mem->Add(
tombstone.seq_, // Sequence number
kTypeRangeDeletion, // KV type
tombstone.start_key_, // Key is start key.
tombstone.end_key_, // Value is end key.
nullptr, // KV protection info set as nullptr since it
// should only be useful for the first add to
// the original memtable.
false, // : allow concurrent_memtable_writes_
// Not seen as necessary for now.
nullptr, // get_post_process_info(m) must be nullptr
// when concurrent_memtable_writes is switched off.
nullptr); // hint, only used when concurrent_memtable_writes_
// is switched on.
if (!s.ok()) {
break;
}
// If new_mem has size greater than maxSize,
// then rollback to regular flush operation,
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
break;
}
}
}
// If everything happened smoothly and new_mem contains valid data,
// decide if it is flushed to storage or kept in the imm()
// memtable list (memory).
if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
// Rectify the first sequence number, which (unlike the earliest seq
// number) needs to be present in the new memtable.
new_mem->SetFirstSequenceNumber(new_first_seqno);
// The new_mem is added to the list of immutable memtables
// only if it filled at less than 60% capacity (arbitrary heuristic).
if (new_mem->ApproximateMemoryUsage() < maxSize) {
db_mutex_->Lock();
cfd_->imm()
->Add(new_mem, &job_context_->memtables_to_free, false /* trigger_flush. Adding this memtable will not trigger any flush */);
new_mem->Ref();
db_mutex_->Unlock();
} else {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
if (new_mem) {
job_context_->memtables_to_free.push_back(new_mem);
}
}
} else {
// In this case, the newly allocated new_mem is empty.
assert(new_mem != nullptr);
job_context_->memtables_to_free.push_back(new_mem);
}
}
// Reacquire the mutex for WriteLevel0 function.
db_mutex_->Lock();
// If mempurge successful, don't write input tables to level0,
// but write any full output table to level0.
if (s.ok()) {
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
} else {
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
}
return s;
}
Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
@ -362,7 +609,7 @@ Status FlushJob::WriteLevel0Table() {
{
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
static_cast<int>(memtables.size()), &arena));
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
@ -470,6 +717,7 @@ Status FlushJob::WriteLevel0Table() {
const bool has_output = meta_.fd.GetFileSize() > 0;
if (s.ok() && has_output) {
TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated");
// if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for

@ -101,6 +101,28 @@ class FlushJob {
void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats();
Status WriteLevel0Table();
// Memtable Garbage Collection algorithm: a MemPurge takes the list
// of immutable memtables and filters out (or "purge") the outdated bytes
// out of it. The output (the filtered bytes, or "useful payload") is
// then transfered into a new memtable. If this memtable is filled, then
// the mempurge is aborted and rerouted to a regular flush process. Else,
// depending on the heuristics, placed onto the immutable memtable list.
// The addition to the imm list will not trigger a flush operation. The
// flush of the imm list will instead be triggered once the mutable memtable
// is added to the imm list.
// This process is typically intended for workloads with heavy overwrites
// when we want to avoid SSD writes (and reads) as much as possible.
// "MemPurge" is an experimental feature still at a very early stage
// of development. At the moment it is only compatible with the Get, Put,
// Delete operations as well as Iterators and CompactionFilters.
// For this early version, "MemPurge" is called by setting the
// options.experimental_allow_mempurge flag as "true". When this is
// the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
// first go through the MemPurge process. herefore, we strongly
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge();
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE

@ -516,7 +516,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
}
// New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete,
bool trigger_flush) {
assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
InstallNewVersion();
// this method is used to move mutable memtable into an immutable list.
@ -527,7 +528,8 @@ void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
current_->Add(m, to_delete);
m->MarkImmutable();
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
if (num_flush_not_started_ > 0 && trigger_flush) {
imm_flush_needed.store(true, std::memory_order_release);
}
UpdateCachedValuesFromMemTableListVersion();

@ -272,7 +272,11 @@ class MemTableList {
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().
void Add(MemTable* m, autovector<MemTable*>* to_delete);
// By default, adding memtables will flag that the memtable list needs to be
// flushed, but in certain situations, like after a mempurge, we may want to
// avoid flushing the memtable list upon addition of a memtable.
void Add(MemTable* m, autovector<MemTable*>* to_delete,
bool trigger_flush = true);
// Returns an estimate of the number of bytes of data in use.
size_t ApproximateMemoryUsage();

Loading…
Cancel
Save