Implement WBWIRocksIterator for WriteBatchWithIndex in the Java API

main
Adam Retter 10 years ago
parent de678b288e
commit 2d0dd8db3b
  1. 4
      java/org/rocksdb/DirectSlice.java
  2. 172
      java/org/rocksdb/WBWIRocksIterator.java
  3. 25
      java/org/rocksdb/WriteBatchWithIndex.java
  4. 144
      java/rocksjni/portal.h
  5. 121
      java/rocksjni/write_batch_with_index.cc

@ -27,12 +27,12 @@ public class DirectSlice extends AbstractSlice<ByteBuffer> {
* Note: You should be aware that * Note: You should be aware that
* {@see org.rocksdb.RocksObject#disOwnNativeHandle()} is intentionally * {@see org.rocksdb.RocksObject#disOwnNativeHandle()} is intentionally
* called from the default DirectSlice constructor, and that it is marked as * called from the default DirectSlice constructor, and that it is marked as
* private. This is so that developers cannot construct their own default * package-private. This is so that developers cannot construct their own default
* DirectSlice objects (at present). As developers cannot construct their own * DirectSlice objects (at present). As developers cannot construct their own
* DirectSlice objects through this, they are not creating underlying C++ * DirectSlice objects through this, they are not creating underlying C++
* DirectSlice objects, and so there is nothing to free (dispose) from Java. * DirectSlice objects, and so there is nothing to free (dispose) from Java.
*/ */
private DirectSlice() { DirectSlice() {
super(); super();
disOwnNativeHandle(); disOwnNativeHandle();
} }

@ -5,121 +5,133 @@
package org.rocksdb; package org.rocksdb;
public class WBWIRocksIterator extends RocksObject implements RocksIteratorInterface { public class WBWIRocksIterator extends AbstractRocksIterator<WriteBatchWithIndex> {
private final WriteEntry entry = new WriteEntry();
//TODO(AR) abstract common code from WBWIRocksIterator and RocksIterator into AbstractRocksIterator
final WriteBatchWithIndex wbwi_;
protected WBWIRocksIterator(WriteBatchWithIndex wbwi, long nativeHandle) { protected WBWIRocksIterator(WriteBatchWithIndex wbwi, long nativeHandle) {
super(); super(wbwi, nativeHandle);
nativeHandle_ = nativeHandle;
// rocksDB must point to a valid RocksDB instance.
assert (wbwi != null);
// WBWIRocksIterator must hold a reference to the related WriteBatchWithIndex instance
// to guarantee that while a GC cycle starts WBWIRocksIterator instances
// are freed prior to WriteBatchWithIndex instances.
wbwi_ = wbwi;
}
@Override
public boolean isValid() {
return false;
}
@Override
public void seekToFirst() {
}
@Override
public void seekToLast() {
}
@Override
public void seek(byte[] target) {
}
@Override
public void next() {
}
@Override
public void prev() {
} }
/** /**
* Get the current entry * Get the current entry
*
* The WriteEntry is only valid
* until the iterator is repositioned.
* If you want to keep the WriteEntry across iterator
* movements, you must make a copy of its data!
*
* @return The WriteEntry of the current entry
*/ */
public WriteEntry entry() { public WriteEntry entry() {
throw new UnsupportedOperationException("NOT YET IMPLEMENTED"); //TODO(AR) implement assert(isInitialized());
} assert(entry != null);
entry1(nativeHandle_, entry);
@Override return entry;
public void status() throws RocksDBException {
} }
/** @Override final native void disposeInternal(long handle);
* <p>Deletes underlying C++ iterator pointer.</p> @Override final native boolean isValid0(long handle);
* <p/> @Override final native void seekToFirst0(long handle);
* <p>Note: the underlying handle can only be safely deleted if the WriteBatchWithIndex @Override final native void seekToLast0(long handle);
* instance related to a certain WBWIRocksIterator is still valid and initialized. @Override final native void next0(long handle);
* Therefore {@code disposeInternal()} checks if the WriteBatchWithIndex is initialized @Override final native void prev0(long handle);
* before freeing the native handle.</p> @Override final native void seek0(long handle, byte[] target, int targetLen);
*/ @Override final native void status0(long handle) throws RocksDBException;
@Override
protected void disposeInternal() {
synchronized (wbwi_) {
assert (isInitialized());
if (wbwi_.isInitialized()) {
disposeInternal(nativeHandle_);
}
}
}
private native void disposeInternal(long handle); private native void entry1(long handle, WriteEntry entry);
/** /**
* Enumeration of the Write operation * Enumeration of the Write operation
* that created the record in the Write Batch * that created the record in the Write Batch
*/ */
public enum WriteType { public enum WriteType {
PutRecord, PUT,
MergeRecord, MERGE,
DeleteRecord, DELETE,
LogDataRecord LOG
} }
/** /**
* Represents the entry returned by a * Represents an entry returned by
* WBWIRocksIterator * {@link org.rocksdb.WBWIRocksIterator#entry()}
*
* It is worth noting that a WriteEntry with
* the type {@link org.rocksdb.WBWIRocksIterator.WriteType#DELETE}
* or {@link org.rocksdb.WBWIRocksIterator.WriteType#LOG}
* will not have a value.
*/ */
public static class WriteEntry { public static class WriteEntry {
final WriteType type; WriteType type = null;
final Slice key; final DirectSlice key;
final Slice value; final DirectSlice value;
/**
* Intentionally private as this
* should only be instantiated in
* this manner by the outer WBWIRocksIterator
* class; The class members are then modified
* by calling {@link org.rocksdb.WBWIRocksIterator#entry()}
*/
private WriteEntry() {
key = new DirectSlice();
value = new DirectSlice();
}
public WriteEntry(final WriteType type, final Slice key, final Slice value) { public WriteEntry(WriteType type, DirectSlice key, DirectSlice value) {
this.type = type; this.type = type;
this.key = key; this.key = key;
this.value = value; this.value = value;
} }
/**
* Returns the type of the Write Entry
*
* @return the WriteType of the WriteEntry
*/
public WriteType getType() { public WriteType getType() {
return type; return type;
} }
public Slice getKey() { /**
* Returns the key of the Write Entry
*
* @return The slice containing the key
* of the WriteEntry
*/
public DirectSlice getKey() {
return key; return key;
} }
public Slice getValue() { /**
return value; * Returns the value of the Write Entry
*
* @return The slice containing the value of
* the WriteEntry or null if the WriteEntry has
* no value
*/
public DirectSlice getValue() {
if(!value.isInitialized()) {
return null; //TODO(AR) migrate to JDK8 java.util.Optional#empty()
} else {
return value;
}
}
@Override
public boolean equals(Object other) {
if(other == null) {
return false;
} else if (this == other) {
return true;
} else if(other instanceof WriteEntry) {
final WriteEntry otherWriteEntry = (WriteEntry)other;
return type.equals(otherWriteEntry.type)
&& key.equals(otherWriteEntry.key)
&& (value.isInitialized() ? value.equals(otherWriteEntry.value)
: !otherWriteEntry.value.isInitialized());
} else {
return false;
}
} }
} }
} }

@ -37,8 +37,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* assignment is determined by the constructor argument * assignment is determined by the constructor argument
* *
* @param overwriteKey if true, overwrite the key in the index when * @param overwriteKey if true, overwrite the key in the index when
* inserting a duplicate key, in this way an iterator will never * inserting a duplicate key, in this way an iterator will never
* show two entries with the same key. * show two entries with the same key.
*/ */
public WriteBatchWithIndex(boolean overwriteKey) { public WriteBatchWithIndex(boolean overwriteKey) {
super(); super();
@ -49,12 +49,14 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* Creates a WriteBatchWithIndex * Creates a WriteBatchWithIndex
* *
* @param fallbackIndexComparator We fallback to this comparator * @param fallbackIndexComparator We fallback to this comparator
* to compare keys within a column family if we cannot determine * to compare keys within a column family if we cannot determine
* the column family and so look up it's comparator. * the column family and so look up it's comparator.
* @param reservedBytes reserved bytes in underlying WriteBatch *
* @param overwriteKey if true, overwrite the key in the index when * @param reservedBytes reserved bytes in underlying WriteBatch
* inserting a duplicate key, in this way an iterator will never *
* show two entries with the same key. * @param overwriteKey if true, overwrite the key in the index when
* inserting a duplicate key, in this way an iterator will never
* show two entries with the same key.
*/ */
public WriteBatchWithIndex(AbstractComparator fallbackIndexComparator, int reservedBytes, public WriteBatchWithIndex(AbstractComparator fallbackIndexComparator, int reservedBytes,
boolean overwriteKey) { boolean overwriteKey) {
@ -97,16 +99,15 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* as a delta and baseIterator as a base * as a delta and baseIterator as a base
* *
* @param columnFamilyHandle The column family to iterate over * @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()} * @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database point-in-time * @return An iterator which shows a view comprised of both the database point-in-time
* from baseIterator and modifications made in this write batch. * from baseIterator and modifications made in this write batch.
*/ */
public RocksIterator newIteratorWithBase(ColumnFamilyHandle columnFamilyHandle, public RocksIterator newIteratorWithBase(ColumnFamilyHandle columnFamilyHandle,
RocksIterator baseIterator) { RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator( RocksIterator iterator = new RocksIterator(
baseIterator.rocksDB_, baseIterator.parent_,
iteratorWithBase(columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_)); iteratorWithBase(columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_));
//when the iterator is deleted it will also delete the baseIterator //when the iterator is deleted it will also delete the baseIterator
baseIterator.disOwnNativeHandle(); baseIterator.disOwnNativeHandle();
return iterator; return iterator;
@ -122,7 +123,7 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* from baseIterator and modifications made in this write batch. * from baseIterator and modifications made in this write batch.
*/ */
public RocksIterator newIteratorWithBase(RocksIterator baseIterator) { public RocksIterator newIteratorWithBase(RocksIterator baseIterator) {
return newIteratorWithBase(baseIterator.rocksDB_.getDefaultColumnFamily(), baseIterator); return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(), baseIterator);
} }
@Override final native void disposeInternal(long handle); @Override final native void disposeInternal(long handle);

@ -863,6 +863,150 @@ class BackupInfoListJni {
} }
}; };
class WBWIRocksIteratorJni {
public:
// Get the java class id of org.rocksdb.WBWIRocksIterator.
static jclass getJClass(JNIEnv* env) {
static jclass jclazz = env->FindClass("org/rocksdb/WBWIRocksIterator");
assert(jclazz != nullptr);
return jclazz;
}
static jfieldID getWriteEntryField(JNIEnv* env) {
static jfieldID fid =
env->GetFieldID(getJClass(env), "entry",
"Lorg/rocksdb/WBWIRocksIterator$WriteEntry;");
assert(fid != nullptr);
return fid;
}
static jobject getWriteEntry(JNIEnv* env, jobject jwbwi_rocks_iterator) {
jobject jwe =
env->GetObjectField(jwbwi_rocks_iterator, getWriteEntryField(env));
assert(jwe != nullptr);
return jwe;
}
};
class WriteTypeJni {
public:
// Get the PUT enum field of org.rocksdb.WBWIRocksIterator.WriteType
static jobject PUT(JNIEnv* env) {
return getEnum(env, "PUT");
}
// Get the MERGE enum field of org.rocksdb.WBWIRocksIterator.WriteType
static jobject MERGE(JNIEnv* env) {
return getEnum(env, "MERGE");
}
// Get the DELETE enum field of org.rocksdb.WBWIRocksIterator.WriteType
static jobject DELETE(JNIEnv* env) {
return getEnum(env, "DELETE");
}
// Get the LOG enum field of org.rocksdb.WBWIRocksIterator.WriteType
static jobject LOG(JNIEnv* env) {
return getEnum(env, "LOG");
}
private:
// Get the java class id of org.rocksdb.WBWIRocksIterator.WriteType.
static jclass getJClass(JNIEnv* env) {
// TODO(AR) setting the jclazz var to static causes getEnum to fail
// occasionally (e.g. in WriteBatchWithIndex#iterator() test) with
// SIGSEGV but I have no idea why...
jclass jclazz = env->FindClass("org/rocksdb/WBWIRocksIterator$WriteType");
assert(jclazz != nullptr);
return jclazz;
}
// Get an enum field of org.rocksdb.WBWIRocksIterator.WriteType
static jobject getEnum(JNIEnv* env, const char name[]) {
// TODO(AR) setting the jclazz var to static causes getEnum to fail
// occasionally (e.g. in WriteBatchWithIndex#iterator() test) with
// SIGSEGV but I have no idea why...
jclass jclazz = getJClass(env);
jfieldID jfid =
env->GetStaticFieldID(jclazz, name,
"Lorg/rocksdb/WBWIRocksIterator$WriteType;");
assert(jfid != nullptr);
return env->GetStaticObjectField(jclazz, jfid);
}
};
class WriteEntryJni {
public:
// Get the java class id of org.rocksdb.WBWIRocksIterator.WriteEntry.
static jclass getJClass(JNIEnv* env) {
static jclass jclazz =
env->FindClass("org/rocksdb/WBWIRocksIterator$WriteEntry");
assert(jclazz != nullptr);
return jclazz;
}
static void setWriteType(JNIEnv* env, jobject jwrite_entry,
WriteType write_type) {
jobject jwrite_type;
switch (write_type) {
case kPutRecord:
jwrite_type = WriteTypeJni::PUT(env);
break;
case kMergeRecord:
jwrite_type = WriteTypeJni::MERGE(env);
break;
case kDeleteRecord:
jwrite_type = WriteTypeJni::DELETE(env);
break;
case kLogDataRecord:
jwrite_type = WriteTypeJni::LOG(env);
break;
default:
jwrite_type = nullptr;
}
assert(jwrite_type != nullptr);
env->SetObjectField(jwrite_entry, getWriteTypeField(env), jwrite_type);
}
static void setKey(JNIEnv* env, jobject jwrite_entry,
const rocksdb::Slice* slice) {
jobject jkey = env->GetObjectField(jwrite_entry, getKeyField(env));
AbstractSliceJni::setHandle(env, jkey, slice);
}
static void setValue(JNIEnv* env, jobject jwrite_entry,
const rocksdb::Slice* slice) {
jobject jvalue = env->GetObjectField(jwrite_entry, getValueField(env));
AbstractSliceJni::setHandle(env, jvalue, slice);
}
private:
static jfieldID getWriteTypeField(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "type", "Lorg/rocksdb/WBWIRocksIterator$WriteType;");
assert(fid != nullptr);
return fid;
}
static jfieldID getKeyField(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "key", "Lorg/rocksdb/DirectSlice;");
assert(fid != nullptr);
return fid;
}
static jfieldID getValueField(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "value", "Lorg/rocksdb/DirectSlice;");
assert(fid != nullptr);
return fid;
}
};
class JniUtil { class JniUtil {
public: public:
/** /**

@ -6,10 +6,11 @@
// This file implements the "bridge" between Java and C++ and enables // This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::WriteBatchWithIndex methods from Java side. // calling c++ rocksdb::WriteBatchWithIndex methods from Java side.
#include "include/org_rocksdb_WBWIRocksIterator.h"
#include "include/org_rocksdb_WriteBatchWithIndex.h" #include "include/org_rocksdb_WriteBatchWithIndex.h"
#include "rocksjni/portal.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksjni/portal.h"
/* /*
* Class: org_rocksdb_WriteBatchWithIndex * Class: org_rocksdb_WriteBatchWithIndex
@ -297,3 +298,121 @@ void Java_org_rocksdb_WriteBatchWithIndex_disposeInternal(
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(handle); auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(handle);
delete wbwi; delete wbwi;
} }
/* WBWIRocksIterator below */
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_WBWIRocksIterator_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
delete it;
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: isValid0
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_WBWIRocksIterator_isValid0(
JNIEnv* env, jobject jobj, jlong handle) {
return reinterpret_cast<rocksdb::WBWIIterator*>(handle)->Valid();
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: seekToFirst0
* Signature: (J)V
*/
void Java_org_rocksdb_WBWIRocksIterator_seekToFirst0(
JNIEnv* env, jobject jobj, jlong handle) {
reinterpret_cast<rocksdb::WBWIIterator*>(handle)->SeekToFirst();
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: seekToLast0
* Signature: (J)V
*/
void Java_org_rocksdb_WBWIRocksIterator_seekToLast0(
JNIEnv* env, jobject jobj, jlong handle) {
reinterpret_cast<rocksdb::WBWIIterator*>(handle)->SeekToLast();
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: next0
* Signature: (J)V
*/
void Java_org_rocksdb_WBWIRocksIterator_next0(
JNIEnv* env, jobject jobj, jlong handle) {
reinterpret_cast<rocksdb::WBWIIterator*>(handle)->Next();
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: prev0
* Signature: (J)V
*/
void Java_org_rocksdb_WBWIRocksIterator_prev0(
JNIEnv* env, jobject jobj, jlong handle) {
reinterpret_cast<rocksdb::WBWIIterator*>(handle)->Prev();
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: seek0
* Signature: (J[BI)V
*/
void Java_org_rocksdb_WBWIRocksIterator_seek0(
JNIEnv* env, jobject jobj, jlong handle, jbyteArray jtarget,
jint jtarget_len) {
auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
jbyte* target = env->GetByteArrayElements(jtarget, 0);
rocksdb::Slice target_slice(
reinterpret_cast<char*>(target), jtarget_len);
it->Seek(target_slice);
env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT);
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: status0
* Signature: (J)V
*/
void Java_org_rocksdb_WBWIRocksIterator_status0(
JNIEnv* env, jobject jobj, jlong handle) {
auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
rocksdb::Status s = it->status();
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: entry1
* Signature: (JLorg/rocksdb/WBWIRocksIterator/WriteEntry;)V
*/
void Java_org_rocksdb_WBWIRocksIterator_entry1(
JNIEnv* env, jobject jobj, jlong handle, jobject jwrite_entry) {
auto* it = reinterpret_cast<rocksdb::WBWIIterator*>(handle);
const rocksdb::WriteEntry& we = it->Entry();
jobject jwe = rocksdb::WBWIRocksIteratorJni::getWriteEntry(env, jobj);
rocksdb::WriteEntryJni::setWriteType(env, jwe, we.type);
rocksdb::WriteEntryJni::setKey(env, jwe, &we.key);
if (we.type == rocksdb::kDeleteRecord || we.type == rocksdb::kLogDataRecord) {
// set native handle of value slice to null if no value available
rocksdb::WriteEntryJni::setValue(env, jwe, NULL);
} else {
rocksdb::WriteEntryJni::setValue(env, jwe, &we.value);
}
}

Loading…
Cancel
Save