From 171be0ed55b5d0664a4c95ec8a4b021526c8fa7a Mon Sep 17 00:00:00 2001 From: fyrz Date: Wed, 29 Oct 2014 18:40:44 +0100 Subject: [PATCH] Merge with ColumnFamilies & Hardening CFHandle Summary: ColumnFamilyHandles face the same problem as RocksIterator previously so used methods were also applied for ColumnFamilyHandles. Another problem with CF was that Options passed to CFs were always filled with default values. To enable Merge, all parts of the database must share the same merge functionality which is not possible using default values. So from now on every CF will inherit from db options. Changes to RocksDB: - merge can now take also a cfhandle Changes to MergeTest: - Corrected formatting - Included also GC tests - Extended tests to cover CF related parts - Corrected paths to cleanup properly within the test process - Reduced verbosity of the test Test Plan: make rocksdbjava make jtest Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D27999 --- java/org/rocksdb/ColumnFamilyHandle.java | 22 +- java/org/rocksdb/RocksDB.java | 52 ++++- java/org/rocksdb/test/MergeTest.java | 243 ++++++++++++++++------- java/rocksjni/rocksjni.cc | 72 ++++++- 4 files changed, 292 insertions(+), 97 deletions(-) diff --git a/java/org/rocksdb/ColumnFamilyHandle.java b/java/org/rocksdb/ColumnFamilyHandle.java index 334abd96d..92a4d7cef 100644 --- a/java/org/rocksdb/ColumnFamilyHandle.java +++ b/java/org/rocksdb/ColumnFamilyHandle.java @@ -10,23 +10,33 @@ package org.rocksdb; * ColumnFamily Pointers. */ public class ColumnFamilyHandle extends RocksObject { - ColumnFamilyHandle(long nativeHandle) { + ColumnFamilyHandle(RocksDB rocksDB, long nativeHandle) { super(); nativeHandle_ = nativeHandle; + // rocksDB must point to a valid RocksDB instance; + assert(rocksDB != null); + // ColumnFamilyHandle must hold a reference to the related RocksDB instance + // to guarantee that while a GC cycle starts ColumnFamilyHandle instances + // are freed prior to RocksDB instances. + rocksDB_ = rocksDB; } /** - * Deletes underlying C++ filter pointer. + *

Deletes underlying C++ iterator pointer.

* - * Note that this function should be called only after all - * RocksDB instances referencing the filter are closed. - * Otherwise an undefined behavior will occur. + *

Note: the underlying handle can only be safely deleted if the RocksDB + * instance related to a certain ColumnFamilyHandle is still valid and initialized. + * Therefore {@code disposeInternal()} checks if the RocksDB is initialized + * before freeing the native handle.

*/ @Override protected void disposeInternal() { assert(isInitialized()); - disposeInternal(nativeHandle_); + if (rocksDB_.isInitialized()) { + disposeInternal(nativeHandle_); + } } private native void disposeInternal(long handle); + private RocksDB rocksDB_; } diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 291c505c7..2a90c7370 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -214,7 +214,7 @@ public class RocksDB extends RocksObject { List cfReferences = db.open(options.nativeHandle_, path, columnFamilyNames, columnFamilyNames.size()); for (int i=0; i cfReferences = db.openROnly(options.nativeHandle_, path, columnFamilyNames, columnFamilyNames.size()); for (int i=0; i cfNames = new ArrayList(); + List columnFamilyHandleList = + new ArrayList(); + cfNames.add("default"); + cfNames.add("new_cf"); + RocksDB db = RocksDB.open(opt, db_cf_path_string, cfNames, columnFamilyHandleList); + + // writing aa under key + db.put(columnFamilyHandleList.get(1), "cfkey".getBytes(), "aa".getBytes()); + // merge bb under key + db.merge(columnFamilyHandleList.get(1), "cfkey".getBytes(), "bb".getBytes()); + + byte[] value = db.get(columnFamilyHandleList.get(1), "cfkey".getBytes()); + String strValue = new String(value); + + for (ColumnFamilyHandle handle : columnFamilyHandleList) { + handle.dispose(); + } + db.close(); + opt.dispose(); + assert(strValue.equals("aa,bb")); + } + + public static void testOperatorOption() + throws InterruptedException, RocksDBException { + Options opt = new Options(); + opt.setCreateIfMissing(true); + + StringAppendOperator stringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(stringAppendOperator); + + RocksDB db = RocksDB.open(opt, db_path_string); + // Writing aa under key + db.put("key".getBytes(), "aa".getBytes()); + + // Writing bb under key + db.merge("key".getBytes(), "bb".getBytes()); + + byte[] value = db.get("key".getBytes()); + String strValue = new String(value); + + db.close(); + opt.dispose(); + assert(strValue.equals("aa,bb")); + } + + public static void testCFOperatorOption() + throws InterruptedException, RocksDBException { + Options opt = new Options(); + opt.setCreateIfMissing(true); + opt.setCreateMissingColumnFamilies(true); + StringAppendOperator stringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(stringAppendOperator); + + List cfNames = new ArrayList(); + List columnFamilyHandleList = + new ArrayList(); + cfNames.add("default"); + cfNames.add("new_cf"); + RocksDB db = RocksDB.open(opt, db_path_operator, cfNames, columnFamilyHandleList); + + // writing aa under key + db.put(columnFamilyHandleList.get(1), "cfkey".getBytes(), "aa".getBytes()); + // merge bb under key + db.merge(columnFamilyHandleList.get(1), "cfkey".getBytes(), "bb".getBytes()); + byte[] value = db.get(columnFamilyHandleList.get(1), "cfkey".getBytes()); + String strValue = new String(value); + + // Test also with createColumnFamily + ColumnFamilyHandle columnFamilyHandle = db.createColumnFamily("new_cf2"); + // writing xx under cfkey2 + db.put(columnFamilyHandle, "cfkey2".getBytes(), "xx".getBytes()); + // merge yy under cfkey2 + db.merge(columnFamilyHandle, "cfkey2".getBytes(), "yy".getBytes()); + value = db.get(columnFamilyHandle, "cfkey2".getBytes()); + String strValueTmpCf = new String(value); + + db.close(); + opt.dispose(); + assert(strValue.equals("aa,bb")); + assert(strValueTmpCf.equals("xx,yy")); + } + + public static void testOperatorGcBehaviour() + throws RocksDBException { + Options opt = new Options(); + opt.setCreateIfMissing(true); + StringAppendOperator stringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(stringAppendOperator); + RocksDB db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + System.gc(); + System.runFinalization(); + // test reuse + opt = new Options(); + opt.setMergeOperator(stringAppendOperator); + db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + System.gc(); + System.runFinalization(); + // test param init + opt = new Options(); + opt.setMergeOperator(new StringAppendOperator()); + db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + System.gc(); + System.runFinalization(); + // test replace one with another merge operator instance + opt = new Options(); + opt.setMergeOperator(stringAppendOperator); + StringAppendOperator newStringAppendOperator = new StringAppendOperator(); + opt.setMergeOperator(newStringAppendOperator); + db = RocksDB.open(opt, db_path_string); + db.close(); + opt.dispose(); + stringAppendOperator = null; + newStringAppendOperator = null; + System.gc(); + System.runFinalization(); + } + + public static void main(String[] args) + throws InterruptedException, RocksDBException { + testStringOption(); + testCFStringOption(); + testOperatorOption(); + testCFOperatorOption(); + testOperatorGcBehaviour(); + System.out.println("Passed MergeTest."); + } } diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 1e886e2e2..50cd8a359 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -95,7 +95,7 @@ jobject cfnames_to_free.push_back(cfname); jcfnames_for_free.push_back(jstr); column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname, - rocksdb::ColumnFamilyOptions())); + *static_cast(opt))); } rocksdb::Status s = rocksdb::DB::OpenForReadOnly(*opt, @@ -167,7 +167,7 @@ jobject Java_org_rocksdb_RocksDB_open__JLjava_lang_String_2Ljava_util_List_2I( cfnames_to_free.push_back(cfname); jcfnames_for_free.push_back(jstr); column_families.push_back(rocksdb::ColumnFamilyDescriptor(cfname, - rocksdb::ColumnFamilyOptions())); + *static_cast(opt))); } rocksdb::Status s = rocksdb::DB::Open(*opt, db_path, column_families, @@ -919,7 +919,7 @@ void Java_org_rocksdb_RocksDB_remove__JJ_3BIJ( void rocksdb_merge_helper( JNIEnv* env, rocksdb::DB* db, const rocksdb::WriteOptions& write_options, - jbyteArray jkey, jint jkey_len, + rocksdb::ColumnFamilyHandle* cf_handle, jbyteArray jkey, jint jkey_len, jbyteArray jvalue, jint jvalue_len) { jbyte* key = env->GetByteArrayElements(jkey, 0); @@ -927,7 +927,12 @@ void rocksdb_merge_helper( rocksdb::Slice key_slice(reinterpret_cast(key), jkey_len); rocksdb::Slice value_slice(reinterpret_cast(value), jvalue_len); - rocksdb::Status s = db->Merge(write_options, key_slice, value_slice); + rocksdb::Status s; + if (cf_handle != nullptr) { + s = db->Merge(write_options, cf_handle, key_slice, value_slice); + } else { + s = db->Merge(write_options, key_slice, value_slice); + } // trigger java unref on key and value. // by passing JNI_ABORT, it will simply release the reference without @@ -955,8 +960,29 @@ void Java_org_rocksdb_RocksDB_merge__J_3BI_3BI( rocksdb::WriteOptions(); rocksdb_merge_helper(env, db, default_write_options, - jkey, jkey_len, - jvalue, jvalue_len); + nullptr, jkey, jkey_len, jvalue, jvalue_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: merge + * Signature: (J[BI[BIJ)V + */ +void Java_org_rocksdb_RocksDB_merge__J_3BI_3BIJ( + JNIEnv* env, jobject jdb, jlong jdb_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len, jlong jcf_handle) { + auto db = reinterpret_cast(jdb_handle); + static const rocksdb::WriteOptions default_write_options = + rocksdb::WriteOptions(); + auto cf_handle = reinterpret_cast(jcf_handle); + if (cf_handle != nullptr) { + rocksdb_merge_helper(env, db, default_write_options, + cf_handle, jkey, jkey_len, jvalue, jvalue_len); + } else { + rocksdb::RocksDBExceptionJni::ThrowNew(env, + rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); + } } /* @@ -974,8 +1000,30 @@ void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BI( jwrite_options_handle); rocksdb_merge_helper(env, db, *write_options, - jkey, jkey_len, - jvalue, jvalue_len); + nullptr, jkey, jkey_len, jvalue, jvalue_len); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: merge + * Signature: (JJ[BI[BIJ)V + */ +void Java_org_rocksdb_RocksDB_merge__JJ_3BI_3BIJ( + JNIEnv* env, jobject jdb, + jlong jdb_handle, jlong jwrite_options_handle, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len, jlong jcf_handle) { + auto db = reinterpret_cast(jdb_handle); + auto write_options = reinterpret_cast( + jwrite_options_handle); + auto cf_handle = reinterpret_cast(jcf_handle); + if (cf_handle != nullptr) { + rocksdb_merge_helper(env, db, *write_options, + cf_handle, jkey, jkey_len, jvalue, jvalue_len); + } else { + rocksdb::RocksDBExceptionJni::ThrowNew(env, + rocksdb::Status::InvalidArgument("Invalid ColumnFamilyHandle.")); + } } ////////////////////////////////////////////////////////////////////////////// @@ -1062,15 +1110,17 @@ jlongArray Java_org_rocksdb_RocksDB_iterators( /* * Class: org_rocksdb_RocksDB * Method: createColumnFamily - * Signature: (JLjava/lang/String;)J; + * Signature: (JJLjava/lang/String;)J; */ jlong Java_org_rocksdb_RocksDB_createColumnFamily( - JNIEnv* env, jobject jdb, jlong jdb_handle, jstring jcfname) { + JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jopt_handle, + jstring jcfname) { rocksdb::ColumnFamilyHandle* handle; const char* cfname = env->GetStringUTFChars(jcfname, 0); auto db_handle = reinterpret_cast(jdb_handle); + auto opt = reinterpret_cast(jopt_handle); rocksdb::Status s = db_handle->CreateColumnFamily( - rocksdb::ColumnFamilyOptions(), cfname, &handle); + *static_cast(opt), cfname, &handle); env->ReleaseStringUTFChars(jcfname, cfname); if (s.ok()) {