diff --git a/java/org/rocksdb/Options.java b/java/org/rocksdb/Options.java index c74369515..e0e7e3e59 100644 --- a/java/org/rocksdb/Options.java +++ b/java/org/rocksdb/Options.java @@ -9,8 +9,8 @@ package org.rocksdb; * Options to control the behavior of a database. It will be used * during the creation of a RocksDB (i.e., RocksDB.open()). * - * Note that dispose() must be called before an Options instance - * become out-of-scope to release the allocated memory in c++. + * If dispose() function is not called, then it will be GC'd automatically and + * native resources will be released as part of the process. */ public class Options extends RocksObject { static final long DEFAULT_CACHE_SIZE = 8 << 20; @@ -168,8 +168,11 @@ public class Options extends RocksObject { /** * Use the specified filter policy to reduce disk reads. * - * Note that the caller should not dispose the input filter as - * Options.dispose() will dispose this filter. + * Filter should not be disposed before options instances using this filter is + * disposed. If dispose() function is not called, then filter object will be + * GC'd automatically. + * + * Filter instance can be re-used in multiple options instances. * * @param Filter policy java instance. * @return the instance of the current Options. diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 7c4b13a25..9fca2df43 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -99,6 +99,15 @@ public class RocksDB extends RocksObject { /** * The factory constructor of RocksDB that opens a RocksDB instance given * the path to the database using the specified options and db path. + * + * Options instance *should* not be disposed before all DBs using this options + * instance have been closed. If user doesn't call options dispose explicitly, + * then this options instance will be GC'd automatically. + * + * Options instance can be re-used to open multiple DBs if DB statistics is + * not used. If DB statistics are required, then its recommended to open DB + * with new Options instance as underlying native statistics instance does not + * use any locks to prevent concurrent updates. */ public static RocksDB open(Options options, String path) throws RocksDBException { @@ -108,9 +117,15 @@ public class RocksDB extends RocksObject { RocksDB db = new RocksDB(); db.open(options.nativeHandle_, options.cacheSize_, options.numShardBits_, path); - db.transferCppRawPointersOwnershipFrom(options); + + db.storeOptionsInstance(options); + return db; } + + private void storeOptionsInstance(Options options) { + options_ = options; + } @Override protected void disposeInternal() { assert(isInitialized()); @@ -318,17 +333,6 @@ public class RocksDB extends RocksObject { super(); } - /** - * Transfer the ownership of all c++ raw-pointers from Options - * to RocksDB to ensure the life-time of those raw-pointers - * will be at least as long as the life-time of any RocksDB - * that uses these raw-pointers. - */ - protected void transferCppRawPointersOwnershipFrom(Options opt) { - filter_ = opt.filter_; - opt.filter_ = null; - } - // native methods protected native void open( long optionsHandle, long cacheSize, int numShardBits, @@ -365,5 +369,5 @@ public class RocksDB extends RocksObject { protected native long iterator0(long optHandle); private native void disposeInternal(long handle); - protected Filter filter_; + protected Options options_; } diff --git a/java/org/rocksdb/StatisticsCollector.java b/java/org/rocksdb/StatisticsCollector.java index 059e63cb5..0aa03736d 100644 --- a/java/org/rocksdb/StatisticsCollector.java +++ b/java/org/rocksdb/StatisticsCollector.java @@ -5,8 +5,10 @@ package org.rocksdb; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -20,41 +22,37 @@ import java.util.concurrent.atomic.AtomicBoolean; * provided in constructor) reference has been disposed. */ public class StatisticsCollector { - private final Statistics _statistics; - private final ThreadPoolExecutor _threadPoolExecutor; + private final List _statsCollectorInputList; + private final ExecutorService _executorService; private final int _statsCollectionInterval; - private final StatisticsCollectorCallback _statsCallback; private volatile boolean _isRunning = true; /** * Constructor for statistics collector. - * @param statistics Reference of DB statistics. + * + * @param statsCollectorInputList List of statistics collector input. * @param statsCollectionIntervalInMilliSeconds Statistics collection time - * period (specified in milliseconds) - * @param statsCallback Reference of statistics callback interface. + * period (specified in milliseconds). */ - public StatisticsCollector(Statistics statistics, - int statsCollectionIntervalInMilliSeconds, - StatisticsCollectorCallback statsCallback) { - _statistics = statistics; + public StatisticsCollector(List statsCollectorInputList, + int statsCollectionIntervalInMilliSeconds) { + _statsCollectorInputList = statsCollectorInputList; _statsCollectionInterval = statsCollectionIntervalInMilliSeconds; - _statsCallback = statsCallback; - _threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, - new ArrayBlockingQueue(1)); + _executorService = Executors.newSingleThreadExecutor(); } public void start() { - _threadPoolExecutor.submit(collectStatistics()); + _executorService.submit(collectStatistics()); } public void shutDown() throws InterruptedException { _isRunning = false; - _threadPoolExecutor.shutdown(); + _executorService.shutdown(); // Wait for collectStatistics runnable to finish so that disposal of // statistics does not cause any exceptions to be thrown. - _threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + _executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } private Runnable collectStatistics() { @@ -64,20 +62,27 @@ public class StatisticsCollector { public void run() { while (_isRunning) { try { - // Collect ticker data - for(TickerType ticker : TickerType.values()) { - long tickerValue = _statistics.getTickerCount(ticker); - _statsCallback.tickerCallback(ticker, tickerValue); - } + for(StatsCollectorInput statsCollectorInput : + _statsCollectorInputList) { + Statistics statistics = statsCollectorInput.getStatistics(); + StatisticsCollectorCallback statsCallback = + statsCollectorInput.getCallback(); + + // Collect ticker data + for(TickerType ticker : TickerType.values()) { + long tickerValue = statistics.getTickerCount(ticker); + statsCallback.tickerCallback(ticker, tickerValue); + } - // Collect histogram data - for(HistogramType histogramType : HistogramType.values()) { - HistogramData histogramData = - _statistics.geHistogramData(histogramType); - _statsCallback.histogramCallback(histogramType, histogramData); - } + // Collect histogram data + for(HistogramType histogramType : HistogramType.values()) { + HistogramData histogramData = + statistics.geHistogramData(histogramType); + statsCallback.histogramCallback(histogramType, histogramData); + } - Thread.sleep(_statsCollectionInterval); + Thread.sleep(_statsCollectionInterval); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/java/org/rocksdb/StatisticsCollectorCallback.java b/java/org/rocksdb/StatisticsCollectorCallback.java index d50b109d2..a955ec216 100644 --- a/java/org/rocksdb/StatisticsCollectorCallback.java +++ b/java/org/rocksdb/StatisticsCollectorCallback.java @@ -12,7 +12,7 @@ package org.rocksdb; * StatisticsCollector doesn't make any guarantees about thread safety. * If the same reference of StatisticsCollectorCallback is passed to multiple * StatisticsCollector references, then its the responsibility of the - * user to make StatisticsCollectorCallback' implementation thread-safe. + * user to make StatisticsCollectorCallback's implementation thread-safe. * * @param tickerType * @param tickerCount diff --git a/java/org/rocksdb/StatsCollectorInput.java b/java/org/rocksdb/StatsCollectorInput.java new file mode 100644 index 000000000..a1aa928d3 --- /dev/null +++ b/java/org/rocksdb/StatsCollectorInput.java @@ -0,0 +1,35 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * Contains all information necessary to collect statistics from one instance + * of DB statistics. + */ +public class StatsCollectorInput { + private final Statistics _statistics; + private final StatisticsCollectorCallback _statsCallback; + + /** + * Constructor for StatsCollectorInput. + * + * @param statistics Reference of DB statistics. + * @param statsCallback Reference of statistics callback interface. + */ + public StatsCollectorInput(Statistics statistics, + StatisticsCollectorCallback statsCallback) { + _statistics = statistics; + _statsCallback = statsCallback; + } + + public Statistics getStatistics() { + return _statistics; + } + + public StatisticsCollectorCallback getCallback() { + return _statsCallback; + } +} diff --git a/java/org/rocksdb/test/StatisticsCollectorTest.java b/java/org/rocksdb/test/StatisticsCollectorTest.java index 973a5d383..20ccf8f87 100644 --- a/java/org/rocksdb/test/StatisticsCollectorTest.java +++ b/java/org/rocksdb/test/StatisticsCollectorTest.java @@ -5,6 +5,7 @@ package org.rocksdb.test; +import java.util.Collections; import org.rocksdb.*; public class StatisticsCollectorTest { @@ -21,8 +22,10 @@ public class StatisticsCollectorTest { RocksDB db = RocksDB.open(db_path); StatsCallbackMock callback = new StatsCallbackMock(); - StatisticsCollector statsCollector = new StatisticsCollector(stats, 100, - callback); + StatsCollectorInput statsInput = new StatsCollectorInput(stats, callback); + + StatisticsCollector statsCollector = new StatisticsCollector( + Collections.singletonList(statsInput), 100); statsCollector.start(); Thread.sleep(1000);