Merge pull request #183 from ankgup87/master

[Java] Merge pull request #183 --- Add statistics collector utility

Summary:
1. Add statistics collector which collects statistics periodically at a period specified by caller.
2. Added unit-test for StatsCollector

make rocksdbjava
make test

Reviewers: haobo, yhchiang, sdong, dhruba, rsumbaly, zzbennett, swapnilghike
Reviewed By: yhchiang
CC: leveldb

Differential Revision: https://reviews.facebook.net/19215
main
Yueh-Hsuan Chiang 11 years ago
commit 918a355e00
  1. 2
      Makefile
  2. 1
      java/Makefile
  3. 4
      java/org/rocksdb/BackupableDBOptions.java
  4. 5
      java/org/rocksdb/RocksDB.java
  5. 93
      java/org/rocksdb/StatisticsCollector.java
  6. 34
      java/org/rocksdb/StatisticsCollectorCallback.java
  7. 40
      java/org/rocksdb/test/StatisticsCollectorTest.java
  8. 22
      java/org/rocksdb/test/StatsCallbackMock.java

@ -439,7 +439,7 @@ ROCKSDBJNILIB = ./java/librocksdbjni.jnilib
JAVA_INCLUDE = -I/System/Library/Frameworks/JavaVM.framework/Headers/ JAVA_INCLUDE = -I/System/Library/Frameworks/JavaVM.framework/Headers/
endif endif
rocksdbjava: clean rocksdbjava:
OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32 OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32
cd java;$(MAKE) java; cd java;$(MAKE) java;
rm -f $(ROCKSDBJNILIB) rm -f $(ROCKSDBJNILIB)

@ -29,6 +29,7 @@ test: java
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.BackupableDBTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.BackupableDBTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.StatisticsCollectorTest
db_bench: java db_bench: java
javac org/rocksdb/benchmark/*.java javac org/rocksdb/benchmark/*.java

@ -38,10 +38,10 @@ public class BackupableDBOptions extends RocksObject {
boolean destroyOldData, boolean backupLogFiles, long backupRateLimit, boolean destroyOldData, boolean backupLogFiles, long backupRateLimit,
long restoreRateLimit) { long restoreRateLimit) {
super(); super();
backupRateLimit = (backupRateLimit <= 0) ? 0 : backupRateLimit; backupRateLimit = (backupRateLimit <= 0) ? 0 : backupRateLimit;
restoreRateLimit = (restoreRateLimit <= 0) ? 0 : restoreRateLimit; restoreRateLimit = (restoreRateLimit <= 0) ? 0 : restoreRateLimit;
newBackupableDBOptions(path, shareTableFiles, sync, destroyOldData, newBackupableDBOptions(path, shareTableFiles, sync, destroyOldData,
backupLogFiles, backupRateLimit, restoreRateLimit); backupLogFiles, backupRateLimit, restoreRateLimit);
} }

@ -93,10 +93,7 @@ public class RocksDB extends RocksObject {
// This allows to use the rocksjni default Options instead of // This allows to use the rocksjni default Options instead of
// the c++ one. // the c++ one.
Options options = new Options(); Options options = new Options();
db.open(options.nativeHandle_, options.cacheSize_, path); return open(options, path);
db.transferCppRawPointersOwnershipFrom(options);
options.dispose();
return db;
} }
/** /**

@ -0,0 +1,93 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Helper class to collect DB statistics periodically at a period specified in
* constructor. Callback function (provided in constructor) is called with
* every statistics collection.
*
* Caller should call start() to start statistics collection. Shutdown() should
* be called to stop stats collection and should be called before statistics (
* provided in constructor) reference has been disposed.
*/
public class StatisticsCollector {
private final Statistics _statistics;
private final ThreadPoolExecutor _threadPoolExecutor;
private final int _statsCollectionInterval;
private final StatisticsCollectorCallback _statsCallback;
private volatile boolean _isRunning = true;
/**
* Constructor for statistics collector.
* @param statistics Reference of DB statistics.
* @param statsCollectionIntervalInMilliSeconds Statistics collection time
* period (specified in milliseconds)
* @param statsCallback Reference of statistics callback interface.
*/
public StatisticsCollector(Statistics statistics,
int statsCollectionIntervalInMilliSeconds,
StatisticsCollectorCallback statsCallback) {
_statistics = statistics;
_statsCollectionInterval = statsCollectionIntervalInMilliSeconds;
_statsCallback = statsCallback;
_threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1));
}
public void start() {
_threadPoolExecutor.submit(collectStatistics());
}
public void shutDown() throws InterruptedException {
_isRunning = false;
_threadPoolExecutor.shutdown();
// Wait for collectStatistics runnable to finish so that disposal of
// statistics does not cause any exceptions to be thrown.
_threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
private Runnable collectStatistics() {
return new Runnable() {
@Override
public void run() {
while (_isRunning) {
try {
// Collect ticker data
for(TickerType ticker : TickerType.values()) {
long tickerValue = _statistics.getTickerCount(ticker);
_statsCallback.tickerCallback(ticker, tickerValue);
}
// Collect histogram data
for(HistogramType histogramType : HistogramType.values()) {
HistogramData histogramData =
_statistics.geHistogramData(histogramType);
_statsCallback.histogramCallback(histogramType, histogramData);
}
Thread.sleep(_statsCollectionInterval);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread got interrupted!", e);
}
catch (Exception e) {
throw new RuntimeException("Error while calculating statistics", e);
}
}
}
};
}
}

@ -0,0 +1,34 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* Callback interface provided to StatisticsCollector.
*
* Thread safety:
* StatisticsCollector doesn't make any guarantees about thread safety.
* If the same reference of StatisticsCollectorCallback is passed to multiple
* StatisticsCollector references, then its the responsibility of the
* user to make StatisticsCollectorCallback' implementation thread-safe.
*
* @param tickerType
* @param tickerCount
*/
public interface StatisticsCollectorCallback {
/**
* Callback function to get ticker values.
* @param tickerType Ticker type.
* @param tickerCount Value of ticker type.
*/
void tickerCallback(TickerType tickerType, long tickerCount);
/**
* Callback function to get histogram values.
* @param histType Histogram type.
* @param histData Histogram data.
*/
void histogramCallback(HistogramType histType, HistogramData histData);
}

@ -0,0 +1,40 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.rocksdb.*;
public class StatisticsCollectorTest {
static final String db_path = "/tmp/backupablejni_db";
static {
RocksDB.loadLibrary();
}
public static void main(String[] args)
throws InterruptedException, RocksDBException {
Options opt = new Options().createStatistics().setCreateIfMissing(true);
Statistics stats = opt.statisticsPtr();
RocksDB db = RocksDB.open(db_path);
StatsCallbackMock callback = new StatsCallbackMock();
StatisticsCollector statsCollector = new StatisticsCollector(stats, 100,
callback);
statsCollector.start();
Thread.sleep(1000);
assert(callback.tickerCallbackCount > 0);
assert(callback.histCallbackCount > 0);
statsCollector.shutDown();
db.close();
opt.dispose();
System.out.println("Stats collector test passed.!");
}
}

@ -0,0 +1,22 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.rocksdb.*;
public class StatsCallbackMock implements StatisticsCollectorCallback {
public int tickerCallbackCount = 0;
public int histCallbackCount = 0;
public void tickerCallback(TickerType tickerType, long tickerCount) {
tickerCallbackCount++;
}
public void histogramCallback(HistogramType histType,
HistogramData histData) {
histCallbackCount++;
}
}
Loading…
Cancel
Save