Add locking to comparator jni callback methods which must be thread-safe

main
Adam Retter 11 years ago
parent d6fe8dacc8
commit fc12cb83f2
  1. 2
      java/Makefile
  2. 6
      java/org/rocksdb/Comparator.java
  3. 49
      java/org/rocksdb/ComparatorOptions.java
  4. 6
      java/org/rocksdb/DirectComparator.java
  5. 32
      java/rocksjni/comparator.cc
  6. 27
      java/rocksjni/comparatorjnicallback.cc
  7. 35
      java/rocksjni/comparatorjnicallback.h
  8. 48
      java/rocksjni/options.cc
  9. 27
      java/rocksjni/portal.h

@ -1,4 +1,4 @@
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle
ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)

@ -16,10 +16,10 @@ package org.rocksdb;
*/ */
public abstract class Comparator extends AbstractComparator<Slice> { public abstract class Comparator extends AbstractComparator<Slice> {
public Comparator() { public Comparator(final ComparatorOptions copt) {
super(); super();
createNewComparator0(); createNewComparator0(copt.nativeHandle_);
} }
private native void createNewComparator0(); private native void createNewComparator0(final long comparatorOptionsHandle);
} }

@ -0,0 +1,49 @@
package org.rocksdb;
public class ComparatorOptions extends RocksObject {
public ComparatorOptions() {
super();
newComparatorOptions();
}
/**
* Use adaptive mutex, which spins in the user space before resorting
* to kernel. This could reduce context switch when the mutex is not
* heavily contended. However, if the mutex is hot, we could end up
* wasting spin time.
* Default: false
*
* @return true if adaptive mutex is used.
*/
public boolean useAdaptiveMutex() {
assert(isInitialized());
return useAdaptiveMutex(nativeHandle_);
}
/**
* Use adaptive mutex, which spins in the user space before resorting
* to kernel. This could reduce context switch when the mutex is not
* heavily contended. However, if the mutex is hot, we could end up
* wasting spin time.
* Default: false
*
* @param useAdaptiveMutex true if adaptive mutex is used.
* @return the reference to the current comparator options.
*/
public ComparatorOptions setUseAdaptiveMutex(final boolean useAdaptiveMutex) {
assert (isInitialized());
setUseAdaptiveMutex(nativeHandle_, useAdaptiveMutex);
return this;
}
@Override protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native void newComparatorOptions();
private native boolean useAdaptiveMutex(final long handle);
private native void setUseAdaptiveMutex(final long handle, final boolean useAdaptiveMutex);
private native void disposeInternal(long handle);
}

@ -16,10 +16,10 @@ package org.rocksdb;
*/ */
public abstract class DirectComparator extends AbstractComparator<DirectSlice> { public abstract class DirectComparator extends AbstractComparator<DirectSlice> {
public DirectComparator() { public DirectComparator(final ComparatorOptions copt) {
super(); super();
createNewDirectComparator0(); createNewDirectComparator0(copt.nativeHandle_);
} }
private native void createNewDirectComparator0(); private native void createNewDirectComparator0(final long comparatorOptionsHandle);
} }

@ -18,6 +18,28 @@
#include "rocksjni/comparatorjnicallback.h" #include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/portal.h" #include "rocksjni/portal.h"
//<editor-fold desc="org.rocksdb.ComparatorOptions">
void Java_org_rocksdb_ComparatorOptions_newComparatorOptions(
JNIEnv* env, jobject jobj, jstring jpath, jboolean jshare_table_files,
jboolean jsync, jboolean jdestroy_old_data, jboolean jbackup_log_files,
jlong jbackup_rate_limit, jlong jrestore_rate_limit) {
jbackup_rate_limit = (jbackup_rate_limit <= 0) ? 0 : jbackup_rate_limit;
jrestore_rate_limit = (jrestore_rate_limit <= 0) ? 0 : jrestore_rate_limit;
const char* cpath = env->GetStringUTFChars(jpath, 0);
auto bopt = new rocksdb::BackupableDBOptions(cpath, nullptr,
jshare_table_files, nullptr, jsync, jdestroy_old_data, jbackup_log_files,
jbackup_rate_limit, jrestore_rate_limit);
env->ReleaseStringUTFChars(jpath, cpath);
rocksdb::BackupableDBOptionsJni::setHandle(env, jobj, bopt);
}
//</editor-fold>
//<editor-fold desc="org.rocksdb.AbstractComparator> //<editor-fold desc="org.rocksdb.AbstractComparator>
/* /*
@ -40,8 +62,9 @@ void Java_org_rocksdb_AbstractComparator_disposeInternal(
* Signature: ()V * Signature: ()V
*/ */
void Java_org_rocksdb_Comparator_createNewComparator0( void Java_org_rocksdb_Comparator_createNewComparator0(
JNIEnv* env, jobject jobj) { JNIEnv* env, jobject jobj, jlong copt_handle) {
const rocksdb::ComparatorJniCallback* c = new rocksdb::ComparatorJniCallback(env, jobj); const rocksdb::ComparatorJniCallbackOptions* copt = reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(copt_handle);
const rocksdb::ComparatorJniCallback* c = new rocksdb::ComparatorJniCallback(env, jobj, copt);
rocksdb::AbstractComparatorJni::setHandle(env, jobj, c); rocksdb::AbstractComparatorJni::setHandle(env, jobj, c);
} }
@ -55,8 +78,9 @@ void Java_org_rocksdb_Comparator_createNewComparator0(
* Signature: ()V * Signature: ()V
*/ */
void Java_org_rocksdb_DirectComparator_createNewDirectComparator0( void Java_org_rocksdb_DirectComparator_createNewDirectComparator0(
JNIEnv* env, jobject jobj) { JNIEnv* env, jobject jobj, jlong copt_handle) {
const rocksdb::DirectComparatorJniCallback* c = new rocksdb::DirectComparatorJniCallback(env, jobj); const rocksdb::ComparatorJniCallbackOptions* copt = reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(copt_handle);
const rocksdb::DirectComparatorJniCallback* c = new rocksdb::DirectComparatorJniCallback(env, jobj, copt);
rocksdb::AbstractComparatorJni::setHandle(env, jobj, c); rocksdb::AbstractComparatorJni::setHandle(env, jobj, c);
} }

@ -11,7 +11,11 @@
namespace rocksdb { namespace rocksdb {
BaseComparatorJniCallback::BaseComparatorJniCallback( BaseComparatorJniCallback::BaseComparatorJniCallback(
JNIEnv* env, jobject jComparator) { JNIEnv* env, jobject jComparator, const ComparatorJniCallbackOptions* copt) {
//mutex is used for synchronisation when we are re-using
//the global java slice objects
mutex_ = new port::Mutex(copt->use_adaptive_mutex);
// Note: Comparator methods may be accessed by multiple threads, // Note: Comparator methods may be accessed by multiple threads,
// so we ref the jvm not the env // so we ref the jvm not the env
@ -51,11 +55,14 @@ int BaseComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
JNIEnv* m_env = getJniEnv(); JNIEnv* m_env = getJniEnv();
mutex_->Lock();
AbstractSliceJni::setHandle(m_env, m_jSliceA, &a); AbstractSliceJni::setHandle(m_env, m_jSliceA, &a);
AbstractSliceJni::setHandle(m_env, m_jSliceB, &b); AbstractSliceJni::setHandle(m_env, m_jSliceB, &b);
jint result = m_env->CallIntMethod(m_jComparator, m_jCompareMethodId, m_jSliceA, m_jSliceB); jint result = m_env->CallIntMethod(m_jComparator, m_jCompareMethodId, m_jSliceA, m_jSliceB);
mutex_->Unlock();
m_jvm->DetachCurrentThread(); m_jvm->DetachCurrentThread();
return result; return result;
@ -72,10 +79,13 @@ void BaseComparatorJniCallback::FindShortestSeparator(std::string* start, const
const char* startUtf = start->c_str(); const char* startUtf = start->c_str();
jstring jsStart = m_env->NewStringUTF(startUtf); jstring jsStart = m_env->NewStringUTF(startUtf);
AbstractSliceJni::setHandle(m_env, m_jSliceLimit, &limit); mutex_->Lock();
AbstractSliceJni::setHandle(m_env, m_jSliceLimit, &limit);
jstring jsResultStart = (jstring)m_env->CallObjectMethod(m_jComparator, m_jFindShortestSeparatorMethodId, jsStart, m_jSliceLimit); jstring jsResultStart = (jstring)m_env->CallObjectMethod(m_jComparator, m_jFindShortestSeparatorMethodId, jsStart, m_jSliceLimit);
mutex_->Unlock();
m_env->DeleteLocalRef(jsStart); m_env->DeleteLocalRef(jsStart);
if(jsResultStart != nullptr) { if(jsResultStart != nullptr) {
@ -110,13 +120,6 @@ void BaseComparatorJniCallback::FindShortSuccessor(std::string* key) const {
} }
BaseComparatorJniCallback::~BaseComparatorJniCallback() { BaseComparatorJniCallback::~BaseComparatorJniCallback() {
// NOTE: we do not need to delete m_name here,
// I am not yet sure why, but doing so causes the error:
// java(13051,0x109f54000) malloc: *** error for object 0x109f52fa9: pointer being freed was not allocated
// *** set a breakpoint in malloc_error_break to debug
//delete[] m_name;
JNIEnv* m_env = getJniEnv(); JNIEnv* m_env = getJniEnv();
m_env->DeleteGlobalRef(m_jComparator); m_env->DeleteGlobalRef(m_jComparator);
@ -131,7 +134,7 @@ BaseComparatorJniCallback::~BaseComparatorJniCallback() {
} }
ComparatorJniCallback::ComparatorJniCallback( ComparatorJniCallback::ComparatorJniCallback(
JNIEnv* env, jobject jComparator) : BaseComparatorJniCallback(env, jComparator) { JNIEnv* env, jobject jComparator, const ComparatorJniCallbackOptions* copt) : BaseComparatorJniCallback(env, jComparator, copt) {
m_jSliceA = env->NewGlobalRef(SliceJni::construct0(env)); m_jSliceA = env->NewGlobalRef(SliceJni::construct0(env));
m_jSliceB = env->NewGlobalRef(SliceJni::construct0(env)); m_jSliceB = env->NewGlobalRef(SliceJni::construct0(env));
@ -139,7 +142,7 @@ ComparatorJniCallback::ComparatorJniCallback(
} }
DirectComparatorJniCallback::DirectComparatorJniCallback( DirectComparatorJniCallback::DirectComparatorJniCallback(
JNIEnv* env, jobject jComparator) : BaseComparatorJniCallback(env, jComparator) { JNIEnv* env, jobject jComparator, const ComparatorJniCallbackOptions* copt) : BaseComparatorJniCallback(env, jComparator, copt) {
m_jSliceA = env->NewGlobalRef(DirectSliceJni::construct0(env)); m_jSliceA = env->NewGlobalRef(DirectSliceJni::construct0(env));
m_jSliceB = env->NewGlobalRef(DirectSliceJni::construct0(env)); m_jSliceB = env->NewGlobalRef(DirectSliceJni::construct0(env));

@ -12,11 +12,39 @@
#include <jni.h> #include <jni.h>
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
struct ComparatorJniCallbackOptions {
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not
// heavily contended. However, if the mutex is hot, we could end up
// wasting spin time.
// Default: false
bool use_adaptive_mutex;
ComparatorJniCallbackOptions() : use_adaptive_mutex(false) {
}
};
/**
* This class acts as a bridge between C++
* and Java. The methods in this class will be
* called back from the RocksDB storage engine (C++)
* we then callback to the appropriate Java method
* this enables Comparators to be implemented in Java.
*
* The design of this Comparator caches the Java Slice
* objects that are used in the compare and findShortestSeparator
* method callbacks. Instead of creating new objects for each callback
* of those functions, by reuse via setHandle we are a lot
* faster; Unfortunately this means that we have to
* introduce locking in regions of those methods via mutex_.
*/
class BaseComparatorJniCallback : public Comparator { class BaseComparatorJniCallback : public Comparator {
public: public:
BaseComparatorJniCallback(JNIEnv* env, jobject jComparator); BaseComparatorJniCallback(JNIEnv* env, jobject jComparator, const ComparatorJniCallbackOptions* copt);
virtual ~BaseComparatorJniCallback(); virtual ~BaseComparatorJniCallback();
virtual const char* Name() const; virtual const char* Name() const;
virtual int Compare(const Slice& a, const Slice& b) const; virtual int Compare(const Slice& a, const Slice& b) const;
@ -24,6 +52,7 @@ class BaseComparatorJniCallback : public Comparator {
virtual void FindShortSuccessor(std::string* key) const; virtual void FindShortSuccessor(std::string* key) const;
private: private:
port::Mutex* mutex_;
JavaVM* m_jvm; JavaVM* m_jvm;
jobject m_jComparator; jobject m_jComparator;
std::string m_name; std::string m_name;
@ -40,12 +69,12 @@ class BaseComparatorJniCallback : public Comparator {
class ComparatorJniCallback : public BaseComparatorJniCallback { class ComparatorJniCallback : public BaseComparatorJniCallback {
public: public:
ComparatorJniCallback(JNIEnv* env, jobject jComparator); ComparatorJniCallback(JNIEnv* env, jobject jComparator, const ComparatorJniCallbackOptions* copt);
}; };
class DirectComparatorJniCallback : public BaseComparatorJniCallback { class DirectComparatorJniCallback : public BaseComparatorJniCallback {
public: public:
DirectComparatorJniCallback(JNIEnv* env, jobject jComparator); DirectComparatorJniCallback(JNIEnv* env, jobject jComparator, const ComparatorJniCallbackOptions* copt);
}; };
} // namespace rocksdb } // namespace rocksdb

@ -14,6 +14,8 @@
#include "include/org_rocksdb_Options.h" #include "include/org_rocksdb_Options.h"
#include "include/org_rocksdb_WriteOptions.h" #include "include/org_rocksdb_WriteOptions.h"
#include "include/org_rocksdb_ReadOptions.h" #include "include/org_rocksdb_ReadOptions.h"
#include "include/org_rocksdb_ComparatorOptions.h"
#include "rocksjni/portal.h" #include "rocksjni/portal.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -23,6 +25,7 @@
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "comparatorjnicallback.h"
/* /*
* Class: org_rocksdb_Options * Class: org_rocksdb_Options
@ -1770,3 +1773,48 @@ void Java_org_rocksdb_ReadOptions_setTailing(
reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->tailing = reinterpret_cast<rocksdb::ReadOptions*>(jhandle)->tailing =
static_cast<bool>(jtailing); static_cast<bool>(jtailing);
} }
/////////////////////////////////////////////////////////////////////
// rocksdb::ComparatorOptions
/*
* Class: org_rocksdb_ComparatorOptions
* Method: newComparatorOptions
* Signature: ()V
*/
void Java_org_rocksdb_ComparatorOptions_newComparatorOptions(
JNIEnv* env, jobject jobj) {
auto comparator_opt = new rocksdb::ComparatorJniCallbackOptions();
rocksdb::ComparatorOptionsJni::setHandle(env, jobj, comparator_opt);
}
/*
* Class: org_rocksdb_ComparatorOptions
* Method: useAdaptiveMutex
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_ComparatorOptions_useAdaptiveMutex(
JNIEnv * env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(jhandle)->use_adaptive_mutex;
}
/*
* Class: org_rocksdb_ComparatorOptions
* Method: setUseAdaptiveMutex
* Signature: (JZ)V
*/
void Java_org_rocksdb_ComparatorOptions_setUseAdaptiveMutex(
JNIEnv * env, jobject jobj, jlong jhandle, jboolean juse_adaptive_mutex) {
reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(jhandle)->use_adaptive_mutex =
static_cast<bool>(juse_adaptive_mutex);
}
/*
* Class: org_rocksdb_ComparatorOptions
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_ComparatorOptions_disposeInternal(
JNIEnv * env, jobject jobj, jlong jhandle) {
delete reinterpret_cast<rocksdb::ComparatorJniCallbackOptions*>(jhandle);
rocksdb::ComparatorOptionsJni::setHandle(env, jobj, nullptr);
}

@ -363,6 +363,33 @@ class ColumnFamilyHandleJni {
} }
}; };
class ComparatorOptionsJni {
public:
// Get the java class id of org.rocksdb.ComparatorOptions.
static jclass getJClass(JNIEnv* env) {
jclass jclazz = env->FindClass("org/rocksdb/ComparatorOptions");
assert(jclazz != nullptr);
return jclazz;
}
// Get the field id of the member variable of org.rocksdb.ComparatorOptions
// that stores the pointer to rocksdb::ComparatorJniCallbackOptions.
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "nativeHandle_", "J");
assert(fid != nullptr);
return fid;
}
// Pass the ComparatorJniCallbackOptions pointer to the java side.
static void setHandle(
JNIEnv* env, jobject jobj, const rocksdb::ComparatorJniCallbackOptions* op) {
env->SetLongField(
jobj, getHandleFieldID(env),
reinterpret_cast<jlong>(op));
}
};
class AbstractComparatorJni { class AbstractComparatorJni {
public: public:
// Get the java class id of org.rocksdb.Comparator. // Get the java class id of org.rocksdb.Comparator.

Loading…
Cancel
Save