From 326670d265f7f161878da2931c2a54fa497d8a32 Mon Sep 17 00:00:00 2001 From: Mark Callaghan Date: Wed, 17 Mar 2021 09:10:24 -0700 Subject: [PATCH] Add new db_bench --benchmarks options for controlling compaction (#8027) Summary: The new options are: * compact0 - compact L0 into L1 using one thread * compact1 - compact L1 into L2 using one thread * flush - flush memtable * waitforcompaction - wait for compaction to finish These are useful for reproducible benchmarks to help get the LSM tree shape into a deterministic state. I wrote about this at: http://smalldatum.blogspot.com/2021/02/read-only-benchmarks-with-lsm-are.html Pull Request resolved: https://github.com/facebook/rocksdb/pull/8027 Reviewed By: riversand963 Differential Revision: D27053861 Pulled By: ajkr fbshipit-source-id: 1646f35584a3db03740fbeb47d91c3f00fb35d6e --- HISTORY.md | 1 + tools/db_bench_tool.cc | 216 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 217 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index f769af68b..9f2d840d4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ ### New Features * Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. * Add support to extend retrieval of checksums for blob files from the MANIFEST when checkpointing. During backup, rocksdb can detect corruption in blob files during file copies. +* Add new options for db_bench --benchmarks: flush, waitforcompaction, compact0, compact1. * Add an option to BackupEngine::GetBackupInfo to include the name and size of each backed-up file. Especially in the presence of file sharing among backups, this offers detailed insight into backup space usage. * Enable backward iteration on keys with user-defined timestamps. diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 9630b779c..aa8d4754e 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -107,6 +107,12 @@ DEFINE_string( "readreverse," "compact," "compactall," + "flush," +#ifndef ROCKSDB_LITE + "compact0," + "compact1," + "waitforcompaction," +#endif "multireadrandom," "mixgraph," "readseq," @@ -196,9 +202,16 @@ DEFINE_string( "Meta operations:\n" "\tcompact -- Compact the entire DB; If multiple, randomly choose one\n" "\tcompactall -- Compact the entire DB\n" +#ifndef ROCKSDB_LITE + "\tcompact0 -- compact L0 into L1\n" + "\tcompact1 -- compact L1 into L2\n" + "\twaitforcompaction - pause until compaction is (probably) done\n" +#endif + "\tflush - flush the memtable\n" "\tstats -- Print DB stats\n" "\tresetstats -- Reset DB stats\n" "\tlevelstats -- Print the number of files and bytes per level\n" + "\tmemstats -- Print memtable stats\n" "\tsstables -- Print sstable info\n" "\theapprofile -- Dump a heap profile (if supported by this port)\n" "\treplay -- replay the trace file specified with trace_file\n" @@ -3210,6 +3223,16 @@ class Benchmark { method = &Benchmark::Compact; } else if (name == "compactall") { CompactAll(); +#ifndef ROCKSDB_LITE + } else if (name == "compact0") { + CompactLevel(0); + } else if (name == "compact1") { + CompactLevel(1); + } else if (name == "waitforcompaction") { + WaitForCompaction(); +#endif + } else if (name == "flush") { + Flush(); } else if (name == "crc32c") { method = &Benchmark::Crc32c; } else if (name == "xxhash") { @@ -3245,6 +3268,14 @@ class Benchmark { VerifyDBFromDB(FLAGS_truth_db); } else if (name == "levelstats") { PrintStats("rocksdb.levelstats"); + } else if (name == "memstats") { + std::vector keys{"rocksdb.num-immutable-mem-table", + "rocksdb.cur-size-active-mem-table", + "rocksdb.cur-size-all-mem-tables", + "rocksdb.size-all-mem-tables", + "rocksdb.num-entries-active-mem-table", + "rocksdb.num-entries-imm-mem-tables"}; + PrintStats(keys); } else if (name == "sstables") { PrintStats("rocksdb.sstables"); } else if (name == "stats_history") { @@ -7259,6 +7290,167 @@ class Benchmark { } } +#ifndef ROCKSDB_LITE + void WaitForCompactionHelper(DBWithColumnFamilies& db) { + // This is an imperfect way of waiting for compaction. The loop and sleep + // is done because a thread that finishes a compaction job should get a + // chance to pickup a new compaction job. + + std::vector keys = {DB::Properties::kMemTableFlushPending, + DB::Properties::kNumRunningFlushes, + DB::Properties::kCompactionPending, + DB::Properties::kNumRunningCompactions}; + + fprintf(stdout, "waitforcompaction(%s): started\n", + db.db->GetName().c_str()); + + while (true) { + bool retry = false; + + for (const auto& k : keys) { + uint64_t v; + if (!db.db->GetIntProperty(k, &v)) { + fprintf(stderr, "waitforcompaction(%s): GetIntProperty(%s) failed\n", + db.db->GetName().c_str(), k.c_str()); + exit(1); + } else if (v > 0) { + fprintf(stdout, + "waitforcompaction(%s): active(%s). Sleep 10 seconds\n", + db.db->GetName().c_str(), k.c_str()); + sleep(10); + retry = true; + break; + } + } + + if (!retry) { + fprintf(stdout, "waitforcompaction(%s): finished\n", + db.db->GetName().c_str()); + return; + } + } + } + + void WaitForCompaction() { + // Give background threads a chance to wake + sleep(5); + + // I am skeptical that this check race free. I hope that checking twice + // reduces the chance. + if (db_.db != nullptr) { + WaitForCompactionHelper(db_); + WaitForCompactionHelper(db_); + } else { + for (auto& db_with_cfh : multi_dbs_) { + WaitForCompactionHelper(db_with_cfh); + WaitForCompactionHelper(db_with_cfh); + } + } + } + + bool CompactLevelHelper(DBWithColumnFamilies& db_with_cfh, int from_level) { + std::vector files; + db_with_cfh.db->GetLiveFilesMetaData(&files); + + assert(from_level == 0 || from_level == 1); + + int real_from_level = from_level; + if (real_from_level > 0) { + // With dynamic leveled compaction the first level with data beyond L0 + // might not be L1. + real_from_level = std::numeric_limits::max(); + + for (auto& f : files) { + if (f.level > 0 && f.level < real_from_level) real_from_level = f.level; + } + + if (real_from_level == std::numeric_limits::max()) { + fprintf(stdout, "compact%d found 0 files to compact\n", from_level); + return true; + } + } + + // The goal is to compact from from_level to the level that follows it, + // and with dynamic leveled compaction the next level might not be + // real_from_level+1 + int next_level = std::numeric_limits::max(); + + std::vector files_to_compact; + for (auto& f : files) { + if (f.level == real_from_level) + files_to_compact.push_back(f.name); + else if (f.level > real_from_level && f.level < next_level) + next_level = f.level; + } + + if (files_to_compact.empty()) { + fprintf(stdout, "compact%d found 0 files to compact\n", from_level); + return true; + } else if (next_level == std::numeric_limits::max()) { + // There is no data beyond real_from_level. So we are done. + fprintf(stdout, "compact%d found no data beyond L%d\n", from_level, + real_from_level); + return true; + } + + fprintf(stdout, "compact%d found %d files to compact from L%d to L%d\n", + from_level, static_cast(files_to_compact.size()), + real_from_level, next_level); + + ROCKSDB_NAMESPACE::CompactionOptions options; + // Lets RocksDB use the configured compression for this level + options.compression = ROCKSDB_NAMESPACE::kDisableCompressionOption; + + ROCKSDB_NAMESPACE::ColumnFamilyDescriptor cfDesc; + db_with_cfh.db->DefaultColumnFamily()->GetDescriptor(&cfDesc); + options.output_file_size_limit = cfDesc.options.target_file_size_base; + + Status status = + db_with_cfh.db->CompactFiles(options, files_to_compact, next_level); + if (!status.ok()) { + // This can fail for valid reasons including the operation was aborted + // or a filename is invalid because background compaction removed it. + // Having read the current cases for which an error is raised I prefer + // not to figure out whether an exception should be thrown here. + fprintf(stderr, "compact%d CompactFiles failed: %s\n", from_level, + status.ToString().c_str()); + return false; + } + return true; + } + + void CompactLevel(int from_level) { + if (db_.db != nullptr) { + while (!CompactLevelHelper(db_, from_level)) WaitForCompaction(); + } + for (auto& db_with_cfh : multi_dbs_) { + while (!CompactLevelHelper(db_with_cfh, from_level)) WaitForCompaction(); + } + } +#endif + + void Flush() { + FlushOptions flush_opt; + flush_opt.wait = true; + + if (db_.db != nullptr) { + Status s = db_.db->Flush(flush_opt, db_.cfh); + if (!s.ok()) { + fprintf(stderr, "Flush failed: %s\n", s.ToString().c_str()); + exit(1); + } + } else { + for (const auto& db_with_cfh : multi_dbs_) { + Status s = db_with_cfh.db->Flush(flush_opt, db_with_cfh.cfh); + if (!s.ok()) { + fprintf(stderr, "Flush failed: %s\n", s.ToString().c_str()); + exit(1); + } + } + } + fprintf(stdout, "flush memtable\n"); + } + void ResetStats() { if (db_.db != nullptr) { db_.db->ResetStats(); @@ -7321,6 +7513,30 @@ class Benchmark { fprintf(stdout, "\n%s\n", stats.c_str()); } + void PrintStats(const std::vector& keys) { + if (db_.db != nullptr) { + PrintStats(db_.db, keys); + } + for (const auto& db_with_cfh : multi_dbs_) { + PrintStats(db_with_cfh.db, keys, true); + } + } + + void PrintStats(DB* db, const std::vector& keys, + bool print_header = false) { + if (print_header) { + fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str()); + } + + for (const auto& key : keys) { + std::string stats; + if (!db->GetProperty(key, &stats)) { + stats = "(failed)"; + } + fprintf(stdout, "%s: %s\n", key.c_str(), stats.c_str()); + } + } + void Replay(ThreadState* thread) { if (db_.db != nullptr) { Replay(thread, &db_);