diff --git a/db/db_impl.cc b/db/db_impl.cc index 9b139ceb2..abcc76140 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -119,13 +119,12 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) db_lock_(NULL), shutting_down_(NULL), bg_cv_(&mutex_), - compacting_cv_(&mutex_), mem_(new MemTable(internal_comparator_)), imm_(NULL), logfile_(NULL), log_(NULL), bg_compaction_scheduled_(false), - compacting_(false) { + manual_compaction_(NULL) { mem_->Ref(); has_imm_.Release_Store(NULL); @@ -141,10 +140,8 @@ DBImpl::~DBImpl() { // Wait for background work to finish mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-NULL value is ok - if (bg_compaction_scheduled_) { - while (bg_compaction_scheduled_) { - bg_cv_.Wait(); - } + while (bg_compaction_scheduled_) { + bg_cv_.Wait(); } mutex_.Unlock(); @@ -437,7 +434,6 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) { Status DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != NULL); - assert(compacting_); // Save the contents of the memtable as a new Table VersionEdit edit; @@ -457,7 +453,6 @@ Status DBImpl::CompactMemTable() { DeleteObsoleteFiles(); } - compacting_cv_.SignalAll(); // Wake up waiter even if there was an error return s; } @@ -466,22 +461,18 @@ void DBImpl::TEST_CompactRange( const std::string& begin, const std::string& end) { MutexLock l(&mutex_); - while (compacting_) { - compacting_cv_.Wait(); + while (manual_compaction_ != NULL) { + bg_cv_.Wait(); } - Compaction* c = versions_->CompactRange( - level, - InternalKey(begin, kMaxSequenceNumber, kValueTypeForSeek), - InternalKey(end, 0, static_cast(0))); - - if (c != NULL) { - CompactionState* compact = new CompactionState(c); - DoCompactionWork(compact); // Ignore error in test compaction - CleanupCompaction(compact); - } - - // Start any background compaction that may have been delayed by this thread + ManualCompaction manual; + manual.level = level; + manual.begin = begin; + manual.end = end; + manual_compaction_ = &manual; MaybeScheduleCompaction(); + while (manual_compaction_ == &manual) { + bg_cv_.Wait(); + } } Status DBImpl::TEST_CompactMemTable() { @@ -490,7 +481,7 @@ Status DBImpl::TEST_CompactMemTable() { if (s.ok()) { // Wait until the compaction completes while (imm_ != NULL && bg_error_.ok()) { - compacting_cv_.Wait(); + bg_cv_.Wait(); } if (imm_ != NULL) { s = bg_error_; @@ -503,11 +494,11 @@ void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_compaction_scheduled_) { // Already scheduled - } else if (compacting_) { - // Some other thread is running a compaction. Do not conflict with it. } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions - } else if (imm_ == NULL && !versions_->NeedsCompaction()) { + } else if (imm_ == NULL && + manual_compaction_ == NULL && + !versions_->NeedsCompaction()) { // No work to be done } else { bg_compaction_scheduled_ = true; @@ -522,38 +513,41 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); - if (!shutting_down_.Acquire_Load() && - !compacting_) { + if (!shutting_down_.Acquire_Load()) { BackgroundCompaction(); } bg_compaction_scheduled_ = false; - bg_cv_.SignalAll(); // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); + bg_cv_.SignalAll(); } void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); - assert(!compacting_); if (imm_ != NULL) { - compacting_ = true; CompactMemTable(); - compacting_ = false; - compacting_cv_.SignalAll(); return; } - Compaction* c = versions_->PickCompaction(); - if (c == NULL) { - // Nothing to do - return; + Compaction* c; + bool is_manual = (manual_compaction_ != NULL); + if (is_manual) { + const ManualCompaction* m = manual_compaction_; + c = versions_->CompactRange( + m->level, + InternalKey(m->begin, kMaxSequenceNumber, kValueTypeForSeek), + InternalKey(m->end, 0, static_cast(0))); + } else { + c = versions_->PickCompaction(); } Status status; - if (c->IsTrivialMove()) { + if (c == NULL) { + // Nothing to do + } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); @@ -561,11 +555,13 @@ void DBImpl::BackgroundCompaction() { c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit()); - Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n", + VersionSet::LevelSummaryStorage tmp; + Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), - status.ToString().c_str()); + status.ToString().c_str(), + versions_->LevelSummary(&tmp)); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); @@ -584,6 +580,11 @@ void DBImpl::BackgroundCompaction() { bg_error_ = status; } } + + if (is_manual) { + // Mark it as done + manual_compaction_ = NULL; + } } void DBImpl::CleanupCompaction(CompactionState* compact) { @@ -734,7 +735,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } // Release mutex while we're actually doing the compaction work - compacting_ = true; mutex_.Unlock(); Iterator* input = versions_->MakeInputIterator(compact->compaction); @@ -751,7 +751,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { mutex_.Lock(); if (imm_ != NULL) { CompactMemTable(); - compacting_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); imm_micros += (env_->NowMicros() - imm_start); @@ -867,8 +867,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = InstallCompactionResults(compact); } - compacting_ = false; - compacting_cv_.SignalAll(); VersionSet::LevelSummaryStorage tmp; Log(env_, options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); @@ -1038,10 +1036,11 @@ Status DBImpl::MakeRoomForWrite(bool force) { } else if (imm_ != NULL) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. - compacting_cv_.Wait(); + bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // There are too many level-0 files. - compacting_cv_.Wait(); + Log(env_, options_.info_log, "waiting...\n"); + bg_cv_.Wait(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); diff --git a/db/db_impl.h b/db/db_impl.h index c23ae0094..84ce1546c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -119,8 +119,7 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::AtomicPointer shutting_down_; - port::CondVar bg_cv_; // Signalled when !bg_compaction_scheduled_ - port::CondVar compacting_cv_; // Signalled when !compacting_ + port::CondVar bg_cv_; // Signalled when background work finishes MemTable* mem_; MemTable* imm_; // Memtable being compacted port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ @@ -135,8 +134,13 @@ class DBImpl : public DB { // Has a background compaction been scheduled or is running? bool bg_compaction_scheduled_; - // Is there a compaction running? - bool compacting_; + // Information for a manual compaction + struct ManualCompaction { + int level; + std::string begin; + std::string end; + }; + ManualCompaction* manual_compaction_; VersionSet* versions_;