Push model for flushing memtables

Summary:
When memtable is full it calls the registered callback. That callback then registers column family as needing the flush. Every write checks if there are some column families that need to be flushed. This completely eliminates the need for MakeRoomForWrite() function and simplifies our Write code-path.

There is some complexity with the concurrency when the column family is dropped. I made it a bit less complex by dropping the column family from the write thread in https://reviews.facebook.net/D22965. Let me know if you want to discuss this.

Test Plan: make check works. I'll also run db_stress with creating and dropping column families for a while.

Reviewers: yhchiang, sdong, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23067
main
Igor Canadi 10 years ago
parent 059e584dd3
commit 3d9e6f7759
  1. 12
      db/column_family.cc
  2. 11
      db/column_family.h
  3. 117
      db/db_impl.cc
  4. 5
      db/db_impl.h
  5. 63
      db/db_test.cc
  6. 62
      db/flush_scheduler.cc
  7. 39
      db/flush_scheduler.h
  8. 1
      db/log_and_apply_bench.cc
  9. 10
      db/memtable.cc
  10. 13
      db/memtable.h
  11. 3
      db/write_batch.cc
  12. 3
      db/write_batch_internal.h

@ -660,6 +660,11 @@ bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id);
column_family_set_->Unlock();
// TODO(icanadi) Maybe remove column family from the hash table when it's
// dropped?
if (current_ != nullptr && current_->IsDropped()) {
current_ = nullptr;
}
}
handle_.SetCFD(current_);
return current_ != nullptr;
@ -685,6 +690,13 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
return &handle_;
}
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
flush_scheduler_->ScheduleFlush(current_);
current_->mem()->MarkFlushScheduled();
}
}
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {

@ -22,6 +22,7 @@
#include "db/write_controller.h"
#include "db/table_cache.h"
#include "util/thread_local.h"
#include "db/flush_scheduler.h"
namespace rocksdb {
@ -394,8 +395,11 @@ class ColumnFamilySet {
// memtables of different column families (specified by ID in the write batch)
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
: column_family_set_(column_family_set), current_(nullptr) {}
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set,
FlushScheduler* flush_scheduler)
: column_family_set_(column_family_set),
current_(nullptr),
flush_scheduler_(flush_scheduler) {}
// sets current_ to ColumnFamilyData with column_family_id
// returns false if column family doesn't exist
@ -414,9 +418,12 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
// Returns column family handle for the selected column family
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
virtual void CheckMemtableFull() override;
private:
ColumnFamilySet* column_family_set_;
ColumnFamilyData* current_;
FlushScheduler* flush_scheduler_;
ColumnFamilyHandleInternal handle_;
};

@ -361,8 +361,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
column_family_memtables_.reset(new ColumnFamilyMemTablesImpl(
versions_->GetColumnFamilySet(), &flush_scheduler_));
DumpLeveldbBuildVersion(db_options_.info_log.get());
DumpDBFileSummary(db_options_, dbname_);
@ -392,6 +392,8 @@ DBImpl::~DBImpl() {
bg_cv_.Wait();
}
flush_scheduler_.Clear();
if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
mutex_.Unlock();
@ -1336,10 +1338,12 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
if (!read_only) {
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ShouldFlush()) {
// we can do this because this is called before client has access to the
// DB and there is only a single thread operating on DB
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
cfd->Unref();
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
@ -1356,8 +1360,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
}
}
}
flush_scheduler_.Clear();
if (versions_->LastSequence() < *max_sequence) {
versions_->SetLastSequence(*max_sequence);
}
@ -2201,7 +2205,7 @@ void DBImpl::BackgroundCallCompaction() {
}
if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
// signal if
// * madeProgress -- need to wakeup MakeRoomForWrite
// * madeProgress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
@ -2622,7 +2626,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
cfd->Ref();
FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer);
cfd->Unref();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary
}
mutex_.Unlock();
log_buffer->FlushBufferToLog();
@ -3959,10 +3963,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w.timeout_hint_us = options.timeout_hint_us;
uint64_t expiration_time = 0;
bool has_timeout = false;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
has_timeout = true;
}
if (!options.disableWAL) {
@ -3997,56 +4003,48 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
assert(!single_column_family_mode_ ||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
uint64_t flush_column_family_if_log_file = 0;
uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0)
? 4 * max_total_in_memory_state_
: db_options_.max_total_wal_size;
if (UNLIKELY(!single_column_family_mode_) &&
alive_log_files_.begin()->getting_flushed == false &&
total_log_size_ > max_total_wal_size) {
flush_column_family_if_log_file = alive_log_files_.begin()->number;
uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
alive_log_files_.begin()->getting_flushed = true;
Log(db_options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
}
if (write_controller_.IsStopped() || write_controller_.GetDelay() > 0) {
DelayWrite(expiration_time);
}
if (LIKELY(single_column_family_mode_)) {
// fast path
status = MakeRoomForWrite(default_cf_handle_->cfd(),
&context, expiration_time);
} else {
// refcounting cfd in iteration
bool dead_cfd = false;
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
if (flush_column_family_if_log_file != 0 &&
cfd->GetLogNumber() <= flush_column_family_if_log_file) {
// log size excedded limit and we need to do flush
// SetNewMemtableAndNewLogFie may temporarily unlock and wait
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
status = SetNewMemtableAndNewLogFile(cfd, &context);
if (!status.ok()) {
break;
}
cfd->imm()->FlushRequested();
}
}
MaybeScheduleFlushOrCompaction();
} else {
// May temporarily unlock and wait.
status = MakeRoomForWrite(cfd, &context, expiration_time);
}
if (cfd->Unref()) {
dead_cfd = true;
}
if (!status.ok()) {
break;
if (UNLIKELY(status.ok() && !bg_error_.ok())) {
status = bg_error_;
}
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
status = ScheduleFlushes(&context);
}
if (dead_cfd) {
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
if (UNLIKELY(status.ok()) &&
(write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) {
DelayWrite(expiration_time);
}
if (UNLIKELY(status.ok() && has_timeout &&
env_->NowMicros() > expiration_time)) {
status = Status::TimedOut();
}
uint64_t last_sequence = versions_->LastSequence();
@ -4241,36 +4239,23 @@ void DBImpl::DelayWrite(uint64_t expiration_time) {
}
}
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd,
WriteContext* context,
uint64_t expiration_time) {
mutex_.AssertHeld();
assert(!writers_.empty());
Status s;
bool has_timeout = (expiration_time > 0);
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (has_timeout && env_->NowMicros() > expiration_time) {
s = Status::TimedOut();
break;
} else if (!cfd->mem()->ShouldFlush()) {
// There is room in current memtable
break;
} else {
s = SetNewMemtableAndNewLogFile(cfd, context);
if (!s.ok()) {
break;
Status DBImpl::ScheduleFlushes(WriteContext* context) {
bool schedule_bg_work = false;
ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
schedule_bg_work = true;
auto status = SetNewMemtableAndNewLogFile(cfd, context);
if (cfd->Unref()) {
delete cfd;
}
MaybeScheduleFlushOrCompaction();
if (!status.ok()) {
return status;
}
}
return s;
if (schedule_bg_work) {
MaybeScheduleFlushOrCompaction();
}
return Status::OK();
}
// REQUIRES: mutex_ is held

@ -33,6 +33,7 @@
#include "util/scoped_arena_iterator.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
namespace rocksdb {
@ -399,8 +400,7 @@ class DBImpl : public DB {
void DelayWrite(uint64_t expiration_time);
Status MakeRoomForWrite(ColumnFamilyData* cfd, WriteContext* context,
uint64_t expiration_time);
Status ScheduleFlushes(WriteContext* context);
Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
WriteContext* context);
@ -557,6 +557,7 @@ class DBImpl : public DB {
WriteBatch tmp_batch_;
WriteController write_controller_;
FlushScheduler flush_scheduler_;
SnapshotList snapshots_;

@ -1151,6 +1151,17 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) {
ASSERT_EQ(props.size(), unique_entries.size());
ASSERT_EQ(expected_entries_size, sum);
}
uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
std::string column_family_name) {
std::vector<LiveFileMetaData> metadata;
db->GetLiveFilesMetaData(&metadata);
uint64_t result = 0;
for (auto& fileMetadata : metadata) {
result += (fileMetadata.column_family_name == column_family_name);
}
return result;
}
} // namespace
TEST(DBTest, Empty) {
@ -2777,6 +2788,41 @@ TEST(DBTest, RecoverDuringMemtableCompaction) {
} while (ChangeOptions());
}
TEST(DBTest, FlushSchedule) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.level0_stop_writes_trigger = 1 << 10;
options.level0_slowdown_writes_trigger = 1 << 10;
options.min_write_buffer_number_to_merge = 1;
options.max_write_buffer_number = 2;
options.write_buffer_size = 100 * 1000;
CreateAndReopenWithCF({"pikachu"}, &options);
std::vector<std::thread> threads;
std::atomic<int> thread_num;
// each column family will have 5 thread, each thread generating 2 memtables.
// each column family should end up with 10 table files
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&]() {
int a = thread_num.fetch_add(1);
Random rnd(a);
// this should fill up 2 memtables
for (int k = 0; k < 5000; ++k) {
Put(a & 1, RandomString(&rnd, 13), "");
}
});
}
for (auto& t : threads) {
t.join();
}
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(10));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(10));
}
TEST(DBTest, MinorCompactionsHappen) {
do {
Options options;
@ -6171,17 +6217,6 @@ std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path) {
return ListSpecificFiles(env, path, kTableFile);
}
uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
std::string column_family_name) {
std::vector<LiveFileMetaData> metadata;
db->GetLiveFilesMetaData(&metadata);
uint64_t result = 0;
for (auto& fileMetadata : metadata) {
result += (fileMetadata.column_family_name == column_family_name);
}
return result;
}
} // namespace
TEST(DBTest, FlushOneColumnFamily) {
@ -6465,7 +6500,7 @@ TEST(DBTest, PurgeInfoLogs) {
ASSERT_EQ(5, info_log_count);
Destroy(&options);
// For mode (1), test DestoryDB() to delete all the logs under DB dir.
// For mode (1), test DestroyDB() to delete all the logs under DB dir.
// For mode (2), no info log file should have been put under DB dir.
std::vector<std::string> db_files;
env_->GetChildren(dbname_, &db_files);
@ -7894,10 +7929,6 @@ TEST(DBTest, SimpleWriteTimeoutTest) {
// fill the two write buffers
ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt));
ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt));
// this will switch the previous memtable, but will not cause block because
// DelayWrite() is called before MakeRoomForWrite()
// TODO(icanadi) remove this as part of https://reviews.facebook.net/D23067
ASSERT_OK(Put(Key(3), Key(3), write_opt));
// As the only two write buffers are full in this moment, the third
// Put is expected to be timed-out.
write_opt.timeout_hint_us = 50;

@ -0,0 +1,62 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/flush_scheduler.h"
#include <cassert>
#include "db/column_family.h"
namespace rocksdb {
void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
#ifndef NDEBUG
assert(column_families_set_.find(cfd) == column_families_set_.end());
column_families_set_.insert(cfd);
#endif // NDEBUG
cfd->Ref();
column_families_.push_back(cfd);
}
ColumnFamilyData* FlushScheduler::GetNextColumnFamily() {
ColumnFamilyData* cfd = nullptr;
while (column_families_.size() > 0) {
cfd = column_families_.front();
column_families_.pop_front();
if (cfd->IsDropped()) {
if (cfd->Unref()) {
delete cfd;
}
} else {
break;
}
}
#ifndef NDEBUG
if (cfd != nullptr) {
auto itr = column_families_set_.find(cfd);
assert(itr != column_families_set_.end());
column_families_set_.erase(itr);
}
#endif // NDEBUG
return cfd;
}
bool FlushScheduler::Empty() { return column_families_.empty(); }
void FlushScheduler::Clear() {
for (auto cfd : column_families_) {
#ifndef NDEBUG
auto itr = column_families_set_.find(cfd);
assert(itr != column_families_set_.end());
column_families_set_.erase(itr);
#endif // NDEBUG
if (cfd->Unref()) {
delete cfd;
}
}
column_families_.clear();
}
} // namespace rocksdb

@ -0,0 +1,39 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <stdint.h>
#include <deque>
#include <set>
#include <vector>
namespace rocksdb {
class ColumnFamilyData;
// This class is thread-compatible. It's should only be accessed from single
// write thread (between BeginWrite() and EndWrite())
class FlushScheduler {
public:
FlushScheduler() = default;
~FlushScheduler() = default;
void ScheduleFlush(ColumnFamilyData* cfd);
// Returns Ref()-ed column family. Client needs to Unref()
ColumnFamilyData* GetNextColumnFamily();
bool Empty();
void Clear();
private:
std::deque<ColumnFamilyData*> column_families_;
#ifndef NDEBUG
std::set<ColumnFamilyData*> column_families_set_;
#endif // NDEBUG
};
} // namespace rocksdb

@ -9,6 +9,7 @@
#include "util/testharness.h"
#include "util/benchharness.h"
#include "db/version_set.h"
#include "db/write_controller.h"
#include "util/mutexlock.h"
namespace rocksdb {

@ -54,8 +54,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)),
arena_(moptions.arena_block_size),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, ioptions.prefix_extractor,
ioptions.info_log)),
comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log)),
num_entries_(0),
flush_in_progress_(false),
flush_completed_(false),
@ -65,7 +64,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks
: 0),
prefix_extractor_(ioptions.prefix_extractor),
should_flush_(ShouldFlushNow()) {
should_flush_(ShouldFlushNow()),
flush_scheduled_(false) {
// if should_flush_ == true without an entry inserted, something must have
// gone wrong already.
assert(!should_flush_);
@ -79,9 +79,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
}
}
MemTable::~MemTable() {
assert(refs_ == 0);
}
MemTable::~MemTable() { assert(refs_ == 0); }
size_t MemTable::ApproximateMemoryUsage() {
size_t arena_usage = arena_.ApproximateMemoryUsage();

@ -10,7 +10,9 @@
#pragma once
#include <string>
#include <memory>
#include <functional>
#include <deque>
#include <vector>
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "db/version_edit.h"
@ -86,7 +88,11 @@ class MemTable {
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldFlush() const { return should_flush_; }
bool ShouldScheduleFlush() const {
return flush_scheduled_ == false && should_flush_;
}
void MarkFlushScheduled() { flush_scheduled_ = true; }
// Return an iterator that yields the contents of the memtable.
//
@ -194,7 +200,7 @@ class MemTable {
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
private:
// Dynamically check if we can add more incoming entries.
// Dynamically check if we can add more incoming entries
bool ShouldFlushNow() const;
friend class MemTableIterator;
@ -238,6 +244,9 @@ class MemTable {
// a flag indicating if a memtable has met the criteria to flush
bool should_flush_;
// a flag indicating if flush has been scheduled
bool flush_scheduled_;
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);

@ -394,6 +394,7 @@ class MemTableInserter : public WriteBatch::Handler {
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
}
@ -465,6 +466,7 @@ class MemTableInserter : public WriteBatch::Handler {
}
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
}
@ -494,6 +496,7 @@ class MemTableInserter : public WriteBatch::Handler {
}
mem->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
}
};

@ -28,6 +28,7 @@ class ColumnFamilyMemTables {
virtual MemTable* GetMemTable() const = 0;
virtual const Options* GetOptions() const = 0;
virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
virtual void CheckMemtableFull() = 0;
};
class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
@ -54,6 +55,8 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
void CheckMemtableFull() override {}
private:
bool ok_;
MemTable* mem_;

Loading…
Cancel
Save