From 205083297452c1b206d2eb192b6e5f7580045e53 Mon Sep 17 00:00:00 2001 From: Andres Noetzli Date: Mon, 24 Aug 2015 17:04:18 -0700 Subject: [PATCH] Fixing race condition in DBTest.DynamicMemtableOptions Summary: This patch fixes a race condition in DBTEst.DynamicMemtableOptions. In rare cases, it was possible that the main thread would fill up both memtables before the flush job acquired its work. Then, the flush job was flushing both memtables together, producing only one L0 file while the test expected two. Now, the test waits for flushes to finish earlier, to make sure that the memtables are flushed in separate flush jobs. Test Plan: Insert "usleep(10000);" after "IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);" in BGWorkFlush() to make the issue more likely. Then test with: make db_test && time while ./db_test --gtest_filter=*DynamicMemtableOptions; do true; done Reviewers: rven, sdong, yhchiang, anthony, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D45429 --- db/db_bench.cc | 13 ++++++------- db/db_impl.cc | 35 ++++++++++++++++++----------------- db/db_test.cc | 12 +++++++++++- db/flush_job.cc | 1 + db/memtable_list.cc | 4 ++-- 5 files changed, 38 insertions(+), 27 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 4fad681a8..7832be6f4 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -223,9 +223,7 @@ static bool ValidateKeySize(const char* flagname, int32_t value) { static bool ValidateUint32Range(const char* flagname, uint64_t value) { if (value > std::numeric_limits::max()) { - fprintf(stderr, - "Invalid value for --%s: %lu, overflow\n", - flagname, + fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname, (unsigned long)value); return false; } @@ -298,10 +296,11 @@ DEFINE_int32(max_background_compactions, " that can occur in parallel."); DEFINE_uint64(subcompactions, 1, - "Maximum number of subcompactions to divide L0-L1 compactions " - "into."); -static const bool FLAGS_subcompactions_dummy __attribute__((unused)) = - RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range); + "Maximum number of subcompactions to divide L0-L1 compactions " + "into."); +static const bool FLAGS_subcompactions_dummy + __attribute__((unused)) = RegisterFlagValidator(&FLAGS_subcompactions, + &ValidateUint32Range); DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes, diff --git a/db/db_impl.cc b/db/db_impl.cc index acc093bef..411edbbc1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1338,7 +1338,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, Status DBImpl::FlushMemTableToOutputFile( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, - bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) { + bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); @@ -1363,8 +1363,8 @@ Status DBImpl::FlushMemTableToOutputFile( if (s.ok()) { InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, mutable_cf_options); - if (madeProgress) { - *madeProgress = 1; + if (made_progress) { + *made_progress = 1; } VersionStorageInfo::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), @@ -2276,7 +2276,7 @@ void DBImpl::BGWorkCompaction(void* db) { reinterpret_cast(db)->BackgroundCallCompaction(); } -Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, +Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); @@ -2317,7 +2317,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, cfd->GetName().c_str(), db_options_.max_background_flushes - bg_flush_scheduled_, db_options_.max_background_compactions - bg_compaction_scheduled_); - status = FlushMemTableToOutputFile(cfd, mutable_cf_options, madeProgress, + status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, job_context, log_buffer); if (cfd->Unref()) { delete cfd; @@ -2327,7 +2327,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context, } void DBImpl::BackgroundCallFlush() { - bool madeProgress = false; + bool made_progress = false; JobContext job_context(next_job_id_.fetch_add(1), true); assert(bg_flush_scheduled_); @@ -2338,7 +2338,7 @@ void DBImpl::BackgroundCallFlush() { auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); - Status s = BackgroundFlush(&madeProgress, &job_context, &log_buffer); + Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background flush in // case this is an environmental problem and we do not want to @@ -2392,7 +2392,7 @@ void DBImpl::BackgroundCallFlush() { } void DBImpl::BackgroundCallCompaction() { - bool madeProgress = false; + bool made_progress = false; JobContext job_context(next_job_id_.fetch_add(1), true); MaybeDumpStats(); @@ -2404,7 +2404,7 @@ void DBImpl::BackgroundCallCompaction() { CaptureCurrentFileNumberInPendingOutputs(); assert(bg_compaction_scheduled_); - Status s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer); + Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -2453,9 +2453,9 @@ void DBImpl::BackgroundCallCompaction() { // See if there's more work to be done MaybeScheduleFlushOrCompaction(); - if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { + if (made_progress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { // signal if - // * madeProgress -- need to wakeup DelayWrite + // * made_progress -- need to wakeup DelayWrite // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl // * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction // If none of this is true, there is no need to signal since nobody is @@ -2469,9 +2469,10 @@ void DBImpl::BackgroundCallCompaction() { } } -Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, +Status DBImpl::BackgroundCompaction(bool* made_progress, + JobContext* job_context, LogBuffer* log_buffer) { - *madeProgress = false; + *made_progress = false; mutex_.AssertHeld(); bool is_manual = (manual_compaction_ != nullptr) && @@ -2608,7 +2609,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); - *madeProgress = true; + *made_progress = true; } else if (!trivial_move_disallowed && c->IsTrivialMove()) { TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove"); // Instrument for event update @@ -2665,7 +2666,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->column_family_data()->GetName().c_str(), moved_files, c->output_level(), moved_bytes, status.ToString().c_str(), c->column_family_data()->current()->storage_info()->LevelSummary(&tmp)); - *madeProgress = true; + *made_progress = true; // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); @@ -2694,14 +2695,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, InstallSuperVersionAndScheduleWorkWrapper( c->column_family_data(), job_context, *c->mutable_cf_options()); } - *madeProgress = true; + *made_progress = true; } if (c != nullptr) { NotifyOnCompactionCompleted( c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); c->ReleaseCompactionFiles(status); - *madeProgress = true; + *made_progress = true; } // this will unref its input_version and column_family_data c.reset(); diff --git a/db/db_test.cc b/db/db_test.cc index 75124fa92..e56b0aca3 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6113,6 +6113,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { const uint64_t k64KB = 1 << 16; const uint64_t k128KB = 1 << 17; const uint64_t k5KB = 5 * 1024; + const int kNumPutsBeforeWaitForFlush = 64; Options options; options.env = env_; options.create_if_missing = true; @@ -6130,6 +6131,15 @@ TEST_F(DBTest, DynamicMemtableOptions) { Random rnd(301); for (int i = 0; i < size; i++) { ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); + + // The following condition prevents a race condition between flush jobs + // acquiring work and this thread filling up multiple memtables. Without + // this, the flush might produce less files than expected because + // multiple memtables are flushed into a single L0 file. This race + // condition affects assertion (A). + if (i % kNumPutsBeforeWaitForFlush == kNumPutsBeforeWaitForFlush - 1) { + dbfull()->TEST_WaitForFlushMemTable(); + } } dbfull()->TEST_WaitForFlushMemTable(); }; @@ -6153,7 +6163,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { // the next memtable will be 128KB in size. Write 256KB total, we should // have a 64KB L0 file, a 128KB L0 file, and a memtable with 64KB data gen_l0_kb(256); - ASSERT_EQ(NumTableFilesAtLevel(0), 2); + ASSERT_EQ(NumTableFilesAtLevel(0), 2); // (A) ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB); ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 2 * k5KB); diff --git a/db/flush_job.cc b/db/flush_job.cc index ae986e8f9..20cf7f860 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -218,6 +218,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, << total_num_entries << "num_deletes" << total_num_deletes << "memory_usage" << total_memory_usage; + TableFileCreationInfo info; { ScopedArenaIterator iter( diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 7ac3af60a..b2bbbd165 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -324,7 +324,7 @@ Status MemTableList::InstallMemtableFlushResults( // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. - uint64_t mem_id = 1; // how many memtables has been flushed. + uint64_t mem_id = 1; // how many memtables have been flushed. do { if (s.ok()) { // commit new state LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 @@ -333,7 +333,7 @@ Status MemTableList::InstallMemtableFlushResults( assert(m->file_number_ > 0); current_->Remove(m, to_delete); } else { - //commit failed. setup state so that we can flush again. + // commit failed. setup state so that we can flush again. LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64 " failed", m->file_number_, mem_id);