Iterator support for Write Batches

main
Adam Retter 10 years ago
parent 00211f9c5b
commit 8fb4751d50
  1. 2
      java/Makefile
  2. 48
      java/org/rocksdb/WriteBatch.java
  3. 74
      java/rocksjni/portal.h
  4. 44
      java/rocksjni/write_batch.cc
  5. 98
      java/rocksjni/writebatchhandlerjnicallback.cc
  6. 47
      java/rocksjni/writebatchhandlerjnicallback.h

@ -1,4 +1,4 @@
NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.DBOptions org.rocksdb.WriteBatch org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB org.rocksdb.Options org.rocksdb.DBOptions org.rocksdb.WriteBatch org.rocksdb.WriteBatch.Handler org.rocksdb.WriteBatchInternal org.rocksdb.WriteBatchTest org.rocksdb.WriteOptions org.rocksdb.BackupableDB org.rocksdb.BackupableDBOptions org.rocksdb.Statistics org.rocksdb.RocksIterator org.rocksdb.VectorMemTableConfig org.rocksdb.SkipListMemTableConfig org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig org.rocksdb.PlainTableConfig org.rocksdb.BlockBasedTableConfig org.rocksdb.ReadOptions org.rocksdb.Filter org.rocksdb.BloomFilter org.rocksdb.ComparatorOptions org.rocksdb.AbstractComparator org.rocksdb.Comparator org.rocksdb.DirectComparator org.rocksdb.AbstractSlice org.rocksdb.Slice org.rocksdb.DirectSlice org.rocksdb.RestoreOptions org.rocksdb.RestoreBackupableDB org.rocksdb.RocksEnv org.rocksdb.GenericRateLimiterConfig org.rocksdb.ColumnFamilyHandle org.rocksdb.MergeOperator org.rocksdb.StringAppendOperator
ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MAJOR = $(shell egrep "ROCKSDB_MAJOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)
ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3) ROCKSDB_MINOR = $(shell egrep "ROCKSDB_MINOR.[0-9]" ../include/rocksdb/version.h | cut -d ' ' -f 3)

@ -5,8 +5,6 @@
package org.rocksdb; package org.rocksdb;
import java.util.*;
/** /**
* WriteBatch holds a collection of updates to apply atomically to a DB. * WriteBatch holds a collection of updates to apply atomically to a DB.
* *
@ -105,6 +103,13 @@ public class WriteBatch extends RocksObject {
putLogData(blob, blob.length); putLogData(blob, blob.length);
} }
/**
* Support for iterating over the contents of a batch.
*/
public void iterate(Handler handler) {
iterate(handler.nativeHandle_);
}
/** /**
* Clear all updates buffered in this batch * Clear all updates buffered in this batch
*/ */
@ -133,7 +138,46 @@ public class WriteBatch extends RocksObject {
private native void remove(byte[] key, int keyLen, private native void remove(byte[] key, int keyLen,
long cfHandle); long cfHandle);
private native void putLogData(byte[] blob, int blobLen); private native void putLogData(byte[] blob, int blobLen);
private native void iterate(long handlerHandle);
private native void disposeInternal(long handle);
/**
* Handler callback for iterating over the contents of a batch.
*/
public static abstract class Handler extends RocksObject {
public Handler() {
super();
createNewHandler0();
}
public abstract void put(byte[] key, byte[] value);
public abstract void merge(byte[] key, byte[] value);
public abstract void delete(byte[] key);
public abstract void logData(byte[] blob);
/**
* shouldContinue is called by the underlying iterator
* (WriteBatch::Iterate.If it returns false,
* iteration is halted. Otherwise, it continues
* iterating. The default implementation always
* returns true.
*/
public boolean shouldContinue() {
return true;
}
/**
* Deletes underlying C++ handler pointer.
*/
@Override
protected void disposeInternal() {
assert(isInitialized());
disposeInternal(nativeHandle_);
}
private native void createNewHandler0();
private native void disposeInternal(long handle); private native void disposeInternal(long handle);
}
} }
/** /**

@ -20,6 +20,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "rocksjni/comparatorjnicallback.h" #include "rocksjni/comparatorjnicallback.h"
#include "rocksjni/writebatchhandlerjnicallback.h"
namespace rocksdb { namespace rocksdb {
@ -288,6 +289,79 @@ class WriteBatchJni {
} }
}; };
class WriteBatchHandlerJni {
public:
static jclass getJClass(JNIEnv* env) {
jclass jclazz = env->FindClass("org/rocksdb/WriteBatch$Handler");
assert(jclazz != nullptr);
return jclazz;
}
static jfieldID getHandleFieldID(JNIEnv* env) {
static jfieldID fid = env->GetFieldID(
getJClass(env), "nativeHandle_", "J");
assert(fid != nullptr);
return fid;
}
// Get the java method `put` of org.rocksdb.WriteBatch.Handler.
static jmethodID getPutMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "put", "([B[B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `merge` of org.rocksdb.WriteBatch.Handler.
static jmethodID getMergeMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "merge", "([B[B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `delete` of org.rocksdb.WriteBatch.Handler.
static jmethodID getDeleteMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "delete", "([B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `logData` of org.rocksdb.WriteBatch.Handler.
static jmethodID getLogDataMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "logData", "([B)V");
assert(mid != nullptr);
return mid;
}
// Get the java method `shouldContinue` of org.rocksdb.WriteBatch.Handler.
static jmethodID getContinueMethodId(JNIEnv* env) {
static jmethodID mid = env->GetMethodID(
getJClass(env), "shouldContinue", "()Z");
assert(mid != nullptr);
return mid;
}
// Get the pointer to rocksdb::WriteBatchHandlerJniCallback of the specified
// org.rocksdb.WriteBatchHandler.
static rocksdb::WriteBatchHandlerJniCallback* getHandle(
JNIEnv* env, jobject jobj) {
return reinterpret_cast<rocksdb::WriteBatchHandlerJniCallback*>(
env->GetLongField(jobj, getHandleFieldID(env)));
}
// Pass the rocksdb::WriteBatchHandlerJniCallback pointer to the java side.
static void setHandle(
JNIEnv* env, jobject jobj,
const rocksdb::WriteBatchHandlerJniCallback* op) {
env->SetLongField(
jobj, getHandleFieldID(env),
reinterpret_cast<jlong>(op));
}
};
class HistogramDataJni { class HistogramDataJni {
public: public:
static jmethodID getConstructorMethodId(JNIEnv* env, jclass jclazz) { static jmethodID getConstructorMethodId(JNIEnv* env, jclass jclazz) {

@ -8,13 +8,16 @@
#include <memory> #include <memory>
#include "include/org_rocksdb_WriteBatch.h" #include "include/org_rocksdb_WriteBatch.h"
#include "include/org_rocksdb_WriteBatch_Handler.h"
#include "include/org_rocksdb_WriteBatchInternal.h" #include "include/org_rocksdb_WriteBatchInternal.h"
#include "include/org_rocksdb_WriteBatchTest.h" #include "include/org_rocksdb_WriteBatchTest.h"
#include "rocksjni/portal.h" #include "rocksjni/portal.h"
#include "rocksjni/writebatchhandlerjnicallback.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/immutable_options.h" #include "rocksdb/immutable_options.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "rocksdb/status.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
@ -224,6 +227,25 @@ void Java_org_rocksdb_WriteBatch_putLogData(
env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT); env->ReleaseByteArrayElements(jblob, blob, JNI_ABORT);
} }
/*
* Class: org_rocksdb_WriteBatch
* Method: iterate
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_iterate(
JNIEnv* env , jobject jobj, jlong handlerHandle) {
rocksdb::WriteBatch* wb = rocksdb::WriteBatchJni::getHandle(env, jobj);
assert(wb != nullptr);
rocksdb::Status s = wb->Iterate(
reinterpret_cast<rocksdb::WriteBatchHandlerJniCallback*>(handlerHandle));
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/* /*
* Class: org_rocksdb_WriteBatch * Class: org_rocksdb_WriteBatch
* Method: disposeInternal * Method: disposeInternal
@ -276,6 +298,28 @@ void Java_org_rocksdb_WriteBatchInternal_append(
rocksdb::WriteBatchInternal::Append(wb1, wb2); rocksdb::WriteBatchInternal::Append(wb1, wb2);
} }
/*
* Class: org_rocksdb_WriteBatch_Handler
* Method: createNewHandler0
* Signature: ()V
*/
void Java_org_rocksdb_WriteBatch_00024Handler_createNewHandler0(
JNIEnv* env, jobject jobj) {
const rocksdb::WriteBatchHandlerJniCallback* h =
new rocksdb::WriteBatchHandlerJniCallback(env, jobj);
rocksdb::WriteBatchHandlerJni::setHandle(env, jobj, h);
}
/*
* Class: org_rocksdb_WriteBatch_Handler
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_00024Handler_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
delete reinterpret_cast<rocksdb::WriteBatchHandlerJniCallback*>(handle);
}
/* /*
* Class: org_rocksdb_WriteBatchTest * Class: org_rocksdb_WriteBatchTest
* Method: getContents * Method: getContents

@ -0,0 +1,98 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the callback "bridge" between Java and C++ for
// rocksdb::Comparator.
#include "rocksjni/writebatchhandlerjnicallback.h"
#include "rocksjni/portal.h"
namespace rocksdb {
WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBatchHandler) {
// Note: WriteBatchHandler methods may be accessed by multiple threads,
// so we ref the jvm not the env
const jint rs = env->GetJavaVM(&m_jvm);
assert(rs == JNI_OK);
// Note: we want to access the Java WriteBatchHandler instance
// across multiple method calls, so we create a global ref
m_jWriteBatchHandler = env->NewGlobalRef(jWriteBatchHandler);
m_jPutMethodId = WriteBatchHandlerJni::getPutMethodId(env);
m_jMergeMethodId = WriteBatchHandlerJni::getMergeMethodId(env);
m_jDeleteMethodId = WriteBatchHandlerJni::getDeleteMethodId(env);
m_jLogDataMethodId = WriteBatchHandlerJni::getLogDataMethodId(env);
m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env);
}
/**
* Attach/Get a JNIEnv for the current native thread
*/
JNIEnv* WriteBatchHandlerJniCallback::getJniEnv() const {
JNIEnv *env;
jint rs = m_jvm->AttachCurrentThread(reinterpret_cast<void **>(&env), NULL);
assert(rs == JNI_OK);
return env;
}
void WriteBatchHandlerJniCallback::Put(const Slice& key, const Slice& value) {
getJniEnv()->CallVoidMethod(
m_jWriteBatchHandler,
m_jPutMethodId,
sliceToJArray(key),
sliceToJArray(value));
}
void WriteBatchHandlerJniCallback::Merge(const Slice& key, const Slice& value) {
getJniEnv()->CallVoidMethod(
m_jWriteBatchHandler,
m_jMergeMethodId,
sliceToJArray(key),
sliceToJArray(value));
}
void WriteBatchHandlerJniCallback::Delete(const Slice& key) {
getJniEnv()->CallVoidMethod(
m_jWriteBatchHandler,
m_jDeleteMethodId,
sliceToJArray(key));
}
void WriteBatchHandlerJniCallback::LogData(const Slice& blob) {
getJniEnv()->CallVoidMethod(
m_jWriteBatchHandler,
m_jLogDataMethodId,
sliceToJArray(blob));
}
bool WriteBatchHandlerJniCallback::Continue() {
jboolean jContinue = getJniEnv()->CallBooleanMethod(
m_jWriteBatchHandler,
m_jContinueMethodId);
return static_cast<bool>(jContinue == JNI_TRUE);
}
jbyteArray WriteBatchHandlerJniCallback::sliceToJArray(const Slice& s) {
jbyteArray ja = getJniEnv()->NewByteArray(s.size());
getJniEnv()->SetByteArrayRegion(
ja, 0, s.size(),
reinterpret_cast<const jbyte*>(s.data()));
return ja;
}
WriteBatchHandlerJniCallback::~WriteBatchHandlerJniCallback() {
JNIEnv* m_env = getJniEnv();
m_env->DeleteGlobalRef(m_jWriteBatchHandler);
// Note: do not need to explicitly detach, as this function is effectively
// called from the Java class's disposeInternal method, and so already
// has an attached thread, getJniEnv above is just a no-op Attach to get
// the env jvm->DetachCurrentThread();
}
} // namespace rocksdb

@ -0,0 +1,47 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the callback "bridge" between Java and C++ for
// rocksdb::WriteBatch::Handler.
#ifndef JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
#define JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
#include <jni.h>
#include "rocksdb/write_batch.h"
namespace rocksdb {
/**
* This class acts as a bridge between C++
* and Java. The methods in this class will be
* called back from the RocksDB storage engine (C++)
* we then callback to the appropriate Java method
* this enables Write Batch Handlers to be implemented in Java.
*/
class WriteBatchHandlerJniCallback : public WriteBatch::Handler {
public:
WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBackHandler);
~WriteBatchHandlerJniCallback();
void Put(const Slice& key, const Slice& value);
void Merge(const Slice& key, const Slice& value);
void Delete(const Slice& key);
void LogData(const Slice& blob);
bool Continue();
private:
JavaVM* m_jvm;
jobject m_jWriteBatchHandler;
JNIEnv* getJniEnv() const;
jbyteArray sliceToJArray(const Slice& s);
jmethodID m_jPutMethodId;
jmethodID m_jMergeMethodId;
jmethodID m_jDeleteMethodId;
jmethodID m_jLogDataMethodId;
jmethodID m_jContinueMethodId;
};
} // namespace rocksdb
#endif // JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
Loading…
Cancel
Save