diff --git a/java/Makefile b/java/Makefile index 37d704428..9fd714ee9 100644 --- a/java/Makefile +++ b/java/Makefile @@ -10,6 +10,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\ org.rocksdb.DBOptions\ org.rocksdb.DirectComparator\ org.rocksdb.DirectSlice\ + org.rocksdb.FlushOptions\ org.rocksdb.Filter\ org.rocksdb.GenericRateLimiterConfig\ org.rocksdb.HashLinkedListMemTableConfig\ diff --git a/java/org/rocksdb/FlushOptions.java b/java/org/rocksdb/FlushOptions.java new file mode 100644 index 000000000..e481c7664 --- /dev/null +++ b/java/org/rocksdb/FlushOptions.java @@ -0,0 +1,51 @@ +package org.rocksdb; + +/** + * FlushOptions to be passed to flush operations of + * {@link org.rocksdb.RocksDB}. + */ +public class FlushOptions extends RocksObject { + + /** + * Construct a new instance of FlushOptions. + */ + public FlushOptions(){ + super(); + newFlushOptions(); + } + + /** + * Set if the flush operation shall block until it terminates. + * + * @param waitForFlush boolean value indicating if the flush + * operations waits for termination of the flush process. + * + * @return instance of current FlushOptions. + */ + public FlushOptions setWaitForFlush(boolean waitForFlush) { + assert(isInitialized()); + waitForFlush(nativeHandle_); + return this; + } + + /** + * Wait for flush to finished. + * + * @return boolean value indicating if the flush operation + * waits for termination of the flush process. + */ + public boolean waitForFlush() { + assert(isInitialized()); + return waitForFlush(nativeHandle_); + } + + @Override protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + private native void newFlushOptions(); + private native void disposeInternal(long handle); + private native void setWaitForFlush(long handle, + boolean wait); + private native boolean waitForFlush(long handle); +} diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 40680e438..8efdaea1f 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -1085,6 +1085,40 @@ public class RocksDB extends RocksObject { dropColumnFamily(nativeHandle_, columnFamilyHandle.nativeHandle_); } + /** + *

Flush all memory table data.

+ * + *

Note: it must be ensured that the FlushOptions instance + * is not GC'ed before this method finishes. If the wait parameter is + * set to false, flush processing is asynchronous.

+ * + * @param flushOptions {@link org.rocksdb.FlushOptions} instance. + * @throws RocksDBException thrown if an error occurs within the native + * part of the library. + */ + public void flush(FlushOptions flushOptions) + throws RocksDBException { + flush(nativeHandle_, flushOptions.nativeHandle_); + } + + /** + *

Flush all memory table data.

+ * + *

Note: it must be ensured that the FlushOptions instance + * is not GC'ed before this method finishes. If the wait parameter is + * set to false, flush processing is asynchronous.

+ * + * @param flushOptions {@link org.rocksdb.FlushOptions} instance. + * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} instance. + * @throws RocksDBException thrown if an error occurs within the native + * part of the library. + */ + public void flush(FlushOptions flushOptions, + ColumnFamilyHandle columnFamilyHandle) throws RocksDBException { + flush(nativeHandle_, flushOptions.nativeHandle_, + columnFamilyHandle.nativeHandle_); + } + /** * Private constructor. */ @@ -1197,10 +1231,13 @@ public class RocksDB extends RocksObject { protected native void releaseSnapshot( long nativeHandle, long snapshotHandle); private native void disposeInternal(long handle); - private native long createColumnFamily(long handle, long opt_handle, String name) throws RocksDBException; private native void dropColumnFamily(long handle, long cfHandle) throws RocksDBException; + private native void flush(long handle, long flushOptHandle) + throws RocksDBException; + private native void flush(long handle, long flushOptHandle, + long cfHandle) throws RocksDBException; protected Options options_; } diff --git a/java/org/rocksdb/test/FlushTest.java b/java/org/rocksdb/test/FlushTest.java new file mode 100644 index 000000000..1742be67f --- /dev/null +++ b/java/org/rocksdb/test/FlushTest.java @@ -0,0 +1,47 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +package org.rocksdb.test; + +import org.rocksdb.*; + +public class FlushTest { + + static final String db_path = "/tmp/rocksdbjni_flush_test"; + static { + RocksDB.loadLibrary(); + } + + public static void main(String[] args) { + RocksDB db = null; + Options options = new Options(); + WriteOptions wOpt = new WriteOptions(); + FlushOptions flushOptions = new FlushOptions(); + + try { + // Setup options + options.setCreateIfMissing(true); + options.setMaxWriteBufferNumber(10); + options.setMinWriteBufferNumberToMerge(10); + flushOptions.setWaitForFlush(true); + wOpt.setDisableWAL(true); + db = RocksDB.open(options, db_path); + + db.put(wOpt, "key1".getBytes(), "value1".getBytes()); + db.put(wOpt, "key2".getBytes(), "value2".getBytes()); + db.put(wOpt, "key3".getBytes(), "value3".getBytes()); + db.put(wOpt, "key4".getBytes(), "value4".getBytes()); + assert(db.getProperty("rocksdb.num-entries-active-mem-table").equals("4")); + db.flush(flushOptions); + assert(db.getProperty("rocksdb.num-entries-active-mem-table").equals("0")); + } catch (RocksDBException e) { + assert(false); + } + + db.close(); + options.dispose(); + wOpt.dispose(); + flushOptions.dispose(); + } +} diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index ee0255d80..de614594f 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -18,9 +18,11 @@ #include "include/org_rocksdb_WriteOptions.h" #include "include/org_rocksdb_ReadOptions.h" #include "include/org_rocksdb_ComparatorOptions.h" +#include "include/org_rocksdb_FlushOptions.h" #include "rocksjni/comparatorjnicallback.h" #include "rocksjni/portal.h" + #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/statistics.h" @@ -3607,6 +3609,32 @@ jboolean Java_org_rocksdb_ReadOptions_tailing( return reinterpret_cast(jhandle)->tailing; } +/* + * Class: org_rocksdb_ReadOptions + * Method: setSnapshot + * Signature: (JJ)V + */ +void Java_org_rocksdb_ReadOptions_setSnapshot( + JNIEnv* env, jobject jobj, jlong jhandle, jlong jsnapshot) { + reinterpret_cast(jhandle)->snapshot = + reinterpret_cast(jsnapshot); +} + +/* + * Class: org_rocksdb_ReadOptions + * Method: snapshot + * Signature: (J)J + */ +jlong Java_org_rocksdb_ReadOptions_snapshot( + JNIEnv* env, jobject jobj, jlong jhandle) { + auto& snapshot = + reinterpret_cast(jhandle)->snapshot; + return reinterpret_cast(snapshot); +} + +///////////////////////////////////////////////////////////////////// +// rocksdb::ComparatorOptions + /* * Class: org_rocksdb_ComparatorOptions * Method: newComparatorOptions @@ -3651,25 +3679,49 @@ void Java_org_rocksdb_ComparatorOptions_disposeInternal( rocksdb::ComparatorOptionsJni::setHandle(env, jobj, nullptr); } +///////////////////////////////////////////////////////////////////// +// rocksdb::FlushOptions + /* - * Class: org_rocksdb_ReadOptions - * Method: setSnapshot - * Signature: (JJ)V + * Class: org_rocksdb_FlushOptions + * Method: newFlushOptions + * Signature: ()V */ -void Java_org_rocksdb_ReadOptions_setSnapshot( - JNIEnv* env, jobject jobj, jlong jhandle, jlong jsnapshot) { - reinterpret_cast(jhandle)->snapshot = - reinterpret_cast(jsnapshot); +void Java_org_rocksdb_FlushOptions_newFlushOptions( + JNIEnv* env, jobject jobj) { + auto flush_opt = new rocksdb::FlushOptions(); + rocksdb::FlushOptionsJni::setHandle(env, jobj, flush_opt); } /* - * Class: org_rocksdb_ReadOptions - * Method: snapshot - * Signature: (J)J + * Class: org_rocksdb_FlushOptions + * Method: setWaitForFlush + * Signature: (JZ)V */ -jlong Java_org_rocksdb_ReadOptions_snapshot( - JNIEnv* env, jobject jobj, jlong jhandle) { - auto& snapshot = - reinterpret_cast(jhandle)->snapshot; - return reinterpret_cast(snapshot); +void Java_org_rocksdb_FlushOptions_setWaitForFlush( + JNIEnv * env, jobject jobj, jlong jhandle, jboolean jwait) { + reinterpret_cast(jhandle) + ->wait = static_cast(jwait); +} + +/* + * Class: org_rocksdb_FlushOptions + * Method: waitForFlush + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_FlushOptions_waitForFlush( + JNIEnv * env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle) + ->wait; +} + +/* + * Class: org_rocksdb_FlushOptions + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_FlushOptions_disposeInternal( + JNIEnv * env, jobject jobj, jlong jhandle) { + delete reinterpret_cast(jhandle); + rocksdb::FlushOptionsJni::setHandle(env, jobj, nullptr); } diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 54b3b2766..3a5641d46 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -505,6 +505,34 @@ class ColumnFamilyHandleJni { } }; +class FlushOptionsJni { + public: + // Get the java class id of org.rocksdb.FlushOptions. + static jclass getJClass(JNIEnv* env) { + jclass jclazz = env->FindClass("org/rocksdb/FlushOptions"); + assert(jclazz != nullptr); + return jclazz; + } + + // Get the field id of the member variable of org.rocksdb.FlushOptions + // that stores the pointer to rocksdb::FlushOptions. + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle_", "J"); + assert(fid != nullptr); + return fid; + } + + // Pass the FlushOptions pointer to the java side. + static void setHandle( + JNIEnv* env, jobject jobj, + const rocksdb::FlushOptions* op) { + env->SetLongField( + jobj, getHandleFieldID(env), + reinterpret_cast(op)); + } +}; + class ComparatorOptionsJni { public: // Get the java class id of org.rocksdb.ComparatorOptions. diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index b17f9bab7..3b00cbe42 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -1255,3 +1255,48 @@ jstring Java_org_rocksdb_RocksDB_getProperty0__JJLjava_lang_String_2I( return env->NewStringUTF(property_value.data()); } + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Flush + +void rocksdb_flush_helper( + JNIEnv* env, rocksdb::DB* db, const rocksdb::FlushOptions& flush_options, + rocksdb::ColumnFamilyHandle* column_family_handle) { + rocksdb::Status s; + if (column_family_handle != nullptr) { + s = db->Flush(flush_options, column_family_handle); + } else { + s = db->Flush(flush_options); + } + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_RocksDB + * Method: flush + * Signature: (JJ)V + */ +void Java_org_rocksdb_RocksDB_flush__JJ( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jlong jflush_options) { + auto db = reinterpret_cast(jdb_handle); + auto flush_options = reinterpret_cast(jflush_options); + rocksdb_flush_helper(env, db, *flush_options, nullptr); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: flush + * Signature: (JJJ)V + */ +void Java_org_rocksdb_RocksDB_flush__JJJ( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jlong jflush_options, jlong jcf_handle) { + auto db = reinterpret_cast(jdb_handle); + auto flush_options = reinterpret_cast(jflush_options); + auto cf_handle = reinterpret_cast(jcf_handle); + rocksdb_flush_helper(env, db, *flush_options, cf_handle); +} +