diff --git a/HISTORY.md b/HISTORY.md index 842a9f3ec..92f823218 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,8 @@ * Introduce CheckOptionsCompatibility() in rocksdb/utilities/options_util.h. This function checks whether the input set of options is able to open the specified DB successfully. ### Public API Changes +* When options.db_write_buffer_size triggers, only the column family with the largest column family size will be flushed, not all the column families. + ## 4.2.0 (11/9/2015) ### New Features * Introduce CreateLoggerFromOptions(), this function create a Logger for provided DBOptions. diff --git a/db/db_impl.cc b/db/db_impl.cc index 30da7a934..56a96a67d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3920,25 +3920,36 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, MaybeScheduleFlushOrCompaction(); } else if (UNLIKELY(write_buffer_.ShouldFlush())) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "Flushing all column families. Write buffer is using %" PRIu64 - " bytes out of a total of %" PRIu64 ".", + "Flushing column family with largest mem table size. Write buffer is " + "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", write_buffer_.memory_usage(), write_buffer_.buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread + ColumnFamilyData* largest_cfd = nullptr; + size_t largest_cfd_size = 0; + for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } if (!cfd->mem()->IsEmpty()) { - status = SwitchMemtable(cfd, &context); - if (!status.ok()) { - break; + // We only consider active mem table, hoping immutable memtable is + // already in the process of flushing. + size_t cfd_size = cfd->mem()->ApproximateMemoryUsage(); + if (largest_cfd == nullptr || cfd_size > largest_cfd_size) { + largest_cfd = cfd; + largest_cfd_size = cfd_size; } - cfd->imm()->FlushRequested(); - SchedulePendingFlush(cfd); } } - MaybeScheduleFlushOrCompaction(); + if (largest_cfd != nullptr) { + status = SwitchMemtable(largest_cfd, &context); + if (status.ok()) { + largest_cfd->imm()->FlushRequested(); + SchedulePendingFlush(largest_cfd); + MaybeScheduleFlushOrCompaction(); + } + } } if (UNLIKELY(status.ok() && !bg_error_.ok())) { diff --git a/db/db_test.cc b/db/db_test.cc index 71037bf4f..e6ebd7054 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4936,7 +4936,7 @@ TEST_F(DBTest, SharedWriteBuffer) { options.write_buffer_size = 500000; // this is never hit CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); - // Trigger a flush on every CF + // Trigger a flush on CF "nikitich" ASSERT_OK(Put(0, Key(1), DummyString(1))); ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(3, Key(1), DummyString(90000))); @@ -4948,65 +4948,107 @@ TEST_F(DBTest, SharedWriteBuffer) { dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(1)); + } + + // "dobrynia": 20KB + // Flush 'dobrynia' + ASSERT_OK(Put(3, Key(2), DummyString(40000))); + ASSERT_OK(Put(2, Key(2), DummyString(70000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(1)); } - // Flush 'dobrynia' and 'nikitich' - ASSERT_OK(Put(2, Key(2), DummyString(50000))); - ASSERT_OK(Put(3, Key(2), DummyString(40000))); - ASSERT_OK(Put(2, Key(3), DummyString(20000))); + // "nikitich" still has has data of 80KB + // Inserting Data in "dobrynia" triggers "nikitich" flushing. ASSERT_OK(Put(3, Key(2), DummyString(40000))); + ASSERT_OK(Put(2, Key(2), DummyString(40000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(2)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } - // Make 'dobrynia' and 'nikitich' both take up 40% of space - // When 'pikachu' puts us over 100%, all 3 flush. - ASSERT_OK(Put(2, Key(2), DummyString(40000))); + // "dobrynia" still has 40KB ASSERT_OK(Put(1, Key(2), DummyString(20000))); + ASSERT_OK(Put(0, Key(1), DummyString(10000))); ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); dbfull()->TEST_WaitForFlushMemTable(handles_[2]); dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + // This should triggers no flush { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(1)); + static_cast(0)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), + static_cast(1)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); + } + + // "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB + ASSERT_OK(Put(1, Key(2), DummyString(40000))); + ASSERT_OK(Put(0, Key(1), DummyString(1))); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + dbfull()->TEST_WaitForFlushMemTable(handles_[3]); + // This should triggers flush of "pikachu" + { + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(0)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(3)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(3)); + static_cast(2)); } - // Some remaining writes so 'default' and 'nikitich' flush on closure. + // "default": 10KB, "dobrynia": 40KB + // Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on + // closure. ASSERT_OK(Put(3, Key(1), DummyString(1))); ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, options); { ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), - static_cast(2)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), - static_cast(2)); + static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), - static_cast(3)); + static_cast(2)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), - static_cast(4)); + static_cast(3)); } } #endif // ROCKSDB_LITE