Refactor flush request queueing and processing (#3952)

Summary:
RocksDB currently queues individual column family for flushing. This is not sufficient to support the needs of some applications that want to enforce order/dependency between column families, given that multiple foreground and background activities can trigger flushing in RocksDB.

This PR aims to address this limitation. Each flush request is described as a `FlushRequest` that can contain multiple column families. A background flushing thread pops one flush request from the queue at a time and processes it.

This PR does not enable atomic_flush yet, but is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3952

Differential Revision: D8529933

Pulled By: riversand963

fbshipit-source-id: 78908a21e389a3a3f7de2a79bae0cd13af5f3539
main
Yanqin Jin 7 years ago committed by Facebook Github Bot
parent 17f9a181d5
commit 7daae512d2
  1. 5
      db/db_impl.cc
  2. 59
      db/db_impl.h
  3. 187
      db/db_impl_compaction_flush.cc
  4. 51
      db/db_impl_write.cc

@ -348,11 +348,14 @@ Status DBImpl::CloseHelper() {
flush_scheduler_.Clear();
while (!flush_queue_.empty()) {
auto cfd = PopFirstFromFlushQueue();
const FlushRequest& flush_req = PopFirstFromFlushQueue();
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
if (cfd->Unref()) {
delete cfd;
}
}
}
while (!compaction_queue_.empty()) {
auto cfd = PopFirstFromCompactionQueue();
if (cfd->Unref()) {

@ -884,12 +884,41 @@ class DBImpl : public DB {
Status SyncClosedLogs(JobContext* job_context);
// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
// log-file/memtable and writes a new descriptor iff successful. Then
// installs a new super version for the column family.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
bool* madeProgress, JobContext* job_context,
SuperVersionContext* superversion_context,
LogBuffer* log_buffer);
// Argument required by background flush thread.
struct BGFlushArg {
BGFlushArg()
: cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {}
BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id,
SuperVersionContext* superversion_context)
: cfd_(cfd),
memtable_id_(memtable_id),
superversion_context_(superversion_context) {}
// Column family to flush.
ColumnFamilyData* cfd_;
// Maximum ID of memtable to flush. In this column family, memtables with
// IDs smaller than this value must be flushed before this flush completes.
uint64_t memtable_id_;
// Pointer to a SuperVersionContext object. After flush completes, RocksDB
// installs a new superversion for the column family. This operation
// requires a SuperVersionContext object (currently embedded in JobContext).
SuperVersionContext* superversion_context_;
};
// Flush the memtables of (multiple) column families to multiple files on
// persistent storage.
Status FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer);
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only);
@ -911,8 +940,7 @@ class DBImpl : public DB {
Status ScheduleFlushes(WriteContext* context);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
FlushReason flush_reason = FlushReason::kOthers);
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
@ -923,7 +951,13 @@ class DBImpl : public DB {
// gets flush. Otherwise, wait until the column family don't have any
// memtable pending flush.
Status WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id = nullptr);
const uint64_t* flush_memtable_id = nullptr) {
return WaitForFlushMemTables({cfd}, {flush_memtable_id});
}
// Wait for memtables to be flushed for multiple column families.
Status WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids);
// REQUIRES: mutex locked
Status SwitchWAL(WriteContext* write_context);
@ -979,7 +1013,17 @@ class DBImpl : public DB {
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
void MaybeScheduleFlushOrCompaction();
void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason);
// A flush request specifies the column families to flush as well as the
// largest memtable id to persist for each column family. Once all the
// memtables whose IDs are smaller than or equal to this per-column-family
// specified value, this flush request is considered to have completed its
// work of flushing this column family. After completing the work for all
// column families in this request, this flush is considered complete.
typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest;
void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
@ -1021,8 +1065,7 @@ class DBImpl : public DB {
// helper functions for adding and removing from flush & compaction queues
void AddToCompactionQueue(ColumnFamilyData* cfd);
ColumnFamilyData* PopFirstFromCompactionQueue();
void AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason);
ColumnFamilyData* PopFirstFromFlushQueue();
FlushRequest PopFirstFromFlushQueue();
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
@ -1255,7 +1298,7 @@ class DBImpl : public DB {
// in MaybeScheduleFlushOrCompaction()
// invariant(column family present in flush_queue_ <==>
// ColumnFamilyData::pending_flush_ == true)
std::deque<ColumnFamilyData*> flush_queue_;
std::deque<FlushRequest> flush_queue_;
// invariant(column family present in compaction_queue_ <==>
// ColumnFamilyData::pending_compaction_ == true)
std::deque<ColumnFamilyData*> compaction_queue_;

@ -104,7 +104,8 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
bool* made_progress, JobContext* job_context,
SuperVersionContext* superversion_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
@ -160,8 +161,8 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (s.ok()) {
InstallSuperVersionAndScheduleWork(
cfd, &job_context->superversion_contexts[0], mutable_cf_options);
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
mutable_cf_options);
if (made_progress) {
*made_progress = 1;
}
@ -200,6 +201,25 @@ Status DBImpl::FlushMemTableToOutputFile(
return s;
}
Status DBImpl::FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer) {
Status s;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
const MutableCFOptions& mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
SuperVersionContext* superversion_context = arg.superversion_context_;
s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, superversion_context,
log_buffer);
if (!s.ok()) {
break;
}
}
return s;
}
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
@ -1077,63 +1097,93 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
FlushReason flush_reason, bool writes_stopped) {
Status s;
uint64_t flush_memtable_id = 0;
FlushRequest flush_req;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
cached_recoverable_state_empty_.load()) {
// Nothing to flush
return Status::OK();
}
WriteThread::Writer w;
if (!writes_stopped) {
write_thread_.EnterUnbatched(&w, &mutex_);
}
// SwitchMemtable() will release and reacquire mutex during execution
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
s = SwitchMemtable(cfd, &context);
flush_memtable_id = cfd->imm()->GetLatestMemTableID();
flush_req.emplace_back(cfd, flush_memtable_id);
}
if (s.ok() && !flush_req.empty()) {
for (auto& elem : flush_req) {
ColumnFamilyData* loop_cfd = elem.first;
loop_cfd->imm()->FlushRequested();
}
SchedulePendingFlush(flush_req, flush_reason);
MaybeScheduleFlushOrCompaction();
}
if (!writes_stopped) {
write_thread_.ExitUnbatched(&w);
}
cfd->imm()->FlushRequested();
// schedule flush
SchedulePendingFlush(cfd, flush_reason);
MaybeScheduleFlushOrCompaction();
}
if (s.ok() && flush_options.wait) {
// Wait until the compaction completes
s = WaitForFlushMemTable(cfd, &flush_memtable_id);
autovector<ColumnFamilyData*> cfds;
autovector<const uint64_t*> flush_memtable_ids;
for (auto& iter : flush_req) {
cfds.push_back(iter.first);
flush_memtable_ids.push_back(&(iter.second));
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids);
}
TEST_SYNC_POINT("FlushMemTableFinished");
return s;
}
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id) {
Status s;
// Wait for memtables to be flushed for multiple column families.
// let N = cfds.size()
// for i in [0, N),
// 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
// have to be flushed for THIS column family;
// 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
// family have to be flushed.
// Finish waiting when ALL column families finish flushing memtables.
Status DBImpl::WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids) {
int num = static_cast<int>(cfds.size());
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
while (cfd->imm()->NumNotFlushed() > 0 && !error_handler_.IsDBStopped() &&
(flush_memtable_id == nullptr ||
cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) {
while (!error_handler_.IsDBStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (cfd->IsDropped()) {
// FlushJob cannot flush a dropped CF, if we did not break here
// we will loop forever since cfd->imm()->NumNotFlushed() will never
// drop to zero
// Number of column families that have been dropped.
int num_dropped = 0;
// Number of column families that have finished flush.
int num_finished = 0;
for (int i = 0; i < num; ++i) {
if (cfds[i]->IsDropped()) {
++num_dropped;
} else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
(flush_memtable_ids[i] != nullptr &&
cfds[i]->imm()->GetEarliestMemTableID() >
*flush_memtable_ids[i])) {
++num_finished;
}
}
if (1 == num_dropped && 1 == num) {
return Status::InvalidArgument("Cannot flush a dropped CF");
}
// Column families involved in this flush request have either been dropped
// or finished flush. Then it's time to finish waiting.
if (num_dropped + num_finished == num) {
break;
}
bg_cv_.Wait();
}
Status s;
if (error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
}
@ -1172,7 +1222,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
@ -1183,7 +1232,6 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
bg_job_limits.max_flushes) {
unscheduled_flushes_--;
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
}
@ -1260,30 +1308,28 @@ ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
return cfd;
}
void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason) {
assert(!cfd->queued_for_flush());
cfd->Ref();
flush_queue_.push_back(cfd);
cfd->set_queued_for_flush(true);
cfd->SetFlushReason(flush_reason);
}
ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
assert(!flush_queue_.empty());
auto cfd = *flush_queue_.begin();
FlushRequest flush_req = flush_queue_.front();
assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size()));
unscheduled_flushes_ -= static_cast<int>(flush_req.size());
flush_queue_.pop_front();
assert(cfd->queued_for_flush());
cfd->set_queued_for_flush(false);
// TODO: need to unset flush reason?
return cfd;
return flush_req;
}
void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
FlushReason flush_reason) {
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
AddToFlushQueue(cfd, flush_reason);
++unscheduled_flushes_;
if (flush_req.empty()) {
return;
}
for (auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
cfd->SetFlushReason(flush_reason);
}
unscheduled_flushes_ += static_cast<int>(flush_req.size());
flush_queue_.push_back(flush_req);
}
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
@ -1367,40 +1413,55 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
return status;
}
ColumnFamilyData* cfd = nullptr;
autovector<BGFlushArg> bg_flush_args;
std::vector<SuperVersionContext>& superversion_contexts =
job_context->superversion_contexts;
while (!flush_queue_.empty()) {
// This cfd is already referenced
auto first_cfd = PopFirstFromFlushQueue();
const FlushRequest& flush_req = PopFirstFromFlushQueue();
superversion_contexts.clear();
superversion_contexts.reserve(flush_req.size());
if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
// can't flush this CF, try next one
if (first_cfd->Unref()) {
delete first_cfd;
if (cfd->Unref()) {
delete cfd;
}
continue;
}
// found a flush!
cfd = first_cfd;
superversion_contexts.emplace_back(SuperVersionContext(true));
bg_flush_args.emplace_back(cfd, iter.second,
&(superversion_contexts.back()));
}
if (!bg_flush_args.empty()) {
break;
}
}
if (cfd != nullptr) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
if (!bg_flush_args.empty()) {
auto bg_job_limits = GetBGJobLimits();
for (const auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
ROCKS_LOG_BUFFER(
log_buffer,
"Calling FlushMemTableToOutputFile with column "
"family [%s], flush slots available %d, compaction slots available %d, "
"family [%s], flush slots available %d, compaction slots available "
"%d, "
"flush slots scheduled %d, compaction slots scheduled %d",
cfd->GetName().c_str(), bg_job_limits.max_flushes,
bg_job_limits.max_compactions, bg_flush_scheduled_,
bg_compaction_scheduled_);
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
}
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->Unref()) {
delete cfd;
arg.cfd_ = nullptr;
}
}
}
return status;
@ -2080,7 +2141,10 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options, FlushReason flush_reason) {
const MutableCFOptions& mutable_cf_options,
FlushReason /* flush_reason */) {
// TODO(yanqin) investigate if 'flush_reason' can be removed since it's not
// used.
mutex_.AssertHeld();
// Update max_total_in_memory_state_
@ -2099,7 +2163,6 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
SchedulePendingFlush(cfd, flush_reason);
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();

@ -1064,6 +1064,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
FlushRequest flush_req;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
@ -1073,11 +1074,14 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
if (!status.ok()) {
break;
}
flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd, FlushReason::kWriteBufferManager);
}
}
if (status.ok()) {
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
MaybeScheduleFlushOrCompaction();
}
return status;
}
@ -1116,15 +1120,27 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
}
}
}
autovector<ColumnFamilyData*> cfds;
if (cfd_picked != nullptr) {
status = SwitchMemtable(cfd_picked, write_context,
FlushReason::kWriteBufferFull);
cfds.push_back(cfd_picked);
}
FlushRequest flush_req;
for (const auto cfd : cfds) {
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->Unref();
if (!status.ok()) {
break;
}
uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
cfd->imm()->FlushRequested();
flush_req.emplace_back(cfd, flush_memtable_id);
}
if (status.ok()) {
cfd_picked->imm()->FlushRequested();
SchedulePendingFlush(cfd_picked, FlushReason::kWriteBufferFull);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
}
return status;
}
@ -1219,16 +1235,28 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
FlushRequest flush_req;
Status status;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
auto status = SwitchMemtable(cfd, context, FlushReason::kWriteBufferFull);
status = SwitchMemtable(cfd, context);
bool should_schedule = true;
if (cfd->Unref()) {
delete cfd;
should_schedule = false;
}
if (!status.ok()) {
return status;
break;
}
if (should_schedule) {
uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
flush_req.emplace_back(cfd, flush_memtable_id);
}
return Status::OK();
}
if (status.ok()) {
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
return status;
}
#ifndef ROCKSDB_LITE
@ -1249,8 +1277,7 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
FlushReason flush_reason) {
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutex_.AssertHeld();
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
@ -1422,7 +1449,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
new_mem->Ref();
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options, flush_reason);
mutable_cf_options);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}

Loading…
Cancel
Save