[RocksJava] Expose MemEnv in RocksJava

Summary:
In 3.10 the C++ code was extended with a MemEnv implementation. This
is now also available in RocksJava.

Changes:
- Extraced abstract super class Env
- Introduced RocksMemEnv
- Remove unnecessary disposeInternal method. The disposal of the default environment is managed by C++ so there needs to be no disposeInternal method in Java.
- Introduced a RocksMemEnvTest, which is aligned with the C++ equivalent.

Test Plan:
make rocksdbjava
make jtest

Reviewers: adamretter, yhchiang, ankgup87

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D35619
main
fyrz 10 years ago
parent 51da3aab4a
commit fd8804f979
  1. 3
      java/Makefile
  2. 33
      java/rocksjni/env.cc
  3. 92
      java/src/main/java/org/rocksdb/Env.java
  4. 14
      java/src/main/java/org/rocksdb/Options.java
  5. 87
      java/src/main/java/org/rocksdb/RocksEnv.java
  6. 33
      java/src/main/java/org/rocksdb/RocksMemEnv.java
  7. 8
      java/src/test/java/org/rocksdb/OptionsTest.java
  8. 2
      java/src/test/java/org/rocksdb/RocksEnvTest.java
  9. 196
      java/src/test/java/org/rocksdb/RocksMemEnvTest.java

@ -12,6 +12,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.DBOptions\
org.rocksdb.DirectComparator\
org.rocksdb.DirectSlice\
org.rocksdb.Env\
org.rocksdb.FlushOptions\
org.rocksdb.Filter\
org.rocksdb.GenericRateLimiterConfig\
@ -27,6 +28,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.RocksDB\
org.rocksdb.RocksEnv\
org.rocksdb.RocksIterator\
org.rocksdb.RocksMemEnv\
org.rocksdb.SkipListMemTableConfig\
org.rocksdb.Slice\
org.rocksdb.Statistics\
@ -84,6 +86,7 @@ JAVA_TESTS = org.rocksdb.BackupableDBOptionsTest\
org.rocksdb.RocksDBTest\
org.rocksdb.RocksEnvTest\
org.rocksdb.RocksIteratorTest\
org.rocksdb.RocksMemEnvTest\
org.rocksdb.util.SizeUnitTest\
org.rocksdb.SliceTest\
org.rocksdb.SnapshotTest\

@ -6,44 +6,46 @@
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::Env methods from Java side.
#include "include/org_rocksdb_Env.h"
#include "include/org_rocksdb_RocksEnv.h"
#include "include/org_rocksdb_RocksMemEnv.h"
#include "rocksdb/env.h"
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_Env
* Method: getDefaultEnvInternal
* Signature: ()J
*/
jlong Java_org_rocksdb_RocksEnv_getDefaultEnvInternal(
jlong Java_org_rocksdb_Env_getDefaultEnvInternal(
JNIEnv* env, jclass jclazz) {
return reinterpret_cast<jlong>(rocksdb::Env::Default());
}
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_Env
* Method: setBackgroundThreads
* Signature: (JII)V
*/
void Java_org_rocksdb_RocksEnv_setBackgroundThreads(
void Java_org_rocksdb_Env_setBackgroundThreads(
JNIEnv* env, jobject jobj, jlong jhandle,
jint num, jint priority) {
auto* rocks_env = reinterpret_cast<rocksdb::Env*>(jhandle);
switch (priority) {
case org_rocksdb_RocksEnv_FLUSH_POOL:
case org_rocksdb_Env_FLUSH_POOL:
rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::LOW);
break;
case org_rocksdb_RocksEnv_COMPACTION_POOL:
case org_rocksdb_Env_COMPACTION_POOL:
rocks_env->SetBackgroundThreads(num, rocksdb::Env::Priority::HIGH);
break;
}
}
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_sEnv
* Method: getThreadPoolQueueLen
* Signature: (JI)I
*/
jint Java_org_rocksdb_RocksEnv_getThreadPoolQueueLen(
jint Java_org_rocksdb_Env_getThreadPoolQueueLen(
JNIEnv* env, jobject jobj, jlong jhandle, jint pool_id) {
auto* rocks_env = reinterpret_cast<rocksdb::Env*>(jhandle);
switch (pool_id) {
@ -56,11 +58,22 @@ jint Java_org_rocksdb_RocksEnv_getThreadPoolQueueLen(
}
/*
* Class: org_rocksdb_RocksEnv
* Class: org_rocksdb_RocksMemEnv
* Method: createMemEnv
* Signature: ()J
*/
jlong Java_org_rocksdb_RocksMemEnv_createMemEnv(
JNIEnv* env, jclass jclazz) {
return reinterpret_cast<jlong>(rocksdb::NewMemEnv(
rocksdb::Env::Default()));
}
/*
* Class: org_rocksdb_RocksMemEnv
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_RocksEnv_disposeInternal(
void Java_org_rocksdb_RocksMemEnv_disposeInternal(
JNIEnv* env, jobject jobj, jlong jhandle) {
delete reinterpret_cast<rocksdb::Env*>(jhandle);
}

@ -0,0 +1,92 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* Base class for all Env implementations in RocksDB.
*/
public abstract class Env extends RocksObject {
public static final int FLUSH_POOL = 0;
public static final int COMPACTION_POOL = 1;
/**
* <p>Returns the default environment suitable for the current operating
* system.</p>
*
* <p>The result of {@code getDefault()} is a singleton whose ownership
* belongs to rocksdb c++. As a result, the returned RocksEnv will not
* have the ownership of its c++ resource, and calling its dispose()
* will be no-op.</p>
*
* @return the default {@link org.rocksdb.RocksEnv} instance.
*/
public static Env getDefault() {
return default_env_;
}
/**
* <p>Sets the number of background worker threads of the flush pool
* for this environment.</p>
* <p>Default number: 1</p>
*
* @param num the number of threads
*
* @return current {@link RocksEnv} instance.
*/
public Env setBackgroundThreads(final int num) {
return setBackgroundThreads(num, FLUSH_POOL);
}
/**
* <p>Sets the number of background worker threads of the specified thread
* pool for this environment.</p>
*
* @param num the number of threads
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* <p>Default number: 1</p>
* @return current {@link RocksEnv} instance.
*/
public Env setBackgroundThreads(final int num, final int poolID) {
setBackgroundThreads(nativeHandle_, num, poolID);
return this;
}
/**
* <p>Returns the length of the queue associated with the specified
* thread pool.</p>
*
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* @return the thread pool queue length.
*/
public int getThreadPoolQueueLen(final int poolID) {
return getThreadPoolQueueLen(nativeHandle_, poolID);
}
protected Env() {
super();
}
static {
default_env_ = new RocksEnv(getDefaultEnvInternal());
}
/**
* <p>The static default Env. The ownership of its native handle
* belongs to rocksdb c++ and is not able to be released on the Java
* side.</p>
*/
static Env default_env_;
private static native long getDefaultEnvInternal();
private native void setBackgroundThreads(
long handle, int num, int priority);
private native int getThreadPoolQueueLen(long handle, int poolID);
}

@ -26,7 +26,7 @@ public class Options extends RocksObject
public Options() {
super();
newOptions();
env_ = RocksEnv.getDefault();
env_ = Env.getDefault();
}
/**
@ -41,7 +41,7 @@ public class Options extends RocksObject
final ColumnFamilyOptions columnFamilyOptions) {
super();
newOptions(dbOptions.nativeHandle_, columnFamilyOptions.nativeHandle_);
env_ = RocksEnv.getDefault();
env_ = Env.getDefault();
}
@Override
@ -68,12 +68,12 @@ public class Options extends RocksObject
/**
* Use the specified object to interact with the environment,
* e.g. to read/write files, schedule background work, etc.
* Default: {@link RocksEnv#getDefault()}
* Default: {@link Env#getDefault()}
*
* @param env {@link RocksEnv} instance.
* @param env {@link Env} instance.
* @return the instance of the current Options.
*/
public Options setEnv(final RocksEnv env) {
public Options setEnv(final Env env) {
assert(isInitialized());
setEnv(nativeHandle_, env.nativeHandle_);
env_ = env;
@ -85,7 +85,7 @@ public class Options extends RocksObject
*
* @return {@link RocksEnv} instance set in the Options.
*/
public RocksEnv getEnv() {
public Env getEnv() {
return env_;
}
@ -1304,7 +1304,7 @@ public class Options extends RocksObject
boolean optimizeFiltersForHits);
private native boolean optimizeFiltersForHits(long handle);
// instance variables
RocksEnv env_;
Env env_;
MemTableConfig memTableConfig_;
TableFormatConfig tableFormatConfig_;
RateLimiterConfig rateLimiterConfig_;

@ -12,75 +12,7 @@ package org.rocksdb;
* <p>All Env implementations are safe for concurrent access from
* multiple threads without any external synchronization.</p>
*/
public class RocksEnv extends RocksObject {
public static final int FLUSH_POOL = 0;
public static final int COMPACTION_POOL = 1;
static {
default_env_ = new RocksEnv(getDefaultEnvInternal());
}
private static native long getDefaultEnvInternal();
/**
* <p>Returns the default environment suitable for the current operating
* system.</p>
*
* <p>The result of {@code getDefault()} is a singleton whose ownership
* belongs to rocksdb c++. As a result, the returned RocksEnv will not
* have the ownership of its c++ resource, and calling its dispose()
* will be no-op.</p>
*
* @return the default {@link org.rocksdb.RocksEnv} instance.
*/
public static RocksEnv getDefault() {
return default_env_;
}
/**
* <p>Sets the number of background worker threads of the flush pool
* for this environment.</p>
* <p>Default number: 1</p>
*
* @param num the number of threads
*
* @return current {@link org.rocksdb.RocksEnv} instance.
*/
public RocksEnv setBackgroundThreads(final int num) {
return setBackgroundThreads(num, FLUSH_POOL);
}
/**
* <p>Sets the number of background worker threads of the specified thread
* pool for this environment.</p>
*
* @param num the number of threads
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* <p>Default number: 1</p>
* @return current {@link org.rocksdb.RocksEnv} instance.
*/
public RocksEnv setBackgroundThreads(final int num, final int poolID) {
setBackgroundThreads(nativeHandle_, num, poolID);
return this;
}
private native void setBackgroundThreads(
long handle, int num, int priority);
/**
* <p>Returns the length of the queue associated with the specified
* thread pool.</p>
*
* @param poolID the id to specified a thread pool. Should be either
* FLUSH_POOL or COMPACTION_POOL.
*
* @return the thread pool queue length.
*/
public int getThreadPoolQueueLen(final int poolID) {
return getThreadPoolQueueLen(nativeHandle_, poolID);
}
private native int getThreadPoolQueueLen(long handle, int poolID);
public class RocksEnv extends Env {
/**
* <p>Package-private constructor that uses the specified native handle
@ -98,19 +30,14 @@ public class RocksEnv extends RocksObject {
}
/**
* The helper function of {@link #dispose()} which all subclasses of
* <p>The helper function of {@link #dispose()} which all subclasses of
* {@link RocksObject} must implement to release their associated C++
* resource.
* resource.</p>
*
* <p><strong>Note:</strong> this class is used to use the default
* RocksEnv with RocksJava. The default env allocation is managed
* by C++.</p>
*/
@Override protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private native void disposeInternal(long handle);
/**
* <p>The static default RocksEnv. The ownership of its native handle
* belongs to rocksdb c++ and is not able to be released on the Java
* side.</p>
*/
static RocksEnv default_env_;
}

@ -0,0 +1,33 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* RocksDB memory environment.
*/
public class RocksMemEnv extends Env {
/**
* <p>Creates a new RocksDB environment that stores its data
* in memory and delegates all non-file-storage tasks to
* base_env. The caller must delete the result when it is
* no longer needed.</p>
*
* <p>{@code *base_env} must remain live while the result is in use.</p>
*/
public RocksMemEnv() {
super();
nativeHandle_ = createMemEnv();
}
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
private static native long createMemEnv();
private native void disposeInternal(long handle);
}

@ -994,13 +994,13 @@ public class OptionsTest {
}
@Test
public void rocksEnv() {
public void env() {
Options options = null;
try {
options = new Options();
RocksEnv rocksEnv = RocksEnv.getDefault();
options.setEnv(rocksEnv);
assertThat(options.getEnv()).isSameAs(rocksEnv);
Env env = Env.getDefault();
options.setEnv(env);
assertThat(options.getEnv()).isSameAs(env);
} finally {
if (options != null) {
options.dispose();

@ -18,7 +18,7 @@ public class RocksEnvTest {
@Test
public void rocksEnv(){
RocksEnv rocksEnv = RocksEnv.getDefault();
Env rocksEnv = RocksEnv.getDefault();
rocksEnv.setBackgroundThreads(5);
// default rocksenv will always return zero for flush pool
// no matter what was set via setBackgroundThreads

@ -0,0 +1,196 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.assertj.core.api.Assertions.assertThat;
public class RocksMemEnvTest {
@ClassRule
public static final RocksMemoryResource rocksMemoryResource =
new RocksMemoryResource();
@Test
public void memEnvFillAndReopen() throws RocksDBException {
final byte[][] keys = {
"aaa".getBytes(),
"bbb".getBytes(),
"ccc".getBytes()
};
final byte[][] values = {
"foo".getBytes(),
"bar".getBytes(),
"baz".getBytes()
};
Env env = null;
Options options = null;
RocksDB db = null;
FlushOptions flushOptions = null;
try {
env = new RocksMemEnv();
options = new Options().
setCreateIfMissing(true).
setEnv(env);
flushOptions = new FlushOptions().
setWaitForFlush(true);
db = RocksDB.open(options, "dir/db");
// write key/value pairs using MemEnv
for (int i=0; i < keys.length; i++) {
db.put(keys[i], values[i]);
}
// read key/value pairs using MemEnv
for (int i=0; i < keys.length; i++) {
assertThat(db.get(keys[i])).isEqualTo(values[i]);
}
// Check iterator access
RocksIterator iterator = db.newIterator();
iterator.seekToFirst();
for (int i=0; i < keys.length; i++) {
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo(keys[i]);
assertThat(iterator.value()).isEqualTo(values[i]);
iterator.next();
}
// reached end of database
assertThat(iterator.isValid()).isFalse();
iterator.dispose();
// flush
db.flush(flushOptions);
// read key/value pairs after flush using MemEnv
for (int i=0; i < keys.length; i++) {
assertThat(db.get(keys[i])).isEqualTo(values[i]);
}
db.close();
options.setCreateIfMissing(false);
// After reopen the values shall still be in the mem env.
// as long as the env is not freed.
db = RocksDB.open(options, "dir/db");
// read key/value pairs using MemEnv
for (int i=0; i < keys.length; i++) {
assertThat(db.get(keys[i])).isEqualTo(values[i]);
}
} finally {
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
if (flushOptions != null) {
flushOptions.dispose();
}
if (env != null) {
env.dispose();
}
}
}
@Test
public void multipleDatabaseInstances() throws RocksDBException {
// db - keys
final byte[][] keys = {
"aaa".getBytes(),
"bbb".getBytes(),
"ccc".getBytes()
};
// otherDb - keys
final byte[][] otherKeys = {
"111".getBytes(),
"222".getBytes(),
"333".getBytes()
};
// values
final byte[][] values = {
"foo".getBytes(),
"bar".getBytes(),
"baz".getBytes()
};
Env env = null;
Options options = null;
RocksDB db = null, otherDb = null;
try {
env = new RocksMemEnv();
options = new Options().
setCreateIfMissing(true).
setEnv(env);
db = RocksDB.open(options, "dir/db");
otherDb = RocksDB.open(options, "dir/otherDb");
// write key/value pairs using MemEnv
// to db and to otherDb.
for (int i=0; i < keys.length; i++) {
db.put(keys[i], values[i]);
otherDb.put(otherKeys[i], values[i]);
}
// verify key/value pairs after flush using MemEnv
for (int i=0; i < keys.length; i++) {
// verify db
assertThat(db.get(otherKeys[i])).isNull();
assertThat(db.get(keys[i])).isEqualTo(values[i]);
// verify otherDb
assertThat(otherDb.get(keys[i])).isNull();
assertThat(otherDb.get(otherKeys[i])).isEqualTo(values[i]);
}
} finally {
if (db != null) {
db.close();
}
if (otherDb != null) {
otherDb.close();
}
if (options != null) {
options.dispose();
}
if (env != null) {
env.dispose();
}
}
}
@Test(expected = RocksDBException.class)
public void createIfMissingFalse() throws RocksDBException {
Env env = null;
Options options = null;
RocksDB db = null;
try {
env = new RocksMemEnv();
options = new Options().
setCreateIfMissing(false).
setEnv(env);
// shall throw an exception because db dir does not
// exist.
db = RocksDB.open(options, "db/dir");
} finally {
if (options != null) {
options.dispose();
}
if (env != null) {
env.dispose();
}
}
}
}
Loading…
Cancel
Save