diff --git a/db/db_impl.cc b/db/db_impl.cc index 48983bf03..96c3db8c9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4758,8 +4758,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_buffer_manager_->buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread - ColumnFamilyData* largest_cfd = nullptr; - size_t largest_cfd_size = 0; + ColumnFamilyData* cfd_picked = nullptr; + SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { @@ -4768,18 +4768,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (!cfd->mem()->IsEmpty()) { // We only consider active mem table, hoping immutable memtable is // already in the process of flushing. - size_t cfd_size = cfd->mem()->ApproximateMemoryUsage(); - if (largest_cfd == nullptr || cfd_size > largest_cfd_size) { - largest_cfd = cfd; - largest_cfd_size = cfd_size; + uint64_t seq = cfd->mem()->GetCreationSeq(); + if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { + cfd_picked = cfd; + seq_num_for_cf_picked = seq; } } } - if (largest_cfd != nullptr) { - status = SwitchMemtable(largest_cfd, &context); + if (cfd_picked != nullptr) { + status = SwitchMemtable(cfd_picked, &context); if (status.ok()) { - largest_cfd->imm()->FlushRequested(); - SchedulePendingFlush(largest_cfd); + cfd_picked->imm()->FlushRequested(); + SchedulePendingFlush(cfd_picked); MaybeScheduleFlushOrCompaction(); } } @@ -5317,17 +5317,21 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_dir_synced_ = false; logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); - for (auto loop_cfd : *versions_->GetColumnFamilySet()) { - // all this is just optimization to delete logs that - // are no longer needed -- if CF is empty, that means it - // doesn't need that particular log to stay alive, so we just - // advance the log number. no need to persist this in the manifest - if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 && - loop_cfd->imm()->NumNotFlushed() == 0) { + } + for (auto loop_cfd : *versions_->GetColumnFamilySet()) { + // all this is just optimization to delete logs that + // are no longer needed -- if CF is empty, that means it + // doesn't need that particular log to stay alive, so we just + // advance the log number. no need to persist this in the manifest + if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 && + loop_cfd->imm()->NumNotFlushed() == 0) { + if (creating_new_log) { loop_cfd->SetLogNumber(logfile_number_); } + loop_cfd->mem()->SetCreationSeq(versions_->LastSequence()); } } + cfd->mem()->SetNextLogNumber(logfile_number_); cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); diff --git a/db/db_test2.cc b/db/db_test2.cc index e9110772e..109a550ac 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -183,19 +183,32 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { options.write_buffer_size = 500000; // this is never hit CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); - // Trigger a flush on CF "nikitich" - ASSERT_OK(Put(0, Key(1), DummyString(1))); - ASSERT_OK(Put(1, Key(1), DummyString(1))); - ASSERT_OK(Put(3, Key(1), DummyString(90000))); - ASSERT_OK(Put(2, Key(2), DummyString(20000))); - ASSERT_OK(Put(2, Key(1), DummyString(1))); + WriteOptions wo; + wo.disableWAL = true; + + // Create some data and flush "default" and "nikitich" so that they + // are newer CFs created. + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + Flush(3); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + Flush(0); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), + static_cast(1)); + + ASSERT_OK(Put(3, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(60000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + // No flush should trigger dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), @@ -204,99 +217,85 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { static_cast(1)); } - // "dobrynia": 20KB - // Flush 'dobrynia' - ASSERT_OK(Put(3, Key(2), DummyString(40000))); - ASSERT_OK(Put(2, Key(2), DummyString(70000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); + // Trigger a flush. Flushing "nikitich". + ASSERT_OK(Put(3, Key(2), DummyString(30000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(1)); + static_cast(2)); } - // "nikitich" still has data of 80KB - // Inserting Data in "dobrynia" triggers "nikitich" flushing. - ASSERT_OK(Put(3, Key(2), DummyString(40000))); - ASSERT_OK(Put(2, Key(2), DummyString(40000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); + // Without hitting the threshold, no flush should trigger. + ASSERT_OK(Put(2, Key(1), DummyString(30000), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } - // "dobrynia" still has 40KB - ASSERT_OK(Put(1, Key(2), DummyString(20000))); - ASSERT_OK(Put(0, Key(1), DummyString(10000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); + // Hit the write buffer limit again. "default" + // will have been flushed. + ASSERT_OK(Put(2, Key(2), DummyString(10000), wo)); + ASSERT_OK(Put(3, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(1), wo)); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - // This should triggers no flush { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(2)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } - // "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB - ASSERT_OK(Put(1, Key(2), DummyString(40000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); + // Trigger another flush. This time "dobrynia". "pikachu" should not + // be flushed, althrough it was never flushed. + ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(80000), wo)); + ASSERT_OK(Put(1, Key(1), DummyString(1), wo)); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); - // This should triggers flush of "pikachu" { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(2)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } - - // "default": 10KB, "dobrynia": 40KB - // Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on - // closure. - ASSERT_OK(Put(3, Key(1), DummyString(1))); - ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, - options); - { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(1)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(2)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(3)); - } } INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs, @@ -314,66 +313,60 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { ASSERT_OK(DB::Open(options, dbname2, &db2)); WriteOptions wo; + wo.disableWAL = true; // Trigger a flush on cf2 - ASSERT_OK(Put(0, Key(1), DummyString(1))); - ASSERT_OK(Put(1, Key(1), DummyString(1))); - ASSERT_OK(Put(2, Key(1), DummyString(90000))); + ASSERT_OK(Put(2, Key(1), DummyString(70000), wo)); + ASSERT_OK(Put(0, Key(1), DummyString(20000), wo)); // Insert to DB2 ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000))); - ASSERT_OK(Put(2, Key(1), DummyString(1))); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); static_cast(db2)->TEST_WaitForFlushMemTable(); { - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), - static_cast(0)); - ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") + + GetNumberOfSstFilesForColumnFamily(db_, "cf1") + + GetNumberOfSstFilesForColumnFamily(db_, "cf2"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), static_cast(0)); } - // db2: 20KB - ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000))); - ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); - ASSERT_OK(db2->Put(wo, Key(1), DummyString(1))); + // Triggering to flush another CF in DB1 + ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000))); + ASSERT_OK(Put(2, Key(1), DummyString(1), wo)); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); - static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), - static_cast(1)); + static_cast(0)); } - // - // Inserting Data in db2 and db_ triggers flushing in db_. - ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000))); - ASSERT_OK(Put(2, Key(2), DummyString(45000))); - ASSERT_OK(Put(0, Key(1), DummyString(1))); + // Triggering flush in DB2. + ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000))); + ASSERT_OK(db2->Put(wo, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[0]); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); static_cast(db2)->TEST_WaitForFlushMemTable(); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(0)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"), static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"), - static_cast(2)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"), static_cast(1)); } diff --git a/db/memtable.cc b/db/memtable.cc index a8ece01ce..0ecd36f7e 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -61,7 +61,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, WriteBufferManager* write_buffer_manager, - SequenceNumber earliest_seq) + SequenceNumber latest_seq) : comparator_(cmp), moptions_(ioptions, mutable_cf_options), refs_(0), @@ -83,7 +83,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, flush_completed_(false), file_number_(0), first_seqno_(0), - earliest_seqno_(earliest_seq), + earliest_seqno_(latest_seq), + creation_seq_(latest_seq), mem_next_logfile_number_(0), min_prep_log_referenced_(0), locks_(moptions_.inplace_update_support diff --git a/db/memtable.h b/db/memtable.h index 12d7be61d..2417a1130 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -286,6 +286,12 @@ class MemTable { return earliest_seqno_.load(std::memory_order_relaxed); } + // DB's latest sequence ID when the memtable is created. This number + // may be updated to a more recent one before any key is inserted. + SequenceNumber GetCreationSeq() const { return creation_seq_; } + + void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } + // Returns the next active logfile number when this memtable is about to // be flushed to storage // REQUIRES: external synchronization to prevent simultaneous @@ -381,6 +387,8 @@ class MemTable { // if not set. std::atomic earliest_seqno_; + SequenceNumber creation_seq_; + // The log files earlier than this number can be deleted. uint64_t mem_next_logfile_number_;