Fix a memory leak of Slice objects from org.rocksdb.WBWIRocksIterator#entry1

main
Adam Retter 9 years ago
parent e84137c8ae
commit c5af85ecad
  1. 6
      java/rocksjni/comparatorjnicallback.cc
  2. 84
      java/rocksjni/portal.h
  3. 46
      java/rocksjni/write_batch_with_index.cc
  4. 26
      java/src/main/java/org/rocksdb/AbstractSlice.java
  5. 21
      java/src/main/java/org/rocksdb/DirectSlice.java
  6. 43
      java/src/main/java/org/rocksdb/RocksMutableObject.java
  7. 5
      java/src/main/java/org/rocksdb/Slice.java
  8. 43
      java/src/main/java/org/rocksdb/WBWIRocksIterator.java

@ -60,8 +60,8 @@ int BaseComparatorJniCallback::Compare(const Slice& a, const Slice& b) const {
// performance. // performance.
mtx_compare->Lock(); mtx_compare->Lock();
AbstractSliceJni::setHandle(m_env, m_jSliceA, &a); AbstractSliceJni::setHandle(m_env, m_jSliceA, &a, JNI_FALSE);
AbstractSliceJni::setHandle(m_env, m_jSliceB, &b); AbstractSliceJni::setHandle(m_env, m_jSliceB, &b, JNI_FALSE);
jint result = jint result =
m_env->CallIntMethod(m_jComparator, m_jCompareMethodId, m_jSliceA, m_env->CallIntMethod(m_jComparator, m_jCompareMethodId, m_jSliceA,
m_jSliceB); m_jSliceB);
@ -89,7 +89,7 @@ void BaseComparatorJniCallback::FindShortestSeparator(
// performance. // performance.
mtx_findShortestSeparator->Lock(); mtx_findShortestSeparator->Lock();
AbstractSliceJni::setHandle(m_env, m_jSliceLimit, &limit); AbstractSliceJni::setHandle(m_env, m_jSliceLimit, &limit, JNI_FALSE);
jstring jsResultStart = jstring jsResultStart =
(jstring)m_env->CallObjectMethod(m_jComparator, (jstring)m_env->CallObjectMethod(m_jComparator,
m_jFindShortestSeparatorMethodId, jsStart, m_jSliceLimit); m_jFindShortestSeparatorMethodId, jsStart, m_jSliceLimit);

@ -49,25 +49,22 @@ template<class PTR, class DERIVED> class RocksDBNativeClass {
assert(jclazz != nullptr); assert(jclazz != nullptr);
return jclazz; return jclazz;
} }
// Get the field id of the member variable to store
// the ptr
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
DERIVED::getJClass(env), "nativeHandle_", "J");
assert(fid != nullptr);
return fid;
}
}; };
// Native class template for sub-classes of RocksMutableObject // Native class template for sub-classes of RocksMutableObject
template<class PTR, class DERIVED> class NativeRocksMutableObject : public RocksDBNativeClass<PTR, DERIVED> { template<class PTR, class DERIVED> class NativeRocksMutableObject : public RocksDBNativeClass<PTR, DERIVED> {
public: public:
static jmethodID getSetNativeHandleMethod(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
DERIVED::getJClass(env), "setNativeHandle", "(JZ)V");
assert(mid != nullptr);
return mid;
}
// Pass the pointer to the java side. // Pass the pointer to the java side.
static void setHandle(JNIEnv* env, jobject jdb, PTR ptr) { static void setHandle(JNIEnv* env, jobject jobj, PTR ptr, jboolean java_owns_handle) {
env->SetLongField( env->CallVoidMethod(jobj, getSetNativeHandleMethod(env), reinterpret_cast<jlong>(ptr), java_owns_handle);
jdb, RocksDBNativeClass<PTR, DERIVED>::getHandleFieldID(env),
reinterpret_cast<jlong>(ptr));
} }
}; };
@ -647,67 +644,6 @@ class WriteEntryJni {
assert(jclazz != nullptr); assert(jclazz != nullptr);
return jclazz; return jclazz;
} }
static void setWriteType(JNIEnv* env, jobject jwrite_entry,
WriteType write_type) {
jobject jwrite_type;
switch (write_type) {
case kPutRecord:
jwrite_type = WriteTypeJni::PUT(env);
break;
case kMergeRecord:
jwrite_type = WriteTypeJni::MERGE(env);
break;
case kDeleteRecord:
jwrite_type = WriteTypeJni::DELETE(env);
break;
case kLogDataRecord:
jwrite_type = WriteTypeJni::LOG(env);
break;
default:
jwrite_type = nullptr;
}
assert(jwrite_type != nullptr);
env->SetObjectField(jwrite_entry, getWriteTypeField(env), jwrite_type);
}
static void setKey(JNIEnv* env, jobject jwrite_entry,
const rocksdb::Slice* slice) {
jobject jkey = env->GetObjectField(jwrite_entry, getKeyField(env));
AbstractSliceJni::setHandle(env, jkey, slice);
}
static void setValue(JNIEnv* env, jobject jwrite_entry,
const rocksdb::Slice* slice) {
jobject jvalue = env->GetObjectField(jwrite_entry, getValueField(env));
AbstractSliceJni::setHandle(env, jvalue, slice);
}
private:
static jfieldID getWriteTypeField(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "type", "Lorg/rocksdb/WBWIRocksIterator$WriteType;");
assert(fid != nullptr);
return fid;
}
static jfieldID getKeyField(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "key", "Lorg/rocksdb/DirectSlice;");
assert(fid != nullptr);
return fid;
}
static jfieldID getValueField(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "value", "Lorg/rocksdb/DirectSlice;");
assert(fid != nullptr);
return fid;
}
}; };
class InfoLogLevelJni { class InfoLogLevelJni {

@ -353,27 +353,57 @@ void Java_org_rocksdb_WBWIRocksIterator_status0(
/* /*
* Class: org_rocksdb_WBWIRocksIterator * Class: org_rocksdb_WBWIRocksIterator
* Method: entry1 * Method: entry1
* Signature: (JLorg/rocksdb/WBWIRocksIterator/WriteEntry;)V * Signature: (J)[J
*/ */
void Java_org_rocksdb_WBWIRocksIterator_entry1( jlongArray Java_org_rocksdb_WBWIRocksIterator_entry1(
JNIEnv* env, jobject jobj, jlong handle, jobject jwrite_entry) { JNIEnv* env, jobject jobj, jlong handle) {
auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle); auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
const rocksdb::WriteEntry& we = it->Entry(); const rocksdb::WriteEntry& we = it->Entry();
jobject jwe = rocksdb::WBWIRocksIteratorJni::getWriteEntry(env, jobj);
rocksdb::WriteEntryJni::setWriteType(env, jwe, we.type);
jlong results[3];
//set the type of the write entry
switch (we.type) {
case rocksdb::kPutRecord:
results[0] = 0x1;
break;
case rocksdb::kMergeRecord:
results[0] = 0x2;
break;
case rocksdb::kDeleteRecord:
results[0] = 0x4;
break;
case rocksdb::kLogDataRecord:
results[0] = 0x8;
break;
default:
results[0] = 0x0;
}
//TODO(AR) do we leak buf and value_buf?
//set the pointer to the key slice
char* buf = new char[we.key.size()]; char* buf = new char[we.key.size()];
memcpy(buf, we.key.data(), we.key.size()); memcpy(buf, we.key.data(), we.key.size());
auto* key_slice = new rocksdb::Slice(buf, we.key.size()); auto* key_slice = new rocksdb::Slice(buf, we.key.size());
rocksdb::WriteEntryJni::setKey(env, jwe, key_slice); results[1] = reinterpret_cast<jlong>(key_slice);
//set the pointer to the value slice
if (we.type == rocksdb::kDeleteRecord || we.type == rocksdb::kLogDataRecord) { if (we.type == rocksdb::kDeleteRecord || we.type == rocksdb::kLogDataRecord) {
// set native handle of value slice to null if no value available // set native handle of value slice to null if no value available
rocksdb::WriteEntryJni::setValue(env, jwe, nullptr); results[2] = 0;
} else { } else {
char* value_buf = new char[we.value.size()]; char* value_buf = new char[we.value.size()];
memcpy(value_buf, we.value.data(), we.value.size()); memcpy(value_buf, we.value.data(), we.value.size());
auto* value_slice = new rocksdb::Slice(value_buf, we.value.size()); auto* value_slice = new rocksdb::Slice(value_buf, we.value.size());
rocksdb::WriteEntryJni::setValue(env, jwe, value_slice); results[2] = reinterpret_cast<jlong>(value_slice);
} }
jlongArray jresults = env->NewLongArray(3);
env->SetLongArrayRegion(jresults, 0, 3, results);
return jresults;
} }

@ -42,8 +42,7 @@ abstract class AbstractSlice<T> extends RocksMutableObject {
* @see org.rocksdb.AbstractSlice#data0(long) * @see org.rocksdb.AbstractSlice#data0(long)
*/ */
public T data() { public T data() {
assert (isOwningHandle()); return data0(getNativeHandle());
return data0(nativeHandle_);
} }
/** /**
@ -64,8 +63,7 @@ abstract class AbstractSlice<T> extends RocksMutableObject {
* @return The length in bytes. * @return The length in bytes.
*/ */
public int size() { public int size() {
assert (isOwningHandle()); return size0(getNativeHandle());
return size0(nativeHandle_);
} }
/** /**
@ -75,8 +73,7 @@ abstract class AbstractSlice<T> extends RocksMutableObject {
* @return true if there is no data, false otherwise. * @return true if there is no data, false otherwise.
*/ */
public boolean empty() { public boolean empty() {
assert (isOwningHandle()); return empty0(getNativeHandle());
return empty0(nativeHandle_);
} }
/** /**
@ -88,8 +85,7 @@ abstract class AbstractSlice<T> extends RocksMutableObject {
* @return The string representation of the data. * @return The string representation of the data.
*/ */
public String toString(final boolean hex) { public String toString(final boolean hex) {
assert (isOwningHandle()); return toString0(getNativeHandle(), hex);
return toString0(nativeHandle_, hex);
} }
@Override @Override
@ -109,8 +105,15 @@ abstract class AbstractSlice<T> extends RocksMutableObject {
*/ */
public int compare(final AbstractSlice<?> other) { public int compare(final AbstractSlice<?> other) {
assert (other != null); assert (other != null);
assert (isOwningHandle()); if(!isOwningHandle()) {
return compare0(nativeHandle_, other.nativeHandle_); return other.isOwningHandle() ? -1 : 0;
} else {
if(!other.isOwningHandle()) {
return 1;
} else {
return compare0(getNativeHandle(), other.getNativeHandle());
}
}
} }
@Override @Override
@ -149,8 +152,7 @@ abstract class AbstractSlice<T> extends RocksMutableObject {
*/ */
public boolean startsWith(final AbstractSlice<?> prefix) { public boolean startsWith(final AbstractSlice<?> prefix) {
if (prefix != null) { if (prefix != null) {
assert (isOwningHandle()); return startsWith0(getNativeHandle(), prefix.getNativeHandle());
return startsWith0(nativeHandle_, prefix.nativeHandle_);
} else { } else {
return false; return false;
} }

@ -24,10 +24,11 @@ public class DirectSlice extends AbstractSlice<ByteBuffer> {
* at creation time. * at creation time.
* *
* Note: You should be aware that it is intentionally marked as * Note: You should be aware that it is intentionally marked as
* package-private. This is so that developers cannot construct their own default * package-private. This is so that developers cannot construct their own
* DirectSlice objects (at present). As developers cannot construct their own * default DirectSlice objects (at present). As developers cannot construct
* DirectSlice objects through this, they are not creating underlying C++ * their own DirectSlice objects through this, they are not creating
* DirectSlice objects, and so there is nothing to free (dispose) from Java. * underlying C++ DirectSlice objects, and so there is nothing to free
* (dispose) from Java.
*/ */
DirectSlice() { DirectSlice() {
super(); super();
@ -68,7 +69,8 @@ public class DirectSlice extends AbstractSlice<ByteBuffer> {
} }
private static ByteBuffer ensureDirect(final ByteBuffer data) { private static ByteBuffer ensureDirect(final ByteBuffer data) {
//TODO(AR) consider throwing a checked exception, as if it's not direct this can SIGSEGV // TODO(AR) consider throwing a checked exception, as if it's not direct
// this can SIGSEGV
assert(data.isDirect()); assert(data.isDirect());
return data; return data;
} }
@ -82,16 +84,14 @@ public class DirectSlice extends AbstractSlice<ByteBuffer> {
* @return the requested byte * @return the requested byte
*/ */
public byte get(int offset) { public byte get(int offset) {
assert (isOwningHandle()); return get0(getNativeHandle(), offset);
return get0(nativeHandle_, offset);
} }
/** /**
* Clears the backing slice * Clears the backing slice
*/ */
public void clear() { public void clear() {
assert (isOwningHandle()); clear0(getNativeHandle());
clear0(nativeHandle_);
} }
/** /**
@ -102,8 +102,7 @@ public class DirectSlice extends AbstractSlice<ByteBuffer> {
* @param n The number of bytes to drop * @param n The number of bytes to drop
*/ */
public void removePrefix(final int n) { public void removePrefix(final int n) {
assert (isOwningHandle()); removePrefix0(getNativeHandle(), n);
removePrefix0(nativeHandle_, n);
} }
private native static long createNewDirectSlice0(final ByteBuffer data, private native static long createNewDirectSlice0(final ByteBuffer data,

@ -1,30 +1,47 @@
package org.rocksdb; package org.rocksdb;
public abstract class RocksMutableObject extends NativeReference { public abstract class RocksMutableObject /*extends NativeReference*/ {
private final boolean shouldOwnHandle; private long nativeHandle_;
protected volatile long nativeHandle_; private boolean owningHandle_;
protected RocksMutableObject() { protected RocksMutableObject() {
super(false);
this.shouldOwnHandle = false;
} }
protected RocksMutableObject(final long nativeHandle) { protected RocksMutableObject(final long nativeHandle) {
super(true);
this.shouldOwnHandle = true;
this.nativeHandle_ = nativeHandle; this.nativeHandle_ = nativeHandle;
this.owningHandle_ = true;
} }
@Override public synchronized void setNativeHandle(final long nativeHandle, final boolean owningNativeHandle) {
public boolean isOwningHandle() { this.nativeHandle_ = nativeHandle;
return ((!shouldOwnHandle) || super.isOwningHandle()) && nativeHandle_ != 0; this.owningHandle_ = owningNativeHandle;
}
//@Override
protected synchronized boolean isOwningHandle() {
return this.owningHandle_;
}
protected synchronized long getNativeHandle() {
assert(this.nativeHandle_ != 0);
return this.nativeHandle_;
}
public synchronized final void dispose() {
if(isOwningHandle()) {
disposeInternal();
this.owningHandle_ = false;
this.nativeHandle_ = 0;
}
} }
/**
* Deletes underlying C++ object pointer.
*/
@Override @Override
protected void finalize() throws Throwable {
dispose();
super.finalize();
}
protected void disposeInternal() { protected void disposeInternal() {
disposeInternal(nativeHandle_); disposeInternal(nativeHandle_);
} }

@ -73,8 +73,9 @@ public class Slice extends AbstractSlice<byte[]> {
*/ */
@Override @Override
protected void disposeInternal() { protected void disposeInternal() {
disposeInternalBuf(nativeHandle_); final long nativeHandle = getNativeHandle();
super.disposeInternal(); disposeInternalBuf(nativeHandle);
super.disposeInternal(nativeHandle);
} }
@Override protected final native byte[] data0(long handle); @Override protected final native byte[] data0(long handle);

@ -5,10 +5,12 @@
package org.rocksdb; package org.rocksdb;
public class WBWIRocksIterator extends AbstractRocksIterator<WriteBatchWithIndex> { public class WBWIRocksIterator
extends AbstractRocksIterator<WriteBatchWithIndex> {
private final WriteEntry entry = new WriteEntry(); private final WriteEntry entry = new WriteEntry();
protected WBWIRocksIterator(final WriteBatchWithIndex wbwi, final long nativeHandle) { protected WBWIRocksIterator(final WriteBatchWithIndex wbwi,
final long nativeHandle) {
super(wbwi, nativeHandle); super(wbwi, nativeHandle);
} }
@ -20,12 +22,20 @@ public class WBWIRocksIterator extends AbstractRocksIterator<WriteBatchWithIndex
* If you want to keep the WriteEntry across iterator * If you want to keep the WriteEntry across iterator
* movements, you must make a copy of its data! * movements, you must make a copy of its data!
* *
* Note - This method is not thread-safe with respect to the WriteEntry
* as it performs a non-atomic update across the fields of the WriteEntry
*
* @return The WriteEntry of the current entry * @return The WriteEntry of the current entry
*/ */
public WriteEntry entry() { public WriteEntry entry() {
assert(isOwningHandle()); assert(isOwningHandle());
assert(entry != null); assert(entry != null);
entry1(nativeHandle_, entry); final long ptrs[] = entry1(nativeHandle_);
entry.type = WriteType.fromId((byte)ptrs[0]);
entry.key.setNativeHandle(ptrs[1], true);
entry.value.setNativeHandle(ptrs[2], ptrs[2] != 0);
return entry; return entry;
} }
@ -38,17 +48,31 @@ public class WBWIRocksIterator extends AbstractRocksIterator<WriteBatchWithIndex
@Override final native void seek0(long handle, byte[] target, int targetLen); @Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void status0(long handle) throws RocksDBException; @Override final native void status0(long handle) throws RocksDBException;
private native void entry1(long handle, WriteEntry entry); private native long[] entry1(final long handle);
/** /**
* Enumeration of the Write operation * Enumeration of the Write operation
* that created the record in the Write Batch * that created the record in the Write Batch
*/ */
public enum WriteType { public enum WriteType {
PUT, PUT((byte)0x1),
MERGE, MERGE((byte)0x2),
DELETE, DELETE((byte)0x4),
LOG LOG((byte)0x8);
final byte id;
WriteType(final byte id) {
this.id = id;
}
public static WriteType fromId(final byte id) {
for(final WriteType wt : WriteType.values()) {
if(id == wt.id) {
return wt;
}
}
throw new IllegalArgumentException("No WriteType with id=" + id);
}
} }
/** /**
@ -139,8 +163,7 @@ public class WBWIRocksIterator extends AbstractRocksIterator<WriteBatchWithIndex
final WriteEntry otherWriteEntry = (WriteEntry)other; final WriteEntry otherWriteEntry = (WriteEntry)other;
return type.equals(otherWriteEntry.type) return type.equals(otherWriteEntry.type)
&& key.equals(otherWriteEntry.key) && key.equals(otherWriteEntry.key)
&& (value.isOwningHandle() ? value.equals(otherWriteEntry.value) && value.equals(otherWriteEntry.value);
: !otherWriteEntry.value.isOwningHandle());
} else { } else {
return false; return false;
} }

Loading…
Cancel
Save