Flush only one column family

Summary:
Currently DBImpl::Flush() triggers flushes in all column families.
Instead we need to trigger just the column family specified.

Test Plan: make all check

Reviewers: igor, ljin, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D20841
main
Stanislau Hlebik 10 years ago
parent 9674c11d01
commit 06a52bda64
  1. 4
      db/column_family_test.cc
  2. 266
      db/db_impl.cc
  3. 34
      db/db_impl.h
  4. 35
      db/db_test.cc

@ -913,7 +913,9 @@ TEST(ColumnFamilyTest, DontRollEmptyLogs) {
}
int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls();
// this will trigger the flushes
ASSERT_OK(db_->Write(WriteOptions(), nullptr));
for (size_t i = 0; i <= 4; ++i) {
ASSERT_OK(Flush(i));
}
for (int i = 0; i < 4; ++i) {
dbfull()->TEST_WaitForFlushMemTable(handles_[i]);

@ -89,6 +89,20 @@ struct DBImpl::Writer {
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
struct DBImpl::WriteContext {
autovector<SuperVersion*> superversions_to_free_;
autovector<log::Writer*> logs_to_free_;
~WriteContext() {
for (auto& sv : superversions_to_free_) {
delete sv;
}
for (auto& log : logs_to_free_) {
delete log;
}
}
};
struct DBImpl::CompactionState {
Compaction* const compaction;
@ -1843,8 +1857,31 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& options) {
// nullptr batch means just wait for earlier writes to be done
Status s = Write(WriteOptions(), nullptr);
Writer w(&mutex_);
w.batch = nullptr;
w.sync = false;
w.disableWAL = false;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = kNoTimeOut;
WriteContext context;
mutex_.Lock();
Status s = BeginWrite(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job
// SetNewMemtableAndNewLogFile() will release and reacquire mutex
// during execution
s = SetNewMemtableAndNewLogFile(cfd, &context);
cfd->imm()->FlushRequested();
MaybeScheduleFlushOrCompaction();
assert(!writers_.empty());
assert(writers_.front() == &w);
EndWrite(&w, &w, s);
mutex_.Unlock();
if (s.ok() && options.wait) {
// Wait until the compaction completes
s = WaitForFlushMemTable(cfd);
@ -3529,10 +3566,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
max_total_in_memory_state_ -= cfd->options()->write_buffer_size *
cfd->options()->max_write_buffer_number;
Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID());
// Flush the memtables. This will make all WAL files referencing dropped
// column family to be obsolete. They will be deleted once user deletes
// column family handle
Write(WriteOptions(), nullptr); // ignore error
} else {
Log(options_.info_log, "Dropping column family with id %u FAILED -- %s\n",
cfd->GetID(), s.ToString().c_str());
@ -3728,38 +3761,22 @@ Status DBImpl::Delete(const WriteOptions& options,
return DB::Delete(options, column_family, key);
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
PERF_TIMER_AUTO(write_pre_and_post_process_time);
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.disableWAL = options.disableWAL;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = options.timeout_hint_us;
uint64_t expiration_time = 0;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
}
w.done = false;
mutex_.Lock();
// REQUIRES: mutex_ is held
Status DBImpl::BeginWrite(Writer* w, uint64_t expiration_time) {
// the following code block pushes the current writer "w" into the writer
// queue "writers_" and wait until one of the following conditions met:
// 1. the job of "w" has been done by some other writers.
// 2. "w" becomes the first writer in "writers_"
// 3. "w" timed-out.
writers_.push_back(&w);
mutex_.AssertHeld();
writers_.push_back(w);
bool timed_out = false;
while (!w.done && &w != writers_.front()) {
while (!w->done && w != writers_.front()) {
if (expiration_time == 0) {
w.cv.Wait();
} else if (w.cv.TimedWait(expiration_time)) {
if (w.in_batch_group) {
w->cv.Wait();
} else if (w->cv.TimedWait(expiration_time)) {
if (w->in_batch_group) {
// then it means the front writer is currently doing the
// write on behalf of this "timed-out" writer. Then it
// should wait until the write completes.
@ -3771,24 +3788,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
}
if (!options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL);
default_cf_internal_stats_->AddDBStats(
InternalStats::WRITE_WITH_WAL, 1);
}
if (w.done) {
default_cf_internal_stats_->AddDBStats(
InternalStats::WRITE_DONE_BY_OTHER, 1);
mutex_.Unlock();
RecordTick(stats_, WRITE_DONE_BY_OTHER);
return w.status;
} else if (timed_out) {
if (timed_out) {
#ifndef NDEBUG
bool found = false;
#endif
for (auto iter = writers_.begin(); iter != writers_.end(); iter++) {
if (*iter == &w) {
if (*iter == w) {
writers_.erase(iter);
#ifndef NDEBUG
found = true;
@ -3805,14 +3810,77 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return Status::TimedOut();
}
return Status::OK();
}
// REQUIRES: mutex_ is held
void DBImpl::EndWrite(Writer* w, Writer* last_writer, Status status) {
// Pop out the current writer and all writers being pushed before the
// current writer from the writer queue.
mutex_.AssertHeld();
while (!writers_.empty()) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
PERF_TIMER_AUTO(write_pre_and_post_process_time);
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.disableWAL = options.disableWAL;
w.in_batch_group = false;
w.done = false;
w.timeout_hint_us = options.timeout_hint_us;
uint64_t expiration_time = 0;
if (w.timeout_hint_us == 0) {
w.timeout_hint_us = kNoTimeOut;
} else {
expiration_time = env_->NowMicros() + w.timeout_hint_us;
}
if (!options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL);
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
}
WriteContext context;
mutex_.Lock();
Status status = BeginWrite(&w, expiration_time);
assert(status.ok() || status.IsTimedOut());
if (status.IsTimedOut()) {
mutex_.Unlock();
RecordTick(stats_, WRITE_TIMEDOUT);
return Status::TimedOut();
} else {
RecordTick(stats_, WRITE_DONE_BY_SELF);
default_cf_internal_stats_->AddDBStats(
InternalStats::WRITE_DONE_BY_SELF, 1);
}
if (w.done) { // write was done by someone else
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
1);
mutex_.Unlock();
RecordTick(stats_, WRITE_DONE_BY_OTHER);
return w.status;
}
RecordTick(stats_, WRITE_DONE_BY_SELF);
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
// Once reaches this point, the current writer "w" will try to do its write
// job. It may also pick up some of the remaining writers in the "writers_"
@ -3836,29 +3904,27 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
}
Status status;
autovector<SuperVersion*> superversions_to_free;
autovector<log::Writer*> logs_to_free;
if (LIKELY(single_column_family_mode_)) {
// fast path
status = MakeRoomForWrite(
default_cf_handle_->cfd(), my_batch == nullptr,
&superversions_to_free, &logs_to_free,
expiration_time);
status = MakeRoomForWrite(default_cf_handle_->cfd(),
&context, expiration_time);
} else {
// refcounting cfd in iteration
bool dead_cfd = false;
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
bool force_flush =
my_batch == nullptr ||
(flush_column_family_if_log_file != 0 &&
cfd->GetLogNumber() <= flush_column_family_if_log_file);
if (flush_column_family_if_log_file != 0 &&
cfd->GetLogNumber() <= flush_column_family_if_log_file) {
// log size excedded limit and we need to do flush
// SetNewMemtableAndNewLogFie may temporarily unlock and wait
status = SetNewMemtableAndNewLogFile(cfd, &context);
cfd->imm()->FlushRequested();
MaybeScheduleFlushOrCompaction();
} else {
// May temporarily unlock and wait.
status = MakeRoomForWrite(
cfd, force_flush, &superversions_to_free, &logs_to_free,
expiration_time);
status = MakeRoomForWrite(cfd, &context, expiration_time);
}
if (cfd->Unref()) {
dead_cfd = true;
}
@ -3873,7 +3939,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
if (status.ok()) {
autovector<WriteBatch*> write_batch_group;
BuildBatchGroup(&last_writer, &write_batch_group);
@ -3969,36 +4035,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
bg_error_ = status; // stop compaction & fail any further writes
}
// Pop out the current writer and all writers being pushed before the
// current writer from the writer queue.
while (!writers_.empty()) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
EndWrite(&w, last_writer, status);
mutex_.Unlock();
if (status.IsTimedOut()) {
RecordTick(stats_, WRITE_TIMEDOUT);
}
for (auto& sv : superversions_to_free) {
delete sv;
}
for (auto& log : logs_to_free) {
delete log;
}
PERF_TIMER_STOP(write_pre_and_post_process_time);
return status;
}
@ -4095,16 +4138,14 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(
ColumnFamilyData* cfd, bool force,
autovector<SuperVersion*>* superversions_to_free,
autovector<log::Writer*>* logs_to_free,
Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd,
WriteContext* context,
uint64_t expiration_time) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
bool allow_hard_rate_limit_delay = !force;
bool allow_soft_rate_limit_delay = !force;
bool allow_delay = true;
bool allow_hard_rate_limit_delay = true;
bool allow_soft_rate_limit_delay = true;
uint64_t rate_limit_delay_millis = 0;
Status s;
double score;
@ -4145,7 +4186,7 @@ Status DBImpl::MakeRoomForWrite(
cfd->internal_stats()->AddCFStats(
InternalStats::LEVEL0_SLOWDOWN, delayed);
delayed_writes_++;
} else if (!force && !cfd->mem()->ShouldFlush()) {
} else if (!cfd->mem()->ShouldFlush()) {
// There is room in current memtable
if (allow_delay) {
DelayLoggingAndReset();
@ -4228,6 +4269,21 @@ Status DBImpl::MakeRoomForWrite(
mutex_.Lock();
cfd->internal_stats()->RecordLevelNSlowdown(max_level, elapsed, true);
} else {
s = SetNewMemtableAndNewLogFile(cfd, context);
if (!s.ok()) {
break;
}
MaybeScheduleFlushOrCompaction();
}
}
return s;
}
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
WriteContext* context) {
mutex_.AssertHeld();
unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr;
MemTable* new_mem = nullptr;
@ -4240,11 +4296,12 @@ Status DBImpl::MakeRoomForWrite(
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
SuperVersion* new_superversion = nullptr;
mutex_.Unlock();
Status s;
{
DelayLoggingAndReset();
if (creating_new_log) {
s = env_->NewWritableFile(
LogFileName(options_.wal_dir, new_log_number), &lfile,
s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
&lfile,
env_->OptimizeForLogWrite(storage_options_));
if (s.ok()) {
// Our final size should be less than write_buffer_size
@ -4268,12 +4325,12 @@ Status DBImpl::MakeRoomForWrite(
versions_->ReuseLogFileNumber(new_log_number);
assert(!new_mem);
assert(!new_log);
break;
return s;
}
if (creating_new_log) {
logfile_number_ = new_log_number;
assert(new_log != nullptr);
logs_to_free->push_back(log_.release());
context->logs_to_free_.push_back(log_.release());
log_.reset(new_log);
log_empty_ = true;
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
@ -4290,20 +4347,13 @@ Status DBImpl::MakeRoomForWrite(
}
cfd->mem()->SetNextLogNumber(logfile_number_);
cfd->imm()->Add(cfd->mem());
if (force) {
cfd->imm()->FlushRequested();
}
new_mem->Ref();
cfd->SetMemtable(new_mem);
Log(options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64 "\n",
cfd->GetName().c_str(), logfile_number_);
force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction();
superversions_to_free->push_back(
context->superversions_to_free_.push_back(
cfd->InstallSuperVersion(new_superversion, &mutex_));
}
}
return s;
}

@ -309,6 +309,7 @@ class DBImpl : public DB {
friend struct SuperVersion;
struct CompactionState;
struct Writer;
struct WriteContext;
Status NewDB();
@ -347,13 +348,38 @@ class DBImpl : public DB {
uint64_t SlowdownAmount(int n, double bottom, double top);
// TODO(icanadi) free superversion_to_free and old_log outside of mutex
// Before applying write operation (such as DBImpl::Write, DBImpl::Flush)
// thread should grab the mutex_ and be the first on writers queue.
// BeginWrite is used for it.
// Be aware! Writer's job can be done by other thread (see DBImpl::Write
// for examples), so check it via w.done before applying changes.
//
// Writer* w: writer to be placed in the queue
// uint64_t expiration_time: maximum time to be in the queue
// See also: EndWrite
Status BeginWrite(Writer* w, uint64_t expiration_time);
// After doing write job, we need to remove already used writers from
// writers_ queue and notify head of the queue about it.
// EndWrite is used for this.
//
// Writer* w: Writer, that was added by BeginWrite function
// Writer* last_writer: Since we can join a few Writers (as DBImpl::Write
// does)
// we should pass last_writer as a parameter to
// EndWrite
// (if you don't touch other writers, just pass w)
// Status status: Status of write operation
// See also: BeginWrite
void EndWrite(Writer* w, Writer* last_writer, Status status);
Status MakeRoomForWrite(ColumnFamilyData* cfd,
bool force /* flush even if there is room? */,
autovector<SuperVersion*>* superversions_to_free,
autovector<log::Writer*>* logs_to_free,
WriteContext* context,
uint64_t expiration_time);
Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
WriteContext* context);
void BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group);

@ -5898,7 +5898,8 @@ TEST(DBTest, CompactOnFlush) {
}
namespace {
std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
std::vector<std::uint64_t> ListSpecificFiles(
Env* env, const std::string& path, const FileType expected_file_type) {
std::vector<std::string> files;
std::vector<uint64_t> log_files;
env->GetChildren(path, &files);
@ -5906,15 +5907,45 @@ std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
FileType type;
for (size_t i = 0; i < files.size(); ++i) {
if (ParseFileName(files[i], &number, &type)) {
if (type == kLogFile) {
if (type == expected_file_type) {
log_files.push_back(number);
}
}
}
return std::move(log_files);
}
std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
return ListSpecificFiles(env, path, kLogFile);
}
std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path) {
return ListSpecificFiles(env, path, kTableFile);
}
} // namespace
TEST(DBTest, FlushOneColumnFamily) {
Options options;
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
"alyosha", "popovich"},
&options);
ASSERT_OK(Put(0, "Default", "Default"));
ASSERT_OK(Put(1, "pikachu", "pikachu"));
ASSERT_OK(Put(2, "ilya", "ilya"));
ASSERT_OK(Put(3, "muromec", "muromec"));
ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
ASSERT_OK(Put(5, "nikitich", "nikitich"));
ASSERT_OK(Put(6, "alyosha", "alyosha"));
ASSERT_OK(Put(7, "popovich", "popovich"));
for (int i = 0; i < 8; ++i) {
Flush(i);
auto tables = ListTableFiles(env_, dbname_);
ASSERT_EQ(tables.size(), i + 1);
}
}
TEST(DBTest, WALArchivalTtl) {
do {
Options options = CurrentOptions();

Loading…
Cancel
Save