diff --git a/java/Makefile b/java/Makefile index 98c7d740c..d0092b42f 100644 --- a/java/Makefile +++ b/java/Makefile @@ -26,9 +26,14 @@ test: java db_bench: java javac org/rocksdb/benchmark/*.java rm -rf /tmp/rocksdbjni-bench - java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=1 --benchmarks=fillseq,readrandom - java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=2 --benchmarks=fillseq,readrandom - java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=4 --benchmarks=fillseq,readrandom - java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=8 --benchmarks=fillseq,readrandom - java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=16 --benchmarks=fillseq,readrandom - java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=32 --benchmarks=fillseq,readrandom + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=1 --benchmarks=fillseq,readrandom,readwhilewriting + rm -rf /tmp/rocksdbjni-bench + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=2 --benchmarks=fillseq,readrandom,readwhilewriting + rm -rf /tmp/rocksdbjni-bench + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=4 --benchmarks=fillseq,readrandom,readwhilewriting + rm -rf /tmp/rocksdbjni-bench + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=8 --benchmarks=fillseq,readrandom,readwhilewriting + rm -rf /tmp/rocksdbjni-bench + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=16 --benchmarks=fillseq,readrandom,readwhilewriting + rm -rf /tmp/rocksdbjni-bench + java -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.benchmark.DbBenchmark --threads=32 --benchmarks=fillseq,readrandom,readwhilewriting diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java index 55f7b1189..63ac70aaf 100644 --- a/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -47,7 +47,7 @@ class Stats { double seconds_; long done_; long found_; - long lastReportDone_; + long lastOpTime_; long nextReport_; long bytes_; StringBuilder message_; @@ -57,10 +57,10 @@ class Stats { id_ = id; nextReport_ = 100; done_ = 0; - lastReportDone_ = 0; bytes_ = 0; seconds_ = 0; start_ = System.nanoTime(); + lastOpTime_ = start_; finish_ = start_; found_ = 0; message_ = new StringBuilder(""); @@ -102,7 +102,7 @@ class Stats { void finishedSingleOp(int bytes) { done_++; - lastReportDone_ = System.nanoTime(); + lastOpTime_ = System.nanoTime(); bytes_ += bytes; if (done_ >= nextReport_) { if (nextReport_ < 1000) { @@ -202,6 +202,16 @@ public class DbBenchmark { super(tid, randSeed, numEntries, keyRange); writeOpt_ = writeOpt; entriesPerBatch_ = entriesPerBatch; + maxWritesPerSecond_ = -1; + } + + public WriteTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch, long maxWritesPerSecond) { + super(tid, randSeed, numEntries, keyRange); + writeOpt_ = writeOpt; + entriesPerBatch_ = entriesPerBatch; + maxWritesPerSecond_ = maxWritesPerSecond; } @Override public void runTask() throws RocksDBException { @@ -211,29 +221,53 @@ public class DbBenchmark { byte[] key = new byte[keySize_]; byte[] value = new byte[valueSize_]; - if (entriesPerBatch_ == 1) { - for (long i = 0; i < numEntries_; ++i) { - getKey(key, i, keyRange_); - db_.put(writeOpt_, key, DbBenchmark.this.gen_.generate(valueSize_)); - stats_.finishedSingleOp(keySize_ + valueSize_); - } - } else { - for (long i = 0; i < numEntries_; i += entriesPerBatch_) { - WriteBatch batch = new WriteBatch(); - for (long j = 0; j < entriesPerBatch_; j++) { - getKey(key, i + j, keyRange_); - batch.put(key, DbBenchmark.this.gen_.generate(valueSize_)); + try { + if (entriesPerBatch_ == 1) { + for (long i = 0; i < numEntries_; ++i) { + getKey(key, i, keyRange_); + db_.put(writeOpt_, key, DbBenchmark.this.gen_.generate(valueSize_)); stats_.finishedSingleOp(keySize_ + valueSize_); + writeRateControl(i); + if (isFinished()) { + return; + } + } + } else { + for (long i = 0; i < numEntries_; i += entriesPerBatch_) { + WriteBatch batch = new WriteBatch(); + for (long j = 0; j < entriesPerBatch_; j++) { + getKey(key, i + j, keyRange_); + batch.put(key, DbBenchmark.this.gen_.generate(valueSize_)); + stats_.finishedSingleOp(keySize_ + valueSize_); + } + db_.write(writeOpt_, batch); + batch.dispose(); + writeRateControl(i); + if (isFinished()) { + return; + } } - db_.write(writeOpt_, batch); - batch.dispose(); } + } catch (InterruptedException e) { + // thread has been terminated. + } + } + + protected void writeRateControl(long writeCount) + throws InterruptedException { + if (maxWritesPerSecond_ <= 0) return; + long minInterval = + writeCount * TimeUnit.SECONDS.toNanos(1) / maxWritesPerSecond_; + long interval = System.nanoTime() - stats_.start_; + if (minInterval - interval > TimeUnit.MILLISECONDS.toNanos(1)) { + TimeUnit.NANOSECONDS.sleep(minInterval - interval); } } abstract protected void getKey(byte[] key, long id, long range); protected WriteOptions writeOpt_; protected long entriesPerBatch_; + protected long maxWritesPerSecond_; } class WriteSequentialTask extends WriteTask { @@ -243,6 +277,14 @@ public class DbBenchmark { super(tid, randSeed, numEntries, keyRange, writeOpt, entriesPerBatch); } + public WriteSequentialTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch, + long maxWritesPerSecond) { + super(tid, randSeed, numEntries, keyRange, + writeOpt, entriesPerBatch, + maxWritesPerSecond); + } @Override protected void getKey(byte[] key, long id, long range) { getFixedKey(key, id); } @@ -255,6 +297,14 @@ public class DbBenchmark { super(tid, randSeed, numEntries, keyRange, writeOpt, entriesPerBatch); } + public WriteRandomTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch, + long maxWritesPerSecond) { + super(tid, randSeed, numEntries, keyRange, + writeOpt, entriesPerBatch, + maxWritesPerSecond); + } @Override protected void getKey(byte[] key, long id, long range) { getRandomKey(key, range); } @@ -278,6 +328,9 @@ public class DbBenchmark { } else { stats_.finishedSingleOp(keySize_); } + if (isFinished()) { + return; + } } } } @@ -312,7 +365,9 @@ public class DbBenchmark { useExisting_ = (boolean) flags.get(Flag.use_existing_db); randSeed_ = (long) flags.get(Flag.seed); databaseDir_ = (String) flags.get(Flag.db); + writesPerSeconds_ = (int) flags.get(Flag.writes_per_second); gen_ = new RandomGenerator(compressionRatio_); + finishLock_ = new Object(); } private void run() throws RocksDBException { @@ -325,29 +380,25 @@ public class DbBenchmark { for (String benchmark : benchmarks_) { List> tasks = new ArrayList>(); + List> bgTasks = new ArrayList>(); WriteOptions writeOpt = new WriteOptions(); int currentTaskId = 0; - int concurrentThreads = threadNum_; boolean known = true; if (benchmark.equals("fillseq")) { tasks.add(new WriteSequentialTask( - currentTaskId, randSeed_, num_, num_, writeOpt, 1)); - concurrentThreads = 1; + currentTaskId++, randSeed_, num_, num_, writeOpt, 1)); } else if (benchmark.equals("fillbatch")) { tasks.add(new WriteRandomTask( - currentTaskId, randSeed_, num_ / 1000, num_, writeOpt, 1000)); - concurrentThreads = 1; + currentTaskId++, randSeed_, num_ / 1000, num_, writeOpt, 1000)); } else if (benchmark.equals("fillrandom")) { tasks.add(new WriteRandomTask( - currentTaskId, randSeed_, num_, num_, writeOpt, 1)); - concurrentThreads = 1; + currentTaskId++, randSeed_, num_, num_, writeOpt, 1)); } else if (benchmark.equals("fillsync")) { writeOpt.setSync(true); tasks.add(new WriteRandomTask( - currentTaskId, randSeed_, num_ / 1000, num_ / 1000, + currentTaskId++, randSeed_, num_ / 1000, num_ / 1000, writeOpt, 1)); - concurrentThreads = 1; } else if (benchmark.equals("readseq")) { for (int t = 0; t < threadNum_; ++t) { tasks.add(new ReadSequentialTask( @@ -359,6 +410,15 @@ public class DbBenchmark { tasks.add(new ReadRandomTask( currentTaskId++, randSeed_, reads_ / threadNum_, num_)); } + } else if (benchmark.equals("readwhilewriting")) { + WriteTask writeTask = new WriteRandomTask( + -1, randSeed_, Long.MAX_VALUE, num_, writeOpt, 1, writesPerSeconds_); + writeTask.stats_.setExcludeFromMerge(); + bgTasks.add(writeTask); + for (int t = 0; t < threadNum_; ++t) { + tasks.add(new ReadRandomTask( + currentTaskId++, randSeed_, reads_ / threadNum_, num_)); + } } else if (benchmark.equals("readhot")) { for (int t = 0; t < threadNum_; ++t) { tasks.add(new ReadRandomTask( @@ -373,17 +433,34 @@ public class DbBenchmark { } if (known) { ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService bgExecutor = Executors.newCachedThreadPool(); try { + // measure only the main executor time + List> bgResults = new ArrayList>(); + for (Callable bgTask : bgTasks) { + bgResults.add(bgExecutor.submit(bgTask)); + } start(); List> results = executor.invokeAll(tasks); executor.shutdown(); - boolean finished = executor.awaitTermination(3, TimeUnit.DAYS); - // do something - stop(benchmark, results, concurrentThreads); + boolean finished = executor.awaitTermination(10, TimeUnit.SECONDS); if (!finished) { - // do something else - System.out.format("Benchmark %s was not finished before timeout."); + System.out.format( + "Benchmark %s was not finished before timeout.", + benchmark); + executor.shutdownNow(); } + setFinished(true); + bgExecutor.shutdown(); + finished = bgExecutor.awaitTermination(10, TimeUnit.SECONDS); + if (!finished) { + System.out.format( + "Benchmark %s was not finished before timeout.", + benchmark); + bgExecutor.shutdownNow(); + } + + stop(benchmark, results, currentTaskId); } catch (InterruptedException e) { System.err.println(e); } @@ -423,6 +500,7 @@ public class DbBenchmark { } private void start() { + setFinished(false); startTime_ = System.nanoTime(); } @@ -449,7 +527,7 @@ public class DbBenchmark { } System.out.printf( - "%-12s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n", + "%-16s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n", benchmark, elapsedSeconds * 1e6 / num_, (stats.bytes_ / 1048576.0) / elapsedSeconds, taskFinishedCount, concurrentThreads); @@ -536,17 +614,20 @@ public class DbBenchmark { "fillrandom"), "Comma-separated list of operations to run in the specified order\n" + "\tActual benchmarks:\n" + - "\t\tfillseq -- write N values in sequential key order in async mode\n" + - "\t\tfillrandom -- write N values in random key order in async mode\n" + - "\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" + - "\t\t in random key order in sync mode\n" + - "\t\tfillsync -- write N/100 values in random key order in sync mode\n" + - "\t\tfill100K -- write N/1000 100K values in random order in async mode\n" + - "\t\treadseq -- read N times sequentially\n" + - "\t\treadrandom -- read N times in random order\n" + - "\t\treadhot -- read N times in random order from 1% section of DB\n" + + "\t\tfillseq -- write N values in sequential key order in async mode.\n" + + "\t\tfillrandom -- write N values in random key order in async mode.\n" + + "\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" + + "\t\t in random key order in sync mode.\n" + + "\t\tfillsync -- write N/100 values in random key order in sync mode.\n" + + "\t\tfill100K -- write N/1000 100K values in random order in async mode.\n" + + "\t\treadseq -- read N times sequentially.\n" + + "\t\treadrandom -- read N times in random order.\n" + + "\t\treadhot -- read N times in random order from 1% section of DB.\n" + + "\t\treadwhilewriting -- measure the read performance of multiple readers\n" + + "\t\t with a bg single writer. The write rate of the bg\n" + + "\t\t is capped by --writes_per_second.\n" + "\tMeta Operations:\n" + - "\t\tdelete -- delete DB") { + "\t\tdelete -- delete DB") { @Override public Object parseValue(String value) { return new ArrayList(Arrays.asList(value.split(","))); } @@ -613,6 +694,15 @@ public class DbBenchmark { } }, + writes_per_second(10000, + "The write-rate of the background writer used in the\n" + + "`readwhilewriting` benchmark. Non-positive number indicates\n" + + "using an unbounded write-rate in `readwhilewriting` benchmark.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + cache_size(-1, "Number of bytes to use as a cache of uncompressed data.\n" + "\tNegative means use default settings.") { @@ -682,6 +772,18 @@ public class DbBenchmark { } } + boolean isFinished() { + synchronized(finishLock_) { + return isFinished_; + } + } + + void setFinished(boolean flag) { + synchronized(finishLock_) { + isFinished_ = flag; + } + } + RocksDB db_; final List benchmarks_; final int num_; @@ -690,10 +792,13 @@ public class DbBenchmark { final int valueSize_; final int writeBufferSize_; final int threadNum_; + final int writesPerSeconds_; final long randSeed_; final boolean useExisting_; final String databaseDir_; final double compressionRatio_; RandomGenerator gen_; long startTime_; + Object finishLock_; + boolean isFinished_; }