Merge pull request #372 from fyrz/RocksJava-CF-Merge-Hardening

[RocksJava] Merge with ColumnFamilies & Hardening CFHandle
main
Yueh-Hsuan Chiang 11 years ago
commit ccaf1aa7cb
  1. 22
      java/org/rocksdb/ColumnFamilyHandle.java
  2. 52
      java/org/rocksdb/RocksDB.java
  3. 249
      java/org/rocksdb/test/MergeTest.java
  4. 72
      java/rocksjni/rocksjni.cc

@ -10,23 +10,33 @@ package org.rocksdb;
* ColumnFamily Pointers. * ColumnFamily Pointers.
*/ */
public class ColumnFamilyHandle extends RocksObject { public class ColumnFamilyHandle extends RocksObject {
ColumnFamilyHandle(long nativeHandle) { ColumnFamilyHandle(RocksDB rocksDB, long nativeHandle) {
super(); super();
nativeHandle_ = nativeHandle; nativeHandle_ = nativeHandle;
// rocksDB must point to a valid RocksDB instance;
assert(rocksDB != null);
// ColumnFamilyHandle must hold a reference to the related RocksDB instance
// to guarantee that while a GC cycle starts ColumnFamilyHandle instances
// are freed prior to RocksDB instances.
rocksDB_ = rocksDB;
} }
/** /**
* Deletes underlying C++ filter pointer. * <p>Deletes underlying C++ iterator pointer.</p>
* *
* Note that this function should be called only after all * <p>Note: the underlying handle can only be safely deleted if the RocksDB
* RocksDB instances referencing the filter are closed. * instance related to a certain ColumnFamilyHandle is still valid and initialized.
* Otherwise an undefined behavior will occur. * Therefore {@code disposeInternal()} checks if the RocksDB is initialized
* before freeing the native handle.</p>
*/ */
@Override protected void disposeInternal() { @Override protected void disposeInternal() {
assert(isInitialized()); assert(isInitialized());
disposeInternal(nativeHandle_); if (rocksDB_.isInitialized()) {
disposeInternal(nativeHandle_);
}
} }
private native void disposeInternal(long handle); private native void disposeInternal(long handle);
private RocksDB rocksDB_;
} }

@ -214,7 +214,7 @@ public class RocksDB extends RocksObject {
List<Long> cfReferences = db.open(options.nativeHandle_, path, List<Long> cfReferences = db.open(options.nativeHandle_, path,
columnFamilyNames, columnFamilyNames.size()); columnFamilyNames, columnFamilyNames.size());
for (int i=0; i<columnFamilyNames.size(); i++) { for (int i=0; i<columnFamilyNames.size(); i++) {
columnFamilyHandles.add(new ColumnFamilyHandle(cfReferences.get(i))); columnFamilyHandles.add(new ColumnFamilyHandle(db, cfReferences.get(i)));
} }
db.storeOptionsInstance(options); db.storeOptionsInstance(options);
return db; return db;
@ -316,7 +316,7 @@ public class RocksDB extends RocksObject {
List<Long> cfReferences = db.openROnly(options.nativeHandle_, path, List<Long> cfReferences = db.openROnly(options.nativeHandle_, path,
columnFamilyNames, columnFamilyNames.size()); columnFamilyNames, columnFamilyNames.size());
for (int i=0; i<columnFamilyNames.size(); i++) { for (int i=0; i<columnFamilyNames.size(); i++) {
columnFamilyHandles.add(new ColumnFamilyHandle(cfReferences.get(i))); columnFamilyHandles.add(new ColumnFamilyHandle(db, cfReferences.get(i)));
} }
db.storeOptionsInstance(options); db.storeOptionsInstance(options);
@ -426,7 +426,7 @@ public class RocksDB extends RocksObject {
* This check is potentially lighter-weight than invoking DB::Get(). One way * This check is potentially lighter-weight than invoking DB::Get(). One way
* to make this lighter weight is to avoid doing any IOs. * to make this lighter weight is to avoid doing any IOs.
* *
* @param columnFamilyHandle {@link ColumnFamilyHandle} instnace * @param columnFamilyHandle {@link ColumnFamilyHandle} instance
* @param key byte array of a key to search for * @param key byte array of a key to search for
* @param value StringBuffer instance which is a out parameter if a value is * @param value StringBuffer instance which is a out parameter if a value is
* found in block-cache. * found in block-cache.
@ -446,7 +446,7 @@ public class RocksDB extends RocksObject {
* to make this lighter weight is to avoid doing any IOs. * to make this lighter weight is to avoid doing any IOs.
* *
* @param readOptions {@link ReadOptions} instance * @param readOptions {@link ReadOptions} instance
* @param columnFamilyHandle {@link ColumnFamilyHandle} instnace * @param columnFamilyHandle {@link ColumnFamilyHandle} instance
* @param key byte array of a key to search for * @param key byte array of a key to search for
* @param value StringBuffer instance which is a out parameter if a value is * @param value StringBuffer instance which is a out parameter if a value is
* found in block-cache. * found in block-cache.
@ -483,6 +483,20 @@ public class RocksDB extends RocksObject {
merge(nativeHandle_, key, key.length, value, value.length); merge(nativeHandle_, key, key.length, value, value.length);
} }
/**
* Add merge operand for key/value pair in a ColumnFamily.
*
* @param columnFamilyHandle {@link ColumnFamilyHandle} instance
* @param key the specified key to be merged.
* @param value the value to be nerged with the current value for
* the specified key.
*/
public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key,
byte[] value) throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length,
columnFamilyHandle.nativeHandle_);
}
/** /**
* Add merge operand for key/value pair. * Add merge operand for key/value pair.
* *
@ -497,6 +511,22 @@ public class RocksDB extends RocksObject {
key, key.length, value, value.length); key, key.length, value, value.length);
} }
/**
* Add merge operand for key/value pair.
*
* @param columnFamilyHandle {@link ColumnFamilyHandle} instance
* @param writeOpts {@link WriteOptions} for this write.
* @param key the specified key to be merged.
* @param value the value to be merged with the current value for
* the specified key.
*/
public void merge(ColumnFamilyHandle columnFamilyHandle,
WriteOptions writeOpts, byte[] key, byte[] value)
throws RocksDBException {
merge(nativeHandle_, writeOpts.nativeHandle_,
key, key.length, value, value.length,
columnFamilyHandle.nativeHandle_);
}
/** /**
* Get the value associated with the specified key within column family* * Get the value associated with the specified key within column family*
@ -1000,8 +1030,8 @@ public class RocksDB extends RocksObject {
*/ */
public ColumnFamilyHandle createColumnFamily(String columnFamilyName) public ColumnFamilyHandle createColumnFamily(String columnFamilyName)
throws RocksDBException { throws RocksDBException {
return new ColumnFamilyHandle(createColumnFamily(nativeHandle_, return new ColumnFamilyHandle(this, createColumnFamily(nativeHandle_,
columnFamilyName)); options_.nativeHandle_, columnFamilyName));
} }
/** /**
@ -1063,10 +1093,17 @@ public class RocksDB extends RocksObject {
protected native void merge( protected native void merge(
long handle, byte[] key, int keyLen, long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException; byte[] value, int valueLen) throws RocksDBException;
protected native void merge(
long handle, byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native void merge( protected native void merge(
long handle, long writeOptHandle, long handle, long writeOptHandle,
byte[] key, int keyLen, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException; byte[] value, int valueLen) throws RocksDBException;
protected native void merge(
long handle, long writeOptHandle,
byte[] key, int keyLen,
byte[] value, int valueLen, long cfHandle) throws RocksDBException;
protected native int get( protected native int get(
long handle, byte[] key, int keyLen, long handle, byte[] key, int keyLen,
byte[] value, int valueLen) throws RocksDBException; byte[] value, int valueLen) throws RocksDBException;
@ -1122,7 +1159,8 @@ public class RocksDB extends RocksObject {
long nativeHandle, long snapshotHandle); long nativeHandle, long snapshotHandle);
private native void disposeInternal(long handle); private native void disposeInternal(long handle);
private native long createColumnFamily(long handle, String name) throws RocksDBException; private native long createColumnFamily(long handle, long opt_handle,
String name) throws RocksDBException;
private native void dropColumnFamily(long handle, long cfHandle) throws RocksDBException; private native void dropColumnFamily(long handle, long cfHandle) throws RocksDBException;
protected Options options_; protected Options options_;

@ -5,81 +5,184 @@
package org.rocksdb.test; package org.rocksdb.test;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import org.rocksdb.*; import org.rocksdb.*;
public class MergeTest { public class MergeTest {
static final String db_path_string = "/tmp/mergestringjni_db"; static final String db_path_string = "/tmp/rocksdbjni_mergestring_db";
static final String db_path_function = "/tmp/mergefunctionjni_db"; static final String db_cf_path_string = "/tmp/rocksdbjni_mergecfstring_db";
static { static final String db_path_operator = "/tmp/rocksdbjni_mergeoperator_db";
RocksDB.loadLibrary();
} static {
RocksDB.loadLibrary();
public static void testStringOption() }
throws InterruptedException, RocksDBException {
public static void testStringOption()
System.out.println("Testing merge function string option ==="); throws InterruptedException, RocksDBException {
Options opt = new Options();
Options opt = new Options(); opt.setCreateIfMissing(true);
opt.setCreateIfMissing(true); opt.setMergeOperatorName("stringappend");
opt.setMergeOperatorName("stringappend");
RocksDB db = RocksDB.open(opt, db_path_string);
RocksDB db = RocksDB.open(opt, db_path_string); // writing aa under key
db.put("key".getBytes(), "aa".getBytes());
System.out.println("Writing aa under key..."); // merge bb under key
db.put("key".getBytes(), "aa".getBytes()); db.merge("key".getBytes(), "bb".getBytes());
System.out.println("Writing bb under key..."); byte[] value = db.get("key".getBytes());
db.merge("key".getBytes(), "bb".getBytes()); String strValue = new String(value);
byte[] value = db.get("key".getBytes()); db.close();
String strValue = new String(value); opt.dispose();
assert(strValue.equals("aa,bb"));
System.out.println("Retrieved value: " + strValue); }
db.close(); public static void testCFStringOption()
opt.dispose(); throws InterruptedException, RocksDBException {
Options opt = new Options();
assert(strValue.equals("aa,bb")); opt.setCreateIfMissing(true);
opt.setCreateMissingColumnFamilies(true);
System.out.println("Merge function string option passed!"); opt.setMergeOperatorName("stringappend");
}
List<String> cfNames = new ArrayList<String>();
public static void testOperatorOption() List<ColumnFamilyHandle> columnFamilyHandleList =
throws InterruptedException, RocksDBException { new ArrayList<ColumnFamilyHandle>();
cfNames.add("default");
System.out.println("Testing merge function operator option ==="); cfNames.add("new_cf");
RocksDB db = RocksDB.open(opt, db_cf_path_string,
Options opt = new Options(); cfNames, columnFamilyHandleList);
opt.setCreateIfMissing(true);
// writing aa under key
StringAppendOperator stringAppendOperator = new StringAppendOperator(); db.put(columnFamilyHandleList.get(1),
opt.setMergeOperator(stringAppendOperator); "cfkey".getBytes(), "aa".getBytes());
// merge bb under key
RocksDB db = RocksDB.open(opt, db_path_string); db.merge(columnFamilyHandleList.get(1),
"cfkey".getBytes(), "bb".getBytes());
System.out.println("Writing aa under key...");
db.put("key".getBytes(), "aa".getBytes()); byte[] value = db.get(columnFamilyHandleList.get(1), "cfkey".getBytes());
String strValue = new String(value);
System.out.println("Writing bb under key...");
db.merge("key".getBytes(), "bb".getBytes()); for (ColumnFamilyHandle handle : columnFamilyHandleList) {
handle.dispose();
byte[] value = db.get("key".getBytes()); }
String strValue = new String(value); db.close();
opt.dispose();
System.out.println("Retrieved value: " + strValue); assert(strValue.equals("aa,bb"));
}
db.close();
opt.dispose(); public static void testOperatorOption()
throws InterruptedException, RocksDBException {
assert(strValue.equals("aa,bb")); Options opt = new Options();
opt.setCreateIfMissing(true);
System.out.println("Merge function operator option passed!");
} StringAppendOperator stringAppendOperator = new StringAppendOperator();
opt.setMergeOperator(stringAppendOperator);
public static void main(String[] args)
throws InterruptedException, RocksDBException { RocksDB db = RocksDB.open(opt, db_path_string);
testStringOption(); // Writing aa under key
testOperatorOption(); db.put("key".getBytes(), "aa".getBytes());
}
// Writing bb under key
db.merge("key".getBytes(), "bb".getBytes());
byte[] value = db.get("key".getBytes());
String strValue = new String(value);
db.close();
opt.dispose();
assert(strValue.equals("aa,bb"));
}
public static void testCFOperatorOption()
throws InterruptedException, RocksDBException {
Options opt = new Options();
opt.setCreateIfMissing(true);
opt.setCreateMissingColumnFamilies(true);
StringAppendOperator stringAppendOperator = new StringAppendOperator();
opt.setMergeOperator(stringAppendOperator);
List<String> cfNames = new ArrayList<String>();
List<ColumnFamilyHandle> columnFamilyHandleList =
new ArrayList<ColumnFamilyHandle>();
cfNames.add("default");
cfNames.add("new_cf");
RocksDB db = RocksDB.open(opt, db_path_operator,
cfNames, columnFamilyHandleList);
// writing aa under key
db.put(columnFamilyHandleList.get(1),
"cfkey".getBytes(), "aa".getBytes());
// merge bb under key
db.merge(columnFamilyHandleList.get(1),
"cfkey".getBytes(), "bb".getBytes());
byte[] value = db.get(columnFamilyHandleList.get(1), "cfkey".getBytes());
String strValue = new String(value);
// Test also with createColumnFamily
ColumnFamilyHandle columnFamilyHandle = db.createColumnFamily("new_cf2");
// writing xx under cfkey2
db.put(columnFamilyHandle, "cfkey2".getBytes(), "xx".getBytes());
// merge yy under cfkey2
db.merge(columnFamilyHandle, "cfkey2".getBytes(), "yy".getBytes());
value = db.get(columnFamilyHandle, "cfkey2".getBytes());
String strValueTmpCf = new String(value);
db.close();
opt.dispose();
assert(strValue.equals("aa,bb"));
assert(strValueTmpCf.equals("xx,yy"));
}
public static void testOperatorGcBehaviour()
throws RocksDBException {
Options opt = new Options();
opt.setCreateIfMissing(true);
StringAppendOperator stringAppendOperator = new StringAppendOperator();
opt.setMergeOperator(stringAppendOperator);
RocksDB db = RocksDB.open(opt, db_path_string);
db.close();
opt.dispose();
System.gc();
System.runFinalization();
// test reuse
opt = new Options();
opt.setMergeOperator(stringAppendOperator);
db = RocksDB.open(opt, db_path_string);
db.close();
opt.dispose();
System.gc();
System.runFinalization();
// test param init
opt = new Options();
opt.setMergeOperator(new StringAppendOperator());
db = RocksDB.open(opt, db_path_string);
db.close();
opt.dispose();
System.gc();
System.runFinalization();
// test replace one with another merge operator instance
opt = new Options();
opt.setMergeOperator(stringAppendOperator);
StringAppendOperator newStringAppendOperator = new StringAppendOperator();
opt.setMergeOperator(newStringAppendOperator);
db = RocksDB.open(opt, db_path_string);
db.close();
opt.dispose();
stringAppendOperator = null;
newStringAppendOperator = null;
System.gc();
System.runFinalization();
}
public static void main(String[] args)
throws InterruptedException, RocksDBException {
testStringOption();
testCFStringOption();
testOperatorOption();
testCFOperatorOption();
testOperatorGcBehaviour();
System.out.println("Passed MergeTest.");
}
} }

@ -95,7 +95,7 @@ jobject
cfnames_to_free.push_back(cfname); cfnames_to_free.push_back(cfname);
jcfnames_for_free.push_back(jstr); jcfnames_for_free.push_back(jstr);
column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname, column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname,
rocksdb::ColumnFamilyOptions())); *static_cast<rocksdb::ColumnFamilyOptions*>(opt)));
} }
rocksdb::Status s = rocksdb::DB::OpenForReadOnly(*opt, rocksdb::Status s = rocksdb::DB::OpenForReadOnly(*opt,
@ -167,7 +167,7 @@ jobject Java_org_rocksdb_RocksDB_open__JLjava_lang_String_2Ljava_util_List_2I(
cfnames_to_free.push_back(cfname); cfnames_to_free.push_back(cfname);
jcfnames_for_free.push_back(jstr); jcfnames_for_free.push_back(jstr);
column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname, column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname,
rocksdb::ColumnFamilyOptions())); *static_cast<rocksdb::ColumnFamilyOptions*>(opt)));
} }
rocksdb::Status s = rocksdb::DB::Open(*opt, db_path, column_families, rocksdb::Status s = rocksdb::DB::Open(*opt, db_path, column_families,
@ -919,7 +919,7 @@ void Java_org_rocksdb_RocksDB_remove__JJ_3BIJ(
void rocksdb_merge_helper( void rocksdb_merge_helper(
JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options,
jbyteArray jkey, jint jkey_len, rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len) { jbyteArray jvalue, jint jvalue_len) {
jbyte* key = env->GetByteArrayElements(jkey, 0); jbyte* key = env->GetByteArrayElements(jkey, 0);
@ -927,7 +927,12 @@ void rocksdb_merge_helper(
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len); rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jvalue_len); rocksdb::Slice value_slice(reinterpret_cast<char*>(value), jvalue_len);
rocksdb::Status s = db->Merge(write_options, key_slice, value_slice); rocksdb::Status s;
if (cf_handle != nullptr) {
s = db->Merge(write_options, cf_handle, key_slice, value_slice);
} else {
s = db->Merge(write_options, key_slice, value_slice);
}
// trigger java unref on key and value. // trigger java unref on key and value.
// by passing JNI_ABORT, it will simply release the reference without // by passing JNI_ABORT, it will simply release the reference without
@ -955,8 +960,29 @@ void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI(
rocksdb::WriteOptions(); rocksdb::WriteOptions();
rocksdb_merge_helper(env, db, default_write_options, rocksdb_merge_helper(env, db, default_write_options,
jkey, jkey_len, nullptr, jkey, jkey_len, jvalue, jvalue_len);
jvalue, jvalue_len); }
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (J[BI[BIJ)V
*/
void Java_org_rocksdb_RocksDB_merge__J_3BI_3BIJ(
JNIEnv* env, jobject jdb, jlong jdb_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
static const rocksdb::WriteOptions default_write_options =
rocksdb::WriteOptions();
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_merge_helper(env, db, default_write_options,
cf_handle, jkey, jkey_len, jvalue, jvalue_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
}
} }
/* /*
@ -974,8 +1000,30 @@ void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI(
jwrite_options_handle); jwrite_options_handle);
rocksdb_merge_helper(env, db, *write_options, rocksdb_merge_helper(env, db, *write_options,
jkey, jkey_len, nullptr, jkey, jkey_len, jvalue, jvalue_len);
jvalue, jvalue_len); }
/*
* Class: org_rocksdb_RocksDB
* Method: merge
* Signature: (JJ[BI[BIJ)V
*/
void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BIJ(
JNIEnv* env, jobject jdb,
jlong jdb_handle, jlong jwrite_options_handle,
jbyteArray jkey, jint jkey_len,
jbyteArray jvalue, jint jvalue_len, jlong jcf_handle) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto write_options = reinterpret_cast<rocksdb::WriteOptions*>(
jwrite_options_handle);
auto cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
if (cf_handle != nullptr) {
rocksdb_merge_helper(env, db, *write_options,
cf_handle, jkey, jkey_len, jvalue, jvalue_len);
} else {
rocksdb::RocksDBExceptionJni::ThrowNew(env,
rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle."));
}
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -1062,15 +1110,17 @@ jlongArray Java_org_rocksdb_RocksDB_iterators(
/* /*
* Class: org_rocksdb_RocksDB * Class: org_rocksdb_RocksDB
* Method: createColumnFamily * Method: createColumnFamily
* Signature: (JLjava/lang/String;)J; * Signature: (JJLjava/lang/String;)J;
*/ */
jlong Java_org_rocksdb_RocksDB_createColumnFamily( jlong Java_org_rocksdb_RocksDB_createColumnFamily(
JNIEnv* env, jobject jdb, jlong jdb_handle, jstring jcfname) { JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jopt_handle,
jstring jcfname) {
rocksdb::ColumnFamilyHandle* handle; rocksdb::ColumnFamilyHandle* handle;
const char* cfname = env->GetStringUTFChars(jcfname, 0); const char* cfname = env->GetStringUTFChars(jcfname, 0);
auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle); auto db_handle = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto opt = reinterpret_cast<rocksdb::Options*>(jopt_handle);
rocksdb::Status s = db_handle->CreateColumnFamily( rocksdb::Status s = db_handle->CreateColumnFamily(
rocksdb::ColumnFamilyOptions(), cfname, &handle); *static_cast<rocksdb::ColumnFamilyOptions*>(opt), cfname, &handle);
env->ReleaseStringUTFChars(jcfname, cfname); env->ReleaseStringUTFChars(jcfname, cfname);
if (s.ok()) { if (s.ok()) {

Loading…
Cancel
Save