From 3886dddc3b44bf5061c0f93eab578c51e8bad7bd Mon Sep 17 00:00:00 2001 From: Zhongyi Xie Date: Mon, 1 Jul 2019 11:53:25 -0700 Subject: [PATCH] force flushing stats CF to avoid holding old logs (#5509) Summary: WAL records RocksDB writes to all column families. When user flushes a a column family, the old WAL will not accept new writes but cannot be deleted yet because it may still contain live data for other column families. (See https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log#life-cycle-of-a-wal for detailed explanation) Because of this, if there is a column family that receive very infrequent writes and no manual flush is called for it, it could prevent a lot of WALs from being deleted. PR https://github.com/facebook/rocksdb/pull/5046 introduced persistent stats column family which is a good example of such column families. Depending on the config, it may have long intervals between writes, and user is unaware of it which makes it difficult to call manual flush for it. This PR addresses the problem for persistent stats column family by forcing a flush for persistent stats column family when 1) another column family is flushed 2) persistent stats column family's log number is the smallest among all column families, this way persistent stats column family will keep advancing its log number when necessary, allowing RocksDB to delete old WAL files. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5509 Differential Revision: D16045896 Pulled By: miasantreble fbshipit-source-id: 286837b633e988417f0096ff38384742d3b40ef4 --- db/db_impl/db_impl.h | 2 + db/db_impl/db_impl_compaction_flush.cc | 28 +++++++++- db/db_impl/db_impl_write.cc | 37 +++++++++++++ monitoring/stats_history_test.cc | 77 +++++++++++++++++++++++++- 4 files changed, 142 insertions(+), 2 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index b5437c495..e57768a74 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1292,6 +1292,8 @@ class DBImpl : public DB { Status ScheduleFlushes(WriteContext* context); + void MaybeFlushStatsCF(autovector* cfds); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); void SelectColumnFamiliesForAtomicFlush(autovector* cfds); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8cb37484c..ff03e591d 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1551,13 +1551,39 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { s = SwitchMemtable(cfd, &context); } - if (s.ok()) { if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) { flush_memtable_id = cfd->imm()->GetLatestMemTableID(); flush_req.emplace_back(cfd, flush_memtable_id); } + if (immutable_db_options_.persist_stats_to_disk) { + ColumnFamilyData* cfd_stats = + versions_->GetColumnFamilySet()->GetColumnFamily( + kPersistentStatsColumnFamilyName); + if (cfd_stats != nullptr && cfd_stats != cfd && + !cfd_stats->mem()->IsEmpty()) { + // only force flush stats CF when it will be the only CF lagging + // behind after the current flush + bool stats_cf_flush_needed = true; + for (auto* loop_cfd : *versions_->GetColumnFamilySet()) { + if (loop_cfd == cfd_stats || loop_cfd == cfd) { + continue; + } + if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) { + stats_cf_flush_needed = false; + } + } + if (stats_cf_flush_needed) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Force flushing stats CF with manual flush of %s " + "to avoid holding old logs", cfd->GetName().c_str()); + s = SwitchMemtable(cfd_stats, &context); + flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID(); + flush_req.emplace_back(cfd_stats, flush_memtable_id); + } + } + } } if (s.ok() && !flush_req.empty()) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 21b123c3a..c0d320013 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1228,6 +1228,7 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { cfds.push_back(cfd); } } + MaybeFlushStatsCF(&cfds); } for (const auto cfd : cfds) { cfd->Ref(); @@ -1294,6 +1295,7 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) { if (cfd_picked != nullptr) { cfds.push_back(cfd_picked); } + MaybeFlushStatsCF(&cfds); } for (const auto cfd : cfds) { @@ -1437,6 +1439,40 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, return Status::OK(); } +void DBImpl::MaybeFlushStatsCF(autovector* cfds) { + assert(cfds != nullptr); + if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) { + ColumnFamilyData* cfd_stats = + versions_->GetColumnFamilySet()->GetColumnFamily( + kPersistentStatsColumnFamilyName); + if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) { + for (ColumnFamilyData* cfd : *cfds) { + if (cfd == cfd_stats) { + // stats CF already included in cfds + return; + } + } + // force flush stats CF when its log number is less than all other CF's + // log numbers + bool force_flush_stats_cf = true; + for (auto* loop_cfd : *versions_->GetColumnFamilySet()) { + if (loop_cfd == cfd_stats) { + continue; + } + if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) { + force_flush_stats_cf = false; + } + } + if (force_flush_stats_cf) { + cfds->push_back(cfd_stats); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Force flushing stats CF with automated flush " + "to avoid holding old logs"); + } + } + } +} + Status DBImpl::ScheduleFlushes(WriteContext* context) { autovector cfds; if (immutable_db_options_.atomic_flush) { @@ -1450,6 +1486,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { cfds.push_back(tmp_cfd); } + MaybeFlushStatsCF(&cfds); } Status status; for (auto& cfd : cfds) { diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index 16681fe05..bef928558 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -561,7 +561,7 @@ TEST_F(StatsHistoryTest, PersistentStatsReadOnly) { Close(); // Reopen and flush memtable. - Reopen(options); + ASSERT_OK(TryReopen(options)); Flush(); Close(); // Now check keys in read only mode. @@ -569,6 +569,81 @@ TEST_F(StatsHistoryTest, PersistentStatsReadOnly) { } #endif // !ROCKSDB_LITE +TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { + Options options; + options.create_if_missing = true; + options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb + options.stats_persist_period_sec = 5; + options.statistics = rocksdb::CreateDBStatistics(); + options.persist_stats_to_disk = true; + std::unique_ptr mock_env; + mock_env.reset(new rocksdb::MockTimeEnv(env_)); + mock_env->set_current_time(0); // in seconds + options.env = mock_env.get(); + CreateColumnFamilies({"pikachu"}, options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ColumnFamilyData* cfd_default = + static_cast(dbfull()->DefaultColumnFamily()) + ->cfd(); + ColumnFamilyData* cfd_stats = static_cast( + dbfull()->PersistentStatsColumnFamily()) + ->cfd(); + ColumnFamilyData* cfd_test = + static_cast(handles_[1])->cfd(); + + ASSERT_OK(Put("foo", "v0")); + ASSERT_OK(Put("bar", "v0")); + ASSERT_EQ("v0", Get("bar")); + ASSERT_EQ("v0", Get("foo")); + ASSERT_OK(Put(1, "Eevee", "v0")); + ASSERT_EQ("v0", Get(1, "Eevee")); + dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); + // writing to all three cf, flush default cf + // LogNumbers: default: 14, stats: 4, pikachu: 4 + ASSERT_OK(Flush()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo1", "v1")); + ASSERT_OK(Put("bar1", "v1")); + ASSERT_EQ("v1", Get("bar1")); + ASSERT_EQ("v1", Get("foo1")); + ASSERT_OK(Put(1, "Vaporeon", "v1")); + ASSERT_EQ("v1", Get(1, "Vaporeon")); + // writing to default and test cf, flush test cf + // LogNumbers: default: 14, stats: 16, pikachu: 16 + ASSERT_OK(Flush(1)); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_GT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo2", "v2")); + ASSERT_OK(Put("bar2", "v2")); + ASSERT_EQ("v2", Get("bar2")); + ASSERT_EQ("v2", Get("foo2")); + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(10); }); + // writing to default and stats cf, flushing default cf + // LogNumbers: default: 19, stats: 19, pikachu: 19 + ASSERT_OK(Flush()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + + ASSERT_OK(Put("foo3", "v3")); + ASSERT_OK(Put("bar3", "v3")); + ASSERT_EQ("v3", Get("bar3")); + ASSERT_EQ("v3", Get("foo3")); + ASSERT_OK(Put(1, "Jolteon", "v3")); + ASSERT_EQ("v3", Get(1, "Jolteon")); + dbfull()->TEST_WaitForPersistStatsRun( + [&] { mock_env->set_current_time(15); }); + // writing to all three cf, flushing test cf + // LogNumbers: default: 19, stats: 19, pikachu: 22 + ASSERT_OK(Flush(1)); + ASSERT_LT(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); + Close(); +} + } // namespace rocksdb int main(int argc, char** argv) {