From c2be2cba040c9983690b13033055461f88f760df Mon Sep 17 00:00:00 2001 From: shamdor Date: Wed, 6 Nov 2013 18:46:28 -0800 Subject: [PATCH] WAL log retention policy based on archive size. Summary: Archive cleaning will still happen every WAL_ttl seconds but archived logs will be deleted only if archive size is greater then a WAL_size_limit value. Empty archived logs will be deleted evety WAL_ttl. Test Plan: 1. Unit tests pass. 2. Benchmark. Reviewers: emayanke, dhruba, haobo, sdong, kailiu, igor Reviewed By: emayanke CC: leveldb Differential Revision: https://reviews.facebook.net/D13869 --- db/c.cc | 5 ++ db/db_bench.cc | 7 +- db/db_impl.cc | 141 +++++++++++++++++++++++++----- db/db_impl.h | 9 ++ db/db_test.cc | 77 ++++++++++++---- db/deletefile_test.cc | 1 + include/rocksdb/db.h | 7 +- include/rocksdb/options.h | 21 +++-- include/rocksdb/transaction_log.h | 4 +- tools/db_repl_stress.cc | 10 ++- util/options.cc | 3 + 11 files changed, 227 insertions(+), 58 deletions(-) diff --git a/db/c.cc b/db/c.cc index 7903b7ca3..0d99c44dd 100644 --- a/db/c.cc +++ b/db/c.cc @@ -554,6 +554,11 @@ void leveldb_options_set_WAL_ttl_seconds(leveldb_options_t* opt, uint64_t ttl) { opt->rep.WAL_ttl_seconds = ttl; } +void leveldb_options_set_WAL_size_limit_MB( + leveldb_options_t* opt, uint64_t limit) { + opt->rep.WAL_size_limit_MB = limit; +} + leveldb_comparator_t* leveldb_comparator_create( void* state, void (*destructor)(void*), diff --git a/db/db_bench.cc b/db/db_bench.cc index e322fdef5..ba8e27fa7 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -397,7 +397,9 @@ DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for" " a compaction run that compacts Level-K with Level-(K+1) (for" " K >= 1)"); -DEFINE_uint64(wal_ttl, 0, "Set the TTL for the WAL Files in seconds."); +DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds."); +DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files" + " in MB."); DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer, "Allow buffered io using OS buffers"); @@ -1352,7 +1354,8 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type_e; - options.WAL_ttl_seconds = FLAGS_wal_ttl; + options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; + options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; if (FLAGS_min_level_to_compress >= 0) { assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); options.compression_per_level.resize(FLAGS_num_levels); diff --git a/db/db_impl.cc b/db/db_impl.cc index 32985a9ca..dd526392b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -264,6 +264,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) delete_obsolete_files_last_run_(0), purge_wal_files_last_run_(0), last_stats_dump_time_microsec_(0), + default_interval_to_delete_obsolete_WAL_(600), stall_level0_slowdown_(0), stall_memtable_compaction_(0), stall_level0_num_files_(0), @@ -407,7 +408,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const { } const Status DBImpl::CreateArchivalDirectory() { - if (options_.WAL_ttl_seconds > 0) { + if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) { std::string archivalPath = ArchivalDirectory(options_.wal_dir); return env_->CreateDirIfMissing(archivalPath); } @@ -494,7 +495,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { Status DBImpl::DeleteLogFile(uint64_t number) { Status s; auto filename = LogFileName(options_.wal_dir, number); - if (options_.WAL_ttl_seconds > 0) { + if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) { s = env_->RenameFile(filename, ArchivedLogFileName(options_.wal_dir, number)); @@ -613,34 +614,128 @@ void DBImpl::DeleteObsoleteFiles() { EvictObsoleteFiles(deletion_state); } +// 1. Go through all archived files and +// a. if ttl is enabled, delete outdated files +// b. if archive size limit is enabled, delete empty files, +// compute file number and size. +// 2. If size limit is enabled: +// a. compute how many files should be deleted +// b. get sorted non-empty archived logs +// c. delete what should be deleted void DBImpl::PurgeObsoleteWALFiles() { + bool const ttl_enabled = options_.WAL_ttl_seconds > 0; + bool const size_limit_enabled = options_.WAL_size_limit_MB > 0; + if (!ttl_enabled && !size_limit_enabled) { + return; + } + int64_t current_time; Status s = env_->GetCurrentTime(¤t_time); - uint64_t now_seconds = static_cast(current_time); - assert(s.ok()); + if (!s.ok()) { + Log(options_.info_log, "Can't get current time: %s", s.ToString().c_str()); + assert(false); + return; + } + uint64_t const now_seconds = static_cast(current_time); + uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ? + options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_; - if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) { - if (purge_wal_files_last_run_ + options_.WAL_ttl_seconds > now_seconds) { - return; - } - std::vector wal_files; - std::string archival_dir = ArchivalDirectory(options_.wal_dir); - env_->GetChildren(archival_dir, &wal_files); - for (const auto& f : wal_files) { - uint64_t file_m_time; - const std::string file_path = archival_dir + "/" + f; - const Status s = env_->GetFileModificationTime(file_path, &file_m_time); - if (s.ok() && (now_seconds - file_m_time > options_.WAL_ttl_seconds)) { - Status status = env_->DeleteFile(file_path); - if (!status.ok()) { - Log(options_.info_log, - "Failed Deleting a WAL file Error : i%s", - status.ToString().c_str()); + if (purge_wal_files_last_run_ + time_to_check > now_seconds) { + return; + } + + purge_wal_files_last_run_ = now_seconds; + + std::string archival_dir = ArchivalDirectory(options_.wal_dir); + std::vector files; + s = env_->GetChildren(archival_dir, &files); + if (!s.ok()) { + Log(options_.info_log, "Can't get archive files: %s", s.ToString().c_str()); + assert(false); + return; + } + + size_t log_files_num = 0; + uint64_t log_file_size = 0; + + for (auto& f : files) { + uint64_t number; + FileType type; + if (ParseFileName(f, &number, &type) && type == kLogFile) { + std::string const file_path = archival_dir + "/" + f; + if (ttl_enabled) { + uint64_t file_m_time; + Status const s = env_->GetFileModificationTime(file_path, + &file_m_time); + if (!s.ok()) { + Log(options_.info_log, "Can't get file mod time: %s: %s", + file_path.c_str(), s.ToString().c_str()); + continue; } - } // Ignore errors. + if (now_seconds - file_m_time > options_.WAL_ttl_seconds) { + Status const s = env_->DeleteFile(file_path); + if (!s.ok()) { + Log(options_.info_log, "Can't delete file: %s: %s", + file_path.c_str(), s.ToString().c_str()); + continue; + } + continue; + } + } + + if (size_limit_enabled) { + uint64_t file_size; + Status const s = env_->GetFileSize(file_path, &file_size); + if (!s.ok()) { + Log(options_.info_log, "Can't get file size: %s: %s", + file_path.c_str(), s.ToString().c_str()); + return; + } else { + if (file_size > 0) { + log_file_size = std::max(log_file_size, file_size); + ++log_files_num; + } else { + Status s = env_->DeleteFile(file_path); + if (!s.ok()) { + Log(options_.info_log, "Can't delete file: %s: %s", + file_path.c_str(), s.ToString().c_str()); + continue; + } + } + } + } + } + } + + if (0 == log_files_num || !size_limit_enabled) { + return; + } + + size_t const files_keep_num = options_.WAL_size_limit_MB * + 1024 * 1024 / log_file_size; + if (log_files_num <= files_keep_num) { + return; + } + + size_t files_del_num = log_files_num - files_keep_num; + VectorLogPtr archived_logs; + AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); + + if (files_del_num > archived_logs.size()) { + Log(options_.info_log, "Trying to delete more archived log files than " + "exist. Deleting all"); + files_del_num = archived_logs.size(); + } + + for (size_t i = 0; i < files_del_num; ++i) { + std::string const file_path = archived_logs[i]->PathName(); + Status const s = DeleteFile(file_path); + if (!s.ok()) { + Log(options_.info_log, "Can't delete file: %s: %s", + file_path.c_str(), s.ToString().c_str()); + continue; } } - purge_wal_files_last_run_ = now_seconds; } // If externalTable is set, then apply recovered transactions diff --git a/db/db_impl.h b/db/db_impl.h index 3b6be8373..c70ec23d4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -116,6 +116,11 @@ class DBImpl : public DB { // get total level0 file size. Only for testing. uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);} + void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL) + { + default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL; + } + protected: Env* const env_; const std::string dbname_; @@ -323,6 +328,10 @@ class DBImpl : public DB { // last time stats were dumped to LOG std::atomic last_stats_dump_time_microsec_; + // obsolete files will be deleted every this seconds if ttl deletion is + // enabled and archive size_limit is disabled. + uint64_t default_interval_to_delete_obsolete_WAL_; + // These count the number of microseconds for which MakeRoomForWrite stalls. uint64_t stall_level0_slowdown_; uint64_t stall_memtable_compaction_; diff --git a/db/db_test.cc b/db/db_test.cc index c089dcf43..35f30c0f5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3811,28 +3811,28 @@ std::vector ListLogFiles(Env* env, const std::string& path) { return std::move(log_files); } -TEST(DBTest, WALArchival) { +TEST(DBTest, WALArchivalTtl) { do { - std::string value(1024, '1'); Options options = CurrentOptions(); options.create_if_missing = true; options.WAL_ttl_seconds = 1000; DestroyAndReopen(&options); - - // TEST : Create DB with a ttl. + // TEST : Create DB with a ttl and no size limit. // Put some keys. Count the log files present in the DB just after insert. // Re-open db. Causes deletion/archival to take place. // Assert that the files moved under "/archive". + // Reopen db with small ttl. + // Assert that archive was removed. std::string archiveDir = ArchivalDirectory(dbname_); for (int i = 0; i < 10; ++i) { for (int j = 0; j < 10; ++j) { - ASSERT_OK(Put(Key(10*i+j), value)); + ASSERT_OK(Put(Key(10 * i + j), DummyString(1024))); } - std::vector logFiles = ListLogFiles(env_, dbname_); + std::vector log_files = ListLogFiles(env_, dbname_); options.create_if_missing = false; Reopen(&options); @@ -3840,37 +3840,78 @@ TEST(DBTest, WALArchival) { std::vector logs = ListLogFiles(env_, archiveDir); std::set archivedFiles(logs.begin(), logs.end()); - for (auto& log : logFiles) { + for (auto& log : log_files) { ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end()); } } - std::vector logFiles = ListLogFiles(env_, archiveDir); - ASSERT_TRUE(logFiles.size() > 0); + std::vector log_files = ListLogFiles(env_, archiveDir); + ASSERT_TRUE(log_files.size() > 0); + options.WAL_ttl_seconds = 1; - env_->SleepForMicroseconds(2*1000*1000); + env_->SleepForMicroseconds(2 * 1000 * 1000); Reopen(&options); - logFiles = ListLogFiles(env_, archiveDir); - ASSERT_TRUE(logFiles.size() == 0); + log_files = ListLogFiles(env_, archiveDir); + ASSERT_TRUE(log_files.empty()); } while (ChangeCompactOptions()); } -TEST(DBTest, WALClear) { +uint64_t GetLogDirSize(std::string dir_path, SpecialEnv* env) { + uint64_t dir_size = 0; + std::vector files; + env->GetChildren(dir_path, &files); + for (auto& f : files) { + uint64_t number; + FileType type; + if (ParseFileName(f, &number, &type) && type == kLogFile) { + std::string const file_path = dir_path + "/" + f; + uint64_t file_size; + env->GetFileSize(file_path, &file_size); + dir_size += file_size; + } + } + return dir_size; +} + +TEST(DBTest, WALArchivalSizeLimit) { do { Options options = CurrentOptions(); options.create_if_missing = true; - options.WAL_ttl_seconds = 1; + options.WAL_ttl_seconds = 0; + options.WAL_size_limit_MB = 1000; + + // TEST : Create DB with huge size limit and no ttl. + // Put some keys. Count the archived log files present in the DB + // just after insert. Assert that there are many enough. + // Change size limit. Re-open db. + // Assert that archive is not greater than WAL_size_limit_MB. + // Set ttl and time_to_check_ to small values. Re-open db. + // Assert that there are no archived logs left. - for (int j = 0; j < 10; ++j) - for (int i = 0; i < 10; ++i) - ASSERT_OK(Put(Key(10*i+j), DummyString(1024))); + DestroyAndReopen(&options); + for (int i = 0; i < 128 * 128; ++i) { + ASSERT_OK(Put(Key(i), DummyString(1024))); + } Reopen(&options); + std::string archive_dir = ArchivalDirectory(dbname_); std::vector log_files = ListLogFiles(env_, archive_dir); - ASSERT_TRUE(!log_files.empty()); + ASSERT_TRUE(log_files.size() > 2); + + options.WAL_size_limit_MB = 8; + Reopen(&options); + dbfull()->TEST_PurgeObsoleteteWAL(); + + uint64_t archive_size = GetLogDirSize(archive_dir, env_); + ASSERT_TRUE(archive_size <= options.WAL_size_limit_MB * 1024 * 1024); + + options.WAL_ttl_seconds = 1; + dbfull()->TEST_SetDefaultTimeToCheck(1); env_->SleepForMicroseconds(2 * 1000 * 1000); + Reopen(&options); dbfull()->TEST_PurgeObsoleteteWAL(); + log_files = ListLogFiles(env_, archive_dir); ASSERT_TRUE(log_files.empty()); } while (ChangeCompactOptions()); diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index a864716cc..324c8c69d 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -38,6 +38,7 @@ class DeleteFileTest { options_.target_file_size_base = 1024*1024*1000; options_.max_bytes_for_level_base = 1024*1024*1000; options_.WAL_ttl_seconds = 300; // Used to test log files + options_.WAL_size_limit_MB = 1024; // Used to test log files dbname_ = test::TmpDir() + "/deletefile_test"; DestroyDB(dbname_, options_); numlevels_ = 7; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 25614a91c..3d7bc3ab6 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -264,9 +264,10 @@ class DB { // seq_number. If the sequence number is non existent, it returns an iterator // at the first available seq_no after the requested seq_no // Returns Status::Ok if iterator is valid - // Must set WAL_ttl_seconds to a large value to use this api, else the WAL - // files will get cleared aggressively and the iterator might keep getting - // invalid before an update is read. + // Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to + // use this api, else the WAL files will get + // cleared aggressively and the iterator might keep getting invalid before + // an update is read. virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter) = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 055019e76..08f6cf0c2 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -489,15 +489,20 @@ struct Options { // be issued on this database. bool disable_auto_compactions; - // The number of seconds a WAL(write ahead log) should be kept after it has - // been marked as Not Live. If the value is set. The WAL files are moved to - // the archive directory and deleted after the given TTL. - // If set to 0, WAL files are deleted as soon as they are not required by - // the database. - // If set to std::numeric_limits::max() the WAL files will never be - // deleted. - // Default : 0 + // The following two fields affect how archived logs will be deleted. + // 1. If both set to 0, logs will be deleted asap and will not get into + // the archive. + // 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, + // WAL files will be checked every 10 min and if total size is greater + // then WAL_size_limit_MB, they will be deleted starting with the + // earliest until size_limit is met. All empty files will be deleted. + // 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then + // WAL files will be checked every WAL_ttl_secondsi / 2 and those which + // are older than WAL_ttl_seconds will be deleted. + // 4. If both are not 0, WAL files will be checked every 10 min and both + // checks will be performed with ttl being first. uint64_t WAL_ttl_seconds; + uint64_t WAL_size_limit_MB; // Number of bytes to preallocate (via fallocate) the manifest // files. Default is 4mb, which is reasonable to reduce random IO diff --git a/include/rocksdb/transaction_log.h b/include/rocksdb/transaction_log.h index e74980af6..4da83d021 100644 --- a/include/rocksdb/transaction_log.h +++ b/include/rocksdb/transaction_log.h @@ -16,7 +16,9 @@ typedef std::vector> VectorLogPtr; enum WalFileType { /* Indicates that WAL file is in archive directory. WAL files are moved from * the main db directory to archive directory once they are not live and stay - * there for a duration of WAL_ttl_seconds which can be set in Options + * there until cleaned up. Files are cleaned depending on archive size + * (Options::WAL_size_limit_MB) and time since last cleaning + * (Options::WAL_ttl_seconds). */ kArchivedLogFile = 0, diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 20fe40f3e..c22d31eb1 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -80,11 +80,14 @@ static void ReplicationThreadBody(void* arg) { DEFINE_uint64(num_inserts, 1000, "the num of inserts the first thread should" " perform."); -DEFINE_uint64(wal_ttl, 1000, "the wal ttl for the run(in seconds)"); +DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)"); +DEFINE_uint64(wal_size_limit_MB, 10, "the wal size limit for the run" + "(in MB)"); int main(int argc, const char** argv) { google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + - " --num_inserts= --wal_ttl="); + " --num_inserts= --wal_ttl_seconds=" + + " --wal_size_limit_MB="); google::ParseCommandLineFlags(&argc, const_cast(&argv), true); Env* env = Env::Default(); @@ -93,7 +96,8 @@ int main(int argc, const char** argv) { default_db_path += "db_repl_stress"; Options options; options.create_if_missing = true; - options.WAL_ttl_seconds = FLAGS_wal_ttl; + options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; + options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; DB* db; DestroyDB(default_db_path, options); diff --git a/util/options.cc b/util/options.cc index 170a8ae55..3952f45fa 100644 --- a/util/options.cc +++ b/util/options.cc @@ -77,6 +77,7 @@ Options::Options() arena_block_size(0), disable_auto_compactions(false), WAL_ttl_seconds(0), + WAL_size_limit_MB(0), manifest_preallocation_size(4 * 1024 * 1024), purge_redundant_kvs_while_flush(true), allow_os_buffer(true), @@ -237,6 +238,8 @@ Options::Dump(Logger* log) const disable_auto_compactions); Log(log," Options.WAL_ttl_seconds: %ld", WAL_ttl_seconds); + Log(log," Options.WAL_size_limit_MB: %ld", + WAL_size_limit_MB); Log(log," Options.manifest_preallocation_size: %ld", manifest_preallocation_size); Log(log," Options.purge_redundant_kvs_while_flush: %d",