Flush triggered by DB write buffer size picks the oldest unflushed CF

Summary:
Previously, when DB write buffer size triggers, we always pick the CF with most data in its memtable to flush. This approach can minimize total flush happens. Change the behavior to always pick the oldest unflushed CF, which makes it the same behavior when max_total_wal_size hits. This approach will minimize size used by max_total_wal_size.
Closes https://github.com/facebook/rocksdb/pull/1987

Differential Revision: D4703214

Pulled By: siying

fbshipit-source-id: 9ff8b09
main
Siying Dong 7 years ago committed by Facebook Github Bot
parent 6908e24b56
commit 8f5bf04468
  1. 38
      db/db_impl.cc
  2. 143
      db/db_test2.cc
  3. 5
      db/memtable.cc
  4. 8
      db/memtable.h

@ -4758,8 +4758,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
ColumnFamilyData* largest_cfd = nullptr;
size_t largest_cfd_size = 0;
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
@ -4768,18 +4768,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (!cfd->mem()->IsEmpty()) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
size_t cfd_size = cfd->mem()->ApproximateMemoryUsage();
if (largest_cfd == nullptr || cfd_size > largest_cfd_size) {
largest_cfd = cfd;
largest_cfd_size = cfd_size;
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
seq_num_for_cf_picked = seq;
}
}
}
if (largest_cfd != nullptr) {
status = SwitchMemtable(largest_cfd, &context);
if (cfd_picked != nullptr) {
status = SwitchMemtable(cfd_picked, &context);
if (status.ok()) {
largest_cfd->imm()->FlushRequested();
SchedulePendingFlush(largest_cfd);
cfd_picked->imm()->FlushRequested();
SchedulePendingFlush(cfd_picked);
MaybeScheduleFlushOrCompaction();
}
}
@ -5317,17 +5317,21 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
log_dir_synced_ = false;
logs_.emplace_back(logfile_number_, new_log);
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
// all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it
// doesn't need that particular log to stay alive, so we just
// advance the log number. no need to persist this in the manifest
if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
loop_cfd->imm()->NumNotFlushed() == 0) {
}
for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
// all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it
// doesn't need that particular log to stay alive, so we just
// advance the log number. no need to persist this in the manifest
if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
loop_cfd->imm()->NumNotFlushed() == 0) {
if (creating_new_log) {
loop_cfd->SetLogNumber(logfile_number_);
}
loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
}
}
cfd->mem()->SetNextLogNumber(logfile_number_);
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
new_mem->Ref();

@ -183,19 +183,32 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
// Trigger a flush on CF "nikitich"
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
ASSERT_OK(Put(2, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
WriteOptions wo;
wo.disableWAL = true;
// Create some data and flush "default" and "nikitich" so that they
// are newer CFs created.
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
Flush(3);
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
Flush(0);
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
ASSERT_OK(Put(0, Key(1), DummyString(60000), wo));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
// No flush should trigger
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
@ -204,99 +217,85 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
static_cast<uint64_t>(1));
}
// "dobrynia": 20KB
// Flush 'dobrynia'
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(70000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
// Trigger a flush. Flushing "nikitich".
ASSERT_OK(Put(3, Key(2), DummyString(30000), wo));
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(2));
}
// "nikitich" still has data of 80KB
// Inserting Data in "dobrynia" triggers "nikitich" flushing.
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
// Without hitting the threshold, no flush should trigger.
ASSERT_OK(Put(2, Key(1), DummyString(30000), wo));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "dobrynia" still has 40KB
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
ASSERT_OK(Put(0, Key(1), DummyString(10000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
// Hit the write buffer limit again. "default"
// will have been flushed.
ASSERT_OK(Put(2, Key(2), DummyString(10000), wo));
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers no flush
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB
ASSERT_OK(Put(1, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
// Trigger another flush. This time "dobrynia". "pikachu" should not
// be flushed, althrough it was never flushed.
ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
ASSERT_OK(Put(2, Key(1), DummyString(80000), wo));
ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers flush of "pikachu"
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "dobrynia": 40KB
// Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on
// closure.
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(3));
}
}
INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
@ -314,66 +313,60 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
ASSERT_OK(DB::Open(options, dbname2, &db2));
WriteOptions wo;
wo.disableWAL = true;
// Trigger a flush on cf2
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(2, Key(1), DummyString(90000)));
ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
// Insert to DB2
ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
GetNumberOfSstFilesForColumnFamily(db_, "cf1") +
GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(0));
}
// db2: 20KB
ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000)));
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
// Triggering to flush another CF in DB1
ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000)));
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
}
//
// Inserting Data in db2 and db_ triggers flushing in db_.
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
ASSERT_OK(Put(2, Key(2), DummyString(45000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
// Triggering flush in DB2.
ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000)));
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(2));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(1));
}

@ -61,7 +61,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq)
SequenceNumber latest_seq)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
@ -83,7 +83,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_completed_(false),
file_number_(0),
first_seqno_(0),
earliest_seqno_(earliest_seq),
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support

@ -286,6 +286,12 @@ class MemTable {
return earliest_seqno_.load(std::memory_order_relaxed);
}
// DB's latest sequence ID when the memtable is created. This number
// may be updated to a more recent one before any key is inserted.
SequenceNumber GetCreationSeq() const { return creation_seq_; }
void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }
// Returns the next active logfile number when this memtable is about to
// be flushed to storage
// REQUIRES: external synchronization to prevent simultaneous
@ -381,6 +387,8 @@ class MemTable {
// if not set.
std::atomic<SequenceNumber> earliest_seqno_;
SequenceNumber creation_seq_;
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;

Loading…
Cancel
Save