|
|
|
@ -16,20 +16,18 @@ |
|
|
|
|
#include <numa.h> |
|
|
|
|
#include <numaif.h> |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
#ifndef OS_WIN |
|
|
|
|
#include <unistd.h> |
|
|
|
|
#endif |
|
|
|
|
#include <fcntl.h> |
|
|
|
|
#include <gflags/gflags.h> |
|
|
|
|
#include <inttypes.h> |
|
|
|
|
#include <cstddef> |
|
|
|
|
#include <sys/types.h> |
|
|
|
|
#include <stdio.h> |
|
|
|
|
#include <stdlib.h> |
|
|
|
|
#include <gflags/gflags.h> |
|
|
|
|
|
|
|
|
|
#include <sys/types.h> |
|
|
|
|
#include <atomic> |
|
|
|
|
#include <condition_variable> |
|
|
|
|
#include <cstddef> |
|
|
|
|
#include <mutex> |
|
|
|
|
#include <thread> |
|
|
|
|
#include <unordered_map> |
|
|
|
@ -79,101 +77,108 @@ using GFLAGS::ParseCommandLineFlags; |
|
|
|
|
using GFLAGS::RegisterFlagValidator; |
|
|
|
|
using GFLAGS::SetUsageMessage; |
|
|
|
|
|
|
|
|
|
DEFINE_string(benchmarks, |
|
|
|
|
"fillseq," |
|
|
|
|
"fillsync," |
|
|
|
|
"fillrandom," |
|
|
|
|
"overwrite," |
|
|
|
|
"readrandom," |
|
|
|
|
"newiterator," |
|
|
|
|
"newiteratorwhilewriting," |
|
|
|
|
"seekrandom," |
|
|
|
|
"seekrandomwhilewriting," |
|
|
|
|
"seekrandomwhilemerging," |
|
|
|
|
"readseq," |
|
|
|
|
"readreverse," |
|
|
|
|
"compact," |
|
|
|
|
"readrandom," |
|
|
|
|
"multireadrandom," |
|
|
|
|
"readseq," |
|
|
|
|
"readtocache," |
|
|
|
|
"readreverse," |
|
|
|
|
"readwhilewriting," |
|
|
|
|
"readwhilemerging," |
|
|
|
|
"readrandomwriterandom," |
|
|
|
|
"updaterandom," |
|
|
|
|
"randomwithverify," |
|
|
|
|
"fill100K," |
|
|
|
|
"crc32c," |
|
|
|
|
"xxhash," |
|
|
|
|
"compress," |
|
|
|
|
"uncompress," |
|
|
|
|
"acquireload," |
|
|
|
|
"fillseekseq," |
|
|
|
|
"randomtransaction," |
|
|
|
|
"randomreplacekeys," |
|
|
|
|
"timeseries", |
|
|
|
|
|
|
|
|
|
"Comma-separated list of operations to run in the specified" |
|
|
|
|
" order. Available benchmarks:\n" |
|
|
|
|
"\tfillseq -- write N values in sequential key" |
|
|
|
|
" order in async mode\n" |
|
|
|
|
"\tfillrandom -- write N values in random key order in async" |
|
|
|
|
" mode\n" |
|
|
|
|
"\toverwrite -- overwrite N values in random key order in" |
|
|
|
|
" async mode\n" |
|
|
|
|
"\tfillsync -- write N/100 values in random key order in " |
|
|
|
|
"sync mode\n" |
|
|
|
|
"\tfill100K -- write N/1000 100K values in random order in" |
|
|
|
|
" async mode\n" |
|
|
|
|
"\tdeleteseq -- delete N keys in sequential order\n" |
|
|
|
|
"\tdeleterandom -- delete N keys in random order\n" |
|
|
|
|
"\treadseq -- read N times sequentially\n" |
|
|
|
|
"\treadtocache -- 1 thread reading database sequentially\n" |
|
|
|
|
"\treadreverse -- read N times in reverse order\n" |
|
|
|
|
"\treadrandom -- read N times in random order\n" |
|
|
|
|
"\treadmissing -- read N missing keys in random order\n" |
|
|
|
|
"\treadwhilewriting -- 1 writer, N threads doing random " |
|
|
|
|
"reads\n" |
|
|
|
|
"\treadwhilemerging -- 1 merger, N threads doing random " |
|
|
|
|
"reads\n" |
|
|
|
|
"\treadrandomwriterandom -- N threads doing random-read, " |
|
|
|
|
"random-write\n" |
|
|
|
|
"\tprefixscanrandom -- prefix scan N times in random order\n" |
|
|
|
|
"\tupdaterandom -- N threads doing read-modify-write for random " |
|
|
|
|
"keys\n" |
|
|
|
|
"\tappendrandom -- N threads doing read-modify-write with " |
|
|
|
|
"growing values\n" |
|
|
|
|
"\tmergerandom -- same as updaterandom/appendrandom using merge" |
|
|
|
|
" operator. " |
|
|
|
|
"Must be used with merge_operator\n" |
|
|
|
|
"\treadrandommergerandom -- perform N random read-or-merge " |
|
|
|
|
"operations. Must be used with merge_operator\n" |
|
|
|
|
"\tnewiterator -- repeated iterator creation\n" |
|
|
|
|
"\tseekrandom -- N random seeks, call Next seek_nexts times " |
|
|
|
|
"per seek\n" |
|
|
|
|
"\tseekrandomwhilewriting -- seekrandom and 1 thread doing " |
|
|
|
|
"overwrite\n" |
|
|
|
|
"\tseekrandomwhilemerging -- seekrandom and 1 thread doing " |
|
|
|
|
"merge\n" |
|
|
|
|
"\tcrc32c -- repeated crc32c of 4K of data\n" |
|
|
|
|
"\txxhash -- repeated xxHash of 4K of data\n" |
|
|
|
|
"\tacquireload -- load N*1000 times\n" |
|
|
|
|
"\tfillseekseq -- write N values in sequential key, then read " |
|
|
|
|
"them by seeking to each key\n" |
|
|
|
|
"\trandomtransaction -- execute N random transactions and " |
|
|
|
|
"verify correctness\n" |
|
|
|
|
"\trandomreplacekeys -- randomly replaces N keys by deleting " |
|
|
|
|
"the old version and putting the new version\n\n" |
|
|
|
|
"\ttimeseries -- 1 writer generates time series data " |
|
|
|
|
"and multiple readers doing random reads on id\n\n" |
|
|
|
|
"Meta operations:\n" |
|
|
|
|
"\tcompact -- Compact the entire DB\n" |
|
|
|
|
"\tstats -- Print DB stats\n" |
|
|
|
|
"\tlevelstats -- Print the number of files and bytes per level\n" |
|
|
|
|
"\tsstables -- Print sstable info\n" |
|
|
|
|
"\theapprofile -- Dump a heap profile (if supported by this" |
|
|
|
|
" port)\n"); |
|
|
|
|
DEFINE_string( |
|
|
|
|
benchmarks, |
|
|
|
|
"fillseq," |
|
|
|
|
"fillseqdeterministic," |
|
|
|
|
"fillsync," |
|
|
|
|
"fillrandom," |
|
|
|
|
"filluniquerandomdeterministic," |
|
|
|
|
"overwrite," |
|
|
|
|
"readrandom," |
|
|
|
|
"newiterator," |
|
|
|
|
"newiteratorwhilewriting," |
|
|
|
|
"seekrandom," |
|
|
|
|
"seekrandomwhilewriting," |
|
|
|
|
"seekrandomwhilemerging," |
|
|
|
|
"readseq," |
|
|
|
|
"readreverse," |
|
|
|
|
"compact," |
|
|
|
|
"readrandom," |
|
|
|
|
"multireadrandom," |
|
|
|
|
"readseq," |
|
|
|
|
"readtocache," |
|
|
|
|
"readreverse," |
|
|
|
|
"readwhilewriting," |
|
|
|
|
"readwhilemerging," |
|
|
|
|
"readrandomwriterandom," |
|
|
|
|
"updaterandom," |
|
|
|
|
"randomwithverify," |
|
|
|
|
"fill100K," |
|
|
|
|
"crc32c," |
|
|
|
|
"xxhash," |
|
|
|
|
"compress," |
|
|
|
|
"uncompress," |
|
|
|
|
"acquireload," |
|
|
|
|
"fillseekseq," |
|
|
|
|
"randomtransaction," |
|
|
|
|
"randomreplacekeys," |
|
|
|
|
"timeseries", |
|
|
|
|
|
|
|
|
|
"Comma-separated list of operations to run in the specified" |
|
|
|
|
" order. Available benchmarks:\n" |
|
|
|
|
"\tfillseq -- write N values in sequential key" |
|
|
|
|
" order in async mode\n" |
|
|
|
|
"\tfillseqdeterministic -- write N values in the specified" |
|
|
|
|
" key order and keep the shape of the LSM tree\n" |
|
|
|
|
"\tfillrandom -- write N values in random key order in async" |
|
|
|
|
" mode\n" |
|
|
|
|
"\tfilluniquerandomdeterministic -- write N values in a random" |
|
|
|
|
" key order and keep the shape of the LSM tree\n" |
|
|
|
|
"\toverwrite -- overwrite N values in random key order in" |
|
|
|
|
" async mode\n" |
|
|
|
|
"\tfillsync -- write N/100 values in random key order in " |
|
|
|
|
"sync mode\n" |
|
|
|
|
"\tfill100K -- write N/1000 100K values in random order in" |
|
|
|
|
" async mode\n" |
|
|
|
|
"\tdeleteseq -- delete N keys in sequential order\n" |
|
|
|
|
"\tdeleterandom -- delete N keys in random order\n" |
|
|
|
|
"\treadseq -- read N times sequentially\n" |
|
|
|
|
"\treadtocache -- 1 thread reading database sequentially\n" |
|
|
|
|
"\treadreverse -- read N times in reverse order\n" |
|
|
|
|
"\treadrandom -- read N times in random order\n" |
|
|
|
|
"\treadmissing -- read N missing keys in random order\n" |
|
|
|
|
"\treadwhilewriting -- 1 writer, N threads doing random " |
|
|
|
|
"reads\n" |
|
|
|
|
"\treadwhilemerging -- 1 merger, N threads doing random " |
|
|
|
|
"reads\n" |
|
|
|
|
"\treadrandomwriterandom -- N threads doing random-read, " |
|
|
|
|
"random-write\n" |
|
|
|
|
"\tprefixscanrandom -- prefix scan N times in random order\n" |
|
|
|
|
"\tupdaterandom -- N threads doing read-modify-write for random " |
|
|
|
|
"keys\n" |
|
|
|
|
"\tappendrandom -- N threads doing read-modify-write with " |
|
|
|
|
"growing values\n" |
|
|
|
|
"\tmergerandom -- same as updaterandom/appendrandom using merge" |
|
|
|
|
" operator. " |
|
|
|
|
"Must be used with merge_operator\n" |
|
|
|
|
"\treadrandommergerandom -- perform N random read-or-merge " |
|
|
|
|
"operations. Must be used with merge_operator\n" |
|
|
|
|
"\tnewiterator -- repeated iterator creation\n" |
|
|
|
|
"\tseekrandom -- N random seeks, call Next seek_nexts times " |
|
|
|
|
"per seek\n" |
|
|
|
|
"\tseekrandomwhilewriting -- seekrandom and 1 thread doing " |
|
|
|
|
"overwrite\n" |
|
|
|
|
"\tseekrandomwhilemerging -- seekrandom and 1 thread doing " |
|
|
|
|
"merge\n" |
|
|
|
|
"\tcrc32c -- repeated crc32c of 4K of data\n" |
|
|
|
|
"\txxhash -- repeated xxHash of 4K of data\n" |
|
|
|
|
"\tacquireload -- load N*1000 times\n" |
|
|
|
|
"\tfillseekseq -- write N values in sequential key, then read " |
|
|
|
|
"them by seeking to each key\n" |
|
|
|
|
"\trandomtransaction -- execute N random transactions and " |
|
|
|
|
"verify correctness\n" |
|
|
|
|
"\trandomreplacekeys -- randomly replaces N keys by deleting " |
|
|
|
|
"the old version and putting the new version\n\n" |
|
|
|
|
"\ttimeseries -- 1 writer generates time series data " |
|
|
|
|
"and multiple readers doing random reads on id\n\n" |
|
|
|
|
"Meta operations:\n" |
|
|
|
|
"\tcompact -- Compact the entire DB\n" |
|
|
|
|
"\tstats -- Print DB stats\n" |
|
|
|
|
"\tlevelstats -- Print the number of files and bytes per level\n" |
|
|
|
|
"\tsstables -- Print sstable info\n" |
|
|
|
|
"\theapprofile -- Dump a heap profile (if supported by this" |
|
|
|
|
" port)\n"); |
|
|
|
|
|
|
|
|
|
DEFINE_int64(num, 1000000, "Number of key/values to place in database"); |
|
|
|
|
|
|
|
|
@ -2030,6 +2035,16 @@ class Benchmark { |
|
|
|
|
options.wal_dir = FLAGS_wal_dir; |
|
|
|
|
} |
|
|
|
|
DestroyDB(FLAGS_db, options); |
|
|
|
|
if (!FLAGS_wal_dir.empty()) { |
|
|
|
|
FLAGS_env->DeleteDir(FLAGS_wal_dir); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (FLAGS_num_multi_db > 1) { |
|
|
|
|
FLAGS_env->CreateDir(FLAGS_db); |
|
|
|
|
if (!FLAGS_wal_dir.empty()) { |
|
|
|
|
FLAGS_env->CreateDir(FLAGS_wal_dir); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2041,7 +2056,7 @@ class Benchmark { |
|
|
|
|
cache_->DisownData(); |
|
|
|
|
} |
|
|
|
|
if (FLAGS_disable_flashcache_for_background_threads && cachedev_fd_ != -1) { |
|
|
|
|
// Dtor for this env should run before cachedev_fd_ is closed
|
|
|
|
|
// Dtor for thiis env should run before cachedev_fd_ is closed
|
|
|
|
|
flashcache_aware_env_ = nullptr; |
|
|
|
|
close(cachedev_fd_); |
|
|
|
|
} |
|
|
|
@ -2101,7 +2116,18 @@ class Benchmark { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::string GetDbNameForMultiple(std::string base_name, size_t id) { |
|
|
|
|
std::string GetPathForMultiple(std::string base_name, size_t id) { |
|
|
|
|
if (!base_name.empty()) { |
|
|
|
|
#ifndef OS_WIN |
|
|
|
|
if (base_name.back() != '/') { |
|
|
|
|
base_name += '/'; |
|
|
|
|
} |
|
|
|
|
#else |
|
|
|
|
if (base_name.back() != '\\') { |
|
|
|
|
base_name += '\\'; |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
} |
|
|
|
|
return base_name + ToString(id); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2166,7 +2192,30 @@ class Benchmark { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (name == "fillseq") { |
|
|
|
|
// Both fillseqdeterministic and filluniquerandomdeterministic
|
|
|
|
|
// fill the levels except the max level with UNIQUE_RANDOM
|
|
|
|
|
// and fill the max level with fillseq and filluniquerandom, respectively
|
|
|
|
|
if (name == "fillseqdeterministic" || |
|
|
|
|
name == "filluniquerandomdeterministic") { |
|
|
|
|
if (!FLAGS_disable_auto_compactions) { |
|
|
|
|
fprintf(stderr, |
|
|
|
|
"Please disable_auto_compactions in FillDeterministic " |
|
|
|
|
"benchmark\n"); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
if (num_threads > 1) { |
|
|
|
|
fprintf(stderr, |
|
|
|
|
"filldeterministic multithreaded not supported" |
|
|
|
|
", use 1 thread\n"); |
|
|
|
|
num_threads = 1; |
|
|
|
|
} |
|
|
|
|
fresh_db = true; |
|
|
|
|
if (name == "fillseqdeterministic") { |
|
|
|
|
method = &Benchmark::WriteSeqDeterministic; |
|
|
|
|
} else { |
|
|
|
|
method = &Benchmark::WriteUniqueRandomDeterministic; |
|
|
|
|
} |
|
|
|
|
} else if (name == "fillseq") { |
|
|
|
|
fresh_db = true; |
|
|
|
|
method = &Benchmark::WriteSeq; |
|
|
|
|
} else if (name == "fillbatch") { |
|
|
|
@ -2316,9 +2365,13 @@ class Benchmark { |
|
|
|
|
db_.DeleteDBs(); |
|
|
|
|
DestroyDB(FLAGS_db, open_options_); |
|
|
|
|
} |
|
|
|
|
Options options = open_options_; |
|
|
|
|
for (size_t i = 0; i < multi_dbs_.size(); i++) { |
|
|
|
|
delete multi_dbs_[i].db; |
|
|
|
|
DestroyDB(GetDbNameForMultiple(FLAGS_db, i), open_options_); |
|
|
|
|
if (!open_options_.wal_dir.empty()) { |
|
|
|
|
options.wal_dir = GetPathForMultiple(open_options_.wal_dir, i); |
|
|
|
|
} |
|
|
|
|
DestroyDB(GetPathForMultiple(FLAGS_db, i), options); |
|
|
|
|
} |
|
|
|
|
multi_dbs_.clear(); |
|
|
|
|
} |
|
|
|
@ -2964,9 +3017,14 @@ class Benchmark { |
|
|
|
|
} else { |
|
|
|
|
multi_dbs_.clear(); |
|
|
|
|
multi_dbs_.resize(FLAGS_num_multi_db); |
|
|
|
|
auto wal_dir = options.wal_dir; |
|
|
|
|
for (int i = 0; i < FLAGS_num_multi_db; i++) { |
|
|
|
|
OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &multi_dbs_[i]); |
|
|
|
|
if (!wal_dir.empty()) { |
|
|
|
|
options.wal_dir = GetPathForMultiple(wal_dir, i); |
|
|
|
|
} |
|
|
|
|
OpenDb(options, GetPathForMultiple(FLAGS_db, i), &multi_dbs_[i]); |
|
|
|
|
} |
|
|
|
|
options.wal_dir = wal_dir; |
|
|
|
|
} |
|
|
|
|
options.dump_malloc_stats = FLAGS_dump_malloc_stats; |
|
|
|
|
} |
|
|
|
@ -3054,6 +3112,15 @@ class Benchmark { |
|
|
|
|
RANDOM, SEQUENTIAL, UNIQUE_RANDOM |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
void WriteSeqDeterministic(ThreadState* thread) { |
|
|
|
|
DoDeterministicCompact(thread, open_options_.compaction_style, SEQUENTIAL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WriteUniqueRandomDeterministic(ThreadState* thread) { |
|
|
|
|
DoDeterministicCompact(thread, open_options_.compaction_style, |
|
|
|
|
UNIQUE_RANDOM); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void WriteSeq(ThreadState* thread) { |
|
|
|
|
DoWrite(thread, SEQUENTIAL); |
|
|
|
|
} |
|
|
|
@ -3096,6 +3163,7 @@ class Benchmark { |
|
|
|
|
case RANDOM: |
|
|
|
|
return rand_->Next() % num_; |
|
|
|
|
case UNIQUE_RANDOM: |
|
|
|
|
assert(next_ + 1 < num_); |
|
|
|
|
return values_[next_++]; |
|
|
|
|
} |
|
|
|
|
assert(false); |
|
|
|
@ -3219,6 +3287,268 @@ class Benchmark { |
|
|
|
|
thread->stats.AddBytes(bytes); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status DoDeterministicCompact(ThreadState* thread, |
|
|
|
|
CompactionStyle compaction_style, |
|
|
|
|
WriteMode write_mode) { |
|
|
|
|
#ifndef ROCKSDB_LITE |
|
|
|
|
ColumnFamilyMetaData meta; |
|
|
|
|
std::vector<DB*> db_list; |
|
|
|
|
if (db_.db != nullptr) { |
|
|
|
|
db_list.push_back(db_.db); |
|
|
|
|
} else { |
|
|
|
|
for (auto& db : multi_dbs_) { |
|
|
|
|
db_list.push_back(db.db); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
std::vector<Options> options_list; |
|
|
|
|
for (auto db : db_list) { |
|
|
|
|
options_list.push_back(db->GetOptions()); |
|
|
|
|
db->SetOptions({{"disable_auto_compactions", "1"}, |
|
|
|
|
{"level0_slowdown_writes_trigger", "400000000"}, |
|
|
|
|
{"level0_stop_writes_trigger", "400000000"}}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
assert(!db_list.empty()); |
|
|
|
|
auto num_db = db_list.size(); |
|
|
|
|
size_t num_levels = static_cast<size_t>(open_options_.num_levels); |
|
|
|
|
size_t output_level = open_options_.num_levels - 1; |
|
|
|
|
std::vector<std::vector<std::vector<SstFileMetaData>>> sorted_runs(num_db); |
|
|
|
|
std::vector<size_t> num_files_at_level0(num_db, 0); |
|
|
|
|
|
|
|
|
|
if (compaction_style == kCompactionStyleLevel) { |
|
|
|
|
if (num_levels == 0) { |
|
|
|
|
return Status::InvalidArgument("num_levels should be larger than 1"); |
|
|
|
|
} |
|
|
|
|
bool should_stop = false; |
|
|
|
|
while (!should_stop) { |
|
|
|
|
if (sorted_runs[0].empty()) { |
|
|
|
|
DoWrite(thread, write_mode); |
|
|
|
|
} else { |
|
|
|
|
DoWrite(thread, UNIQUE_RANDOM); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_db; i++) { |
|
|
|
|
auto db = db_list[i]; |
|
|
|
|
db->Flush(FlushOptions()); |
|
|
|
|
db->GetColumnFamilyMetaData(&meta); |
|
|
|
|
if (num_files_at_level0[i] == meta.levels[0].files.size() || |
|
|
|
|
writes_ == 0) { |
|
|
|
|
should_stop = true; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
sorted_runs[i].emplace_back( |
|
|
|
|
meta.levels[0].files.begin(), |
|
|
|
|
meta.levels[0].files.end() - num_files_at_level0[i]); |
|
|
|
|
num_files_at_level0[i] = meta.levels[0].files.size(); |
|
|
|
|
if (sorted_runs[i].back().size() == 1) { |
|
|
|
|
should_stop = true; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
if (sorted_runs[i].size() == output_level) { |
|
|
|
|
auto& L1 = sorted_runs[i].back(); |
|
|
|
|
L1.erase(L1.begin(), L1.begin() + L1.size() / 3); |
|
|
|
|
should_stop = true; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
writes_ /= open_options_.max_bytes_for_level_multiplier; |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_db; i++) { |
|
|
|
|
if (sorted_runs[i].size() < num_levels - 1) { |
|
|
|
|
fprintf(stderr, "n is too small to fill %lu levels\n", num_levels); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_db; i++) { |
|
|
|
|
auto db = db_list[i]; |
|
|
|
|
auto compactionOptions = CompactionOptions(); |
|
|
|
|
auto options = db->GetOptions(); |
|
|
|
|
MutableCFOptions mutable_cf_options(options); |
|
|
|
|
for (size_t j = 0; j < sorted_runs[i].size(); j++) { |
|
|
|
|
compactionOptions.output_file_size_limit = |
|
|
|
|
mutable_cf_options.MaxFileSizeForLevel( |
|
|
|
|
static_cast<int>(output_level)); |
|
|
|
|
std::cout << sorted_runs[i][j].size() << std::endl; |
|
|
|
|
db->CompactFiles(compactionOptions, {sorted_runs[i][j].back().name, |
|
|
|
|
sorted_runs[i][j].front().name}, |
|
|
|
|
static_cast<int>(output_level - j) /*level*/); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else if (compaction_style == kCompactionStyleUniversal) { |
|
|
|
|
auto ratio = open_options_.compaction_options_universal.size_ratio; |
|
|
|
|
bool should_stop = false; |
|
|
|
|
while (!should_stop) { |
|
|
|
|
if (sorted_runs[0].empty()) { |
|
|
|
|
DoWrite(thread, write_mode); |
|
|
|
|
} else { |
|
|
|
|
DoWrite(thread, UNIQUE_RANDOM); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_db; i++) { |
|
|
|
|
auto db = db_list[i]; |
|
|
|
|
db->Flush(FlushOptions()); |
|
|
|
|
db->GetColumnFamilyMetaData(&meta); |
|
|
|
|
if (num_files_at_level0[i] == meta.levels[0].files.size() || |
|
|
|
|
writes_ == 0) { |
|
|
|
|
should_stop = true; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
sorted_runs[i].emplace_back( |
|
|
|
|
meta.levels[0].files.begin(), |
|
|
|
|
meta.levels[0].files.end() - num_files_at_level0[i]); |
|
|
|
|
num_files_at_level0[i] = meta.levels[0].files.size(); |
|
|
|
|
if (sorted_runs[i].back().size() == 1) { |
|
|
|
|
should_stop = true; |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
num_files_at_level0[i] = meta.levels[0].files.size(); |
|
|
|
|
} |
|
|
|
|
writes_ *= static_cast<double>(100) / (ratio + 200); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_db; i++) { |
|
|
|
|
if (sorted_runs[i].size() < num_levels) { |
|
|
|
|
fprintf(stderr, "n is too small to fill %lu levels\n", num_levels); |
|
|
|
|
exit(1); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_db; i++) { |
|
|
|
|
auto db = db_list[i]; |
|
|
|
|
auto compactionOptions = CompactionOptions(); |
|
|
|
|
auto options = db->GetOptions(); |
|
|
|
|
MutableCFOptions mutable_cf_options(options); |
|
|
|
|
for (size_t j = 0; j < sorted_runs[i].size(); j++) { |
|
|
|
|
compactionOptions.output_file_size_limit = |
|
|
|
|
mutable_cf_options.MaxFileSizeForLevel( |
|
|
|
|
static_cast<int>(output_level)); |
|
|
|
|
db->CompactFiles( |
|
|
|
|
compactionOptions, |
|
|
|
|
{sorted_runs[i][j].back().name, sorted_runs[i][j].front().name}, |
|
|
|
|
(output_level > j ? static_cast<int>(output_level - j) |
|
|
|
|
: 0) /*level*/); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else if (compaction_style == kCompactionStyleFIFO) { |
|
|
|
|
return Status::InvalidArgument("FIFO compaction is not supported"); |
|
|
|
|
} else { |
|
|
|
|
fprintf(stdout, |
|
|
|
|
"%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n", |
|
|
|
|
"filldeterministic"); |
|
|
|
|
return Status::InvalidArgument("None compaction is not supported"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Verify seqno and key range
|
|
|
|
|
// Note: the seqno get changed at the max level by implementation
|
|
|
|
|
// optimization, so skip the check of the max level.
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
for (size_t k = 0; k < num_db; k++) { |
|
|
|
|
auto db = db_list[k]; |
|
|
|
|
db->GetColumnFamilyMetaData(&meta); |
|
|
|
|
// verify the number of sorted runs
|
|
|
|
|
if (compaction_style == kCompactionStyleLevel) { |
|
|
|
|
assert(num_levels - 1 == sorted_runs[k].size()); |
|
|
|
|
} else if (compaction_style == kCompactionStyleUniversal) { |
|
|
|
|
assert(meta.levels[0].files.size() + num_levels - 1 == |
|
|
|
|
sorted_runs[k].size()); |
|
|
|
|
} else if (compaction_style == kCompactionStyleFIFO) { |
|
|
|
|
// TODO(gzh): FIFO compaction
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// verify smallest/largest seqno and key range of each sorted run
|
|
|
|
|
auto max_level = num_levels - 1; |
|
|
|
|
int level; |
|
|
|
|
for (size_t i = 0; i < sorted_runs[k].size(); i++) { |
|
|
|
|
level = static_cast<int>(max_level - i); |
|
|
|
|
SequenceNumber sorted_run_smallest_seqno = kMaxSequenceNumber; |
|
|
|
|
SequenceNumber sorted_run_largest_seqno = 0; |
|
|
|
|
std::string sorted_run_smallest_key, sorted_run_largest_key; |
|
|
|
|
bool first_key = true; |
|
|
|
|
for (auto fileMeta : sorted_runs[k][i]) { |
|
|
|
|
sorted_run_smallest_seqno = |
|
|
|
|
std::min(sorted_run_smallest_seqno, fileMeta.smallest_seqno); |
|
|
|
|
sorted_run_largest_seqno = |
|
|
|
|
std::max(sorted_run_largest_seqno, fileMeta.largest_seqno); |
|
|
|
|
if (first_key || |
|
|
|
|
db->DefaultColumnFamily()->GetComparator()->Compare( |
|
|
|
|
fileMeta.smallestkey, sorted_run_smallest_key) < 0) { |
|
|
|
|
sorted_run_smallest_key = fileMeta.smallestkey; |
|
|
|
|
} |
|
|
|
|
if (first_key || |
|
|
|
|
db->DefaultColumnFamily()->GetComparator()->Compare( |
|
|
|
|
fileMeta.largestkey, sorted_run_largest_key) > 0) { |
|
|
|
|
sorted_run_largest_key = fileMeta.largestkey; |
|
|
|
|
} |
|
|
|
|
first_key = false; |
|
|
|
|
} |
|
|
|
|
if (compaction_style == kCompactionStyleLevel || |
|
|
|
|
(compaction_style == kCompactionStyleUniversal && level > 0)) { |
|
|
|
|
SequenceNumber level_smallest_seqno = kMaxSequenceNumber; |
|
|
|
|
SequenceNumber level_largest_seqno = 0; |
|
|
|
|
for (auto fileMeta : meta.levels[level].files) { |
|
|
|
|
level_smallest_seqno = |
|
|
|
|
std::min(level_smallest_seqno, fileMeta.smallest_seqno); |
|
|
|
|
level_largest_seqno = |
|
|
|
|
std::max(level_largest_seqno, fileMeta.largest_seqno); |
|
|
|
|
} |
|
|
|
|
assert(sorted_run_smallest_key == |
|
|
|
|
meta.levels[level].files.front().smallestkey); |
|
|
|
|
assert(sorted_run_largest_key == |
|
|
|
|
meta.levels[level].files.back().largestkey); |
|
|
|
|
if (level != static_cast<int>(max_level)) { |
|
|
|
|
// compaction at max_level would change sequence number
|
|
|
|
|
assert(sorted_run_smallest_seqno == level_smallest_seqno); |
|
|
|
|
assert(sorted_run_largest_seqno == level_largest_seqno); |
|
|
|
|
} |
|
|
|
|
} else if (compaction_style == kCompactionStyleUniversal) { |
|
|
|
|
// level <= 0 means sorted runs on level 0
|
|
|
|
|
auto level0_file = |
|
|
|
|
meta.levels[0].files[sorted_runs[k].size() - 1 - i]; |
|
|
|
|
assert(sorted_run_smallest_key == level0_file.smallestkey); |
|
|
|
|
assert(sorted_run_largest_key == level0_file.largestkey); |
|
|
|
|
if (level != static_cast<int>(max_level)) { |
|
|
|
|
assert(sorted_run_smallest_seqno == level0_file.smallest_seqno); |
|
|
|
|
assert(sorted_run_largest_seqno == level0_file.largest_seqno); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
// print the size of each sorted_run
|
|
|
|
|
for (size_t k = 0; k < num_db; k++) { |
|
|
|
|
auto db = db_list[k]; |
|
|
|
|
fprintf(stdout, |
|
|
|
|
"---------------------- DB %lu LSM ---------------------\n", k); |
|
|
|
|
db->GetColumnFamilyMetaData(&meta); |
|
|
|
|
for (auto& levelMeta : meta.levels) { |
|
|
|
|
if (levelMeta.files.empty()) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
if (levelMeta.level == 0) { |
|
|
|
|
for (auto& fileMeta : levelMeta.files) { |
|
|
|
|
fprintf(stdout, "Level[%d]: %s(size: %lu bytes)\n", levelMeta.level, |
|
|
|
|
fileMeta.name.c_str(), fileMeta.size); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
fprintf(stdout, "Level[%d]: %s - %s(total size: %lu bytes)\n", |
|
|
|
|
levelMeta.level, levelMeta.files.front().name.c_str(), |
|
|
|
|
levelMeta.files.back().name.c_str(), levelMeta.size); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < num_db; i++) { |
|
|
|
|
db_list[i]->SetOptions( |
|
|
|
|
{{"disable_auto_compactions", |
|
|
|
|
std::to_string(options_list[i].disable_auto_compactions)}, |
|
|
|
|
{"level0_slowdown_writes_trigger", |
|
|
|
|
std::to_string(options_list[i].level0_slowdown_writes_trigger)}, |
|
|
|
|
{"level0_stop_writes_trigger", |
|
|
|
|
std::to_string(options_list[i].level0_stop_writes_trigger)}}); |
|
|
|
|
} |
|
|
|
|
return Status::OK(); |
|
|
|
|
#else |
|
|
|
|
fprintf(stderr, "Rocksdb Lite doesn't support filldeterministic\n"); |
|
|
|
|
return Status::NotSupported( |
|
|
|
|
"Rocksdb Lite doesn't support filldeterministic"); |
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ReadSequential(ThreadState* thread) { |
|
|
|
|
if (db_.db != nullptr) { |
|
|
|
|
ReadSequential(thread, db_.db); |
|
|
|
|