diff --git a/db/db_impl.cc b/db/db_impl.cc index cff2d5a20..1d72a1ea4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -375,7 +375,7 @@ DBImpl::~DBImpl() { mutex_.Lock(); if (flush_on_destroy_) { for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->mem()->GetFirstSequenceNumber() != 0) { + if (!cfd->mem()->IsEmpty()) { cfd->Ref(); mutex_.Unlock(); FlushMemTable(cfd, FlushOptions()); @@ -1905,6 +1905,12 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, { WriteContext context; MutexLock guard_lock(&mutex_); + + if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) { + // Nothing to flush + return Status::OK(); + } + s = BeginWrite(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job diff --git a/db/db_test.cc b/db/db_test.cc index b30bfd70d..140e87078 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2535,6 +2535,49 @@ class SleepingBackgroundTask { bool done_with_sleep_; }; +TEST(DBTest, FlushEmptyColumnFamily) { + // Block flush thread and disable compaction thread + env_->SetBackgroundThreads(1, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); + SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, + Env::Priority::HIGH); + + Options options = CurrentOptions(); + // disable compaction + options.disable_auto_compactions = true; + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + options.max_write_buffer_number = 2; + options.min_write_buffer_number_to_merge = 1; + CreateAndReopenWithCF({"pikachu"}, &options); + + // Compaction can still go through even if no thread can flush the + // mem table. + ASSERT_OK(Flush(0)); + ASSERT_OK(Flush(1)); + + // Insert can go through + ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1")); + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); + + ASSERT_EQ("v1", Get(0, "foo")); + ASSERT_EQ("v1", Get(1, "bar")); + + sleeping_task_high.WakeUp(); + sleeping_task_high.WaitUntilDone(); + + // Flush can still go through. + ASSERT_OK(Flush(0)); + ASSERT_OK(Flush(1)); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); +} + TEST(DBTest, GetProperty) { // Set sizes to both background thread pool to be 1 and block them. env_->SetBackgroundThreads(1, Env::HIGH); diff --git a/db/memtable.cc b/db/memtable.cc index e102575a4..1ed0e2cea 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -414,7 +414,7 @@ static bool SaveValue(void* arg, const char* entry) { bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, MergeContext& merge_context, const Options& options) { // The sequence number is updated synchronously in version_set.h - if (first_seqno_ == 0) { + if (IsEmpty()) { // Avoiding recording stats for speed. return false; } diff --git a/db/memtable.h b/db/memtable.h index 2723f30d8..80dcdd42e 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -137,6 +137,9 @@ class MemTable { // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } + // Returns if there is no entry inserted to the mem table. + bool IsEmpty() const { return first_seqno_ == 0; } + // Returns the sequence number of the first element that was inserted // into the memtable SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }