diff --git a/java/org/rocksdb/Options.java b/java/org/rocksdb/Options.java index c74369515..e0e7e3e59 100644 --- a/java/org/rocksdb/Options.java +++ b/java/org/rocksdb/Options.java @@ -9,8 +9,8 @@ package org.rocksdb; * Options to control the behavior of a database. It will be used * during the creation of a RocksDB (i.e., RocksDB.open()). * - * Note that dispose() must be called before an Options instance - * become out-of-scope to release the allocated memory in c++. + * If dispose() function is not called, then it will be GC'd automatically and + * native resources will be released as part of the process. */ public class Options extends RocksObject { static final long DEFAULT_CACHE_SIZE = 8 << 20; @@ -168,8 +168,11 @@ public class Options extends RocksObject { /** * Use the specified filter policy to reduce disk reads. * - * Note that the caller should not dispose the input filter as - * Options.dispose() will dispose this filter. + * Filter should not be disposed before options instances using this filter is + * disposed. If dispose() function is not called, then filter object will be + * GC'd automatically. + * + * Filter instance can be re-used in multiple options instances. * * @param Filter policy java instance. * @return the instance of the current Options. diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 7c4b13a25..9fca2df43 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -99,6 +99,15 @@ public class RocksDB extends RocksObject { /** * The factory constructor of RocksDB that opens a RocksDB instance given * the path to the database using the specified options and db path. + * + * Options instance *should* not be disposed before all DBs using this options + * instance have been closed. If user doesn't call options dispose explicitly, + * then this options instance will be GC'd automatically. + * + * Options instance can be re-used to open multiple DBs if DB statistics is + * not used. If DB statistics are required, then its recommended to open DB + * with new Options instance as underlying native statistics instance does not + * use any locks to prevent concurrent updates. */ public static RocksDB open(Options options, String path) throws RocksDBException { @@ -108,9 +117,15 @@ public class RocksDB extends RocksObject { RocksDB db = new RocksDB(); db.open(options.nativeHandle_, options.cacheSize_, options.numShardBits_, path); - db.transferCppRawPointersOwnershipFrom(options); + + db.storeOptionsInstance(options); + return db; } + + private void storeOptionsInstance(Options options) { + options_ = options; + } @Override protected void disposeInternal() { assert(isInitialized()); @@ -318,17 +333,6 @@ public class RocksDB extends RocksObject { super(); } - /** - * Transfer the ownership of all c++ raw-pointers from Options - * to RocksDB to ensure the life-time of those raw-pointers - * will be at least as long as the life-time of any RocksDB - * that uses these raw-pointers. - */ - protected void transferCppRawPointersOwnershipFrom(Options opt) { - filter_ = opt.filter_; - opt.filter_ = null; - } - // native methods protected native void open( long optionsHandle, long cacheSize, int numShardBits, @@ -365,5 +369,5 @@ public class RocksDB extends RocksObject { protected native long iterator0(long optHandle); private native void disposeInternal(long handle); - protected Filter filter_; + protected Options options_; } diff --git a/java/org/rocksdb/StatisticsCollector.java b/java/org/rocksdb/StatisticsCollector.java index 059e63cb5..0aa03736d 100644 --- a/java/org/rocksdb/StatisticsCollector.java +++ b/java/org/rocksdb/StatisticsCollector.java @@ -5,8 +5,10 @@ package org.rocksdb; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -20,41 +22,37 @@ import java.util.concurrent.atomic.AtomicBoolean; * provided in constructor) reference has been disposed. */ public class StatisticsCollector { - private final Statistics _statistics; - private final ThreadPoolExecutor _threadPoolExecutor; + private final List _statsCollectorInputList; + private final ExecutorService _executorService; private final int _statsCollectionInterval; - private final StatisticsCollectorCallback _statsCallback; private volatile boolean _isRunning = true; /** * Constructor for statistics collector. - * @param statistics Reference of DB statistics. + * + * @param statsCollectorInputList List of statistics collector input. * @param statsCollectionIntervalInMilliSeconds Statistics collection time - * period (specified in milliseconds) - * @param statsCallback Reference of statistics callback interface. + * period (specified in milliseconds). */ - public StatisticsCollector(Statistics statistics, - int statsCollectionIntervalInMilliSeconds, - StatisticsCollectorCallback statsCallback) { - _statistics = statistics; + public StatisticsCollector(List statsCollectorInputList, + int statsCollectionIntervalInMilliSeconds) { + _statsCollectorInputList = statsCollectorInputList; _statsCollectionInterval = statsCollectionIntervalInMilliSeconds; - _statsCallback = statsCallback; - _threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, - new ArrayBlockingQueue(1)); + _executorService = Executors.newSingleThreadExecutor(); } public void start() { - _threadPoolExecutor.submit(collectStatistics()); + _executorService.submit(collectStatistics()); } public void shutDown() throws InterruptedException { _isRunning = false; - _threadPoolExecutor.shutdown(); + _executorService.shutdown(); // Wait for collectStatistics runnable to finish so that disposal of // statistics does not cause any exceptions to be thrown. - _threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + _executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } private Runnable collectStatistics() { @@ -64,20 +62,27 @@ public class StatisticsCollector { public void run() { while (_isRunning) { try { - // Collect ticker data - for(TickerType ticker : TickerType.values()) { - long tickerValue = _statistics.getTickerCount(ticker); - _statsCallback.tickerCallback(ticker, tickerValue); - } + for(StatsCollectorInput statsCollectorInput : + _statsCollectorInputList) { + Statistics statistics = statsCollectorInput.getStatistics(); + StatisticsCollectorCallback statsCallback = + statsCollectorInput.getCallback(); + + // Collect ticker data + for(TickerType ticker : TickerType.values()) { + long tickerValue = statistics.getTickerCount(ticker); + statsCallback.tickerCallback(ticker, tickerValue); + } - // Collect histogram data - for(HistogramType histogramType : HistogramType.values()) { - HistogramData histogramData = - _statistics.geHistogramData(histogramType); - _statsCallback.histogramCallback(histogramType, histogramData); - } + // Collect histogram data + for(HistogramType histogramType : HistogramType.values()) { + HistogramData histogramData = + statistics.geHistogramData(histogramType); + statsCallback.histogramCallback(histogramType, histogramData); + } - Thread.sleep(_statsCollectionInterval); + Thread.sleep(_statsCollectionInterval); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/java/org/rocksdb/StatisticsCollectorCallback.java b/java/org/rocksdb/StatisticsCollectorCallback.java index d50b109d2..a955ec216 100644 --- a/java/org/rocksdb/StatisticsCollectorCallback.java +++ b/java/org/rocksdb/StatisticsCollectorCallback.java @@ -12,7 +12,7 @@ package org.rocksdb; * StatisticsCollector doesn't make any guarantees about thread safety. * If the same reference of StatisticsCollectorCallback is passed to multiple * StatisticsCollector references, then its the responsibility of the - * user to make StatisticsCollectorCallback' implementation thread-safe. + * user to make StatisticsCollectorCallback's implementation thread-safe. * * @param tickerType * @param tickerCount diff --git a/java/org/rocksdb/StatsCollectorInput.java b/java/org/rocksdb/StatsCollectorInput.java new file mode 100644 index 000000000..a1aa928d3 --- /dev/null +++ b/java/org/rocksdb/StatsCollectorInput.java @@ -0,0 +1,35 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * Contains all information necessary to collect statistics from one instance + * of DB statistics. + */ +public class StatsCollectorInput { + private final Statistics _statistics; + private final StatisticsCollectorCallback _statsCallback; + + /** + * Constructor for StatsCollectorInput. + * + * @param statistics Reference of DB statistics. + * @param statsCallback Reference of statistics callback interface. + */ + public StatsCollectorInput(Statistics statistics, + StatisticsCollectorCallback statsCallback) { + _statistics = statistics; + _statsCallback = statsCallback; + } + + public Statistics getStatistics() { + return _statistics; + } + + public StatisticsCollectorCallback getCallback() { + return _statsCallback; + } +} diff --git a/java/org/rocksdb/test/StatisticsCollectorTest.java b/java/org/rocksdb/test/StatisticsCollectorTest.java index 973a5d383..20ccf8f87 100644 --- a/java/org/rocksdb/test/StatisticsCollectorTest.java +++ b/java/org/rocksdb/test/StatisticsCollectorTest.java @@ -5,6 +5,7 @@ package org.rocksdb.test; +import java.util.Collections; import org.rocksdb.*; public class StatisticsCollectorTest { @@ -21,8 +22,10 @@ public class StatisticsCollectorTest { RocksDB db = RocksDB.open(db_path); StatsCallbackMock callback = new StatsCallbackMock(); - StatisticsCollector statsCollector = new StatisticsCollector(stats, 100, - callback); + StatsCollectorInput statsInput = new StatsCollectorInput(stats, callback); + + StatisticsCollector statsCollector = new StatisticsCollector( + Collections.singletonList(statsInput), 100); statsCollector.start(); Thread.sleep(1000); diff --git a/table/cuckoo_table_builder_test.cc b/table/cuckoo_table_builder_test.cc index f20463e12..9e76e16ee 100644 --- a/table/cuckoo_table_builder_test.cc +++ b/table/cuckoo_table_builder_test.cc @@ -67,6 +67,7 @@ class CuckooBuilderTest { CuckooTablePropertyNames::kMaxNumBuckets]); GetVarint32(&max_buckets_slice, &max_buckets); ASSERT_EQ(expected_max_buckets, max_buckets); + delete props; // Check contents of the bucket. std::string read_data; read_data.resize(expected_data.size()); @@ -92,7 +93,6 @@ class CuckooBuilderTest { unsigned int expected_max_buckets; }; - TEST(CuckooBuilderTest, NoCollision) { hash_map.clear(); num_items = 20; @@ -122,25 +122,25 @@ TEST(CuckooBuilderTest, NoCollision) { unique_ptr writable_file; fname = test::TmpDir() + "/BasicTest_writable_file"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), ikey_length, value_length, hash_table_ratio, file_size, num_hash_fun, 100, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); + ASSERT_OK(cuckoo_builder.status()); unsigned int key_idx = 0; std::string expected_file_data = ""; for (unsigned int i = 0; i < expected_max_buckets; i++) { if (key_idx * num_hash_fun == i && key_idx < num_items) { - cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1); - ASSERT_OK(cuckoo_builder->status()); + cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); + ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1); + ASSERT_OK(cuckoo_builder.status()); expected_file_data.append(keys[key_idx] + values[key_idx]); ++key_idx; } else { expected_file_data.append(expected_unused_bucket); } } - ASSERT_OK(cuckoo_builder->Finish()); + ASSERT_OK(cuckoo_builder.Finish()); writable_file->Close(); CheckFileContents(expected_file_data); } @@ -171,25 +171,25 @@ TEST(CuckooBuilderTest, NoCollisionLastLevel) { unique_ptr writable_file; fname = test::TmpDir() + "/NoCollisionLastLevel_writable_file"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), key_length, value_length, hash_table_ratio, file_size, num_hash_fun, 100, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); + ASSERT_OK(cuckoo_builder.status()); unsigned int key_idx = 0; std::string expected_file_data = ""; for (unsigned int i = 0; i < expected_max_buckets; i++) { if (key_idx * num_hash_fun == i && key_idx < num_items) { - cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1); - ASSERT_OK(cuckoo_builder->status()); + cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); + ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1); + ASSERT_OK(cuckoo_builder.status()); expected_file_data.append(user_keys[key_idx] + values[key_idx]); ++key_idx; } else { expected_file_data.append(expected_unused_bucket); } } - ASSERT_OK(cuckoo_builder->Finish()); + ASSERT_OK(cuckoo_builder.Finish()); writable_file->Close(); CheckFileContents(expected_file_data); } @@ -222,24 +222,24 @@ TEST(CuckooBuilderTest, WithCollision) { unique_ptr writable_file; fname = test::TmpDir() + "/WithCollision_writable_file"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), key_length, value_length, hash_table_ratio, file_size, num_hash_fun, 100, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); + ASSERT_OK(cuckoo_builder.status()); unsigned int key_idx = 0; std::string expected_file_data = ""; for (unsigned int i = 0; i < expected_max_buckets; i++) { if (key_idx == i && key_idx < num_items) { - cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1); - ASSERT_OK(cuckoo_builder->status()); + cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); + ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1); + ASSERT_OK(cuckoo_builder.status()); expected_file_data.append(keys[key_idx] + values[key_idx]); ++key_idx; } else { expected_file_data.append(expected_unused_bucket); } } - ASSERT_OK(cuckoo_builder->Finish()); + ASSERT_OK(cuckoo_builder.Finish()); writable_file->Close(); CheckFileContents(expected_file_data); } @@ -266,19 +266,19 @@ TEST(CuckooBuilderTest, FailWithTooManyCollisions) { unique_ptr writable_file; fname = test::TmpDir() + "/FailWithTooManyCollisions_writable"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), ikey_length, value_length, hash_table_ratio, file_size, num_hash_fun, 100, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); + ASSERT_OK(cuckoo_builder.status()); for (unsigned int key_idx = 0; key_idx < num_items-1; key_idx++) { - cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_OK(cuckoo_builder->status()); - ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1); + cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); + ASSERT_OK(cuckoo_builder.status()); + ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1); } - cuckoo_builder->Add(Slice(keys.back()), Slice(values.back())); - ASSERT_TRUE(cuckoo_builder->status().IsCorruption()); - cuckoo_builder->Abandon(); + cuckoo_builder.Add(Slice(keys.back()), Slice(values.back())); + ASSERT_TRUE(cuckoo_builder.status().IsCorruption()); + cuckoo_builder.Abandon(); writable_file->Close(); } @@ -297,17 +297,17 @@ TEST(CuckooBuilderTest, FailWhenSameKeyInserted) { unique_ptr writable_file; fname = test::TmpDir() + "/FailWhenSameKeyInserted_writable_file"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), ikey_length, value_length, hash_table_ratio, file_size, num_hash_fun, 100, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); - cuckoo_builder->Add(Slice(key_to_reuse1), Slice(value)); - ASSERT_OK(cuckoo_builder->status()); - ASSERT_EQ(cuckoo_builder->NumEntries(), 1); - cuckoo_builder->Add(Slice(key_to_reuse2), Slice(value)); - ASSERT_TRUE(cuckoo_builder->status().IsCorruption()); - cuckoo_builder->Abandon(); + ASSERT_OK(cuckoo_builder.status()); + cuckoo_builder.Add(Slice(key_to_reuse1), Slice(value)); + ASSERT_OK(cuckoo_builder.status()); + ASSERT_EQ(cuckoo_builder.NumEntries(), 1U); + cuckoo_builder.Add(Slice(key_to_reuse2), Slice(value)); + ASSERT_TRUE(cuckoo_builder.status().IsCorruption()); + cuckoo_builder.Abandon(); writable_file->Close(); } @@ -359,19 +359,19 @@ TEST(CuckooBuilderTest, WithACollisionPath) { unique_ptr writable_file; fname = test::TmpDir() + "/WithCollisionPath_writable_file"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), key_length, value_length, hash_table_ratio, file_size, num_hash_fun, max_search_depth, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); + ASSERT_OK(cuckoo_builder.status()); for (unsigned int key_idx = 0; key_idx < num_items; key_idx++) { - cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_OK(cuckoo_builder->status()); - ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1); + cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); + ASSERT_OK(cuckoo_builder.status()); + ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1); expected_file_data.replace(expected_bucket_id[key_idx]*bucket_length, bucket_length, keys[key_idx] + values[key_idx]); } - ASSERT_OK(cuckoo_builder->Finish()); + ASSERT_OK(cuckoo_builder.Finish()); writable_file->Close(); CheckFileContents(expected_file_data); } @@ -407,19 +407,19 @@ TEST(CuckooBuilderTest, FailWhenCollisionPathTooLong) { unique_ptr writable_file; fname = test::TmpDir() + "/FailWhenCollisionPathTooLong_writable"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), ikey_length, value_length, hash_table_ratio, file_size, num_hash_fun, max_search_depth, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); + ASSERT_OK(cuckoo_builder.status()); for (unsigned int key_idx = 0; key_idx < num_items-1; key_idx++) { - cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_OK(cuckoo_builder->status()); - ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1); + cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); + ASSERT_OK(cuckoo_builder.status()); + ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1); } - cuckoo_builder->Add(Slice(keys.back()), Slice(values.back())); - ASSERT_TRUE(cuckoo_builder->status().IsCorruption()); - cuckoo_builder->Abandon(); + cuckoo_builder.Add(Slice(keys.back()), Slice(values.back())); + ASSERT_TRUE(cuckoo_builder.status().IsCorruption()); + cuckoo_builder.Abandon(); writable_file->Close(); } @@ -448,19 +448,19 @@ TEST(CuckooBuilderTest, FailWhenTableIsFull) { unique_ptr writable_file; fname = test::TmpDir() + "/FailWhenTabelIsFull_writable"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); - CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder( + CuckooTableBuilder cuckoo_builder( writable_file.get(), ikey_length, value_length, hash_table_ratio, file_size, num_hash_fun, 100, GetSliceHash); - ASSERT_OK(cuckoo_builder->status()); + ASSERT_OK(cuckoo_builder.status()); for (unsigned int key_idx = 0; key_idx < num_items-1; key_idx++) { - cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx])); - ASSERT_OK(cuckoo_builder->status()); - ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1); + cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); + ASSERT_OK(cuckoo_builder.status()); + ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1); } - cuckoo_builder->Add(Slice(keys.back()), Slice(values.back())); - ASSERT_TRUE(cuckoo_builder->status().IsCorruption()); - cuckoo_builder->Abandon(); + cuckoo_builder.Add(Slice(keys.back()), Slice(values.back())); + ASSERT_TRUE(cuckoo_builder.status().IsCorruption()); + cuckoo_builder.Abandon(); writable_file->Close(); } } // namespace rocksdb