From a40ce219b9d3009331f4984529d92b499b03d454 Mon Sep 17 00:00:00 2001 From: Vlad Balan Date: Tue, 16 Sep 2014 13:58:49 -0700 Subject: [PATCH] Adding merge functions to RocksDBJava Summary: Added support for the merge operation to RocksJava. You can specify a merge function to be used on the current database. The merge function can either be one of the functions defined in utilities/merge_operators.h, which can be specified through its corresponding name, or a user-created function that needs to be encapsulated in a JNI object in order to be used. Examples are provided for both use cases. Test Plan: There are unit test in MergeTest.java Reviewers: ankgup87 Subscribers: vladb38 Differential Revision: https://reviews.facebook.net/D24525 --- java/Makefile | 3 +- java/org/rocksdb/MergeOperator.java | 19 +++++ java/org/rocksdb/Options.java | 34 +++++++++ java/org/rocksdb/RocksDB.java | 33 +++++++- java/org/rocksdb/StringAppendOperator.java | 21 ++++++ java/org/rocksdb/test/MergeTest.java | 88 ++++++++++++++++++++++ java/rocksjni/merge_operator.cc | 35 +++++++++ java/rocksjni/options.cc | 26 +++++++ java/rocksjni/rocksjni.cc | 64 ++++++++++++++++ 9 files changed, 321 insertions(+), 2 deletions(-) create mode 100644 java/org/rocksdb/MergeOperator.java create mode 100644 java/org/rocksdb/StringAppendOperator.java create mode 100644 java/org/rocksdb/test/MergeTest.java create mode 100644 java/rocksjni/merge_operator.cc diff --git a/java/Makefile b/java/Makefile index 9c75c54ea..441238930 100644 --- a/java/Makefile +++ b/java/Makefile @@ -1,4 +1,4 @@ -NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle +NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) @@ -44,6 +44,7 @@ test: java java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.KeyMayExistTest #java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOnlyTest + java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.MergeTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.StatisticsCollectorTest @rm -rf /tmp/rocksdbjni_* diff --git a/java/org/rocksdb/MergeOperator.java b/java/org/rocksdb/MergeOperator.java new file mode 100644 index 000000000..310cf7a46 --- /dev/null +++ b/java/org/rocksdb/MergeOperator.java @@ -0,0 +1,19 @@ +// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import java.util.*; + +/** + * MergeOperator holds an operator to be applied when compacting + * two values held under the same key in order to obtain a single + * value. + */ +public abstract class MergeOperator { + + abstract protected long newMergeOperatorHandle(); + +} diff --git a/java/org/rocksdb/Options.java b/java/org/rocksdb/Options.java index bb6f74e08..f34171ea9 100644 --- a/java/org/rocksdb/Options.java +++ b/java/org/rocksdb/Options.java @@ -2234,6 +2234,40 @@ public class Options extends RocksObject { private native void setMinPartialMergeOperands( long handle, int minPartialMergeOperands); + /** + * Set the merge operator to be used for merging two different key/value + * pairs that share the same key. The merge function is invoked during + * compaction and at lookup time, if multiple key/value pairs belonging + * to the same key are found in the database. + * + * @param name the name of the merge function, as defined by + * the MergeOperators factory (see utilities/MergeOperators.h) + * @return the reference to the current option. + */ + public Options setMergeOperatorName(String name) { + setMergeOperatorName(nativeHandle_, name); + return this; + } + private native void setMergeOperatorName( + long handle, String name); + + /** + * Set the merge operator to be used for merging two different key/value + * pairs that share the same key. The merge function is invoked during + * compaction and at lookup time, if multiple key/value pairs belonging + * to the same key are found in the database. + * + * @param name the name of the merge function, as defined by + * the MergeOperators factory (see utilities/MergeOperators.h) + * @return the reference to the current option. + */ + public Options setMergeOperator(MergeOperator mergeOperator) { + setMergeOperator(nativeHandle_, mergeOperator.newMergeOperatorHandle()); + return this; + } + private native void setMergeOperator( + long handle, long mergeOperatorHandle); + /** * Release the memory allocated for the current instance * in the c++ side. diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index d10c235dc..f54088da4 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -473,8 +473,32 @@ public class RocksDB extends RocksObject { } /** - * Get the value associated with the specified key within column family + * Set the database entry for "key" to "value". * + * @param key the specified key to be merged. + * @param value the value to be nerged with the current value for + * the specified key. + */ + public void merge(byte[] key, byte[] value) throws RocksDBException { + merge(nativeHandle_, key, key.length, value, value.length); + } + + /** + * Merge the database entry for "key" with "value". + * + * @param key the specified key to be merged. + * @param value the value to be merged with the current value for + * the specified key. + */ + public void merge(WriteOptions writeOpts, byte[] key, byte[] value) + throws RocksDBException { + merge(nativeHandle_, writeOpts.nativeHandle_, + key, key.length, value, value.length); + } + + + /** + * Get the value associated with the specified key within column family* * @param key the key to retrieve the value. * @param value the out-value to receive the retrieved value. * @return The size of the actual value that matches the specified @@ -1002,6 +1026,13 @@ public class RocksDB extends RocksObject { long cfHandle, StringBuffer stringBuffer); protected native boolean keyMayExist(long optionsHandle, byte[] key, int keyLen, long cfHandle, StringBuffer stringBuffer); + protected native void merge( + long handle, byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; + protected native void merge( + long handle, long writeOptHandle, + byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; protected native int get( long handle, byte[] key, int keyLen, byte[] value, int valueLen) throws RocksDBException; diff --git a/java/org/rocksdb/StringAppendOperator.java b/java/org/rocksdb/StringAppendOperator.java new file mode 100644 index 000000000..9b593204f --- /dev/null +++ b/java/org/rocksdb/StringAppendOperator.java @@ -0,0 +1,21 @@ +// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +/** + * MergeOperator holds an operator to be applied when compacting + * two values held under the same key in order to obtain a single + * value. + */ +public class StringAppendOperator extends MergeOperator { + + @Override protected long newMergeOperatorHandle() { + return newMergeOperatorHandleImpl(); + } + + private native long newMergeOperatorHandleImpl(); + +} diff --git a/java/org/rocksdb/test/MergeTest.java b/java/org/rocksdb/test/MergeTest.java new file mode 100644 index 000000000..0d3833715 --- /dev/null +++ b/java/org/rocksdb/test/MergeTest.java @@ -0,0 +1,88 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb.test; + +import java.util.Collections; +import org.rocksdb.*; + +public class MergeTest { + static final String db_path_string = "/tmp/mergestringjni_db"; + static final String db_path_function = "/tmp/mergefunctionjni_db"; + static { + RocksDB.loadLibrary(); + } + + public static void testStringOption() + throws InterruptedException, RocksDBException { + + System.out.println("Testing merge function string option ==="); + + Options opt = new Options(); + opt.setCreateIfMissing(true); + opt.setMergeOperatorName("stringappend"); + + RocksDB db = RocksDB.open(opt, db_path_string); + + System.out.println("Writing aa under key..."); + db.put("key".getBytes(), "aa".getBytes()); + + System.out.println("Writing bb under key..."); + db.merge("key".getBytes(), "bb".getBytes()); + + byte[] value = db.get("key".getBytes()); + String strValue = new String(value); + + System.out.println("Retrieved value: " + strValue); + + db.close(); + opt.dispose(); + + assert(strValue.equals("aa,bb")); + + System.out.println("Merge function string option passed!"); + + } + + public static void testOperatorOption() + throws InterruptedException, RocksDBException { + + System.out.println("Testing merge function operator option ==="); + + Options opt = new Options(); + opt.setCreateIfMissing(true); + + StringAppendOperator stringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(stringAppendOperator); + + RocksDB db = RocksDB.open(opt, db_path_string); + + System.out.println("Writing aa under key..."); + db.put("key".getBytes(), "aa".getBytes()); + + System.out.println("Writing bb under key..."); + db.merge("key".getBytes(), "bb".getBytes()); + + byte[] value = db.get("key".getBytes()); + String strValue = new String(value); + + System.out.println("Retrieved value: " + strValue); + + db.close(); + opt.dispose(); + + assert(strValue.equals("aa,bb")); + + System.out.println("Merge function operator option passed!"); + + } + + public static void main(String[] args) + throws InterruptedException, RocksDBException { + testStringOption(); + testOperatorOption(); + + } +} diff --git a/java/rocksjni/merge_operator.cc b/java/rocksjni/merge_operator.cc new file mode 100644 index 000000000..fc295e38c --- /dev/null +++ b/java/rocksjni/merge_operator.cc @@ -0,0 +1,35 @@ +// Copyright (c) 2014, Vlad Balan (vlad.gm@gmail.com). All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ for rocksdb::MergeOperator. + +#include +#include +#include +#include +#include + +#include "include/org_rocksdb_StringAppendOperator.h" +#include "rocksjni/portal.h" +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/statistics.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/table.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" + +/* + * Class: org_rocksdb_StringAppendOperator + * Method: newMergeOperatorHandle + * Signature: ()J + */ +jlong Java_org_rocksdb_StringAppendOperator_newMergeOperatorHandleImpl(JNIEnv* env, jobject jobj) { + std::shared_ptr *op = new std::shared_ptr(); + *op = rocksdb::MergeOperators::CreateFromStringId("stringappend"); + return reinterpret_cast(op); +} + diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index ef104d92b..a52e2da70 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -23,6 +23,8 @@ #include "rocksdb/slice_transform.h" #include "rocksdb/rate_limiter.h" #include "rocksdb/comparator.h" +#include "rocksdb/merge_operator.h" +#include "utilities/merge_operators.h" /* * Class: org_rocksdb_Options @@ -1603,6 +1605,29 @@ void Java_org_rocksdb_Options_setMinPartialMergeOperands( static_cast(jmin_partial_merge_operands); } +/* + * Class: org_rocksdb_Options + * Method: setMergeOperatorName + * Signature: (JJjava/lang/String)V + */ +void Java_org_rocksdb_Options_setMergeOperatorName( + JNIEnv* env, jobject jobj, jlong jhandle, jstring name) { + const char* op_name = env->GetStringUTFChars(name, 0); + reinterpret_cast(jhandle)->merge_operator = + rocksdb::MergeOperators::CreateFromStringId(op_name); +} + +/* + * Class: org_rocksdb_Options + * Method: setMergeOperator + * Signature: (JJjava/lang/String)V + */ +void Java_org_rocksdb_Options_setMergeOperator( + JNIEnv* env, jobject jobj, jlong jhandle, jlong mergeOperatorHandle) { + reinterpret_cast(jhandle)->merge_operator = + *(reinterpret_cast*> (mergeOperatorHandle)); +} + ////////////////////////////////////////////////////////////////////////////// // WriteOptions @@ -1759,3 +1784,4 @@ void Java_org_rocksdb_ReadOptions_setTailing( reinterpret_cast(jhandle)->tailing = static_cast(jtailing); } + diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index fa9a66a7d..f5e702520 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -914,6 +914,70 @@ void Java_org_rocksdb_RocksDB_remove__JJ_3BIJ( rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); } } +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::Merge + +void rocksdb_merge_helper( + JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + + jbyte* key = env->GetByteArrayElements(jkey, 0); + jbyte* value = env->GetByteArrayElements(jvalue, 0); + rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); + + rocksdb::Status s = db->Merge(write_options, key_slice, value_slice); + + // trigger java unref on key and value. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); + + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: merge + * Signature: (J[BI[BI)V + */ +void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + auto db = reinterpret_cast(jdb_handle); + static const rocksdb::WriteOptions default_write_options = + rocksdb::WriteOptions(); + + rocksdb_merge_helper(env, db, default_write_options, + jkey, jkey_len, + jvalue, jvalue_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: merge + * Signature: (JJ[BI[BI)V + */ +void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI( + JNIEnv* env, jobject jdb, + jlong jdb_handle, jlong jwrite_options_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + auto db = reinterpret_cast(jdb_handle); + auto write_options = reinterpret_cast( + jwrite_options_handle); + + rocksdb_merge_helper(env, db, *write_options, + jkey, jkey_len, + jvalue, jvalue_len); +} + ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::~DB()