[Java] Optimize statistics collector, improve object dependency in RocksObjects

Summary:
This diff merges pull request 208.  Contributor: ankgup87

[Java] Optimize statistics collector
* Optimize statistics collector by collecting statistics of multiple DBs in a single thread rather than starting up a new thread for each DB.
* Also, fix packaging of jnilib file on OS_X platform.
* Diff review: https://reviews.facebook.net/D20265

[Java] Add documentation on interdependency of dispose call of RocksObjects
* Remove transferCppRawPointersOwnershipFrom function.
  - This function was setting opt.filter_ and thus filter_ to be null. This way there is no
    one holding reference for filter object and can thus be GC'd which is not the intention.
    Replaced it with storeOptionsInstace which stores options instance. Options class
    internally holds Filter instance. Thus when Options is GC'd, filter reference
    will be GC'd automatically.
* Added documentation explaining interdependency of Filter, Options and DB.
* Diff review: https://reviews.facebook.net/D20379

Test Plan:
described in their diff reviews

Reviewers:  haobo sdong swapnilghike zzbennett rsumbaly yhchiang

Reviewed by: yhchiang
main
Yueh-Hsuan Chiang 11 years ago
commit 250f035782
  1. 11
      java/org/rocksdb/Options.java
  2. 30
      java/org/rocksdb/RocksDB.java
  3. 47
      java/org/rocksdb/StatisticsCollector.java
  4. 2
      java/org/rocksdb/StatisticsCollectorCallback.java
  5. 35
      java/org/rocksdb/StatsCollectorInput.java
  6. 7
      java/org/rocksdb/test/StatisticsCollectorTest.java
  7. 114
      table/cuckoo_table_builder_test.cc

@ -9,8 +9,8 @@ package org.rocksdb;
* Options to control the behavior of a database. It will be used
* during the creation of a RocksDB (i.e., RocksDB.open()).
*
* Note that dispose() must be called before an Options instance
* become out-of-scope to release the allocated memory in c++.
* If dispose() function is not called, then it will be GC'd automatically and
* native resources will be released as part of the process.
*/
public class Options extends RocksObject {
static final long DEFAULT_CACHE_SIZE = 8 << 20;
@ -168,8 +168,11 @@ public class Options extends RocksObject {
/**
* Use the specified filter policy to reduce disk reads.
*
* Note that the caller should not dispose the input filter as
* Options.dispose() will dispose this filter.
* Filter should not be disposed before options instances using this filter is
* disposed. If dispose() function is not called, then filter object will be
* GC'd automatically.
*
* Filter instance can be re-used in multiple options instances.
*
* @param Filter policy java instance.
* @return the instance of the current Options.

@ -99,6 +99,15 @@ public class RocksDB extends RocksObject {
/**
* The factory constructor of RocksDB that opens a RocksDB instance given
* the path to the database using the specified options and db path.
*
* Options instance *should* not be disposed before all DBs using this options
* instance have been closed. If user doesn't call options dispose explicitly,
* then this options instance will be GC'd automatically.
*
* Options instance can be re-used to open multiple DBs if DB statistics is
* not used. If DB statistics are required, then its recommended to open DB
* with new Options instance as underlying native statistics instance does not
* use any locks to prevent concurrent updates.
*/
public static RocksDB open(Options options, String path)
throws RocksDBException {
@ -108,10 +117,16 @@ public class RocksDB extends RocksObject {
RocksDB db = new RocksDB();
db.open(options.nativeHandle_, options.cacheSize_,
options.numShardBits_, path);
db.transferCppRawPointersOwnershipFrom(options);
db.storeOptionsInstance(options);
return db;
}
private void storeOptionsInstance(Options options) {
options_ = options;
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
@ -318,17 +333,6 @@ public class RocksDB extends RocksObject {
super();
}
/**
* Transfer the ownership of all c++ raw-pointers from Options
* to RocksDB to ensure the life-time of those raw-pointers
* will be at least as long as the life-time of any RocksDB
* that uses these raw-pointers.
*/
protected void transferCppRawPointersOwnershipFrom(Options opt) {
filter_ = opt.filter_;
opt.filter_ = null;
}
// native methods
protected native void open(
long optionsHandle, long cacheSize, int numShardBits,
@ -365,5 +369,5 @@ public class RocksDB extends RocksObject {
protected native long iterator0(long optHandle);
private native void disposeInternal(long handle);
protected Filter filter_;
protected Options options_;
}

@ -5,8 +5,10 @@
package org.rocksdb;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -20,41 +22,37 @@ import java.util.concurrent.atomic.AtomicBoolean;
* provided in constructor) reference has been disposed.
*/
public class StatisticsCollector {
private final Statistics _statistics;
private final ThreadPoolExecutor _threadPoolExecutor;
private final List<StatsCollectorInput> _statsCollectorInputList;
private final ExecutorService _executorService;
private final int _statsCollectionInterval;
private final StatisticsCollectorCallback _statsCallback;
private volatile boolean _isRunning = true;
/**
* Constructor for statistics collector.
* @param statistics Reference of DB statistics.
*
* @param statsCollectorInputList List of statistics collector input.
* @param statsCollectionIntervalInMilliSeconds Statistics collection time
* period (specified in milliseconds)
* @param statsCallback Reference of statistics callback interface.
* period (specified in milliseconds).
*/
public StatisticsCollector(Statistics statistics,
int statsCollectionIntervalInMilliSeconds,
StatisticsCollectorCallback statsCallback) {
_statistics = statistics;
public StatisticsCollector(List<StatsCollectorInput> statsCollectorInputList,
int statsCollectionIntervalInMilliSeconds) {
_statsCollectorInputList = statsCollectorInputList;
_statsCollectionInterval = statsCollectionIntervalInMilliSeconds;
_statsCallback = statsCallback;
_threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1));
_executorService = Executors.newSingleThreadExecutor();
}
public void start() {
_threadPoolExecutor.submit(collectStatistics());
_executorService.submit(collectStatistics());
}
public void shutDown() throws InterruptedException {
_isRunning = false;
_threadPoolExecutor.shutdown();
_executorService.shutdown();
// Wait for collectStatistics runnable to finish so that disposal of
// statistics does not cause any exceptions to be thrown.
_threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
_executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
private Runnable collectStatistics() {
@ -64,21 +62,28 @@ public class StatisticsCollector {
public void run() {
while (_isRunning) {
try {
for(StatsCollectorInput statsCollectorInput :
_statsCollectorInputList) {
Statistics statistics = statsCollectorInput.getStatistics();
StatisticsCollectorCallback statsCallback =
statsCollectorInput.getCallback();
// Collect ticker data
for(TickerType ticker : TickerType.values()) {
long tickerValue = _statistics.getTickerCount(ticker);
_statsCallback.tickerCallback(ticker, tickerValue);
long tickerValue = statistics.getTickerCount(ticker);
statsCallback.tickerCallback(ticker, tickerValue);
}
// Collect histogram data
for(HistogramType histogramType : HistogramType.values()) {
HistogramData histogramData =
_statistics.geHistogramData(histogramType);
_statsCallback.histogramCallback(histogramType, histogramData);
statistics.geHistogramData(histogramType);
statsCallback.histogramCallback(histogramType, histogramData);
}
Thread.sleep(_statsCollectionInterval);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread got interrupted!", e);

@ -12,7 +12,7 @@ package org.rocksdb;
* StatisticsCollector doesn't make any guarantees about thread safety.
* If the same reference of StatisticsCollectorCallback is passed to multiple
* StatisticsCollector references, then its the responsibility of the
* user to make StatisticsCollectorCallback' implementation thread-safe.
* user to make StatisticsCollectorCallback's implementation thread-safe.
*
* @param tickerType
* @param tickerCount

@ -0,0 +1,35 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* Contains all information necessary to collect statistics from one instance
* of DB statistics.
*/
public class StatsCollectorInput {
private final Statistics _statistics;
private final StatisticsCollectorCallback _statsCallback;
/**
* Constructor for StatsCollectorInput.
*
* @param statistics Reference of DB statistics.
* @param statsCallback Reference of statistics callback interface.
*/
public StatsCollectorInput(Statistics statistics,
StatisticsCollectorCallback statsCallback) {
_statistics = statistics;
_statsCallback = statsCallback;
}
public Statistics getStatistics() {
return _statistics;
}
public StatisticsCollectorCallback getCallback() {
return _statsCallback;
}
}

@ -5,6 +5,7 @@
package org.rocksdb.test;
import java.util.Collections;
import org.rocksdb.*;
public class StatisticsCollectorTest {
@ -21,8 +22,10 @@ public class StatisticsCollectorTest {
RocksDB db = RocksDB.open(db_path);
StatsCallbackMock callback = new StatsCallbackMock();
StatisticsCollector statsCollector = new StatisticsCollector(stats, 100,
callback);
StatsCollectorInput statsInput = new StatsCollectorInput(stats, callback);
StatisticsCollector statsCollector = new StatisticsCollector(
Collections.singletonList(statsInput), 100);
statsCollector.start();
Thread.sleep(1000);

@ -67,6 +67,7 @@ class CuckooBuilderTest {
CuckooTablePropertyNames::kMaxNumBuckets]);
GetVarint32(&max_buckets_slice, &max_buckets);
ASSERT_EQ(expected_max_buckets, max_buckets);
delete props;
// Check contents of the bucket.
std::string read_data;
read_data.resize(expected_data.size());
@ -92,7 +93,6 @@ class CuckooBuilderTest {
unsigned int expected_max_buckets;
};
TEST(CuckooBuilderTest, NoCollision) {
hash_map.clear();
num_items = 20;
@ -122,25 +122,25 @@ TEST(CuckooBuilderTest, NoCollision) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/BasicTest_writable_file";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), ikey_length,
value_length, hash_table_ratio,
file_size, num_hash_fun, 100, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
ASSERT_OK(cuckoo_builder.status());
unsigned int key_idx = 0;
std::string expected_file_data = "";
for (unsigned int i = 0; i < expected_max_buckets; i++) {
if (key_idx * num_hash_fun == i && key_idx < num_items) {
cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1);
ASSERT_OK(cuckoo_builder->status());
cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1);
ASSERT_OK(cuckoo_builder.status());
expected_file_data.append(keys[key_idx] + values[key_idx]);
++key_idx;
} else {
expected_file_data.append(expected_unused_bucket);
}
}
ASSERT_OK(cuckoo_builder->Finish());
ASSERT_OK(cuckoo_builder.Finish());
writable_file->Close();
CheckFileContents(expected_file_data);
}
@ -171,25 +171,25 @@ TEST(CuckooBuilderTest, NoCollisionLastLevel) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/NoCollisionLastLevel_writable_file";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), key_length,
value_length, hash_table_ratio,
file_size, num_hash_fun, 100, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
ASSERT_OK(cuckoo_builder.status());
unsigned int key_idx = 0;
std::string expected_file_data = "";
for (unsigned int i = 0; i < expected_max_buckets; i++) {
if (key_idx * num_hash_fun == i && key_idx < num_items) {
cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1);
ASSERT_OK(cuckoo_builder->status());
cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1);
ASSERT_OK(cuckoo_builder.status());
expected_file_data.append(user_keys[key_idx] + values[key_idx]);
++key_idx;
} else {
expected_file_data.append(expected_unused_bucket);
}
}
ASSERT_OK(cuckoo_builder->Finish());
ASSERT_OK(cuckoo_builder.Finish());
writable_file->Close();
CheckFileContents(expected_file_data);
}
@ -222,24 +222,24 @@ TEST(CuckooBuilderTest, WithCollision) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/WithCollision_writable_file";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), key_length, value_length, hash_table_ratio,
file_size, num_hash_fun, 100, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
ASSERT_OK(cuckoo_builder.status());
unsigned int key_idx = 0;
std::string expected_file_data = "";
for (unsigned int i = 0; i < expected_max_buckets; i++) {
if (key_idx == i && key_idx < num_items) {
cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1);
ASSERT_OK(cuckoo_builder->status());
cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1);
ASSERT_OK(cuckoo_builder.status());
expected_file_data.append(keys[key_idx] + values[key_idx]);
++key_idx;
} else {
expected_file_data.append(expected_unused_bucket);
}
}
ASSERT_OK(cuckoo_builder->Finish());
ASSERT_OK(cuckoo_builder.Finish());
writable_file->Close();
CheckFileContents(expected_file_data);
}
@ -266,19 +266,19 @@ TEST(CuckooBuilderTest, FailWithTooManyCollisions) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/FailWithTooManyCollisions_writable";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), ikey_length,
value_length, hash_table_ratio, file_size, num_hash_fun,
100, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
ASSERT_OK(cuckoo_builder.status());
for (unsigned int key_idx = 0; key_idx < num_items-1; key_idx++) {
cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder->status());
ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1);
cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder.status());
ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1);
}
cuckoo_builder->Add(Slice(keys.back()), Slice(values.back()));
ASSERT_TRUE(cuckoo_builder->status().IsCorruption());
cuckoo_builder->Abandon();
cuckoo_builder.Add(Slice(keys.back()), Slice(values.back()));
ASSERT_TRUE(cuckoo_builder.status().IsCorruption());
cuckoo_builder.Abandon();
writable_file->Close();
}
@ -297,17 +297,17 @@ TEST(CuckooBuilderTest, FailWhenSameKeyInserted) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/FailWhenSameKeyInserted_writable_file";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), ikey_length,
value_length, hash_table_ratio, file_size, num_hash_fun,
100, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
cuckoo_builder->Add(Slice(key_to_reuse1), Slice(value));
ASSERT_OK(cuckoo_builder->status());
ASSERT_EQ(cuckoo_builder->NumEntries(), 1);
cuckoo_builder->Add(Slice(key_to_reuse2), Slice(value));
ASSERT_TRUE(cuckoo_builder->status().IsCorruption());
cuckoo_builder->Abandon();
ASSERT_OK(cuckoo_builder.status());
cuckoo_builder.Add(Slice(key_to_reuse1), Slice(value));
ASSERT_OK(cuckoo_builder.status());
ASSERT_EQ(cuckoo_builder.NumEntries(), 1U);
cuckoo_builder.Add(Slice(key_to_reuse2), Slice(value));
ASSERT_TRUE(cuckoo_builder.status().IsCorruption());
cuckoo_builder.Abandon();
writable_file->Close();
}
@ -359,19 +359,19 @@ TEST(CuckooBuilderTest, WithACollisionPath) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/WithCollisionPath_writable_file";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), key_length,
value_length, hash_table_ratio, file_size,
num_hash_fun, max_search_depth, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
ASSERT_OK(cuckoo_builder.status());
for (unsigned int key_idx = 0; key_idx < num_items; key_idx++) {
cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder->status());
ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1);
cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder.status());
ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1);
expected_file_data.replace(expected_bucket_id[key_idx]*bucket_length,
bucket_length, keys[key_idx] + values[key_idx]);
}
ASSERT_OK(cuckoo_builder->Finish());
ASSERT_OK(cuckoo_builder.Finish());
writable_file->Close();
CheckFileContents(expected_file_data);
}
@ -407,19 +407,19 @@ TEST(CuckooBuilderTest, FailWhenCollisionPathTooLong) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/FailWhenCollisionPathTooLong_writable";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), ikey_length,
value_length, hash_table_ratio, file_size, num_hash_fun,
max_search_depth, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
ASSERT_OK(cuckoo_builder.status());
for (unsigned int key_idx = 0; key_idx < num_items-1; key_idx++) {
cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder->status());
ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1);
cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder.status());
ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1);
}
cuckoo_builder->Add(Slice(keys.back()), Slice(values.back()));
ASSERT_TRUE(cuckoo_builder->status().IsCorruption());
cuckoo_builder->Abandon();
cuckoo_builder.Add(Slice(keys.back()), Slice(values.back()));
ASSERT_TRUE(cuckoo_builder.status().IsCorruption());
cuckoo_builder.Abandon();
writable_file->Close();
}
@ -448,19 +448,19 @@ TEST(CuckooBuilderTest, FailWhenTableIsFull) {
unique_ptr<WritableFile> writable_file;
fname = test::TmpDir() + "/FailWhenTabelIsFull_writable";
ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_));
CuckooTableBuilder* cuckoo_builder = new CuckooTableBuilder(
CuckooTableBuilder cuckoo_builder(
writable_file.get(), ikey_length,
value_length, hash_table_ratio, file_size, num_hash_fun,
100, GetSliceHash);
ASSERT_OK(cuckoo_builder->status());
ASSERT_OK(cuckoo_builder.status());
for (unsigned int key_idx = 0; key_idx < num_items-1; key_idx++) {
cuckoo_builder->Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder->status());
ASSERT_EQ(cuckoo_builder->NumEntries(), key_idx + 1);
cuckoo_builder.Add(Slice(keys[key_idx]), Slice(values[key_idx]));
ASSERT_OK(cuckoo_builder.status());
ASSERT_EQ(cuckoo_builder.NumEntries(), key_idx + 1);
}
cuckoo_builder->Add(Slice(keys.back()), Slice(values.back()));
ASSERT_TRUE(cuckoo_builder->status().IsCorruption());
cuckoo_builder->Abandon();
cuckoo_builder.Add(Slice(keys.back()), Slice(values.back()));
ASSERT_TRUE(cuckoo_builder.status().IsCorruption());
cuckoo_builder.Abandon();
writable_file->Close();
}
} // namespace rocksdb

Loading…
Cancel
Save