@ -7,18 +7,18 @@
// ROCKSDB_NAMESPACE::Comparator.
# include "rocksjni/comparatorjnicallback.h"
# include "rocksjni/portal.h"
namespace ROCKSDB_NAMESPACE {
ComparatorJniCallback : : ComparatorJniCallback (
JNIEnv * env , jobject jcomparator ,
const ComparatorJniCallbackOptions * options )
: JniCallback ( env , jcomparator ) ,
m_options ( options ) {
// cache the AbstractComparatorJniBridge class as we will reuse it many times for each callback
m_abstract_comparator_jni_bridge_clazz =
static_cast < jclass > ( env - > NewGlobalRef ( AbstractComparatorJniBridge : : getJClass ( env ) ) ) ;
: JniCallback ( env , jcomparator ) , m_options ( options ) {
// cache the AbstractComparatorJniBridge class as we will reuse it many times
// for each callback
m_abstract_comparator_jni_bridge_clazz = static_cast < jclass > (
env - > NewGlobalRef ( AbstractComparatorJniBridge : : getJClass ( env ) ) ) ;
// Note: The name of a Comparator will not change during it's lifetime,
// so we cache it in a global var
@ -60,8 +60,8 @@ ComparatorJniCallback::ComparatorJniCallback(
}
m_jshort_mid =
AbstractComparatorJniBridge : : getFindShortSuccessorInternalMethodId ( env ,
m_abstract_comparator_jni_bridge_clazz ) ;
AbstractComparatorJniBridge : : getFindShortSuccessorInternalMethodId (
env , m_abstract_comparator_jni_bridge_clazz ) ;
if ( m_jshort_mid = = nullptr ) {
// exception thrown: NoSuchMethodException or OutOfMemoryError
return ;
@ -69,9 +69,8 @@ ComparatorJniCallback::ComparatorJniCallback(
// do we need reusable buffers?
if ( m_options - > max_reused_buffer_size > - 1 ) {
if ( m_options - > reused_synchronisation_type
= = ReusedSynchronisationType : : THREAD_LOCAL ) {
if ( m_options - > reused_synchronisation_type = =
ReusedSynchronisationType : : THREAD_LOCAL ) {
// buffers reused per thread
UnrefHandler unref = [ ] ( void * ptr ) {
ThreadLocalBuf * tlb = reinterpret_cast < ThreadLocalBuf * > ( ptr ) ;
@ -98,8 +97,8 @@ ComparatorJniCallback::ComparatorJniCallback(
} else {
// buffers reused and shared across threads
const bool adaptive =
m_options - > reused_synchronisation_type = = ReusedSynchronisationType : : ADAPTIVE_MUTEX ;
const bool adaptive = m_options - > reused_synchronisation_type = =
ReusedSynchronisationType : : ADAPTIVE_MUTEX ;
mtx_compare = std : : unique_ptr < port : : Mutex > ( new port : : Mutex ( adaptive ) ) ;
mtx_shortest = std : : unique_ptr < port : : Mutex > ( new port : : Mutex ( adaptive ) ) ;
mtx_short = std : : unique_ptr < port : : Mutex > ( new port : : Mutex ( adaptive ) ) ;
@ -220,9 +219,7 @@ ComparatorJniCallback::~ComparatorJniCallback() {
releaseJniEnv ( attached_thread ) ;
}
const char * ComparatorJniCallback : : Name ( ) const {
return m_name . get ( ) ;
}
const char * ComparatorJniCallback : : Name ( ) const { return m_name . get ( ) ; }
int ComparatorJniCallback : : Compare ( const Slice & a , const Slice & b ) const {
jboolean attached_thread = JNI_FALSE ;
@ -236,7 +233,8 @@ int ComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
MaybeLockForReuse ( mtx_compare , reuse_jbuf_a | | reuse_jbuf_b ) ;
jobject jcompare_buf_a = GetBuffer ( env , a , reuse_jbuf_a , m_tl_buf_a , m_jcompare_buf_a ) ;
jobject jcompare_buf_a =
GetBuffer ( env , a , reuse_jbuf_a , m_tl_buf_a , m_jcompare_buf_a ) ;
if ( jcompare_buf_a = = nullptr ) {
// exception occurred
MaybeUnlockForReuse ( mtx_compare , reuse_jbuf_a | | reuse_jbuf_b ) ;
@ -245,7 +243,8 @@ int ComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
return 0 ;
}
jobject jcompare_buf_b = GetBuffer ( env , b , reuse_jbuf_b , m_tl_buf_b , m_jcompare_buf_b ) ;
jobject jcompare_buf_b =
GetBuffer ( env , b , reuse_jbuf_b , m_tl_buf_b , m_jcompare_buf_b ) ;
if ( jcompare_buf_b = = nullptr ) {
// exception occurred
if ( ! reuse_jbuf_a ) {
@ -257,12 +256,10 @@ int ComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
return 0 ;
}
jint result =
env - > CallStaticIntMethod (
m_abstract_comparator_jni_bridge_clazz , m_jcompare_mid ,
m_jcallback_obj ,
jcompare_buf_a , reuse_jbuf_a ? a . size ( ) : - 1 ,
jcompare_buf_b , reuse_jbuf_b ? b . size ( ) : - 1 ) ;
jint result = env - > CallStaticIntMethod (
m_abstract_comparator_jni_bridge_clazz , m_jcompare_mid , m_jcallback_obj ,
jcompare_buf_a , reuse_jbuf_a ? a . size ( ) : - 1 , jcompare_buf_b ,
reuse_jbuf_b ? b . size ( ) : - 1 ) ;
if ( env - > ExceptionCheck ( ) ) {
// exception thrown from CallIntMethod
@ -284,8 +281,8 @@ int ComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
return result ;
}
void ComparatorJniCallback : : FindShortestSeparator (
std : : string * start , const Slice & limit ) const {
void ComparatorJniCallback : : FindShortestSeparator ( std : : string * start ,
const Slice & limit ) const {
if ( start = = nullptr ) {
return ;
}
@ -294,15 +291,16 @@ void ComparatorJniCallback::FindShortestSeparator(
JNIEnv * env = getJniEnv ( & attached_thread ) ;
assert ( env ! = nullptr ) ;
const bool reuse_jbuf_start =
static_cast < int64_t > ( start - > length ( ) ) < = m_options - > max_reused_buffer_size ;
const bool reuse_jbuf_start = static_cast < int64_t > ( start - > length ( ) ) < =
m_options - > max_reused_buffer_size ;
const bool reuse_jbuf_limit =
static_cast < int64_t > ( limit . size ( ) ) < = m_options - > max_reused_buffer_size ;
MaybeLockForReuse ( mtx_shortest , reuse_jbuf_start | | reuse_jbuf_limit ) ;
Slice sstart ( start - > data ( ) , start - > length ( ) ) ;
jobject j_start_buf = GetBuffer ( env , sstart , reuse_jbuf_start , m_tl_buf_a , m_jshortest_buf_start ) ;
jobject j_start_buf = GetBuffer ( env , sstart , reuse_jbuf_start , m_tl_buf_a ,
m_jshortest_buf_start ) ;
if ( j_start_buf = = nullptr ) {
// exception occurred
MaybeUnlockForReuse ( mtx_shortest , reuse_jbuf_start | | reuse_jbuf_limit ) ;
@ -311,7 +309,8 @@ void ComparatorJniCallback::FindShortestSeparator(
return ;
}
jobject j_limit_buf = GetBuffer ( env , limit , reuse_jbuf_limit , m_tl_buf_b , m_jshortest_buf_limit ) ;
jobject j_limit_buf = GetBuffer ( env , limit , reuse_jbuf_limit , m_tl_buf_b ,
m_jshortest_buf_limit ) ;
if ( j_limit_buf = = nullptr ) {
// exception occurred
if ( ! reuse_jbuf_start ) {
@ -324,10 +323,9 @@ void ComparatorJniCallback::FindShortestSeparator(
}
jint jstart_len = env - > CallStaticIntMethod (
m_abstract_comparator_jni_bridge_clazz , m_jshortest_mid ,
m_jcallback_obj ,
j_start_buf , reuse_jbuf_start ? start - > length ( ) : - 1 ,
j_limit_buf , reuse_jbuf_limit ? limit . size ( ) : - 1 ) ;
m_abstract_comparator_jni_bridge_clazz , m_jshortest_mid , m_jcallback_obj ,
j_start_buf , reuse_jbuf_start ? start - > length ( ) : - 1 , j_limit_buf ,
reuse_jbuf_limit ? limit . size ( ) : - 1 ) ;
if ( env - > ExceptionCheck ( ) ) {
// exception thrown from CallIntMethod
@ -348,7 +346,8 @@ void ComparatorJniCallback::FindShortestSeparator(
if ( ! reuse_jbuf_limit ) {
DeleteBuffer ( env , j_limit_buf ) ;
}
MaybeUnlockForReuse ( mtx_shortest , reuse_jbuf_start | | reuse_jbuf_limit ) ;
MaybeUnlockForReuse ( mtx_shortest ,
reuse_jbuf_start | | reuse_jbuf_limit ) ;
ROCKSDB_NAMESPACE : : RocksDBExceptionJni : : ThrowNew (
env , " Unable to get Direct Buffer Address " ) ;
env - > ExceptionDescribe ( ) ; // print out exception to stderr
@ -358,14 +357,14 @@ void ComparatorJniCallback::FindShortestSeparator(
start - > assign ( static_cast < const char * > ( start_buf ) , jstart_len ) ;
} else {
// reused non-direct buffer
copy_from_non_direct = true ;
}
} else {
// there was a new buffer
if ( m_options - > direct_buffer ) {
// it was direct... don't forget to potentially truncate the `start` string
// it was direct... don't forget to potentially truncate the `start`
// string
start - > resize ( jstart_len ) ;
} else {
// it was non-direct
@ -374,8 +373,8 @@ void ComparatorJniCallback::FindShortestSeparator(
}
if ( copy_from_non_direct ) {
jbyteArray jarray = ByteBufferJni : : array ( env , j_start_buf ,
m_jbytebuffer_clazz ) ;
jbyteArray jarray =
ByteBufferJni : : array ( env , j_start_buf , m_jbytebuffer_clazz ) ;
if ( jarray = = nullptr ) {
if ( ! reuse_jbuf_start ) {
DeleteBuffer ( env , j_start_buf ) ;
@ -389,9 +388,12 @@ void ComparatorJniCallback::FindShortestSeparator(
return ;
}
jboolean has_exception = JNI_FALSE ;
JniUtil : : byteString < std : : string > ( env , jarray , [ start , jstart_len ] ( const char * data , const size_t ) {
JniUtil : : byteString < std : : string > (
env , jarray ,
[ start , jstart_len ] ( const char * data , const size_t ) {
return start - > assign ( data , static_cast < size_t > ( jstart_len ) ) ;
} , & has_exception ) ;
} ,
& has_exception ) ;
env - > DeleteLocalRef ( jarray ) ;
if ( has_exception = = JNI_TRUE ) {
if ( ! reuse_jbuf_start ) {
@ -420,8 +422,7 @@ void ComparatorJniCallback::FindShortestSeparator(
releaseJniEnv ( attached_thread ) ;
}
void ComparatorJniCallback : : FindShortSuccessor (
std : : string * key ) const {
void ComparatorJniCallback : : FindShortSuccessor ( std : : string * key ) const {
if ( key = = nullptr ) {
return ;
}
@ -436,7 +437,8 @@ void ComparatorJniCallback::FindShortSuccessor(
MaybeLockForReuse ( mtx_short , reuse_jbuf_key ) ;
Slice skey ( key - > data ( ) , key - > length ( ) ) ;
jobject j_key_buf = GetBuffer ( env , skey , reuse_jbuf_key , m_tl_buf_a , m_jshort_buf_key ) ;
jobject j_key_buf =
GetBuffer ( env , skey , reuse_jbuf_key , m_tl_buf_a , m_jshort_buf_key ) ;
if ( j_key_buf = = nullptr ) {
// exception occurred
MaybeUnlockForReuse ( mtx_short , reuse_jbuf_key ) ;
@ -446,8 +448,7 @@ void ComparatorJniCallback::FindShortSuccessor(
}
jint jkey_len = env - > CallStaticIntMethod (
m_abstract_comparator_jni_bridge_clazz , m_jshort_mid ,
m_jcallback_obj ,
m_abstract_comparator_jni_bridge_clazz , m_jshort_mid , m_jcallback_obj ,
j_key_buf , reuse_jbuf_key ? key - > length ( ) : - 1 ) ;
if ( env - > ExceptionCheck ( ) ) {
@ -459,7 +460,6 @@ void ComparatorJniCallback::FindShortSuccessor(
env - > ExceptionDescribe ( ) ; // print out exception to stderr
releaseJniEnv ( attached_thread ) ;
return ;
}
if ( static_cast < size_t > ( jkey_len ) ! = key - > length ( ) ) {
@ -489,7 +489,8 @@ void ComparatorJniCallback::FindShortSuccessor(
} else {
// there was a new buffer
if ( m_options - > direct_buffer ) {
// it was direct... don't forget to potentially truncate the `key` string
// it was direct... don't forget to potentially truncate the `key`
// string
key - > resize ( jkey_len ) ;
} else {
// it was non-direct
@ -498,10 +499,9 @@ void ComparatorJniCallback::FindShortSuccessor(
}
if ( copy_from_non_direct ) {
jbyteArray jarray = ByteBufferJni : : array ( env , j_key_buf ,
m_jbytebuffer_clazz ) ;
jbyteArray jarray =
ByteBufferJni : : array ( env , j_key_buf , m_jbytebuffer_clazz ) ;
if ( jarray = = nullptr ) {
if ( ! reuse_jbuf_key ) {
DeleteBuffer ( env , j_key_buf ) ;
}
@ -511,9 +511,12 @@ void ComparatorJniCallback::FindShortSuccessor(
return ;
}
jboolean has_exception = JNI_FALSE ;
JniUtil : : byteString < std : : string > ( env , jarray , [ key , jkey_len ] ( const char * data , const size_t ) {
JniUtil : : byteString < std : : string > (
env , jarray ,
[ key , jkey_len ] ( const char * data , const size_t ) {
return key - > assign ( data , static_cast < size_t > ( jkey_len ) ) ;
} , & has_exception ) ;
} ,
& has_exception ) ;
env - > DeleteLocalRef ( jarray ) ;
if ( has_exception = = JNI_TRUE ) {
if ( ! reuse_jbuf_key ) {
@ -539,8 +542,9 @@ void ComparatorJniCallback::FindShortSuccessor(
inline void ComparatorJniCallback : : MaybeLockForReuse (
const std : : unique_ptr < port : : Mutex > & mutex , const bool cond ) const {
// no need to lock if using thread_local
if ( m_options - > reused_synchronisation_type ! = ReusedSynchronisationType : : THREAD_LOCAL
& & cond ) {
if ( m_options - > reused_synchronisation_type ! =
ReusedSynchronisationType : : THREAD_LOCAL & &
cond ) {
mutex . get ( ) - > Lock ( ) ;
}
}
@ -548,18 +552,20 @@ inline void ComparatorJniCallback::MaybeLockForReuse(
inline void ComparatorJniCallback : : MaybeUnlockForReuse (
const std : : unique_ptr < port : : Mutex > & mutex , const bool cond ) const {
// no need to unlock if using thread_local
if ( m_options - > reused_synchronisation_type ! = ReusedSynchronisationType : : THREAD_LOCAL
& & cond ) {
if ( m_options - > reused_synchronisation_type ! =
ReusedSynchronisationType : : THREAD_LOCAL & &
cond ) {
mutex . get ( ) - > Unlock ( ) ;
}
}
jobject ComparatorJniCallback : : GetBuffer ( JNIEnv * env , const Slice & src ,
bool reuse_buffer , ThreadLocalPtr * tl_buf , jobject jreuse_buffer ) const {
bool reuse_buffer ,
ThreadLocalPtr * tl_buf ,
jobject jreuse_buffer ) const {
if ( reuse_buffer ) {
if ( m_options - > reused_synchronisation_type
= = ReusedSynchronisationType : : THREAD_LOCAL ) {
if ( m_options - > reused_synchronisation_type = =
ReusedSynchronisationType : : THREAD_LOCAL ) {
// reuse thread-local bufffer
ThreadLocalBuf * tlb = reinterpret_cast < ThreadLocalBuf * > ( tl_buf - > Get ( ) ) ;
if ( tlb = = nullptr ) {
@ -576,25 +582,25 @@ jobject ComparatorJniCallback::GetBuffer(JNIEnv* env, const Slice& src,
}
return ReuseBuffer ( env , src , tlb - > jbuf ) ;
} else {
// reuse class member buffer
return ReuseBuffer ( env , src , jreuse_buffer ) ;
}
} else {
// new buffer
return NewBuffer ( env , src ) ;
}
}
jobject ComparatorJniCallback : : ReuseBuffer (
JNIEnv * env , const Slice & src , jobject jreuse_buffer ) const {
jobject ComparatorJniCallback : : ReuseBuffer ( JNIEnv * env , const Slice & src ,
jobject jreuse_buffer ) const {
// we can reuse the buffer
if ( m_options - > direct_buffer ) {
// copy into direct buffer
void * buf = env - > GetDirectBufferAddress ( jreuse_buffer ) ;
if ( buf = = nullptr ) {
// either memory region is undefined, given object is not a direct java.nio.Buffer, or JNI access to direct buffers is not supported by this virtual machine.
// either memory region is undefined, given object is not a direct
// java.nio.Buffer, or JNI access to direct buffers is not supported by
// this virtual machine.
ROCKSDB_NAMESPACE : : RocksDBExceptionJni : : ThrowNew (
env , " Unable to get Direct Buffer Address " ) ;
return nullptr ;
@ -602,13 +608,14 @@ jobject ComparatorJniCallback::ReuseBuffer(
memcpy ( buf , src . data ( ) , src . size ( ) ) ;
} else {
// copy into non-direct buffer
const jbyteArray jarray = ByteBufferJni : : array ( env , jreuse_buffer ,
m_jbytebuffer_clazz ) ;
const jbyteArray jarray =
ByteBufferJni : : array ( env , jreuse_buffer , m_jbytebuffer_clazz ) ;
if ( jarray = = nullptr ) {
// exception occurred
return nullptr ;
}
env - > SetByteArrayRegion ( jarray , 0 , static_cast < jsize > ( src . size ( ) ) ,
env - > SetByteArrayRegion (
jarray , 0 , static_cast < jsize > ( src . size ( ) ) ,
const_cast < jbyte * > ( reinterpret_cast < const jbyte * > ( src . data ( ) ) ) ) ;
if ( env - > ExceptionCheck ( ) ) {
// exception occurred
@ -622,8 +629,9 @@ jobject ComparatorJniCallback::ReuseBuffer(
jobject ComparatorJniCallback : : NewBuffer ( JNIEnv * env , const Slice & src ) const {
// we need a new buffer
jobject jbuf = ByteBufferJni : : constructWith ( env , m_options - > direct_buffer ,
src . data ( ) , src . size ( ) , m_jbytebuffer_clazz ) ;
jobject jbuf =
ByteBufferJni : : constructWith ( env , m_options - > direct_buffer , src . data ( ) ,
src . size ( ) , m_jbytebuffer_clazz ) ;
if ( jbuf = = nullptr ) {
// exception occurred
return nullptr ;