diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 518cabf11..90e6be6fb 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -159,10 +159,10 @@ Status DBImpl::FlushMemTableToOutputFile( FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, - nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(), - &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), - GetDataDir(cfd, 0U), + port::kMaxUint64 /* memtable_id */, file_options_for_compaction_, + versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, + earliest_write_conflict_snapshot, snapshot_checker, job_context, + log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, @@ -313,30 +313,22 @@ Status DBImpl::FlushMemTablesToOutputFiles( return AtomicFlushMemTablesToOutputFiles( bg_flush_args, made_progress, job_context, log_buffer, thread_pri); } + assert(bg_flush_args.size() == 1); std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; SnapshotChecker* snapshot_checker; GetSnapshotContext(job_context, &snapshot_seqs, &earliest_write_conflict_snapshot, &snapshot_checker); - Status status; - for (auto& arg : bg_flush_args) { - ColumnFamilyData* cfd = arg.cfd_; - MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); - SuperVersionContext* superversion_context = arg.superversion_context_; - Status s = FlushMemTableToOutputFile( - cfd, mutable_cf_options, made_progress, job_context, - superversion_context, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, log_buffer, thread_pri); - if (!s.ok()) { - status = s; - if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { - // At this point, DB is not shutting down, nor is cfd dropped. - // Something is wrong, thus we break out of the loop. - break; - } - } - } - return status; + const auto& bg_flush_arg = bg_flush_args[0]; + ColumnFamilyData* cfd = bg_flush_arg.cfd_; + MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); + SuperVersionContext* superversion_context = + bg_flush_arg.superversion_context_; + Status s = FlushMemTableToOutputFile( + cfd, mutable_cf_options, made_progress, job_context, superversion_context, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + log_buffer, thread_pri); + return s; } /* @@ -399,7 +391,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); - const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); + uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_; jobs.emplace_back(new FlushJob( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_, @@ -1697,8 +1689,9 @@ void DBImpl::GenerateFlushRequest(const autovector& cfds, Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, FlushReason flush_reason, bool writes_stopped) { + // This method should not be called if atomic_flush is true. + assert(!immutable_db_options_.atomic_flush); Status s; - uint64_t flush_memtable_id = 0; if (!flush_options.allow_write_stall) { bool flush_needed = true; s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed); @@ -1708,7 +1701,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } - FlushRequest flush_req; + autovector flush_reqs; + autovector memtable_ids_to_wait; { WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); @@ -1730,11 +1724,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, assert(cfd->imm()->NumNotFlushed() > 0); } } + const uint64_t flush_memtable_id = port::kMaxUint64; if (s.ok()) { if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { - flush_memtable_id = cfd->imm()->GetLatestMemTableID(); - flush_req.emplace_back(cfd, flush_memtable_id); + FlushRequest req{{cfd, flush_memtable_id}}; + flush_reqs.emplace_back(std::move(req)); + memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID()); } if (immutable_db_options_.persist_stats_to_disk && flush_reason != FlushReason::kErrorRecoveryRetryFlush) { @@ -1760,15 +1756,19 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, "to avoid holding old logs", cfd->GetName().c_str()); s = SwitchMemtable(cfd_stats, &context); - flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID(); - flush_req.emplace_back(cfd_stats, flush_memtable_id); + FlushRequest req{{cfd_stats, flush_memtable_id}}; + flush_reqs.emplace_back(std::move(req)); + memtable_ids_to_wait.emplace_back( + cfd->imm()->GetLatestMemTableID()); } } } } - if (s.ok() && !flush_req.empty()) { - for (auto& elem : flush_req) { - ColumnFamilyData* loop_cfd = elem.first; + + if (s.ok() && !flush_reqs.empty()) { + for (const auto& req : flush_reqs) { + assert(req.size() == 1); + ColumnFamilyData* loop_cfd = req[0].first; loop_cfd->imm()->FlushRequested(); } // If the caller wants to wait for this flush to complete, it indicates @@ -1776,12 +1776,15 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // other threads which may drop the column family concurrently. // Therefore, we increase the cfd's ref count. if (flush_options.wait) { - for (auto& elem : flush_req) { - ColumnFamilyData* loop_cfd = elem.first; + for (const auto& req : flush_reqs) { + assert(req.size() == 1); + ColumnFamilyData* loop_cfd = req[0].first; loop_cfd->Ref(); } } - SchedulePendingFlush(flush_req, flush_reason); + for (const auto& req : flush_reqs) { + SchedulePendingFlush(req, flush_reason); + } MaybeScheduleFlushOrCompaction(); } @@ -1797,9 +1800,11 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (s.ok() && flush_options.wait) { autovector cfds; autovector flush_memtable_ids; - for (auto& iter : flush_req) { - cfds.push_back(iter.first); - flush_memtable_ids.push_back(&(iter.second)); + assert(flush_reqs.size() == memtable_ids_to_wait.size()); + for (size_t i = 0; i < flush_reqs.size(); ++i) { + assert(flush_reqs[i].size() == 1); + cfds.push_back(flush_reqs[i][0].first); + flush_memtable_ids.push_back(&(memtable_ids_to_wait[i])); } s = WaitForFlushMemTables( cfds, flush_memtable_ids, @@ -2224,6 +2229,17 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { assert(!flush_queue_.empty()); FlushRequest flush_req = flush_queue_.front(); flush_queue_.pop_front(); + if (!immutable_db_options_.atomic_flush) { + assert(flush_req.size() == 1); + } + for (const auto& elem : flush_req) { + if (!immutable_db_options_.atomic_flush) { + ColumnFamilyData* cfd = elem.first; + assert(cfd); + assert(cfd->queued_for_flush()); + cfd->set_queued_for_flush(false); + } + } // TODO: need to unset flush reason? return flush_req; } @@ -2256,19 +2272,36 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue( void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, FlushReason flush_reason) { + mutex_.AssertHeld(); if (flush_req.empty()) { return; } - for (auto& iter : flush_req) { - ColumnFamilyData* cfd = iter.first; - cfd->Ref(); - cfd->SetFlushReason(flush_reason); + if (!immutable_db_options_.atomic_flush) { + // For the non-atomic flush case, we never schedule multiple column + // families in the same flush request. + assert(flush_req.size() == 1); + ColumnFamilyData* cfd = flush_req[0].first; + assert(cfd); + if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) { + cfd->Ref(); + cfd->set_queued_for_flush(true); + cfd->SetFlushReason(flush_reason); + ++unscheduled_flushes_; + flush_queue_.push_back(flush_req); + } + } else { + for (auto& iter : flush_req) { + ColumnFamilyData* cfd = iter.first; + cfd->Ref(); + cfd->SetFlushReason(flush_reason); + } + ++unscheduled_flushes_; + flush_queue_.push_back(flush_req); } - ++unscheduled_flushes_; - flush_queue_.push_back(flush_req); } void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { + mutex_.AssertHeld(); if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) { AddToCompactionQueue(cfd); ++unscheduled_compactions_; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 1cab2b6c0..02ef5671e 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1335,10 +1335,17 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { } for (auto cfd : cfds) { cfd->imm()->FlushRequested(); + if (!immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + } + } + if (immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); } - FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); MaybeScheduleFlushOrCompaction(); } return status; @@ -1414,10 +1421,17 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { } for (const auto cfd : cfds) { cfd->imm()->FlushRequested(); + if (!immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } + } + if (immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); } - FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } return status; @@ -1641,10 +1655,16 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { if (status.ok()) { if (immutable_db_options_.atomic_flush) { AssignAtomicFlushSeq(cfds); + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } else { + for (auto* cfd : cfds) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } } - FlushRequest flush_req; - GenerateFlushRequest(cfds, &flush_req); - SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); MaybeScheduleFlushOrCompaction(); } return status; diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index bad0153a6..281cf76be 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -748,10 +748,12 @@ TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { } }; for (int k = 0; k != 8; ++k) { - ASSERT_OK( - Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); - ASSERT_OK( - Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + for (int j = 0; j < 2; ++j) { + ASSERT_OK(Put(0 /*cf*/, "key" + std::to_string(k), + "value" + std::to_string(k))); + ASSERT_OK(Put(1 /*cf*/, "key" + std::to_string(k), + "value" + std::to_string(k))); + } TEST_SYNC_POINT( "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"); ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index f5f97dc36..eb7481ab6 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1646,7 +1646,7 @@ TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override { count++; - assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason); + ASSERT_EQ(FlushReason::kWriteBufferManager, flush_job_info.flush_reason); } }; std::shared_ptr test_listener = @@ -1690,7 +1690,9 @@ TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { 1 * kMB); // Write one more key to trigger flush. ASSERT_OK(Put(0, "foo", "v2")); - dbfull()->TEST_WaitForFlushMemTable(); + for (auto* h : handles_) { + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h)); + } // Flushed two column families. ASSERT_EQ(2, test_listener->count.load()); } diff --git a/db/flush_job.cc b/db/flush_job.cc index 6b943a567..d596dc06b 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -80,22 +80,24 @@ const char* GetFlushReasonString (FlushReason flush_reason) { } } -FlushJob::FlushJob( - const std::string& dbname, ColumnFamilyData* cfd, - const ImmutableDBOptions& db_options, - const MutableCFOptions& mutable_cf_options, const uint64_t* max_memtable_id, - const FileOptions& file_options, VersionSet* versions, - InstrumentedMutex* db_mutex, std::atomic* shutting_down, - std::vector existing_snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, FSDirectory* db_directory, - FSDirectory* output_file_directory, CompressionType output_compression, - Statistics* stats, EventLogger* event_logger, bool measure_io_stats, - const bool sync_output_directory, const bool write_manifest, - Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::string& db_id, const std::string& db_session_id, - std::string full_history_ts_low) +FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, + const MutableCFOptions& mutable_cf_options, + uint64_t max_memtable_id, const FileOptions& file_options, + VersionSet* versions, InstrumentedMutex* db_mutex, + std::atomic* shutting_down, + std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SnapshotChecker* snapshot_checker, JobContext* job_context, + LogBuffer* log_buffer, FSDirectory* db_directory, + FSDirectory* output_file_directory, + CompressionType output_compression, Statistics* stats, + EventLogger* event_logger, bool measure_io_stats, + const bool sync_output_directory, const bool write_manifest, + Env::Priority thread_pri, + const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id, + std::string full_history_ts_low) : dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), diff --git a/db/flush_job.h b/db/flush_job.h index 785cfc9bc..e3623209f 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -60,10 +60,9 @@ class FlushJob { // IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, - const MutableCFOptions& mutable_cf_options, - const uint64_t* max_memtable_id, const FileOptions& file_options, - VersionSet* versions, InstrumentedMutex* db_mutex, - std::atomic* shutting_down, + const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id, + const FileOptions& file_options, VersionSet* versions, + InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, JobContext* job_context, @@ -110,12 +109,11 @@ class FlushJob { ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; const MutableCFOptions& mutable_cf_options_; - // Pointer to a variable storing the largest memtable id to flush in this + // A variable storing the largest memtable id to flush in this // flush job. RocksDB uses this variable to select the memtables to flush in // this job. All memtables in this column family with an ID smaller than or - // equal to *max_memtable_id_ will be selected for flush. If null, then all - // memtables in the column family will be selected. - const uint64_t* max_memtable_id_; + // equal to max_memtable_id_ will be selected for flush. + uint64_t max_memtable_id_; const FileOptions file_options_; VersionSet* versions_; InstrumentedMutex* db_mutex_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 6ac6a2e80..2ac569f77 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -158,7 +158,7 @@ TEST_F(FlushJobTest, Empty) { SnapshotChecker* snapshot_checker = nullptr; // not relavant FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + *cfd->GetLatestMutableCFOptions(), port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false, @@ -240,7 +240,7 @@ TEST_F(FlushJobTest, NonEmpty) { SnapshotChecker* snapshot_checker = nullptr; // not relavant FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + *cfd->GetLatestMutableCFOptions(), port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, @@ -302,7 +302,7 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_, + *cfd->GetLatestMutableCFOptions(), flush_memtable_id, env_options_, versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, @@ -374,7 +374,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { std::vector snapshot_seqs; flush_jobs.emplace_back(new FlushJob( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), - &memtable_ids[k], env_options_, versions_.get(), &mutex_, + memtable_ids[k], env_options_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, @@ -491,7 +491,7 @@ TEST_F(FlushJobTest, Snapshots) { SnapshotChecker* snapshot_checker = nullptr; // not relavant FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + *cfd->GetLatestMutableCFOptions(), port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, @@ -558,8 +558,8 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) { PutFixed64(&full_history_ts_low, std::numeric_limits::max()); FlushJob flush_job( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), - nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, - &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, + port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), + &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, @@ -609,8 +609,8 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { PutFixed64(&full_history_ts_low, 0); FlushJob flush_job( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), - nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, - &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, + port::kMaxUint64 /* memtable_id */, env_options_, versions_.get(), + &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f2974ec04..ffb4d7502 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -334,7 +334,7 @@ bool MemTableList::IsFlushPending() const { } // Returns the memtables that need to be flushed. -void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, +void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, autovector* ret) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); @@ -345,7 +345,7 @@ void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { atomic_flush = true; } - if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) { + if (m->GetID() > max_memtable_id) { break; } if (!m->flush_in_progress_) { diff --git a/db/memtable_list.h b/db/memtable_list.h index 0af7a0cea..62e03cf53 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -251,7 +251,7 @@ class MemTableList { // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. - void PickMemtablesToFlush(const uint64_t* max_memtable_id, + void PickMemtablesToFlush(uint64_t max_memtable_id, autovector* mems); // Reset status of the given memtable list back to pending state so that diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 6f578e7c7..e3b7eb621 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -199,7 +199,7 @@ TEST_F(MemTableListTest, Empty) { ASSERT_FALSE(list.IsFlushPending()); autovector mems; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &mems); ASSERT_EQ(0, mems.size()); autovector to_delete; @@ -399,7 +399,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Flush this memtable from the list. // (It will then be a part of the memtable history). autovector to_flush; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(1, to_flush.size()); MutableCFOptions mutable_cf_options(options); @@ -451,7 +451,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { ASSERT_EQ(0, to_delete.size()); to_flush.clear(); - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(1, to_flush.size()); // Flush second memtable @@ -567,7 +567,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); autovector to_flush; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(0, to_flush.size()); // Request a flush even though there is nothing to flush @@ -576,7 +576,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Attempt to 'flush' to clear request for flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(0, to_flush.size()); ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -600,7 +600,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(2, to_flush.size()); ASSERT_EQ(2, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -621,7 +621,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_EQ(0, to_delete.size()); // Pick tables to flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); ASSERT_EQ(3, to_flush.size()); ASSERT_EQ(3, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -629,7 +629,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Pick tables to flush again autovector to_flush2; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush2); ASSERT_EQ(0, to_flush2.size()); ASSERT_EQ(3, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -647,7 +647,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush again - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush2); ASSERT_EQ(1, to_flush2.size()); ASSERT_EQ(4, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -668,7 +668,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_EQ(0, to_delete.size()); // Pick tables to flush - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush); // Should pick 4 of 5 since 1 table has been picked in to_flush2 ASSERT_EQ(4, to_flush.size()); ASSERT_EQ(5, list.NumNotFlushed()); @@ -677,7 +677,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Pick tables to flush again autovector to_flush3; - list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3); + list.PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, &to_flush3); ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed ASSERT_EQ(5, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -738,7 +738,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { autovector to_flush4; list.FlushRequested(); ASSERT_TRUE(list.HasFlushRequested()); - list.PickMemtablesToFlush(&memtable_id, &to_flush4); + list.PickMemtablesToFlush(memtable_id, &to_flush4); ASSERT_TRUE(to_flush4.empty()); ASSERT_EQ(1, list.NumNotFlushed()); ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -749,7 +749,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // equal to 5. Therefore, only tables[5] will be selected. memtable_id = 5; list.FlushRequested(); - list.PickMemtablesToFlush(&memtable_id, &to_flush4); + list.PickMemtablesToFlush(memtable_id, &to_flush4); ASSERT_EQ(1, static_cast(to_flush4.size())); ASSERT_EQ(1, list.NumNotFlushed()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -841,7 +841,8 @@ TEST_F(MemTableListTest, AtomicFlusTest) { auto* list = lists[i]; ASSERT_FALSE(list->IsFlushPending()); ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); - list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]); + list->PickMemtablesToFlush(port::kMaxUint64 /* memtable_id */, + &flush_candidates[i]); ASSERT_EQ(0, flush_candidates[i].size()); } // Request flush even though there is nothing to flush @@ -871,8 +872,7 @@ TEST_F(MemTableListTest, AtomicFlusTest) { // Pick memtables to flush for (auto i = 0; i != num_cfs; ++i) { flush_candidates[i].clear(); - lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i], - &flush_candidates[i]); + lists[i]->PickMemtablesToFlush(flush_memtable_ids[i], &flush_candidates[i]); ASSERT_EQ(flush_memtable_ids[i] - 0 + 1, static_cast(flush_candidates[i].size())); }