diff --git a/HISTORY.md b/HISTORY.md index 99235a33d..23d8717f3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,9 @@ * DBIter::Next() can skip user key checking if previous entry's seqnum is 0. * Merging iterator to avoid child iterator reseek for some cases +### Bug Fixes +* Fix an assertion failure `IsFlushPending() == true` caused by one bg thread releasing the db mutex in ~ColumnFamilyData and another thread clearing `flush_requested_` flag. + ## 6.2.0 (4/30/2019) ### New Features * Add an option `strict_bytes_per_sync` that causes a file-writing thread to block rather than exceed the limit on bytes pending writeback specified by `bytes_per_sync` or `wal_bytes_per_sync`. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 09c461f8d..c603f60b4 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -514,6 +514,37 @@ TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) { ASSERT_EQ("value", Get(0, "key")); } +TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) { + bool atomic_flush = GetParam(); + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + options.max_write_buffer_number = 4; + // Set min_write_buffer_number_to_merge to be greater than 1, so that + // a column family with one memtable in the imm will not cause IsFlushPending + // to return true when flush_requested_ is false. + options.min_write_buffer_number_to_merge = 2; + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + ASSERT_OK(dbfull()->PauseBackgroundWork()); + ASSERT_OK(Put(0, "key00", "value00")); + ASSERT_OK(Put(1, "key10", "value10")); + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(dbfull()->Flush(flush_opts, handles_)); + ASSERT_OK(Put(0, "key01", "value01")); + // Since max_write_buffer_number is 4, the following flush won't cause write + // stall. + ASSERT_OK(dbfull()->Flush(flush_opts)); + ASSERT_OK(dbfull()->DropColumnFamily(handles_[1])); + ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1])); + handles_[1] = nullptr; + ASSERT_OK(dbfull()->ContinueBackgroundWork()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0])); + delete handles_[0]; + handles_.clear(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 1cdadf039..3fbf24e49 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -2082,6 +2082,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, autovector bg_flush_args; std::vector& superversion_contexts = job_context->superversion_contexts; + autovector column_families_not_to_flush; while (!flush_queue_.empty()) { // This cfd is already referenced const FlushRequest& flush_req = PopFirstFromFlushQueue(); @@ -2092,9 +2093,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, ColumnFamilyData* cfd = iter.first; if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) { // can't flush this CF, try next one - if (cfd->Unref()) { - delete cfd; - } + column_families_not_to_flush.push_back(cfd); continue; } superversion_contexts.emplace_back(SuperVersionContext(true)); @@ -2133,6 +2132,11 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, } } } + for (auto cfd : column_families_not_to_flush) { + if (cfd->Unref()) { + delete cfd; + } + } return status; } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 5abe59b36..69beb77f9 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -277,8 +277,12 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); const auto& memlist = current_->memlist_; + bool atomic_flush = false; for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; + if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { + atomic_flush = true; + } if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) { break; } @@ -292,7 +296,9 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, ret->push_back(m); } } - flush_requested_ = false; // start-flush request is complete + if (!atomic_flush || num_flush_not_started_ == 0) { + flush_requested_ = false; // start-flush request is complete + } } void MemTableList::RollbackMemtableFlush(const autovector& mems,