Implement WriteBatchWithIndex in the Java API

main
Adam Retter 10 years ago
parent c6e5545612
commit ef5b34dee0
  1. 2
      java/Makefile
  2. 125
      java/org/rocksdb/WBWIRocksIterator.java
  3. 153
      java/org/rocksdb/WriteBatchWithIndex.java
  4. 32
      java/rocksjni/portal.h
  5. 299
      java/rocksjni/write_batch_with_index.cc

@ -36,6 +36,8 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.test.WriteBatchInternal\
org.rocksdb.test.WriteBatchTest\
org.rocksdb.WriteOptions\
org.rocksdb.WriteBatchWithIndex\
org.rocksdb.WBWIRocksIterator
ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)

@ -0,0 +1,125 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
public class WBWIRocksIterator extends RocksObject implements RocksIteratorInterface {
//TODO(AR) abstract common code from WBWIRocksIterator and RocksIterator into AbstractRocksIterator
final WriteBatchWithIndex wbwi_;
protected WBWIRocksIterator(WriteBatchWithIndex wbwi, long nativeHandle) {
super();
nativeHandle_ = nativeHandle;
// rocksDB must point to a valid RocksDB instance.
assert (wbwi != null);
// WBWIRocksIterator must hold a reference to the related WriteBatchWithIndex instance
// to guarantee that while a GC cycle starts WBWIRocksIterator instances
// are freed prior to WriteBatchWithIndex instances.
wbwi_ = wbwi;
}
@Override
public boolean isValid() {
return false;
}
@Override
public void seekToFirst() {
}
@Override
public void seekToLast() {
}
@Override
public void seek(byte[] target) {
}
@Override
public void next() {
}
@Override
public void prev() {
}
/**
* Get the current entry
*/
public WriteEntry entry() {
throw new UnsupportedOperationException("NOT YET IMPLEMENTED"); //TODO(AR) implement
}
@Override
public void status() throws RocksDBException {
}
/**
* <p>Deletes underlying C++ iterator pointer.</p>
* <p/>
* <p>Note: the underlying handle can only be safely deleted if the WriteBatchWithIndex
* instance related to a certain WBWIRocksIterator is still valid and initialized.
* Therefore {@code disposeInternal()} checks if the WriteBatchWithIndex is initialized
* before freeing the native handle.</p>
*/
@Override
protected void disposeInternal() {
synchronized (wbwi_) {
assert (isInitialized());
if (wbwi_.isInitialized()) {
disposeInternal(nativeHandle_);
}
}
}
private native void disposeInternal(long handle);
/**
* Enumeration of the Write operation
* that created the record in the Write Batch
*/
public enum WriteType {
PutRecord,
MergeRecord,
DeleteRecord,
LogDataRecord
}
/**
* Represents the entry returned by a
* WBWIRocksIterator
*/
public static class WriteEntry {
final WriteType type;
final Slice key;
final Slice value;
public WriteEntry(final WriteType type, final Slice key, final Slice value) {
this.type = type;
this.key = key;
this.value = value;
}
public WriteType getType() {
return type;
}
public Slice getKey() {
return key;
}
public Slice getValue() {
return value;
}
}
}

@ -0,0 +1,153 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* Similar to {@link org.rocksdb.WriteBatch} but with a binary searchable
* index built for all the keys inserted.
*
* Calling put, merge, remove or putLogData calls the same function
* as with {@link org.rocksdb.WriteBatch} whilst also building an index.
*
* A user can call {@link org.rocksdb.WriteBatchWithIndex#newIterator() }to create an iterator
* over the write batch or
* {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.RocksIterator)} to
* get an iterator for the database with Read-Your-Own-Writes like capability
*/
public class WriteBatchWithIndex extends AbstractWriteBatch {
//TODO(AR) need to cover directly passing WriteBatchWithIndex to {@see org.rocksdb.RocksDB#write(WriteBatch)
//this simplifies the Java API beyond the C++ API as you don't need to call
//GetWriteBatch on the WriteBatchWithIndex
/**
* Creates a WriteBatchWithIndex where no bytes
* are reserved up-front, bytewise comparison is
* used for fallback key comparisons,
* and duplicate keys operations are retained
*/
public WriteBatchWithIndex() {
super();
newWriteBatchWithIndex();
}
/**
* Creates a WriteBatchWithIndex where no bytes
* are reserved up-front, bytewise comparison is
* used for fallback key comparisons, and duplicate key
* assignment is determined by the constructor argument
*
* @param overwriteKey if true, overwrite the key in the index when
* inserting a duplicate key, in this way an iterator will never
* show two entries with the same key.
*/
public WriteBatchWithIndex(boolean overwriteKey) {
super();
newWriteBatchWithIndex(overwriteKey);
}
/**
* Creates a WriteBatchWithIndex
*
* @param fallbackIndexComparator We fallback to this comparator
* to compare keys within a column family if we cannot determine
* the column family and so look up it's comparator.
* @param reservedBytes reserved bytes in underlying WriteBatch
* @param overwriteKey if true, overwrite the key in the index when
* inserting a duplicate key, in this way an iterator will never
* show two entries with the same key.
*/
public WriteBatchWithIndex(AbstractComparator fallbackIndexComparator, int reservedBytes,
boolean overwriteKey) {
super();
newWriteBatchWithIndex(fallbackIndexComparator.nativeHandle_, reservedBytes, overwriteKey);
}
/**
* Create an iterator of a column family. User can call
* {@link org.rocksdb.RocksIteratorInterface#seek(byte[])} to
* search to the next entry of or after a key. Keys will be iterated in the
* order given by index_comparator. For multiple updates on the same key,
* each update will be returned as a separate entry, in the order of update
* time.
*
* @param columnFamilyHandle The column family to iterate over
* @return An iterator for the Write Batch contents, restricted to the column family
*/
public WBWIRocksIterator newIterator(ColumnFamilyHandle columnFamilyHandle) {
return new WBWIRocksIterator(this, iterator1(columnFamilyHandle.nativeHandle_));
}
/**
* Create an iterator of the default column family. User can call
* {@link org.rocksdb.RocksIteratorInterface#seek(byte[])} to
* search to the next entry of or after a key. Keys will be iterated in the
* order given by index_comparator. For multiple updates on the same key,
* each update will be returned as a separate entry, in the order of update
* time.
*
* @return An iterator for the Write Batch contents
*/
public WBWIRocksIterator newIterator() {
return new WBWIRocksIterator(this, iterator0());
}
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base
*
* @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database point-in-time
* from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(ColumnFamilyHandle columnFamilyHandle,
RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator(
baseIterator.rocksDB_,
iteratorWithBase(columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_));
//when the iterator is deleted it will also delete the baseIterator
baseIterator.disOwnNativeHandle();
return iterator;
}
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base. Operates on the default column family.
*
* @param baseIterator The base iterator, e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database point-in-time
* from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(RocksIterator baseIterator) {
return newIteratorWithBase(baseIterator.rocksDB_.getDefaultColumnFamily(), baseIterator);
}
@Override final native void disposeInternal(long handle);
@Override final native int count0();
@Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen);
@Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen,
long cfHandle);
@Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen);
@Override final native void merge(byte[] key, int keyLen, byte[] value, int valueLen,
long cfHandle);
@Override final native void remove(byte[] key, int keyLen);
@Override final native void remove(byte[] key, int keyLen, long cfHandle);
@Override final native void putLogData(byte[] blob, int blobLen);
@Override final native void clear0();
private native void newWriteBatchWithIndex();
private native void newWriteBatchWithIndex(boolean overwriteKey);
private native void newWriteBatchWithIndex(long fallbackIndexComparatorHandle, int reservedBytes,
boolean overwriteKey);
private native long iterator0();
private native long iterator1(long cfHandle);
private native long iteratorWithBase(long baseIteratorHandle, long cfHandle);
}

@ -19,6 +19,7 @@
#include "rocksdb/filter_policy.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/writebatchhandlerjnicallback.h"
@ -390,6 +391,37 @@ class WriteBatchHandlerJni {
}
};
class WriteBatchWithIndexJni {
public:
static jclass getJClass(JNIEnv* env) {
jclass jclazz = env->FindClass("org/rocksdb/WriteBatchWithIndex");
assert(jclazz != nullptr);
return jclazz;
}
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "nativeHandle_", "J");
assert(fid != nullptr);
return fid;
}
// Get the pointer to rocksdb::WriteBatchWithIndex of the specified
// org.rocksdb.WriteBatchWithIndex.
static rocksdb::WriteBatchWithIndex* getHandle(JNIEnv* env, jobject jwbwi) {
return reinterpret_cast<rocksdb::WriteBatchWithIndex*>(
env->GetLongField(jwbwi, getHandleFieldID(env)));
}
// Pass the rocksdb::WriteBatchWithIndex pointer to the java side.
static void setHandle(JNIEnv* env, jobject jwbwi,
rocksdb::WriteBatchWithIndex* wbwi) {
env->SetLongField(
jwbwi, getHandleFieldID(env),
reinterpret_cast<jlong>(wbwi));
}
};
class HistogramDataJni {
public:
static jmethodID getConstructorMethodId(JNIEnv* env, jclass jclazz) {

@ -0,0 +1,299 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::WriteBatchWithIndex methods from Java side.
#include "include/org_rocksdb_WriteBatchWithIndex.h"
#include "rocksjni/portal.h"
#include "rocksdb/comparator.h"
#include "rocksdb/utilities/write_batch_with_index.h"
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: newWriteBatchWithIndex
* Signature: ()V
*/
void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__(
JNIEnv* env, jobject jobj) {
rocksdb::WriteBatchWithIndex* wbwi = new rocksdb::WriteBatchWithIndex();
rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: newWriteBatchWithIndex
* Signature: (Z)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__Z(
JNIEnv* env, jobject jobj, jboolean joverwrite_key) {
rocksdb::WriteBatchWithIndex* wbwi =
new rocksdb::WriteBatchWithIndex(rocksdb::BytewiseComparator(), 0,
static_cast<bool>(joverwrite_key));
rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: newWriteBatchWithIndex
* Signature: (JIZ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_newWriteBatchWithIndex__JIZ(
JNIEnv* env, jobject jobj, jlong jfallback_index_comparator_handle,
jint jreserved_bytes, jboolean joverwrite_key) {
rocksdb::WriteBatchWithIndex* wbwi =
new rocksdb::WriteBatchWithIndex(
reinterpret_cast<rocksdb::Comparator*>(jfallback_index_comparator_handle),
static_cast<size_t>(jreserved_bytes), static_cast<bool>(joverwrite_key));
rocksdb::WriteBatchWithIndexJni::setHandle(env, jobj, wbwi);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: count
* Signature: ()I
*/
jint Java_org_rocksdb_WriteBatchWithIndex_count0(
JNIEnv* env, jobject jobj) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
assert(wbwi != nullptr);
return static_cast<jint>(wbwi->GetWriteBatch()->Count());
}
//TODO(AR) make generic with WriteBatch equivalent
/*
* Helper for WriteBatchWithIndex put operations
*/
void write_batch_with_index_put_helper(
JNIEnv* env, jobject jobj,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len,
rocksdb::ColumnFamilyHandle* cf_handle) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
assert(wbwi != nullptr);
jbyte* key = env->GetByteArrayElements(jkey, nullptr);
jbyte* value = env->GetByteArrayElements(jentry_value, nullptr);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value),
jentry_value_len);
if (cf_handle != nullptr) {
wbwi->Put(cf_handle, key_slice, value_slice);
} else {
// backwards compatibility
wbwi->Put(key_slice, value_slice);
}
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: put
* Signature: ([BI[BI)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_put___3BI_3BI(
JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
write_batch_with_index_put_helper(env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len, nullptr);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: put
* Signature: ([BI[BIJ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_put___3BI_3BIJ(
JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
write_batch_with_index_put_helper(env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len, cf_handle);
}
//TODO(AR) make generic with WriteBatch equivalent
/*
* Helper for WriteBatchWithIndex merge operations
*/
void write_batch_with_index_merge_helper(
JNIEnv* env, jobject jobj,
jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len,
rocksdb::ColumnFamilyHandle* cf_handle) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
assert(wbwi != nullptr);
jbyte* key = env->GetByteArrayElements(jkey, nullptr);
jbyte* value = env->GetByteArrayElements(jentry_value, nullptr);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
rocksdb::Slice value_slice(reinterpret_cast<char*>(value),
jentry_value_len);
if (cf_handle != nullptr) {
wbwi->Merge(cf_handle, key_slice, value_slice);
} else {
// backwards compatibility
wbwi->Merge(key_slice, value_slice);
}
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
env->ReleaseByteArrayElements(jentry_value, value, JNI_ABORT);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: merge
* Signature: ([BI[BI)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_merge___3BI_3BI(
JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len) {
write_batch_with_index_merge_helper(env, jobj, jkey, jkey_len,
jentry_value, jentry_value_len, nullptr);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: merge
* Signature: ([BI[BIJ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_merge___3BI_3BIJ(
JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len,
jbyteArray jentry_value, jint jentry_value_len, jlong jcf_handle) {
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
write_batch_with_index_merge_helper(env, jobj, jkey, jkey_len,
jentry_value, jentry_value_len, cf_handle);
}
//TODO(AR) make generic with WriteBatch equivalent
/*
* Helper for write batch remove operations
*/
void write_batch_with_index_remove_helper(
JNIEnv* env, jobject jobj,
jbyteArray jkey, jint jkey_len,
rocksdb::ColumnFamilyHandle* cf_handle) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
assert(wbwi != nullptr);
jbyte* key = env->GetByteArrayElements(jkey, nullptr);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
if (cf_handle != nullptr) {
wbwi->Delete(cf_handle, key_slice);
} else {
wbwi->Delete(key_slice);
}
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: remove
* Signature: ([BI)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_remove___3BI(
JNIEnv* env, jobject jobj, jbyteArray jkey, jint jkey_len) {
write_batch_with_index_remove_helper(env, jobj, jkey, jkey_len, nullptr);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: remove
* Signature: ([BIJ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_remove___3BIJ(
JNIEnv* env, jobject jobj,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
write_batch_with_index_remove_helper(env, jobj, jkey, jkey_len, cf_handle);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: putLogData
* Signature: ([BI)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_putLogData(
JNIEnv* env, jobject jobj, jbyteArray jblob, jint jblob_len) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
assert(wbwi != nullptr);
jbyte* blob = env->GetByteArrayElements(jblob, nullptr);
rocksdb::Slice blob_slice(reinterpret_cast<char*>(blob), jblob_len);
wbwi->PutLogData(blob_slice);
env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: clear
* Signature: ()V
*/
void Java_org_rocksdb_WriteBatchWithIndex_clear0(
JNIEnv* env, jobject jobj) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
assert(wbwi != nullptr);
wbwi->GetWriteBatch()->Clear();
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: iterator0
* Signature: ()J
*/
jlong Java_org_rocksdb_WriteBatchWithIndex_iterator0(
JNIEnv* env, jobject jobj) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
rocksdb::WBWIIterator* wbwi_iterator = wbwi->NewIterator();
return reinterpret_cast<jlong>(wbwi_iterator);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: iterator1
* Signature: (J)J
*/
jlong Java_org_rocksdb_WriteBatchWithIndex_iterator1(
JNIEnv* env, jobject jobj, jlong jcf_handle) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
rocksdb::WBWIIterator* wbwi_iterator = wbwi->NewIterator(cf_handle);
return reinterpret_cast<jlong>(wbwi_iterator);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: iteratorWithBase
* Signature: (JJ)J
*/
jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(
JNIEnv* env, jobject jobj, jlong jcf_handle, jlong jbi_handle) {
rocksdb::WriteBatchWithIndex* wbwi =
rocksdb::WriteBatchWithIndexJni::getHandle(env, jobj);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto* base_iterator = reinterpret_cast<rocksdb::Iterator*>(jbi_handle);
auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator);
return reinterpret_cast<jlong>(iterator);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(handle);
delete wbwi;
}
Loading…
Cancel
Save