Add new db_bench --benchmarks options for controlling compaction (#8027)

Summary:
The new options are:
* compact0 - compact L0 into L1 using one thread
* compact1 - compact L1 into L2 using one thread
* flush - flush memtable
* waitforcompaction - wait for compaction to finish

These are useful for reproducible benchmarks to help get the LSM tree shape
into a deterministic state. I wrote about this at:
http://smalldatum.blogspot.com/2021/02/read-only-benchmarks-with-lsm-are.html

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8027

Reviewed By: riversand963

Differential Revision: D27053861

Pulled By: ajkr

fbshipit-source-id: 1646f35584a3db03740fbeb47d91c3f00fb35d6e
main
Mark Callaghan 4 years ago committed by Facebook GitHub Bot
parent 8d9088464b
commit 326670d265
  1. 1
      HISTORY.md
  2. 216
      tools/db_bench_tool.cc

@ -17,6 +17,7 @@
### New Features
* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision.
* Add support to extend retrieval of checksums for blob files from the MANIFEST when checkpointing. During backup, rocksdb can detect corruption in blob files during file copies.
* Add new options for db_bench --benchmarks: flush, waitforcompaction, compact0, compact1.
* Add an option to BackupEngine::GetBackupInfo to include the name and size of each backed-up file. Especially in the presence of file sharing among backups, this offers detailed insight into backup space usage.
* Enable backward iteration on keys with user-defined timestamps.

@ -107,6 +107,12 @@ DEFINE_string(
"readreverse,"
"compact,"
"compactall,"
"flush,"
#ifndef ROCKSDB_LITE
"compact0,"
"compact1,"
"waitforcompaction,"
#endif
"multireadrandom,"
"mixgraph,"
"readseq,"
@ -196,9 +202,16 @@ DEFINE_string(
"Meta operations:\n"
"\tcompact -- Compact the entire DB; If multiple, randomly choose one\n"
"\tcompactall -- Compact the entire DB\n"
#ifndef ROCKSDB_LITE
"\tcompact0 -- compact L0 into L1\n"
"\tcompact1 -- compact L1 into L2\n"
"\twaitforcompaction - pause until compaction is (probably) done\n"
#endif
"\tflush - flush the memtable\n"
"\tstats -- Print DB stats\n"
"\tresetstats -- Reset DB stats\n"
"\tlevelstats -- Print the number of files and bytes per level\n"
"\tmemstats -- Print memtable stats\n"
"\tsstables -- Print sstable info\n"
"\theapprofile -- Dump a heap profile (if supported by this port)\n"
"\treplay -- replay the trace file specified with trace_file\n"
@ -3210,6 +3223,16 @@ class Benchmark {
method = &Benchmark::Compact;
} else if (name == "compactall") {
CompactAll();
#ifndef ROCKSDB_LITE
} else if (name == "compact0") {
CompactLevel(0);
} else if (name == "compact1") {
CompactLevel(1);
} else if (name == "waitforcompaction") {
WaitForCompaction();
#endif
} else if (name == "flush") {
Flush();
} else if (name == "crc32c") {
method = &Benchmark::Crc32c;
} else if (name == "xxhash") {
@ -3245,6 +3268,14 @@ class Benchmark {
VerifyDBFromDB(FLAGS_truth_db);
} else if (name == "levelstats") {
PrintStats("rocksdb.levelstats");
} else if (name == "memstats") {
std::vector<std::string> keys{"rocksdb.num-immutable-mem-table",
"rocksdb.cur-size-active-mem-table",
"rocksdb.cur-size-all-mem-tables",
"rocksdb.size-all-mem-tables",
"rocksdb.num-entries-active-mem-table",
"rocksdb.num-entries-imm-mem-tables"};
PrintStats(keys);
} else if (name == "sstables") {
PrintStats("rocksdb.sstables");
} else if (name == "stats_history") {
@ -7259,6 +7290,167 @@ class Benchmark {
}
}
#ifndef ROCKSDB_LITE
void WaitForCompactionHelper(DBWithColumnFamilies& db) {
// This is an imperfect way of waiting for compaction. The loop and sleep
// is done because a thread that finishes a compaction job should get a
// chance to pickup a new compaction job.
std::vector<std::string> keys = {DB::Properties::kMemTableFlushPending,
DB::Properties::kNumRunningFlushes,
DB::Properties::kCompactionPending,
DB::Properties::kNumRunningCompactions};
fprintf(stdout, "waitforcompaction(%s): started\n",
db.db->GetName().c_str());
while (true) {
bool retry = false;
for (const auto& k : keys) {
uint64_t v;
if (!db.db->GetIntProperty(k, &v)) {
fprintf(stderr, "waitforcompaction(%s): GetIntProperty(%s) failed\n",
db.db->GetName().c_str(), k.c_str());
exit(1);
} else if (v > 0) {
fprintf(stdout,
"waitforcompaction(%s): active(%s). Sleep 10 seconds\n",
db.db->GetName().c_str(), k.c_str());
sleep(10);
retry = true;
break;
}
}
if (!retry) {
fprintf(stdout, "waitforcompaction(%s): finished\n",
db.db->GetName().c_str());
return;
}
}
}
void WaitForCompaction() {
// Give background threads a chance to wake
sleep(5);
// I am skeptical that this check race free. I hope that checking twice
// reduces the chance.
if (db_.db != nullptr) {
WaitForCompactionHelper(db_);
WaitForCompactionHelper(db_);
} else {
for (auto& db_with_cfh : multi_dbs_) {
WaitForCompactionHelper(db_with_cfh);
WaitForCompactionHelper(db_with_cfh);
}
}
}
bool CompactLevelHelper(DBWithColumnFamilies& db_with_cfh, int from_level) {
std::vector<LiveFileMetaData> files;
db_with_cfh.db->GetLiveFilesMetaData(&files);
assert(from_level == 0 || from_level == 1);
int real_from_level = from_level;
if (real_from_level > 0) {
// With dynamic leveled compaction the first level with data beyond L0
// might not be L1.
real_from_level = std::numeric_limits<int>::max();
for (auto& f : files) {
if (f.level > 0 && f.level < real_from_level) real_from_level = f.level;
}
if (real_from_level == std::numeric_limits<int>::max()) {
fprintf(stdout, "compact%d found 0 files to compact\n", from_level);
return true;
}
}
// The goal is to compact from from_level to the level that follows it,
// and with dynamic leveled compaction the next level might not be
// real_from_level+1
int next_level = std::numeric_limits<int>::max();
std::vector<std::string> files_to_compact;
for (auto& f : files) {
if (f.level == real_from_level)
files_to_compact.push_back(f.name);
else if (f.level > real_from_level && f.level < next_level)
next_level = f.level;
}
if (files_to_compact.empty()) {
fprintf(stdout, "compact%d found 0 files to compact\n", from_level);
return true;
} else if (next_level == std::numeric_limits<int>::max()) {
// There is no data beyond real_from_level. So we are done.
fprintf(stdout, "compact%d found no data beyond L%d\n", from_level,
real_from_level);
return true;
}
fprintf(stdout, "compact%d found %d files to compact from L%d to L%d\n",
from_level, static_cast<int>(files_to_compact.size()),
real_from_level, next_level);
ROCKSDB_NAMESPACE::CompactionOptions options;
// Lets RocksDB use the configured compression for this level
options.compression = ROCKSDB_NAMESPACE::kDisableCompressionOption;
ROCKSDB_NAMESPACE::ColumnFamilyDescriptor cfDesc;
db_with_cfh.db->DefaultColumnFamily()->GetDescriptor(&cfDesc);
options.output_file_size_limit = cfDesc.options.target_file_size_base;
Status status =
db_with_cfh.db->CompactFiles(options, files_to_compact, next_level);
if (!status.ok()) {
// This can fail for valid reasons including the operation was aborted
// or a filename is invalid because background compaction removed it.
// Having read the current cases for which an error is raised I prefer
// not to figure out whether an exception should be thrown here.
fprintf(stderr, "compact%d CompactFiles failed: %s\n", from_level,
status.ToString().c_str());
return false;
}
return true;
}
void CompactLevel(int from_level) {
if (db_.db != nullptr) {
while (!CompactLevelHelper(db_, from_level)) WaitForCompaction();
}
for (auto& db_with_cfh : multi_dbs_) {
while (!CompactLevelHelper(db_with_cfh, from_level)) WaitForCompaction();
}
}
#endif
void Flush() {
FlushOptions flush_opt;
flush_opt.wait = true;
if (db_.db != nullptr) {
Status s = db_.db->Flush(flush_opt, db_.cfh);
if (!s.ok()) {
fprintf(stderr, "Flush failed: %s\n", s.ToString().c_str());
exit(1);
}
} else {
for (const auto& db_with_cfh : multi_dbs_) {
Status s = db_with_cfh.db->Flush(flush_opt, db_with_cfh.cfh);
if (!s.ok()) {
fprintf(stderr, "Flush failed: %s\n", s.ToString().c_str());
exit(1);
}
}
}
fprintf(stdout, "flush memtable\n");
}
void ResetStats() {
if (db_.db != nullptr) {
db_.db->ResetStats();
@ -7321,6 +7513,30 @@ class Benchmark {
fprintf(stdout, "\n%s\n", stats.c_str());
}
void PrintStats(const std::vector<std::string>& keys) {
if (db_.db != nullptr) {
PrintStats(db_.db, keys);
}
for (const auto& db_with_cfh : multi_dbs_) {
PrintStats(db_with_cfh.db, keys, true);
}
}
void PrintStats(DB* db, const std::vector<std::string>& keys,
bool print_header = false) {
if (print_header) {
fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
}
for (const auto& key : keys) {
std::string stats;
if (!db->GetProperty(key, &stats)) {
stats = "(failed)";
}
fprintf(stdout, "%s: %s\n", key.c_str(), stats.c_str());
}
}
void Replay(ThreadState* thread) {
if (db_.db != nullptr) {
Replay(thread, &db_);

Loading…
Cancel
Save