Compact multiple memtables before flushing to storage.

Summary:
Merge multiple multiple memtables in memory before writing it
out to a file in L0.

There is a new config parameter min_write_buffer_number_to_merge
that specifies the number of write buffers that should be merged
together to a single file in storage. The system will not flush
wrte buffers to storage unless at least these many buffers have
accumulated in memory.
The default value of this new parameter is 1, which means that
a write buffer will be immediately flushed to disk as soon it is
ready.

Test Plan: make check

Differential Revision: https://reviews.facebook.net/D11241
main
Dhruba Borthakur 12 years ago
parent f561b3a324
commit 6acbe0fc45
  1. 17
      db/db_bench.cc
  2. 40
      db/db_impl.cc
  3. 2
      db/db_impl.h
  4. 3
      db/memtable.cc
  5. 11
      db/memtable.h
  6. 115
      db/memtablelist.cc
  7. 11
      db/memtablelist.h
  8. 5
      db/version_edit.h
  9. 9
      include/leveldb/options.h
  10. 3
      util/options.cc

@ -122,6 +122,16 @@ static int FLAGS_write_buffer_size = 0;
// This is initialized to default value of 2 in "main" function.
static int FLAGS_max_write_buffer_number = 0;
// The minimum number of write buffers that will be merged together
// before writing to storage. This is cheap because it is an
// in-memory merge. If this feature is not enabled, then all these
// write buffers are fushed to L0 as seperate files and this increases
// read amplification because a get request has to check in all of these
// files. Also, an in-memory merge may result in writing lesser
// data to storage if there are duplicate records in each of these
// individual write buffers.
static int FLAGS_min_write_buffer_number_to_merge = 0;
// The maximum number of concurrent background compactions
// that can occur in parallel.
// This is initialized to default value of 1 in "main" function.
@ -1122,6 +1132,8 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
}
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.min_write_buffer_number_to_merge =
FLAGS_min_write_buffer_number_to_merge;
options.max_background_compactions = FLAGS_max_background_compactions;
options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_;
@ -1999,6 +2011,8 @@ int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
FLAGS_max_write_buffer_number = leveldb::Options().max_write_buffer_number;
FLAGS_min_write_buffer_number_to_merge =
leveldb::Options().min_write_buffer_number_to_merge;
FLAGS_open_files = leveldb::Options().max_open_files;
FLAGS_max_background_compactions =
leveldb::Options().max_background_compactions;
@ -2055,6 +2069,9 @@ int main(int argc, char** argv) {
FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) {
FLAGS_max_write_buffer_number = n;
} else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c",
&n, &junk) == 1) {
FLAGS_min_write_buffer_number_to_merge = n;
} else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) {
FLAGS_max_background_compactions = n;
} else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) {

@ -126,6 +126,9 @@ Options SanitizeOptions(const std::string& dbname,
ClipToRange(&result.write_buffer_size, ((size_t)64)<<10,
((size_t)64)<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
result.min_write_buffer_number_to_merge = std::min(
result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1);
if (result.info_log == nullptr) {
Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
result, &result.info_log);
@ -217,7 +220,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
DBImpl::~DBImpl() {
// Wait for background work to finish
if (flush_on_destroy_) {
if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) {
FlushMemTable(FlushOptions());
}
mutex_.Lock();
@ -794,7 +797,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
}
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
uint64_t* filenumber) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
@ -802,15 +805,21 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
meta.number = versions_->NewFileNumber();
*filenumber = meta.number;
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
std::vector<Iterator*> list;
for (MemTable* m : mems) {
list.push_back(m->NewIterator());
}
Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0],
list.size());
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
mem->GetFirstSequenceNumber();
mems[0]->GetFirstSequenceNumber();
Log(options_.info_log, "Level-0 flush table #%llu: started",
(unsigned long long) meta.number);
Version* base = versions_->current();
base->Ref();
base->Ref(); // it is likely that we do not need this reference
Status s;
{
mutex_.Unlock();
@ -868,7 +877,7 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
mutex_.AssertHeld();
assert(imm_.size() != 0);
if (!imm_.IsFlushPending()) {
if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
Log(options_.info_log, "Memcompaction already in progress");
Status s = Status::IOError("Memcompaction already in progress");
return s;
@ -877,19 +886,21 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
// Save the contents of the earliest memtable as a new Table
// This will release and re-acquire the mutex.
uint64_t file_number;
MemTable* m = imm_.PickMemtableToFlush();
if (m == nullptr) {
std::vector<MemTable*> mems;
imm_.PickMemtablesToFlush(&mems);
if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush");
Status s = Status::IOError("Nothing in memstore to flush");
return s;
}
// record the logfile_number_ before we release the mutex
MemTable* m = mems[0];
VersionEdit* edit = m->GetEdits();
edit->SetPrevLogNumber(0);
edit->SetLogNumber(logfile_number_); // Earlier logs no longer needed
edit->SetLogNumber(m->GetLogNumber()); // Earlier logs no longer needed
Status s = WriteLevel0Table(m, edit, &file_number);
Status s = WriteLevel0Table(mems, edit, &file_number);
if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError(
@ -899,7 +910,7 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
// Replace immutable memtable with the generated Table
s = imm_.InstallMemtableFlushResults(
m, versions_.get(), s, &mutex_, options_.info_log.get(),
mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_);
if (s.ok()) {
@ -1256,7 +1267,7 @@ void DBImpl::MaybeScheduleCompaction() {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!imm_.IsFlushPending() &&
} else if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge) &&
manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
@ -1327,7 +1338,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
*madeProgress = false;
mutex_.AssertHeld();
while (imm_.IsFlushPending()) {
while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
Log(options_.info_log,
"BackgroundCompaction doing CompactMemTable, compaction slots available %d",
options_.max_background_compactions - bg_compaction_scheduled_);
@ -1691,7 +1702,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_.IsFlushPending()) {
if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
CompactMemTable();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
@ -2422,6 +2433,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
logfile_number_ = new_log_number;
log_.reset(new log::Writer(std::move(lfile)));
mem_->SetLogNumber(logfile_number_);
imm_.Add(mem_);
mem_ = new MemTable(internal_comparator_, NumberLevels());
mem_->Ref();

@ -152,7 +152,7 @@ class DBImpl : public DB {
// for the entire period. The second method WriteLevel0Table supports
// concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
uint64_t* filenumber);
Status MakeRoomForWrite(bool force /* compact even if there is room? */);

@ -27,7 +27,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel)
flush_completed_(false),
file_number_(0),
edit_(numlevel),
first_seqno_(0) {
first_seqno_(0),
mem_logfile_number_(0) {
}
MemTable::~MemTable() {

@ -77,6 +77,14 @@ class MemTable {
// into the memtable
SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }
// Returns the logfile number that can be safely deleted when this
// memstore is flushed to storage
uint64_t GetLogNumber() { return mem_logfile_number_; }
// Sets the logfile number that can be safely deleted when this
// memstore is flushed to storage
void SetLogNumber(uint64_t num) { mem_logfile_number_ = num; }
private:
~MemTable(); // Private since only Unref() should be used to delete it
@ -108,6 +116,9 @@ class MemTable {
// The sequence number of the kv that was inserted first
SequenceNumber first_seqno_;
// The log files earlier than this number can be deleted.
uint64_t mem_logfile_number_;
// No copying allowed
MemTable(const MemTable&);
void operator=(const MemTable&);

@ -43,17 +43,16 @@ int MemTableList::size() {
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() {
if (num_flush_not_started_ > 0) {
bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) {
if (num_flush_not_started_ >= min_write_buffer_number_to_merge) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr);
return true;
}
return false;
}
// Returns the earliest memtable that needs to be flushed.
// Returns null, if no such memtable exist.
MemTable* MemTableList::PickMemtableToFlush() {
// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {
for (list<MemTable*>::reverse_iterator it = memlist_.rbegin();
it != memlist_.rend(); it++) {
MemTable* m = *it;
@ -64,37 +63,48 @@ MemTable* MemTableList::PickMemtableToFlush() {
imm_flush_needed.Release_Store(nullptr);
}
m->flush_in_progress_ = true; // flushing will start very soon
return m;
ret->push_back(m);
}
}
return nullptr;
}
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(MemTable* m,
Status MemTableList::InstallMemtableFlushResults(
const std::vector<MemTable*> &mems,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs) {
mu->AssertHeld();
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
// If the flush was not successful, then just reset state.
// Maybe a suceeding attempt to flush will be successful.
if (!flushStatus.ok()) {
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
imm_flush_needed.Release_Store((void *)1);
pending_outputs.erase(file_number);
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
imm_flush_needed.Release_Store((void *)1);
pending_outputs.erase(file_number);
}
return flushStatus;
}
// flush was sucessful
m->flush_completed_ = true;
m->file_number_ = file_number;
bool first = true;
for (MemTable* m : mems) {
// All the edits are associated with the first memtable of this batch.
assert(first || m->GetEdits()->NumEntries() == 0);
first = false;
m->flush_completed_ = true;
m->file_number_ = file_number;
}
// if some other thread is already commiting, then return
Status s;
@ -106,12 +116,15 @@ Status MemTableList::InstallMemtableFlushResults(MemTable* m,
commit_in_progress_ = true;
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing.
while (!memlist_.empty()) {
m = memlist_.back(); // get the last element
// (in that order) that have finished flushing. Memetables
// are always committed in the order that they were created.
while (!memlist_.empty() && s.ok()) {
MemTable* m = memlist_.back(); // get the last element
if (!m->flush_completed_) {
break;
}
first = true;
Log(info_log,
"Level-0 commit table #%llu: started",
(unsigned long long)m->file_number_);
@ -119,33 +132,39 @@ Status MemTableList::InstallMemtableFlushResults(MemTable* m,
// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu);
if (s.ok()) { // commit new state
Log(info_log, "Level-0 commit table #%llu: done",
(unsigned long long)m->file_number_);
memlist_.remove(m);
assert(m->file_number_ > 0);
// pending_outputs can be cleared only after the newly created file
// has been written to a committed version so that other concurrently
// executing compaction threads do not mistakenly assume that this
// file is not live.
pending_outputs.erase(m->file_number_);
m->Unref();
size_--;
} else {
//commit failed. setup state so that we can flush again.
Log(info_log, "Level-0 commit table #%llu: failed",
(unsigned long long)m->file_number_);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
pending_outputs.erase(m->file_number_);
m->file_number_ = 0;
imm_flush_needed.Release_Store((void *)1);
s = Status::IOError("Unable to commit flushed memtable");
break;
}
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
do {
if (s.ok()) { // commit new state
Log(info_log, "Level-0 commit table #%llu: done %s",
(unsigned long long)m->file_number_,
first ? "": "bulk");
memlist_.remove(m);
assert(m->file_number_ > 0);
// pending_outputs can be cleared only after the newly created file
// has been written to a committed version so that other concurrently
// executing compaction threads do not mistakenly assume that this
// file is not live.
pending_outputs.erase(m->file_number_);
m->Unref();
size_--;
} else {
//commit failed. setup state so that we can flush again.
Log(info_log, "Level-0 commit table #%llu: failed",
(unsigned long long)m->file_number_);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
pending_outputs.erase(m->file_number_);
m->file_number_ = 0;
imm_flush_needed.Release_Store((void *)1);
s = Status::IOError("Unable to commit flushed memtable");
}
first = false;
} while (!memlist_.empty() && (m = memlist_.back()) &&
m->file_number_ == file_number);
}
commit_in_progress_ = false;
return s;

@ -18,7 +18,7 @@ class Mutex;
class MemTableListIterator;
//
// This class stores refeernces to all the immutable memtables.
// This class stores references to all the immutable memtables.
// The memtables are flushed to L0 as soon as possible and in
// any order. If there are more than one immutable memtable, their
// flushes can occur concurrently. However, they are 'committed'
@ -49,14 +49,13 @@ class MemTableList {
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool IsFlushPending();
bool IsFlushPending(int min_write_buffer_number_to_merge);
// Returns the earliest memtable that needs to be flushed.
// Returns null, if no such memtable exist.
MemTable* PickMemtableToFlush();
// Returns the earliest memtables that needs to be flushed.
void PickMemtablesToFlush(std::vector<MemTable*>* mems);
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(MemTable* m,
Status InstallMemtableFlushResults(const std::vector<MemTable*> &m,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,

@ -81,6 +81,11 @@ class VersionEdit {
deleted_files_.insert(std::make_pair(level, file));
}
// Number of edits
int NumEntries() {
return new_files_.size() + deleted_files_.size();
}
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);

@ -131,6 +131,15 @@ struct Options {
// Default: 2
int max_write_buffer_number;
// The minimum number of write buffers that will be merged together
// before writing to storage. If set to 1, then
// all write buffers are fushed to L0 as individual files and this increases
// read amplification because a get request has to check in all of these
// files. Also, an in-memory merge may result in writing lesser
// data to storage if there are duplicate records in each of these
// individual write buffers. Default: 1
int min_write_buffer_number_to_merge;
// Number of open files that can be used by the DB. You may need to
// increase this if your database has a large working set (budget
// one open file per 2MB of working set).

@ -26,6 +26,7 @@ Options::Options()
info_log(nullptr),
write_buffer_size(4<<20),
max_write_buffer_number(2),
min_write_buffer_number_to_merge(1),
max_open_files(1000),
block_size(4096),
block_restart_interval(16),
@ -127,6 +128,8 @@ Options::Dump(Logger* log) const
Log(log," Options.allow_readahead: %d", allow_readahead);
Log(log," Options.allow_mmap_reads: %d", allow_mmap_reads);
Log(log," Options.allow_mmap_writes: %d", allow_mmap_writes);
Log(log," Options.min_write_buffer_number_to_merge: %d",
min_write_buffer_number_to_merge);
Log(log," Options.allow_readahead_compactions: %d",
allow_readahead_compactions);
Log(log," Options.purge_redundant_kvs_while_flush: %d",

Loading…
Cancel
Save