diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index b5437c495..e57768a74 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1292,6 +1292,8 @@ class DBImpl : public DB { Status ScheduleFlushes(WriteContext* context); + void MaybeFlushStatsCF(autovector* cfds); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); void SelectColumnFamiliesForAtomicFlush(autovector* cfds); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8cb37484c..ff03e591d 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1551,13 +1551,39 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { s = SwitchMemtable(cfd, &context); } - if (s.ok()) { if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { flush_memtable_id = cfd->imm()->GetLatestMemTableID(); flush_req.emplace_back(cfd, flush_memtable_id); } + if (immutable_db_options_.persist_stats_to_disk) { + ColumnFamilyData* cfd_stats = + versions_->GetColumnFamilySet()->GetColumnFamily( + kPersistentStatsColumnFamilyName); + if (cfd_stats != nullptr && cfd_stats != cfd && + !cfd_stats->mem()->IsEmpty()) { + // only force flush stats CF when it will be the only CF lagging + // behind after the current flush + bool stats_cf_flush_needed = true; + for (auto* loop_cfd : *versions_->GetColumnFamilySet()) { + if (loop_cfd == cfd_stats || loop_cfd == cfd) { + continue; + } + if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) { + stats_cf_flush_needed = false; + } + } + if (stats_cf_flush_needed) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Force flushing stats CF with manual flush of %s " + "to avoid holding old logs", cfd->GetName().c_str()); + s = SwitchMemtable(cfd_stats, &context); + flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID(); + flush_req.emplace_back(cfd_stats, flush_memtable_id); + } + } + } } if (s.ok() && !flush_req.empty()) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 21b123c3a..c0d320013 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1228,6 +1228,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { cfds.push_back(cfd); } } + MaybeFlushStatsCF(&cfds); } for (const auto cfd : cfds) { cfd->Ref(); @@ -1294,6 +1295,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { if (cfd_picked != nullptr) { cfds.push_back(cfd_picked); } + MaybeFlushStatsCF(&cfds); } for (const auto cfd : cfds) { @@ -1437,6 +1439,40 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, return Status::OK(); } +void DBImpl::MaybeFlushStatsCF(autovector* cfds) { + assert(cfds != nullptr); + if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) { + ColumnFamilyData* cfd_stats = + versions_->GetColumnFamilySet()->GetColumnFamily( + kPersistentStatsColumnFamilyName); + if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) { + for (ColumnFamilyData* cfd : *cfds) { + if (cfd == cfd_stats) { + // stats CF already included in cfds + return; + } + } + // force flush stats CF when its log number is less than all other CF's + // log numbers + bool force_flush_stats_cf = true; + for (auto* loop_cfd : *versions_->GetColumnFamilySet()) { + if (loop_cfd == cfd_stats) { + continue; + } + if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) { + force_flush_stats_cf = false; + } + } + if (force_flush_stats_cf) { + cfds->push_back(cfd_stats); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Force flushing stats CF with automated flush " + "to avoid holding old logs"); + } + } + } +} + Status DBImpl::ScheduleFlushes(WriteContext* context) { autovector cfds; if (immutable_db_options_.atomic_flush) { @@ -1450,6 +1486,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { cfds.push_back(tmp_cfd); } + MaybeFlushStatsCF(&cfds); } Status status; for (auto& cfd : cfds) { diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index 16681fe05..bef928558 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -561,7 +561,7 @@ TEST_F(StatsHistoryTest, PersistentStatsReadOnly) { Close(); // Reopen and flush memtable. - Reopen(options); + ASSERT_OK(TryReopen(options)); Flush(); Close(); // Now check keys in read only mode. @@ -569,6 +569,81 @@ TEST_F(StatsHistoryTest, PersistentStatsReadOnly) { } #endif // !ROCKSDB_LITE +TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { + Options options; + options.create_if_missing = true; + options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ColumnFamilyData* cfd_default = + static_cast(dbfull()->DefaultColumnFamily()) + ->cfd(); + ColumnFamilyData* cfd_stats = static_cast( + dbfull()->PersistentStatsColumnFamily()) + ->cfd(); + ColumnFamilyData* cfd_test = + static_cast(handles_[1])->cfd(); + + ASSERT_OK(Put("foo", "v0")); + ASSERT_OK(Put("bar", "v0")); + ASSERT_EQ("v0", Get("bar")); + ASSERT_EQ("v0", Get("foo")); + ASSERT_OK(Put(1, "Eevee", "v0")); + ASSERT_EQ("v0", Get(1, "Eevee")); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + // writing to all three cf, flush default cf + // LogNumbers: default: 14, stats: 4, pikachu: 4 + ASSERT_OK(Flush()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo1", "v1")); + ASSERT_OK(Put("bar1", "v1")); + ASSERT_EQ("v1", Get("bar1")); + ASSERT_EQ("v1", Get("foo1")); + ASSERT_OK(Put(1, "Vaporeon", "v1")); + ASSERT_EQ("v1", Get(1, "Vaporeon")); + // writing to default and test cf, flush test cf + // LogNumbers: default: 14, stats: 16, pikachu: 16 + ASSERT_OK(Flush(1)); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_GT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo2", "v2")); + ASSERT_OK(Put("bar2", "v2")); + ASSERT_EQ("v2", Get("bar2")); + ASSERT_EQ("v2", Get("foo2")); + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(10); }); + // writing to default and stats cf, flushing default cf + // LogNumbers: default: 19, stats: 19, pikachu: 19 + ASSERT_OK(Flush()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo3", "v3")); + ASSERT_OK(Put("bar3", "v3")); + ASSERT_EQ("v3", Get("bar3")); + ASSERT_EQ("v3", Get("foo3")); + ASSERT_OK(Put(1, "Jolteon", "v3")); + ASSERT_EQ("v3", Get(1, "Jolteon")); + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(15); }); + // writing to all three cf, flushing test cf + // LogNumbers: default: 19, stats: 19, pikachu: 22 + ASSERT_OK(Flush(1)); + ASSERT_LT(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + Close(); +} + } // namespace rocksdb int main(int argc, char** argv) {