From df7004766989ae00e1681c369b1de8d5d9f74107 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 30 Apr 2014 14:33:40 -0400 Subject: [PATCH] Flush stale column families Summary: Added a new option `max_total_wal_size`. Once the total WAL size goes over that, we make an attempt to flush all column families that still have data in the earliest WAL file. By default, I calculate `max_total_wal_size` dynamically, that should be good-enough for non-advanced customers. Test Plan: Added a test Reviewers: dhruba, haobo, sdong, ljin, yhchiang Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D18345 --- db/column_family_test.cc | 38 ++++++++++++++++++++++++++++--- db/db_impl.cc | 48 ++++++++++++++++++++++++++++++++------- db/db_impl.h | 14 +++++++++++- include/rocksdb/options.h | 8 +++++++ util/options.cc | 5 ++++ 5 files changed, 101 insertions(+), 12 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index dc0323717..5f7ff48a8 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -238,7 +238,7 @@ class ColumnFamilyTest { return result; } - int CountLiveFiles(int cf) { + int CountLiveFiles() { std::vector metadata; db_->GetLiveFilesMetaData(&metadata); return static_cast(metadata.size()); @@ -396,10 +396,10 @@ TEST(ColumnFamilyTest, DropTest) { } ASSERT_EQ("bar1", Get(1, "1")); - ASSERT_EQ(CountLiveFiles(1), 1); + ASSERT_EQ(CountLiveFiles(), 1); DropColumnFamilies({1}); // make sure that all files are deleted when we drop the column family - ASSERT_EQ(CountLiveFiles(1), 0); + ASSERT_EQ(CountLiveFiles(), 0); Destroy(); } } @@ -545,6 +545,7 @@ TEST(ColumnFamilyTest, FlushTest) { // Makes sure that obsolete log files get deleted TEST(ColumnFamilyTest, LogDeletionTest) { + db_options_.max_total_wal_size = std::numeric_limits::max(); column_family_options_.write_buffer_size = 100000; // 100KB Open(); CreateColumnFamilies({"one", "two", "three", "four"}); @@ -611,6 +612,8 @@ TEST(ColumnFamilyTest, LogDeletionTest) { // Makes sure that obsolete log files get deleted TEST(ColumnFamilyTest, DifferentWriteBufferSizes) { + // disable flushing stale column families + db_options_.max_total_wal_size = std::numeric_limits::max(); Open(); CreateColumnFamilies({"one", "two", "three"}); ColumnFamilyOptions default_cf, one, two, three; @@ -938,6 +941,35 @@ TEST(ColumnFamilyTest, DontRollEmptyLogs) { Close(); } +TEST(ColumnFamilyTest, FlushStaleColumnFamilies) { + Open(); + CreateColumnFamilies({"one", "two"}); + ColumnFamilyOptions default_cf, one, two; + default_cf.write_buffer_size = 100000; // small write buffer size + default_cf.disable_auto_compactions = true; + one.disable_auto_compactions = true; + two.disable_auto_compactions = true; + db_options_.max_total_wal_size = 210000; + + Reopen({default_cf, one, two}); + + PutRandomData(2, 1, 10); // 10 bytes + for (int i = 0; i < 2; ++i) { + PutRandomData(0, 100, 1000); // flush + WaitForFlush(0); + ASSERT_EQ(i + 1, CountLiveFiles()); + } + // third flush. now, CF [two] should be detected as stale and flushed + // column family 1 should not be flushed since it's empty + PutRandomData(0, 100, 1000); // flush + WaitForFlush(0); + WaitForFlush(2); + // 3 files for default column families, 1 file for column family [two], zero + // files for column family [one], because it's empty + ASSERT_EQ(4, CountLiveFiles()); + Close(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 818aa4bf5..12adea0b7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -350,6 +350,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) logfile_number_(0), log_empty_(true), default_cf_handle_(nullptr), + total_log_size_(0), + max_total_in_memory_state_(0), tmp_batch_(), bg_schedule_needed_(false), bg_compaction_scheduled_(0), @@ -1186,6 +1188,11 @@ Status DBImpl::Recover( versions_->LastSequence()); } + for (auto cfd : *versions_->GetColumnFamilySet()) { + max_total_in_memory_state_ += cfd->options()->write_buffer_size * + cfd->options()->max_write_buffer_number; + } + return s; } @@ -1434,9 +1441,6 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, cfd->GetName().c_str(), (unsigned long)meta.number, (unsigned long)meta.file_size, s.ToString().c_str()); - Version::LevelSummaryStorage tmp; - Log(options_.info_log, "[%s] Level summary: %s\n", cfd->GetName().c_str(), - cfd->current()->LevelSummary(&tmp)); if (!options_.disableDataSync) { db_directory_->Fsync(); } @@ -1536,14 +1540,19 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, if (madeProgress) { *madeProgress = 1; } + Version::LevelSummaryStorage tmp; + LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), + cfd->current()->LevelSummary(&tmp)); MaybeScheduleLogDBDeployStats(); if (disable_delete_obsolete_files_ == 0) { // add to deletion state while (alive_log_files_.size() && - *alive_log_files_.begin() < versions_->MinLogNumber()) { - deletion_state.log_delete_files.push_back(*alive_log_files_.begin()); + alive_log_files_.begin()->number < versions_->MinLogNumber()) { + const auto& earliest = *alive_log_files_.begin(); + deletion_state.log_delete_files.push_back(earliest.number); + total_log_size_ -= earliest.size; alive_log_files_.pop_front(); } } @@ -3420,6 +3429,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); Log(options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); + max_total_in_memory_state_ += cfd->options()->write_buffer_size * + cfd->options()->max_write_buffer_number; } else { Log(options_.info_log, "Creating column family [%s] FAILED -- %s", column_family_name.c_str(), s.ToString().c_str()); @@ -3451,6 +3462,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { if (s.ok()) { assert(cfd->IsDropped()); + max_total_in_memory_state_ -= cfd->options()->write_buffer_size * + cfd->options()->max_write_buffer_number; Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); // Flush the memtables. This will make all WAL files referencing dropped // column family to be obsolete. They will be deleted once user deletes @@ -3631,6 +3644,19 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1); } + int64_t flush_column_family_if_log_file = 0; + uint64_t max_total_wal_size = (options_.max_total_wal_size == 0) + ? 2 * max_total_in_memory_state_ + : options_.max_total_wal_size; + if (alive_log_files_.begin()->getting_flushed == false && + total_log_size_ > max_total_wal_size) { + flush_column_family_if_log_file = alive_log_files_.begin()->number; + alive_log_files_.begin()->getting_flushed = true; + Log(options_.info_log, + "Flushing all column families with data in WAL number %" PRIu64, + flush_column_family_if_log_file); + } + Status status; // refcounting cfd in iteration bool dead_cfd = false; @@ -3638,8 +3664,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { autovector logs_to_free; for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->Ref(); + bool force_flush = my_batch == nullptr || + (flush_column_family_if_log_file != 0 && + cfd->GetLogNumber() <= flush_column_family_if_log_file); // May temporarily unlock and wait. - status = MakeRoomForWrite(cfd, my_batch == nullptr, &superversions_to_free, + status = MakeRoomForWrite(cfd, force_flush, &superversions_to_free, &logs_to_free); if (cfd->Unref()) { dead_cfd = true; @@ -3693,6 +3722,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { PERF_TIMER_START(write_wal_time); Slice log_entry = WriteBatchInternal::Contents(updates); status = log_->AddRecord(log_entry); + total_log_size_ += log_entry.size(); + alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1); RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size()); @@ -4023,7 +4054,7 @@ Status DBImpl::MakeRoomForWrite( logs_to_free->push_back(log_.release()); log_.reset(new_log); log_empty_ = true; - alive_log_files_.push_back(logfile_number_); + alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); for (auto cfd : *versions_->GetColumnFamilySet()) { // all this is just optimization to delete logs that // are no longer needed -- if CF is empty, that means it @@ -4415,7 +4446,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, for (auto cfd : *impl->versions_->GetColumnFamilySet()) { delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_); } - impl->alive_log_files_.push_back(impl->logfile_number_); + impl->alive_log_files_.push_back( + DBImpl::LogFileNumberSize(impl->logfile_number_)); impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); impl->MaybeScheduleLogDBDeployStats(); diff --git a/db/db_impl.h b/db/db_impl.h index 915c0997b..cc59cfd7c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -455,7 +455,19 @@ class DBImpl : public DB { bool log_empty_; ColumnFamilyHandleImpl* default_cf_handle_; unique_ptr column_family_memtables_; - std::deque alive_log_files_; + struct LogFileNumberSize { + explicit LogFileNumberSize(uint64_t _number) + : number(_number), size(0), getting_flushed(false) {} + void AddSize(uint64_t new_size) { size += new_size; } + uint64_t number; + uint64_t size; + bool getting_flushed; + }; + std::deque alive_log_files_; + uint64_t total_log_size_; + // only used for dynamically adjusting max_total_wal_size. it is a sum of + // [write_buffer_size * max_write_buffer_number] over all column families + uint64_t max_total_in_memory_state_; std::string host_name_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index abcff6bb2..541e0d111 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -585,6 +585,14 @@ struct DBOptions { // Default: 5000 int max_open_files; + // Once write-ahead logs exceed this size, we will start forcing the flush of + // column families whose memtables are backed by the oldest live WAL file + // (i.e. the ones that are causing all the space amplification). If set to 0 + // (default), we will dynamically choose the WAL size limit to be + // [sum of all write_buffer_size * max_write_buffer_number] * 2 + // Default: 0 + uint64_t max_total_wal_size; + // If non-null, then we should collect metrics about database operations // Statistics objects should not be shared between DB instances as // it does not use any locks to prevent concurrent updates. diff --git a/util/options.cc b/util/options.cc index 2ed82cd37..c8d1e3889 100644 --- a/util/options.cc +++ b/util/options.cc @@ -9,6 +9,8 @@ #include "rocksdb/options.h" +#define __STDC_FORMAT_MACROS +#include #include #include "rocksdb/cache.h" @@ -160,6 +162,7 @@ DBOptions::DBOptions() info_log(nullptr), info_log_level(INFO_LEVEL), max_open_files(5000), + max_total_wal_size(0), statistics(nullptr), disableDataSync(false), use_fsync(false), @@ -198,6 +201,7 @@ DBOptions::DBOptions(const Options& options) info_log(options.info_log), info_log_level(options.info_log_level), max_open_files(options.max_open_files), + max_total_wal_size(options.max_total_wal_size), statistics(options.statistics), disableDataSync(options.disableDataSync), use_fsync(options.use_fsync), @@ -241,6 +245,7 @@ void DBOptions::Dump(Logger* log) const { Log(log," Options.env: %p", env); Log(log," Options.info_log: %p", info_log.get()); Log(log," Options.max_open_files: %d", max_open_files); + Log(log," Options.max_total_wal_size: %" PRIu64, max_total_wal_size); Log(log, " Options.disableDataSync: %d", disableDataSync); Log(log, " Options.use_fsync: %d", use_fsync); Log(log, " Options.max_log_file_size: %zu", max_log_file_size);