diff --git a/java/Makefile b/java/Makefile index d124689f6..95667a2e8 100644 --- a/java/Makefile +++ b/java/Makefile @@ -1,4 +1,4 @@ -NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB +NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv NATIVE_INCLUDE = ./include ROCKSDB_JAR = rocksdbjni.jar diff --git a/java/org/rocksdb/Options.java b/java/org/rocksdb/Options.java index af4b82fab..0bf7a9aa9 100644 --- a/java/org/rocksdb/Options.java +++ b/java/org/rocksdb/Options.java @@ -24,6 +24,7 @@ public class Options extends RocksObject { super(); cacheSize_ = DEFAULT_CACHE_SIZE; newOptions(); + env_ = RocksEnv.getDefault(); } /** @@ -42,6 +43,24 @@ public class Options extends RocksObject { return this; } + /** + * Use the specified object to interact with the environment, + * e.g. to read/write files, schedule background work, etc. + * Default: RocksEnv.getDefault() + */ + public Options setEnv(RocksEnv env) { + assert(isInitialized()); + setEnv(nativeHandle_, env.nativeHandle_); + env_ = env; + return this; + } + private native void setEnv(long optHandle, long envHandle); + + public RocksEnv getEnv() { + return env_; + } + private native long getEnvHandle(long handle); + /** * Return true if the create_if_missing flag is set to true. * If true, the database will be created if it is missing. @@ -2351,4 +2370,5 @@ public class Options extends RocksObject { long cacheSize_; Filter filter_; + RocksEnv env_; } diff --git a/java/org/rocksdb/RocksEnv.java b/java/org/rocksdb/RocksEnv.java new file mode 100644 index 000000000..a9b1add4c --- /dev/null +++ b/java/org/rocksdb/RocksEnv.java @@ -0,0 +1,98 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * A RocksEnv is an interface used by the rocksdb implementation to access + * operating system functionality like the filesystem etc. + * + * All Env implementations are safe for concurrent access from + * multiple threads without any external synchronization. + */ +public class RocksEnv extends RocksObject { + public static final int FLUSH_POOL = 0; + public static final int COMPACTION_POOL = 1; + + static { + default_env_ = new RocksEnv(getDefaultEnvInternal()); + } + private static native long getDefaultEnvInternal(); + + /** + * Returns a default environment suitable for the current operating + * system. + * + * The result of getDefault() belongs to rocksdb c++, and calling its + * dispose() will be no-op. + */ + public static RocksEnv getDefault() { + return default_env_; + } + + /** + * Sets the number of background worker threads of the flush pool + * for this environment. + * default number: 1 + */ + public RocksEnv setBackgroundThreads(int num) { + return setBackgroundThreads(num, FLUSH_POOL); + } + + /** + * Sets the number of background worker threads of the specified thread + * pool for this environment. + * + * @param num the number of threads + * @param poolID the id to specified a thread pool. Should be either + * FLUSH_POOL or COMPACTION_POOL. + * Default number: 1 + */ + public RocksEnv setBackgroundThreads(int num, int poolID) { + setBackgroundThreads(nativeHandle_, num, poolID); + return this; + } + private native void setBackgroundThreads( + long handle, int num, int priority); + + /** + * Returns the length of the queue associated with the specified + * thread pool. + * + * @param poolID the id to specified a thread pool. Should be either + * FLUSH_POOL or COMPACTION_POOL. + */ + public int getThreadPoolQueueLen(int poolID) { + return getThreadPoolQueueLen(nativeHandle_, poolID); + } + private native int getThreadPoolQueueLen(long handle, int poolID); + + /** + * Package-private constructor that uses the specified native handle + * to construct a RocksEnv. Note that the newly created RocksEnv + * will not take the ownership of the input handle. + */ + RocksEnv(long handle) { + super(); + nativeHandle_ = handle; + disOwnNativeHandle(); + } + + /** + * The helper function of dispose() which all subclasses of RocksObject + * must implement to release their associated C++ resource. + */ + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + private native void disposeInternal(long handle); + + /** + * The static default RocksEnv. The ownership of its native handle + * belongs to rocksdb c++ and is not able to be released on the Java + * side. + */ + static RocksEnv default_env_; +} diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java index c34ae9b0a..5ad35a98a 100644 --- a/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -526,6 +526,8 @@ public class DbBenchmark { (Integer)flags_.get(Flag.max_write_buffer_number)); options.setMaxBackgroundCompactions( (Integer)flags_.get(Flag.max_background_compactions)); + options.getEnv().setBackgroundThreads( + (Integer)flags_.get(Flag.max_background_compactions)); options.setMaxBackgroundFlushes( (Integer)flags_.get(Flag.max_background_flushes)); options.setCacheSize( diff --git a/java/rocksjni/env.cc b/java/rocksjni/env.cc new file mode 100644 index 000000000..3aed9f5a0 --- /dev/null +++ b/java/rocksjni/env.cc @@ -0,0 +1,66 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::Env methods from Java side. + +#include "include/org_rocksdb_RocksEnv.h" +#include "rocksdb/env.h" + +/* + * Class: org_rocksdb_RocksEnv + * Method: getDefaultEnvInternal + * Signature: ()J + */ +jlong Java_org_rocksdb_RocksEnv_getDefaultEnvInternal( + JNIEnv* env, jclass jclass) { + return reinterpret_cast(rocksdb::Env::Default()); +} + +/* + * Class: org_rocksdb_RocksEnv + * Method: setBackgroundThreads + * Signature: (JII)V + */ +void Java_org_rocksdb_RocksEnv_setBackgroundThreads( + JNIEnv* env, jobject jobj, jlong jhandle, + jint num, jint priority) { + auto* rocks_env = reinterpret_cast(jhandle); + switch (priority) { + case org_rocksdb_RocksEnv_FLUSH_POOL: + rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::LOW); + break; + case org_rocksdb_RocksEnv_COMPACTION_POOL: + rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::HIGH); + break; + } +} + +/* + * Class: org_rocksdb_RocksEnv + * Method: getThreadPoolQueueLen + * Signature: (JI)I + */ +jint Java_org_rocksdb_RocksEnv_getThreadPoolQueueLen( + JNIEnv* env, jobject jobj, jlong jhandle, jint pool_id) { + auto* rocks_env = reinterpret_cast(jhandle); + switch (pool_id) { + case org_rocksdb_RocksEnv_FLUSH_POOL: + return rocks_env->GetThreadPoolQueueLen(rocksdb::Env::Priority::LOW); + case org_rocksdb_RocksEnv_COMPACTION_POOL: + return rocks_env->GetThreadPoolQueueLen(rocksdb::Env::Priority::HIGH); + } + return 0; +} + +/* + * Class: org_rocksdb_RocksEnv + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_RocksEnv_disposeInternal( + JNIEnv* env, jobject jobj, jlong jhandle) { + delete reinterpret_cast(jhandle); +} diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 697bd0cef..3e303091f 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -29,10 +29,6 @@ void Java_org_rocksdb_RocksDB_open( JNIEnv* env, jobject jdb, jlong jopt_handle, jlong jcache_size, jstring jdb_path) { auto opt = reinterpret_cast(jopt_handle); - // TODO(yhchiang): should be removed once Java binding for Env is ready. - if (opt->max_background_compactions > 1) { - opt->env->SetBackgroundThreads(opt->max_background_compactions); - } if (jcache_size > 0) { opt->no_block_cache = false; opt->block_cache = rocksdb::NewLRUCache(jcache_size);