[Rocksdb] Submit mem table flush job in a different thread pool

Summary: As title. This is just a quick hack and not ready for commit. fails a lot of unit test. I will test/debug it directly in ViewState shadow .

Test Plan: Try it in shadow test.

Reviewers: dhruba, xjin

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12933
main
Haobo Xu 11 years ago
parent 658a3ce2fa
commit fa798e9e28
  1. 99
      db/db_impl.cc
  2. 10
      db/db_impl.h
  3. 15
      include/rocksdb/options.h
  4. 3
      util/options.cc

@ -169,12 +169,14 @@ Options SanitizeOptions(const std::string& dbname,
// function.
auto factory = dynamic_cast<PrefixHashRepFactory*>(
result.memtable_factory.get());
if (factory != nullptr &&
if (factory &&
factory->GetTransform() != result.prefix_extractor) {
Log(result.info_log, "A prefix hash representation factory was supplied "
"whose prefix extractor does not match options.prefix_extractor. "
"Falling back to skip list representation factory");
result.memtable_factory = std::make_shared<SkipListFactory>();
} else if (factory) {
Log(result.info_log, "Prefix hash memtable rep is in use.");
}
}
return result;
@ -198,6 +200,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_number_(0),
tmp_batch_(),
bg_compaction_scheduled_(0),
bg_flush_scheduled_(0),
bg_logstats_scheduled_(false),
manual_compaction_(nullptr),
logger_(nullptr),
@ -265,7 +268,9 @@ DBImpl::~DBImpl() {
}
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-nullptr value is ok
while (bg_compaction_scheduled_ || bg_logstats_scheduled_) {
while (bg_compaction_scheduled_ ||
bg_flush_scheduled_ ||
bg_logstats_scheduled_) {
bg_cv_.Wait();
}
mutex_.Unlock();
@ -285,13 +290,17 @@ void DBImpl::TEST_Destroy_DBImpl() {
// wait till all background compactions are done.
mutex_.Lock();
while (bg_compaction_scheduled_ || bg_logstats_scheduled_) {
while (bg_compaction_scheduled_ ||
bg_flush_scheduled_ ||
bg_logstats_scheduled_) {
bg_cv_.Wait();
}
// Prevent new compactions from occuring.
bg_work_gate_closed_ = true;
const int LargeNumber = 10000000;
bg_compaction_scheduled_ += LargeNumber;
mutex_.Unlock();
// force release the lock file.
@ -1015,10 +1024,10 @@ void DBImpl::ReFitLevel(int level, int target_level) {
// wait for all background threads to stop
bg_work_gate_closed_ = true;
while (bg_compaction_scheduled_ > 0) {
while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) {
Log(options_.info_log,
"RefitLevel: waiting for background threads to stop: %d",
bg_compaction_scheduled_);
"RefitLevel: waiting for background threads to stop: %d %d",
bg_compaction_scheduled_, bg_flush_scheduled_);
bg_cv_.Wait();
}
@ -1369,7 +1378,8 @@ Status DBImpl::TEST_WaitForCompactMemTable() {
Status DBImpl::TEST_WaitForCompact() {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (bg_compaction_scheduled_ && bg_error_.ok()) {
while ((bg_compaction_scheduled_ || bg_flush_scheduled_) &&
bg_error_.ok()) {
bg_cv_.Wait();
}
return bg_error_;
@ -1379,29 +1389,80 @@ void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_work_gate_closed_) {
// gate closed for backgrond work
} else if (bg_compaction_scheduled_ >= options_.max_background_compactions) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge) &&
manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
} else {
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWork, this);
bool is_flush_pending =
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge);
if (is_flush_pending &&
(bg_flush_scheduled_ < options_.max_background_flushes)) {
// memtable flush needed
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
}
if ((manual_compaction_ ||
versions_->NeedsCompaction() ||
(is_flush_pending && (options_.max_background_flushes <= 0))) &&
bg_compaction_scheduled_ < options_.max_background_compactions) {
// compaction needed, or memtable flush needed but HIGH pool not enabled.
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
}
}
}
void DBImpl::BGWorkFlush(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
}
void DBImpl::BGWorkCompaction(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}
Status DBImpl::BackgroundFlush() {
Status stat;
while (stat.ok() &&
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
Log(options_.info_log,
"BackgroundCallFlush doing CompactMemTable, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_);
stat = CompactMemTable();
}
return stat;
}
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
void DBImpl::BackgroundCallFlush() {
assert(bg_flush_scheduled_);
MutexLock l(&mutex_);
if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundFlush();
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background flush error: %s",
s.ToString().c_str());
mutex_.Unlock();
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
}
bg_flush_scheduled_--;
bg_cv_.SignalAll();
}
void DBImpl::TEST_PurgeObsoleteteWAL() {
PurgeObsoleteWALFiles();
}
void DBImpl::BackgroundCall() {
void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
DeletionState deletion_state;
@ -1454,6 +1515,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
*madeProgress = false;
mutex_.AssertHeld();
// TODO: remove memtable flush from formal compaction
while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
Log(options_.info_log,
"BackgroundCompaction doing CompactMemTable, compaction slots available %d",
@ -1830,6 +1892,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
// TODO: remove memtable flush from normal compaction work
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();

@ -187,9 +187,12 @@ class DBImpl : public DB {
void LogDBDeployStats();
void MaybeScheduleCompaction();
static void BGWork(void* db);
void BackgroundCall();
static void BGWorkCompaction(void* db);
static void BGWorkFlush(void* db);
void BackgroundCallCompaction();
void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
Status BackgroundFlush();
void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact);
@ -283,6 +286,9 @@ class DBImpl : public DB {
// count how many background compaction been scheduled or is running?
int bg_compaction_scheduled_;
// number of background memtable flush jobs, submitted to the HIGH pool
int bg_flush_scheduled_;
// Has a background stats log thread scheduled?
bool bg_logstats_scheduled_;

@ -369,10 +369,23 @@ struct Options {
// every compaction run.
uint64_t delete_obsolete_files_period_micros;
// Maximum number of concurrent background compactions.
// Maximum number of concurrent background jobs, submitted to
// the default LOW priority thread pool
// Default: 1
int max_background_compactions;
// Maximum number of concurrent background memtable flush jobs, submitted to
// the HIGH priority thread pool.
// By default, all background jobs (major compaction and memtable flush) go
// to the LOW priority pool. If this option is set to a positive number,
// memtable flush jobs will be submitted to the HIGH priority pool.
// It is important when the same Env is shared by multiple db instances.
// Without a separate pool, long running major compaction jobs could
// potentially block memtable flush jobs of other db instances, leading to
// unnecessary Put stalls.
// Default: 0
int max_background_flushes;
// Specify the maximal size of the info log file. If the log file
// is larger than `max_log_file_size`, a new info log file will
// be created.

@ -54,6 +54,7 @@ Options::Options()
disable_seek_compaction(false),
delete_obsolete_files_period_micros(0),
max_background_compactions(1),
max_background_flushes(0),
max_log_file_size(0),
log_file_time_to_roll(0),
keep_log_file_num(1000),
@ -199,6 +200,8 @@ Options::Dump(Logger* log) const
delete_obsolete_files_period_micros);
Log(log," Options.max_background_compactions: %d",
max_background_compactions);
Log(log," Options.max_background_flushes: %d",
max_background_flushes);
Log(log," Options.soft_rate_limit: %.2f",
soft_rate_limit);
Log(log," Options.hard_rate_limit: %.2f",

Loading…
Cancel
Save