Brings the Java API for WriteBatch inline with the C++ API

Summary:
* Exposes status
* Corrects some method naming
* Adds missing functionality
Closes https://github.com/facebook/rocksdb/pull/3550

Differential Revision: D7140790

Pulled By: sagar0

fbshipit-source-id: cbdab6c5a7ae4f3030fb46739e9060e381b26fa6
main
Adam Retter 7 years ago committed by Facebook Github Bot
parent 1209b6db5c
commit db2445ad24
  1. 4
      java/CMakeLists.txt
  2. 1039
      java/rocksjni/portal.h
  3. 364
      java/rocksjni/write_batch.cc
  4. 32
      java/rocksjni/write_batch_test.cc
  5. 206
      java/rocksjni/write_batch_with_index.cc
  6. 535
      java/rocksjni/writebatchhandlerjnicallback.cc
  7. 39
      java/rocksjni/writebatchhandlerjnicallback.h
  8. 101
      java/src/main/java/org/rocksdb/AbstractWriteBatch.java
  9. 18
      java/src/main/java/org/rocksdb/Status.java
  10. 11
      java/src/main/java/org/rocksdb/WBWIRocksIterator.java
  11. 257
      java/src/main/java/org/rocksdb/WriteBatch.java
  12. 118
      java/src/main/java/org/rocksdb/WriteBatchInterface.java
  13. 18
      java/src/main/java/org/rocksdb/WriteBatchWithIndex.java
  14. 133
      java/src/test/java/org/rocksdb/WriteBatchHandlerTest.java
  15. 365
      java/src/test/java/org/rocksdb/WriteBatchTest.java
  16. 178
      java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java
  17. 171
      java/src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java
  18. 133
      java/src/test/java/org/rocksdb/util/WriteBatchGetter.java

@ -122,6 +122,8 @@ set(NATIVE_JAVA_CLASSES
org.rocksdb.WriteBatchTestInternalHelper
org.rocksdb.WriteBatchWithIndex
org.rocksdb.WriteOptions
org.rocksdb.util.CapturingWriteBatchHandler
org.rocksdb.util.WriteBatchGetter
)
include(FindJava)
@ -249,6 +251,8 @@ add_jar(
src/test/java/org/rocksdb/RocksMemoryResource.java
src/test/java/org/rocksdb/SnapshotTest.java
src/test/java/org/rocksdb/WriteBatchTest.java
src/test/java/org/rocksdb/util/CapturingWriteBatchHandler.java
src/test/java/org/rocksdb/util/WriteBatchGetter.java
INCLUDE_JARS ${JAVA_TESTCLASSPATH}
)

File diff suppressed because it is too large Load Diff

@ -27,12 +27,34 @@
* Method: newWriteBatch
* Signature: (I)J
*/
jlong Java_org_rocksdb_WriteBatch_newWriteBatch(
jlong Java_org_rocksdb_WriteBatch_newWriteBatch__I(
JNIEnv* env, jclass jcls, jint jreserved_bytes) {
auto* wb = new rocksdb::WriteBatch(static_cast<size_t>(jreserved_bytes));
return reinterpret_cast<jlong>(wb);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: newWriteBatch
* Signature: ([BI)J
*/
jlong Java_org_rocksdb_WriteBatch_newWriteBatch___3BI(
JNIEnv* env, jclass jcls, jbyteArray jserialized,
jint jserialized_length) {
jboolean has_exception = JNI_FALSE;
std::string serialized = rocksdb::JniUtil::byteString<std::string>(env,
jserialized, jserialized_length,
[](const char* str, const size_t len) { return std::string(str, len); },
&has_exception);
if(has_exception == JNI_TRUE) {
// exception occurred
return 0;
}
auto* wb = new rocksdb::WriteBatch(serialized);
return reinterpret_cast<jlong>(wb);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: count0
@ -90,6 +112,37 @@ void Java_org_rocksdb_WriteBatch_rollbackToSavePoint0(
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: popSavePoint
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_popSavePoint(
JNIEnv* env, jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto s = wb->PopSavePoint();
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: setMaxBytes
* Signature: (JJ)V
*/
void Java_org_rocksdb_WriteBatch_setMaxBytes(
JNIEnv* env, jobject jobj, jlong jwb_handle, jlong jmax_bytes) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
wb->SetMaxBytes(static_cast<size_t>(jmax_bytes));
}
/*
* Class: org_rocksdb_WriteBatch
* Method: put
@ -102,10 +155,13 @@ void Java_org_rocksdb_WriteBatch_put__J_3BI_3BI(
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto put = [&wb] (rocksdb::Slice key, rocksdb::Slice value) {
wb->Put(key, value);
return wb->Put(key, value);
};
rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(put, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -122,10 +178,13 @@ void Java_org_rocksdb_WriteBatch_put__J_3BI_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto put = [&wb, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
wb->Put(cf_handle, key, value);
return wb->Put(cf_handle, key, value);
};
rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(put, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -140,10 +199,13 @@ void Java_org_rocksdb_WriteBatch_merge__J_3BI_3BI(
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto merge = [&wb] (rocksdb::Slice key, rocksdb::Slice value) {
wb->Merge(key, value);
return wb->Merge(key, value);
};
rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(merge, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -160,34 +222,41 @@ void Java_org_rocksdb_WriteBatch_merge__J_3BI_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto merge = [&wb, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
wb->Merge(cf_handle, key, value);
return wb->Merge(cf_handle, key, value);
};
rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(merge, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatch
* Method: remove
* Method: delete
* Signature: (J[BI)V
*/
void Java_org_rocksdb_WriteBatch_remove__J_3BI(
void Java_org_rocksdb_WriteBatch_delete__J_3BI(
JNIEnv* env, jobject jobj, jlong jwb_handle,
jbyteArray jkey, jint jkey_len) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto remove = [&wb] (rocksdb::Slice key) {
wb->Delete(key);
return wb->Delete(key);
};
rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(remove, env,
jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatch
* Method: remove
* Method: delete
* Signature: (J[BIJ)V
*/
void Java_org_rocksdb_WriteBatch_remove__J_3BIJ(
void Java_org_rocksdb_WriteBatch_delete__J_3BIJ(
JNIEnv* env, jobject jobj, jlong jwb_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
@ -195,9 +264,55 @@ void Java_org_rocksdb_WriteBatch_remove__J_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto remove = [&wb, &cf_handle] (rocksdb::Slice key) {
wb->Delete(cf_handle, key);
return wb->Delete(cf_handle, key);
};
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(remove, env,
jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatch
* Method: singleDelete
* Signature: (J[BI)V
*/
void Java_org_rocksdb_WriteBatch_singleDelete__J_3BI(
JNIEnv* env, jobject jobj, jlong jwb_handle, jbyteArray jkey,
jint jkey_len) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto single_delete = [&wb] (rocksdb::Slice key) {
return wb->SingleDelete(key);
};
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(single_delete,
env, jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatch
* Method: singleDelete
* Signature: (J[BIJ)V
*/
void Java_org_rocksdb_WriteBatch_singleDelete__J_3BIJ(
JNIEnv* env, jobject jobj, jlong jwb_handle, jbyteArray jkey,
jint jkey_len, jlong jcf_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto single_delete = [&wb, &cf_handle] (rocksdb::Slice key) {
return wb->SingleDelete(cf_handle, key);
};
rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(single_delete,
env, jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -205,19 +320,20 @@ void Java_org_rocksdb_WriteBatch_remove__J_3BIJ(
* Method: deleteRange
* Signature: (J[BI[BI)V
*/
JNIEXPORT void JNICALL Java_org_rocksdb_WriteBatch_deleteRange__J_3BI_3BI(
JNIEnv*, jobject, jlong, jbyteArray, jint, jbyteArray, jint);
void Java_org_rocksdb_WriteBatch_deleteRange__J_3BI_3BI(
JNIEnv* env, jobject jobj, jlong jwb_handle, jbyteArray jbegin_key,
jint jbegin_key_len, jbyteArray jend_key, jint jend_key_len) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto deleteRange = [&wb](rocksdb::Slice beginKey, rocksdb::Slice endKey) {
wb->DeleteRange(beginKey, endKey);
return wb->DeleteRange(beginKey, endKey);
};
rocksdb::JniUtil::kv_op(deleteRange, env, jobj, jbegin_key, jbegin_key_len,
jend_key, jend_key_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(
deleteRange, env, jobj, jbegin_key, jbegin_key_len, jend_key,
jend_key_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -234,11 +350,15 @@ void Java_org_rocksdb_WriteBatch_deleteRange__J_3BI_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto deleteRange = [&wb, &cf_handle](rocksdb::Slice beginKey,
rocksdb::Slice endKey) {
wb->DeleteRange(cf_handle, beginKey, endKey);
rocksdb::Slice endKey) {
return wb->DeleteRange(cf_handle, beginKey, endKey);
};
rocksdb::JniUtil::kv_op(deleteRange, env, jobj, jbegin_key, jbegin_key_len,
jend_key, jend_key_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(
deleteRange, env, jobj, jbegin_key, jbegin_key_len, jend_key,
jend_key_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -252,9 +372,13 @@ void Java_org_rocksdb_WriteBatch_putLogData(
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto putLogData = [&wb] (rocksdb::Slice blob) {
wb->PutLogData(blob);
return wb->PutLogData(blob);
};
rocksdb::JniUtil::k_op(putLogData, env, jobj, jblob, jblob_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(putLogData,
env, jobj, jblob, jblob_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -263,7 +387,7 @@ void Java_org_rocksdb_WriteBatch_putLogData(
* Signature: (JJ)V
*/
void Java_org_rocksdb_WriteBatch_iterate(
JNIEnv* env , jobject jobj, jlong jwb_handle, jlong handlerHandle) {
JNIEnv* env, jobject jobj, jlong jwb_handle, jlong handlerHandle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
@ -276,6 +400,178 @@ void Java_org_rocksdb_WriteBatch_iterate(
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: data
* Signature: (J)[B
*/
jbyteArray Java_org_rocksdb_WriteBatch_data(
JNIEnv* env, jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto data = wb->Data();
return rocksdb::JniUtil::copyBytes(env, data);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: getDataSize
* Signature: (J)J
*/
jlong Java_org_rocksdb_WriteBatch_getDataSize(
JNIEnv* env, jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto data_size = wb->GetDataSize();
return static_cast<jlong>(data_size);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasPut
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_WriteBatch_hasPut(
JNIEnv* env, jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasPut();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasDelete
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_WriteBatch_hasDelete(
JNIEnv* env, jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasDelete();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasSingleDelete
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_WriteBatch_hasSingleDelete(
JNIEnv* env , jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasSingleDelete();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasDeleteRange
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_WriteBatch_hasDeleteRange(
JNIEnv* env , jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasDeleteRange();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasMerge
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_WriteBatch_hasMerge(
JNIEnv* env , jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasMerge();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasBeginPrepare
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_WriteBatch_hasBeginPrepare(
JNIEnv* env , jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasBeginPrepare();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasEndPrepare
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_WriteBatch_hasEndPrepare(
JNIEnv* env , jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasEndPrepare();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasCommit
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_WriteBatch_hasCommit(
JNIEnv* env , jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasCommit();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: hasRollback
* Signature: (J)Z
*/
JNIEXPORT jboolean JNICALL Java_org_rocksdb_WriteBatch_hasRollback(
JNIEnv* env , jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
return wb->HasRollback();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: markWalTerminationPoint
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatch_markWalTerminationPoint(
JNIEnv* env, jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
wb->MarkWalTerminationPoint();
}
/*
* Class: org_rocksdb_WriteBatch
* Method: getWalTerminationPoint
* Signature: (J)Lorg/rocksdb/WriteBatch/SavePoint;
*/
jobject Java_org_rocksdb_WriteBatch_getWalTerminationPoint(
JNIEnv* env, jobject jobj, jlong jwb_handle) {
auto* wb = reinterpret_cast<rocksdb::WriteBatch*>(jwb_handle);
assert(wb != nullptr);
auto save_point = wb->GetWalTerminationPoint();
return rocksdb::WriteBatchSavePointJni::construct(env, save_point);
}
/*
* Class: org_rocksdb_WriteBatch
* Method: disposeInternal

@ -87,8 +87,32 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(
state.append(")");
count++;
break;
case rocksdb::kTypeSingleDeletion:
state.append("SingleDelete(");
state.append(ikey.user_key.ToString());
state.append(")");
count++;
break;
case rocksdb::kTypeRangeDeletion:
state.append("DeleteRange(");
state.append(ikey.user_key.ToString());
state.append(", ");
state.append(iter->value().ToString());
state.append(")");
count++;
break;
case rocksdb::kTypeLogData:
state.append("LogData(");
state.append(ikey.user_key.ToString());
state.append(")");
count++;
break;
default:
assert(false);
state.append("Err:Expected(");
state.append(std::to_string(ikey.type));
state.append(")");
count++;
break;
}
state.append("@");
@ -96,8 +120,12 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(
}
if (!s.ok()) {
state.append(s.ToString());
} else if (count != rocksdb::WriteBatchInternal::Count(b)) {
state.append("CountMismatch()");
} else if (rocksdb::WriteBatchInternal::Count(b) != count) {
state.append("Err:CountMismatch(expected=");
state.append(std::to_string(rocksdb::WriteBatchInternal::Count(b)));
state.append(", actual=");
state.append(std::to_string(count));
state.append(")");
}
delete mem->Unref();

@ -83,10 +83,13 @@ void Java_org_rocksdb_WriteBatchWithIndex_put__J_3BI_3BI(
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto put = [&wbwi] (rocksdb::Slice key, rocksdb::Slice value) {
wbwi->Put(key, value);
return wbwi->Put(key, value);
};
rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(put, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -103,10 +106,13 @@ void Java_org_rocksdb_WriteBatchWithIndex_put__J_3BI_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto put = [&wbwi, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
wbwi->Put(cf_handle, key, value);
return wbwi->Put(cf_handle, key, value);
};
rocksdb::JniUtil::kv_op(put, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(put, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -120,10 +126,13 @@ void Java_org_rocksdb_WriteBatchWithIndex_merge__J_3BI_3BI(
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto merge = [&wbwi] (rocksdb::Slice key, rocksdb::Slice value) {
wbwi->Merge(key, value);
return wbwi->Merge(key, value);
};
rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(merge, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -140,34 +149,41 @@ void Java_org_rocksdb_WriteBatchWithIndex_merge__J_3BI_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto merge = [&wbwi, &cf_handle] (rocksdb::Slice key, rocksdb::Slice value) {
wbwi->Merge(cf_handle, key, value);
return wbwi->Merge(cf_handle, key, value);
};
rocksdb::JniUtil::kv_op(merge, env, jobj, jkey, jkey_len, jentry_value,
jentry_value_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(merge, env,
jobj, jkey, jkey_len, jentry_value, jentry_value_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: remove
* Method: delete
* Signature: (J[BI)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_remove__J_3BI(
void Java_org_rocksdb_WriteBatchWithIndex_delete__J_3BI(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jbyteArray jkey,
jint jkey_len) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto remove = [&wbwi] (rocksdb::Slice key) {
wbwi->Delete(key);
return wbwi->Delete(key);
};
rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(remove, env,
jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: remove
* Method: delete
* Signature: (J[BIJ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_remove__J_3BIJ(
void Java_org_rocksdb_WriteBatchWithIndex_delete__J_3BIJ(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jbyteArray jkey,
jint jkey_len, jlong jcf_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
@ -175,9 +191,55 @@ void Java_org_rocksdb_WriteBatchWithIndex_remove__J_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto remove = [&wbwi, &cf_handle] (rocksdb::Slice key) {
wbwi->Delete(cf_handle, key);
return wbwi->Delete(cf_handle, key);
};
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(remove, env,
jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: singleDelete
* Signature: (J[BI)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_singleDelete__J_3BI(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jbyteArray jkey,
jint jkey_len) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto single_delete = [&wbwi] (rocksdb::Slice key) {
return wbwi->SingleDelete(key);
};
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(single_delete,
env, jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: singleDelete
* Signature: (J[BIJ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_singleDelete__J_3BIJ(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jbyteArray jkey,
jint jkey_len, jlong jcf_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto single_delete = [&wbwi, &cf_handle] (rocksdb::Slice key) {
return wbwi->SingleDelete(cf_handle, key);
};
rocksdb::JniUtil::k_op(remove, env, jobj, jkey, jkey_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(single_delete,
env, jobj, jkey, jkey_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -191,10 +253,14 @@ void Java_org_rocksdb_WriteBatchWithIndex_deleteRange__J_3BI_3BI(
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto deleteRange = [&wbwi](rocksdb::Slice beginKey, rocksdb::Slice endKey) {
wbwi->DeleteRange(beginKey, endKey);
return wbwi->DeleteRange(beginKey, endKey);
};
rocksdb::JniUtil::kv_op(deleteRange, env, jobj, jbegin_key, jbegin_key_len,
jend_key, jend_key_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(
deleteRange, env, jobj, jbegin_key, jbegin_key_len, jend_key,
jend_key_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -211,11 +277,15 @@ void Java_org_rocksdb_WriteBatchWithIndex_deleteRange__J_3BI_3BIJ(
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
assert(cf_handle != nullptr);
auto deleteRange = [&wbwi, &cf_handle](rocksdb::Slice beginKey,
rocksdb::Slice endKey) {
wbwi->DeleteRange(cf_handle, beginKey, endKey);
rocksdb::Slice endKey) {
return wbwi->DeleteRange(cf_handle, beginKey, endKey);
};
rocksdb::JniUtil::kv_op(deleteRange, env, jobj, jbegin_key, jbegin_key_len,
jend_key, jend_key_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::kv_op(
deleteRange, env, jobj, jbegin_key, jbegin_key_len, jend_key,
jend_key_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -229,9 +299,13 @@ void Java_org_rocksdb_WriteBatchWithIndex_putLogData(
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto putLogData = [&wbwi] (rocksdb::Slice blob) {
wbwi->PutLogData(blob);
return wbwi->PutLogData(blob);
};
rocksdb::JniUtil::k_op(putLogData, env, jobj, jblob, jblob_len);
std::unique_ptr<rocksdb::Status> status = rocksdb::JniUtil::k_op(putLogData,
env, jobj, jblob, jblob_len);
if (status != nullptr && !status->ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, status);
}
}
/*
@ -279,6 +353,54 @@ void Java_org_rocksdb_WriteBatchWithIndex_rollbackToSavePoint0(
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: popSavePoint
* Signature: (J)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_popSavePoint(
JNIEnv* env, jobject jobj, jlong jwbwi_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto s = wbwi->PopSavePoint();
if (s.ok()) {
return;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: setMaxBytes
* Signature: (JJ)V
*/
void Java_org_rocksdb_WriteBatchWithIndex_setMaxBytes(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jlong jmax_bytes) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
wbwi->SetMaxBytes(static_cast<size_t>(jmax_bytes));
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: getWriteBatch
* Signature: (J)Lorg/rocksdb/WriteBatch;
*/
jobject Java_org_rocksdb_WriteBatchWithIndex_getWriteBatch(
JNIEnv* env, jobject jobj, jlong jwbwi_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
assert(wbwi != nullptr);
auto* wb = wbwi->GetWriteBatch();
// TODO(AR) is the `wb` object owned by us?
return rocksdb::WriteBatchJni::construct(env, wb);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: iterator0
@ -551,33 +673,15 @@ jlongArray Java_org_rocksdb_WBWIRocksIterator_entry1(
jlong results[3];
//set the type of the write entry
switch (we.type) {
case rocksdb::kPutRecord:
results[0] = 0x1;
break;
case rocksdb::kMergeRecord:
results[0] = 0x2;
break;
case rocksdb::kDeleteRecord:
results[0] = 0x4;
break;
case rocksdb::kLogDataRecord:
results[0] = 0x8;
break;
default:
results[0] = 0x0;
}
// set the type of the write entry
results[0] = rocksdb::WriteTypeJni::toJavaWriteType(we.type);
// key_slice and value_slice will be freed by org.rocksdb.DirectSlice#close
// NOTE: key_slice and value_slice will be freed by org.rocksdb.DirectSlice#close
auto* key_slice = new rocksdb::Slice(we.key.data(), we.key.size());
results[1] = reinterpret_cast<jlong>(key_slice);
if (we.type == rocksdb::kDeleteRecord
|| we.type == rocksdb::kSingleDeleteRecord
|| we.type == rocksdb::kLogDataRecord) {
// set native handle of value slice to null if no value available
results[2] = 0;

@ -14,24 +14,62 @@ WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBatchHandler)
: JniCallback(env, jWriteBatchHandler), m_env(env) {
m_jPutCfMethodId = WriteBatchHandlerJni::getPutCfMethodId(env);
if(m_jPutCfMethodId == nullptr) {
// exception thrown
return;
}
m_jPutMethodId = WriteBatchHandlerJni::getPutMethodId(env);
if(m_jPutMethodId == nullptr) {
// exception thrown
return;
}
m_jMergeCfMethodId = WriteBatchHandlerJni::getMergeCfMethodId(env);
if(m_jMergeCfMethodId == nullptr) {
// exception thrown
return;
}
m_jMergeMethodId = WriteBatchHandlerJni::getMergeMethodId(env);
if(m_jMergeMethodId == nullptr) {
// exception thrown
return;
}
m_jDeleteCfMethodId = WriteBatchHandlerJni::getDeleteCfMethodId(env);
if(m_jDeleteCfMethodId == nullptr) {
// exception thrown
return;
}
m_jDeleteMethodId = WriteBatchHandlerJni::getDeleteMethodId(env);
if(m_jDeleteMethodId == nullptr) {
// exception thrown
return;
}
m_jSingleDeleteCfMethodId =
WriteBatchHandlerJni::getSingleDeleteCfMethodId(env);
if(m_jSingleDeleteCfMethodId == nullptr) {
// exception thrown
return;
}
m_jSingleDeleteMethodId = WriteBatchHandlerJni::getSingleDeleteMethodId(env);
if(m_jSingleDeleteMethodId == nullptr) {
// exception thrown
return;
}
m_jDeleteRangeCfMethodId =
WriteBatchHandlerJni::getDeleteRangeCfMethodId(env);
if (m_jDeleteRangeCfMethodId == nullptr) {
// exception thrown
return;
}
m_jDeleteRangeMethodId = WriteBatchHandlerJni::getDeleteRangeMethodId(env);
if (m_jDeleteRangeMethodId == nullptr) {
// exception thrown
@ -44,203 +82,318 @@ WriteBatchHandlerJniCallback::WriteBatchHandlerJniCallback(
return;
}
m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env);
if(m_jContinueMethodId == nullptr) {
m_jPutBlobIndexCfMethodId =
WriteBatchHandlerJni::getPutBlobIndexCfMethodId(env);
if(m_jPutBlobIndexCfMethodId == nullptr) {
// exception thrown
return;
}
}
void WriteBatchHandlerJniCallback::Put(const Slice& key, const Slice& value) {
const jbyteArray j_key = sliceToJArray(key);
if(j_key == nullptr) {
m_jMarkBeginPrepareMethodId =
WriteBatchHandlerJni::getMarkBeginPrepareMethodId(env);
if(m_jMarkBeginPrepareMethodId == nullptr) {
// exception thrown
if(m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
return;
}
const jbyteArray j_value = sliceToJArray(value);
if(j_value == nullptr) {
m_jMarkEndPrepareMethodId =
WriteBatchHandlerJni::getMarkEndPrepareMethodId(env);
if(m_jMarkEndPrepareMethodId == nullptr) {
// exception thrown
if(m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
return;
}
m_env->CallVoidMethod(
m_jcallback_obj,
m_jPutMethodId,
j_key,
j_value);
if(m_env->ExceptionCheck()) {
m_jMarkNoopMethodId = WriteBatchHandlerJni::getMarkNoopMethodId(env);
if(m_jMarkNoopMethodId == nullptr) {
// exception thrown
m_env->ExceptionDescribe();
if(j_value != nullptr) {
m_env->DeleteLocalRef(j_value);
}
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
return;
}
if(j_value != nullptr) {
m_env->DeleteLocalRef(j_value);
}
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
m_jMarkRollbackMethodId = WriteBatchHandlerJni::getMarkRollbackMethodId(env);
if(m_jMarkRollbackMethodId == nullptr) {
// exception thrown
return;
}
}
void WriteBatchHandlerJniCallback::Merge(const Slice& key, const Slice& value) {
const jbyteArray j_key = sliceToJArray(key);
if(j_key == nullptr) {
m_jMarkCommitMethodId = WriteBatchHandlerJni::getMarkCommitMethodId(env);
if(m_jMarkCommitMethodId == nullptr) {
// exception thrown
if(m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
return;
}
const jbyteArray j_value = sliceToJArray(value);
if(j_value == nullptr) {
m_jContinueMethodId = WriteBatchHandlerJni::getContinueMethodId(env);
if(m_jContinueMethodId == nullptr) {
// exception thrown
if(m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
return;
}
}
m_env->CallVoidMethod(
rocksdb::Status WriteBatchHandlerJniCallback::PutCF(uint32_t column_family_id,
const Slice& key, const Slice& value) {
auto put = [this, column_family_id] (
jbyteArray j_key, jbyteArray j_value) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jMergeMethodId,
m_jPutCfMethodId,
static_cast<jint>(column_family_id),
j_key,
j_value);
if(m_env->ExceptionCheck()) {
// exception thrown
m_env->ExceptionDescribe();
if(j_value != nullptr) {
m_env->DeleteLocalRef(j_value);
}
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
return;
};
auto status = WriteBatchHandlerJniCallback::kv_op(key, value, put);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
if(j_value != nullptr) {
m_env->DeleteLocalRef(j_value);
}
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
void WriteBatchHandlerJniCallback::Put(const Slice& key, const Slice& value) {
auto put = [this] (
jbyteArray j_key, jbyteArray j_value) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jPutMethodId,
j_key,
j_value);
};
WriteBatchHandlerJniCallback::kv_op(key, value, put);
}
rocksdb::Status WriteBatchHandlerJniCallback::MergeCF(uint32_t column_family_id,
const Slice& key, const Slice& value) {
auto merge = [this, column_family_id] (
jbyteArray j_key, jbyteArray j_value) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jMergeCfMethodId,
static_cast<jint>(column_family_id),
j_key,
j_value);
};
auto status = WriteBatchHandlerJniCallback::kv_op(key, value, merge);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
void WriteBatchHandlerJniCallback::Delete(const Slice& key) {
const jbyteArray j_key = sliceToJArray(key);
if(j_key == nullptr) {
// exception thrown
if(m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
return;
void WriteBatchHandlerJniCallback::Merge(const Slice& key, const Slice& value) {
auto merge = [this] (
jbyteArray j_key, jbyteArray j_value) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jMergeMethodId,
j_key,
j_value);
};
WriteBatchHandlerJniCallback::kv_op(key, value, merge);
}
rocksdb::Status WriteBatchHandlerJniCallback::DeleteCF(uint32_t column_family_id,
const Slice& key) {
auto remove = [this, column_family_id] (jbyteArray j_key) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jDeleteCfMethodId,
static_cast<jint>(column_family_id),
j_key);
};
auto status = WriteBatchHandlerJniCallback::k_op(key, remove);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
m_env->CallVoidMethod(
void WriteBatchHandlerJniCallback::Delete(const Slice& key) {
auto remove = [this] (jbyteArray j_key) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jDeleteMethodId,
j_key);
if(m_env->ExceptionCheck()) {
// exception thrown
m_env->ExceptionDescribe();
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
return;
};
WriteBatchHandlerJniCallback::k_op(key, remove);
}
rocksdb::Status WriteBatchHandlerJniCallback::SingleDeleteCF(uint32_t column_family_id,
const Slice& key) {
auto singleDelete = [this, column_family_id] (jbyteArray j_key) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jSingleDeleteCfMethodId,
static_cast<jint>(column_family_id),
j_key);
};
auto status = WriteBatchHandlerJniCallback::k_op(key, singleDelete);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
if(j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
void WriteBatchHandlerJniCallback::SingleDelete(const Slice& key) {
auto singleDelete = [this] (jbyteArray j_key) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jSingleDeleteMethodId,
j_key);
};
WriteBatchHandlerJniCallback::k_op(key, singleDelete);
}
rocksdb::Status WriteBatchHandlerJniCallback::DeleteRangeCF(uint32_t column_family_id,
const Slice& beginKey, const Slice& endKey) {
auto deleteRange = [this, column_family_id] (
jbyteArray j_beginKey, jbyteArray j_endKey) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jDeleteRangeCfMethodId,
static_cast<jint>(column_family_id),
j_beginKey,
j_endKey);
};
auto status = WriteBatchHandlerJniCallback::kv_op(beginKey, endKey, deleteRange);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
void WriteBatchHandlerJniCallback::DeleteRange(const Slice& beginKey,
const Slice& endKey) {
const jbyteArray j_beginKey = sliceToJArray(beginKey);
if (j_beginKey == nullptr) {
// exception thrown
if (m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
return;
}
const Slice& endKey) {
auto deleteRange = [this] (
jbyteArray j_beginKey, jbyteArray j_endKey) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jDeleteRangeMethodId,
j_beginKey,
j_endKey);
};
WriteBatchHandlerJniCallback::kv_op(beginKey, endKey, deleteRange);
}
const jbyteArray j_endKey = sliceToJArray(beginKey);
if (j_endKey == nullptr) {
// exception thrown
if (m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
return;
void WriteBatchHandlerJniCallback::LogData(const Slice& blob) {
auto logData = [this] (jbyteArray j_blob) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jLogDataMethodId,
j_blob);
};
WriteBatchHandlerJniCallback::k_op(blob, logData);
}
rocksdb::Status WriteBatchHandlerJniCallback::PutBlobIndexCF(uint32_t column_family_id,
const Slice& key, const Slice& value) {
auto putBlobIndex = [this, column_family_id] (
jbyteArray j_key, jbyteArray j_value) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jPutBlobIndexCfMethodId,
static_cast<jint>(column_family_id),
j_key,
j_value);
};
auto status = WriteBatchHandlerJniCallback::kv_op(key, value, putBlobIndex);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
rocksdb::Status WriteBatchHandlerJniCallback::MarkBeginPrepare() {
m_env->CallVoidMethod(m_jcallback_obj, m_jMarkBeginPrepareMethodId);
m_env->CallVoidMethod(m_jcallback_obj, m_jDeleteRangeMethodId,
j_beginKey, j_endKey);
// check for Exception, in-particular RocksDBException
if (m_env->ExceptionCheck()) {
// exception thrown
m_env->ExceptionDescribe();
if (j_beginKey != nullptr) {
m_env->DeleteLocalRef(j_beginKey);
}
if (j_endKey != nullptr) {
m_env->DeleteLocalRef(j_endKey);
jthrowable exception = m_env->ExceptionOccurred();
std::unique_ptr<rocksdb::Status> status = rocksdb::RocksDBExceptionJni::toCppStatus(m_env, exception);
if (status == nullptr) {
// unkown status or exception occurred extracting status
m_env->ExceptionDescribe();
return rocksdb::Status::OK(); // TODO(AR) probably need a better error code here
} else {
m_env->ExceptionClear(); // clear the exception, as we have extracted the status
return rocksdb::Status(*status);
}
return;
}
if (j_beginKey != nullptr) {
m_env->DeleteLocalRef(j_beginKey);
}
return rocksdb::Status::OK();
}
if (j_endKey != nullptr) {
m_env->DeleteLocalRef(j_endKey);
rocksdb::Status WriteBatchHandlerJniCallback::MarkEndPrepare(const Slice& xid) {
auto markEndPrepare = [this] (
jbyteArray j_xid) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jMarkEndPrepareMethodId,
j_xid);
};
auto status = WriteBatchHandlerJniCallback::k_op(xid, markEndPrepare);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
void WriteBatchHandlerJniCallback::LogData(const Slice& blob) {
const jbyteArray j_blob = sliceToJArray(blob);
if(j_blob == nullptr) {
rocksdb::Status WriteBatchHandlerJniCallback::MarkNoop(bool empty_batch) {
m_env->CallVoidMethod(m_jcallback_obj, m_jMarkNoopMethodId, static_cast<jboolean>(empty_batch));
// check for Exception, in-particular RocksDBException
if (m_env->ExceptionCheck()) {
// exception thrown
if(m_env->ExceptionCheck()) {
jthrowable exception = m_env->ExceptionOccurred();
std::unique_ptr<rocksdb::Status> status = rocksdb::RocksDBExceptionJni::toCppStatus(m_env, exception);
if (status == nullptr) {
// unkown status or exception occurred extracting status
m_env->ExceptionDescribe();
return rocksdb::Status::OK(); // TODO(AR) probably need a better error code here
} else {
m_env->ExceptionClear(); // clear the exception, as we have extracted the status
return rocksdb::Status(*status);
}
return;
}
m_env->CallVoidMethod(
return rocksdb::Status::OK();
}
rocksdb::Status WriteBatchHandlerJniCallback::MarkRollback(const Slice& xid) {
auto markRollback = [this] (
jbyteArray j_xid) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jLogDataMethodId,
j_blob);
if(m_env->ExceptionCheck()) {
// exception thrown
m_env->ExceptionDescribe();
if(j_blob != nullptr) {
m_env->DeleteLocalRef(j_blob);
}
return;
m_jMarkRollbackMethodId,
j_xid);
};
auto status = WriteBatchHandlerJniCallback::k_op(xid, markRollback);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
if(j_blob != nullptr) {
m_env->DeleteLocalRef(j_blob);
rocksdb::Status WriteBatchHandlerJniCallback::MarkCommit(const Slice& xid) {
auto markCommit = [this] (
jbyteArray j_xid) {
m_env->CallVoidMethod(
m_jcallback_obj,
m_jMarkCommitMethodId,
j_xid);
};
auto status = WriteBatchHandlerJniCallback::k_op(xid, markCommit);
if(status == nullptr) {
return rocksdb::Status::OK(); // TODO(AR) what to do if there is an Exception but we don't know the rocksdb::Status?
} else {
return rocksdb::Status(*status);
}
}
@ -256,39 +409,101 @@ bool WriteBatchHandlerJniCallback::Continue() {
return static_cast<bool>(jContinue == JNI_TRUE);
}
/*
* Creates a Java Byte Array from the data in a Slice
*
* When calling this function
* you must remember to call env->DeleteLocalRef
* on the result after you have finished with it
*
* @param s A Slice to convery to a Java byte array
*
* @return A reference to a Java byte array, or a nullptr if an
* exception occurs
*/
jbyteArray WriteBatchHandlerJniCallback::sliceToJArray(const Slice& s) {
// TODO(AR) move to JniUtil
jbyteArray ja = m_env->NewByteArray(static_cast<jsize>(s.size()));
if(ja == nullptr) {
// exception thrown: OutOfMemoryError
std::unique_ptr<rocksdb::Status> WriteBatchHandlerJniCallback::kv_op(const Slice& key, const Slice& value, std::function<void(jbyteArray, jbyteArray)> kvFn) {
const jbyteArray j_key = JniUtil::copyBytes(m_env, key);
if (j_key == nullptr) {
// exception thrown
if (m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
return nullptr;
}
m_env->SetByteArrayRegion(
ja, 0, static_cast<jsize>(s.size()),
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(s.data())));
if(m_env->ExceptionCheck()) {
if(ja != nullptr) {
m_env->DeleteLocalRef(ja);
const jbyteArray j_value = JniUtil::copyBytes(m_env, value);
if (j_value == nullptr) {
// exception thrown
if (m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
if (j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
return nullptr;
}
kvFn(j_key, j_value);
// check for Exception, in-particular RocksDBException
if (m_env->ExceptionCheck()) {
if (j_value != nullptr) {
m_env->DeleteLocalRef(j_value);
}
if (j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
// exception thrown
jthrowable exception = m_env->ExceptionOccurred();
std::unique_ptr<rocksdb::Status> status = rocksdb::RocksDBExceptionJni::toCppStatus(m_env, exception);
if (status == nullptr) {
// unkown status or exception occurred extracting status
m_env->ExceptionDescribe();
return nullptr;
} else {
m_env->ExceptionClear(); // clear the exception, as we have extracted the status
return status;
}
}
if (j_value != nullptr) {
m_env->DeleteLocalRef(j_value);
}
if (j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
// all OK
return std::unique_ptr<rocksdb::Status>(new rocksdb::Status(rocksdb::Status::OK()));
}
std::unique_ptr<rocksdb::Status> WriteBatchHandlerJniCallback::k_op(const Slice& key, std::function<void(jbyteArray)> kFn) {
const jbyteArray j_key = JniUtil::copyBytes(m_env, key);
if (j_key == nullptr) {
// exception thrown
if (m_env->ExceptionCheck()) {
m_env->ExceptionDescribe();
}
// exception thrown: ArrayIndexOutOfBoundsException
return nullptr;
}
return ja;
kFn(j_key);
// check for Exception, in-particular RocksDBException
if (m_env->ExceptionCheck()) {
if (j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
// exception thrown
jthrowable exception = m_env->ExceptionOccurred();
std::unique_ptr<rocksdb::Status> status = rocksdb::RocksDBExceptionJni::toCppStatus(m_env, exception);
if (status == nullptr) {
// unkown status or exception occurred extracting status
m_env->ExceptionDescribe();
return nullptr;
} else {
m_env->ExceptionClear(); // clear the exception, as we have extracted the status
return status;
}
}
if (j_key != nullptr) {
m_env->DeleteLocalRef(j_key);
}
// all OK
return std::unique_ptr<rocksdb::Status>(new rocksdb::Status(rocksdb::Status::OK()));
}
} // namespace rocksdb

@ -9,7 +9,9 @@
#ifndef JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
#define JAVA_ROCKSJNI_WRITEBATCHHANDLERJNICALLBACK_H_
#include <functional>
#include <jni.h>
#include <memory>
#include "rocksjni/jnicallback.h"
#include "rocksdb/write_batch.h"
@ -25,22 +27,57 @@ class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Hand
public:
WriteBatchHandlerJniCallback(
JNIEnv* env, jobject jWriteBackHandler);
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value);
void Put(const Slice& key, const Slice& value);
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value);
void Merge(const Slice& key, const Slice& value);
Status DeleteCF(uint32_t column_family_id, const Slice& key);
void Delete(const Slice& key);
Status SingleDeleteCF(uint32_t column_family_id, const Slice& key);
void SingleDelete(const Slice& key);
Status DeleteRangeCF(uint32_t column_family_id, const Slice& beginKey,
const Slice& endKey);
void DeleteRange(const Slice& beginKey, const Slice& endKey);
void LogData(const Slice& blob);
Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value);
Status MarkBeginPrepare();
Status MarkEndPrepare(const Slice& xid);
Status MarkNoop(bool empty_batch);
Status MarkRollback(const Slice& xid);
Status MarkCommit(const Slice& xid);
bool Continue();
private:
JNIEnv* m_env;
jbyteArray sliceToJArray(const Slice& s);
jmethodID m_jPutCfMethodId;
jmethodID m_jPutMethodId;
jmethodID m_jMergeCfMethodId;
jmethodID m_jMergeMethodId;
jmethodID m_jDeleteCfMethodId;
jmethodID m_jDeleteMethodId;
jmethodID m_jSingleDeleteCfMethodId;
jmethodID m_jSingleDeleteMethodId;
jmethodID m_jDeleteRangeCfMethodId;
jmethodID m_jDeleteRangeMethodId;
jmethodID m_jLogDataMethodId;
jmethodID m_jPutBlobIndexCfMethodId;
jmethodID m_jMarkBeginPrepareMethodId;
jmethodID m_jMarkEndPrepareMethodId;
jmethodID m_jMarkNoopMethodId;
jmethodID m_jMarkRollbackMethodId;
jmethodID m_jMarkCommitMethodId;
jmethodID m_jContinueMethodId;
/**
* @return A pointer to a rocksdb::Status or nullptr if an unexpected exception occurred
*/
std::unique_ptr<rocksdb::Status> kv_op(const Slice& key, const Slice& value, std::function<void(jbyteArray, jbyteArray)> kvFn);
/**
* @return A pointer to a rocksdb::Status or nullptr if an unexpected exception occurred
*/
std::unique_ptr<rocksdb::Status> k_op(const Slice& key, std::function<void(jbyteArray)> kFn);
};
} // namespace rocksdb

@ -18,52 +18,80 @@ public abstract class AbstractWriteBatch extends RocksObject
}
@Override
public void put(byte[] key, byte[] value) {
public void put(byte[] key, byte[] value) throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length);
}
@Override
public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key,
byte[] value) {
byte[] value) throws RocksDBException {
put(nativeHandle_, key, key.length, value, value.length,
columnFamilyHandle.nativeHandle_);
}
@Override
public void merge(byte[] key, byte[] value) {
public void merge(byte[] key, byte[] value) throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length);
}
@Override
public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key,
byte[] value) {
byte[] value) throws RocksDBException {
merge(nativeHandle_, key, key.length, value, value.length,
columnFamilyHandle.nativeHandle_);
}
@Override
public void remove(byte[] key) {
remove(nativeHandle_, key, key.length);
@Deprecated
public void remove(byte[] key) throws RocksDBException {
delete(nativeHandle_, key, key.length);
}
@Override
public void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key) {
remove(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_);
@Deprecated
public void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key)
throws RocksDBException {
delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_);
}
@Override
public void deleteRange(byte[] beginKey, byte[] endKey) {
public void delete(byte[] key) throws RocksDBException {
delete(nativeHandle_, key, key.length);
}
@Override
public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key)
throws RocksDBException {
delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_);
}
@Override
public void singleDelete(byte[] key) throws RocksDBException {
singleDelete(nativeHandle_, key, key.length);
}
@Override
public void singleDelete(ColumnFamilyHandle columnFamilyHandle, byte[] key)
throws RocksDBException {
singleDelete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_);
}
@Override
public void deleteRange(byte[] beginKey, byte[] endKey)
throws RocksDBException {
deleteRange(nativeHandle_, beginKey, beginKey.length, endKey, endKey.length);
}
@Override
public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey) {
public void deleteRange(ColumnFamilyHandle columnFamilyHandle,
byte[] beginKey, byte[] endKey) throws RocksDBException {
deleteRange(nativeHandle_, beginKey, beginKey.length, endKey, endKey.length,
columnFamilyHandle.nativeHandle_);
}
@Override
public void putLogData(byte[] blob) {
public void putLogData(byte[] blob) throws RocksDBException {
putLogData(nativeHandle_, blob, blob.length);
}
@ -82,38 +110,67 @@ public abstract class AbstractWriteBatch extends RocksObject
rollbackToSavePoint0(nativeHandle_);
}
@Override
public void popSavePoint() throws RocksDBException {
popSavePoint(nativeHandle_);
}
@Override
public void setMaxBytes(final long maxBytes) {
setMaxBytes(nativeHandle_, maxBytes);
}
@Override
public WriteBatch getWriteBatch() {
return getWriteBatch(nativeHandle_);
}
abstract int count0(final long handle);
abstract void put(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen);
final byte[] value, final int valueLen) throws RocksDBException;
abstract void put(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen, final long cfHandle);
final byte[] value, final int valueLen, final long cfHandle)
throws RocksDBException;
abstract void merge(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen);
final byte[] value, final int valueLen) throws RocksDBException;
abstract void merge(final long handle, final byte[] key, final int keyLen,
final byte[] value, final int valueLen, final long cfHandle);
final byte[] value, final int valueLen, final long cfHandle)
throws RocksDBException;
abstract void delete(final long handle, final byte[] key,
final int keyLen) throws RocksDBException;
abstract void delete(final long handle, final byte[] key,
final int keyLen, final long cfHandle) throws RocksDBException;
abstract void remove(final long handle, final byte[] key,
final int keyLen);
abstract void singleDelete(final long handle, final byte[] key,
final int keyLen) throws RocksDBException;
abstract void remove(final long handle, final byte[] key,
final int keyLen, final long cfHandle);
abstract void singleDelete(final long handle, final byte[] key,
final int keyLen, final long cfHandle) throws RocksDBException;
abstract void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen,
final byte[] endKey, final int endKeyLen);
final byte[] endKey, final int endKeyLen) throws RocksDBException;
abstract void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen,
final byte[] endKey, final int endKeyLen, final long cfHandle);
final byte[] endKey, final int endKeyLen, final long cfHandle) throws RocksDBException;
abstract void putLogData(final long handle, final byte[] blob,
final int blobLen);
final int blobLen) throws RocksDBException;
abstract void clear0(final long handle);
abstract void setSavePoint0(final long handle);
abstract void rollbackToSavePoint0(final long handle);
abstract void popSavePoint(final long handle) throws RocksDBException;
abstract void setMaxBytes(final long handle, long maxBytes);
abstract WriteBatch getWriteBatch(final long handle);
}

@ -87,6 +87,15 @@ public class Status {
throw new IllegalArgumentException(
"Illegal value provided for Code (" + value + ").");
}
/**
* Returns the byte value of the enumerations value.
*
* @return byte representation
*/
public byte getValue() {
return value;
}
}
// should stay in sync with /include/rocksdb/status.h:SubCode and /java/rocksjni/portal.h:toJavaStatusSubCode
@ -116,5 +125,14 @@ public class Status {
throw new IllegalArgumentException(
"Illegal value provided for SubCode (" + value + ").");
}
/**
* Returns the byte value of the enumerations value.
*
* @return byte representation
*/
public byte getValue() {
return value;
}
}
}

@ -55,10 +55,13 @@ public class WBWIRocksIterator
* that created the record in the Write Batch
*/
public enum WriteType {
PUT((byte)0x1),
MERGE((byte)0x2),
DELETE((byte)0x4),
LOG((byte)0x8);
PUT((byte)0x0),
MERGE((byte)0x1),
DELETE((byte)0x2),
SINGLE_DELETE((byte)0x3),
DELETE_RANGE((byte)0x4),
LOG((byte)0x5),
XID((byte)0x6);
final byte id;
WriteType(final byte id) {

@ -39,6 +39,14 @@ public class WriteBatch extends AbstractWriteBatch {
super(newWriteBatch(reserved_bytes));
}
/**
* Constructs a WriteBatch instance from a serialized representation
* as returned by {@link #data()}.
*/
public WriteBatch(final byte[] serialized) {
super(newWriteBatch(serialized, serialized.length));
}
/**
* Support for iterating over the contents of a batch.
*
@ -51,6 +59,134 @@ public class WriteBatch extends AbstractWriteBatch {
iterate(nativeHandle_, handler.nativeHandle_);
}
/**
* Retrieve the serialized version of this batch.
*
* @return the serialized representation of this write batch.
*/
public byte[] data() {
return data(nativeHandle_);
}
/**
* Retrieve data size of the batch.
*
* @return the serialized data size of the batch.
*/
public long getDataSize() {
return getDataSize(nativeHandle_);
}
/**
* Returns true if PutCF will be called during Iterate.
*
* Return true if PutCF will be called during Iterate.
*/
public boolean hasPut() {
return hasPut(nativeHandle_);
}
/**
* Returns true if DeleteCF will be called during Iterate.
*
* Return true if DeleteCF will be called during Iterate.
*/
public boolean hasDelete() {
return hasDelete(nativeHandle_);
}
/**
* Returns true if SingleDeleteCF will be called during Iterate.
*
* Return true if SingleDeleteCF will be called during Iterate.
*/
public boolean hasSingleDelete() {
return hasSingleDelete(nativeHandle_);
}
/**
* Returns true if DeleteRangeCF will be called during Iterate.
*
* Return true if DeleteRangeCF will be called during Iterate.
*/
public boolean hasDeleteRange() {
return hasDeleteRange(nativeHandle_);
}
/**
* Returns true if MergeCF will be called during Iterate.
*
* Return true if MergeCF will be called during Iterate.
*/
public boolean hasMerge() {
return hasMerge(nativeHandle_);
}
/**
* Returns true if MarkBeginPrepare will be called during Iterate.
*
* Return true if MarkBeginPrepare will be called during Iterate.
*/
public boolean hasBeginPrepare() {
return hasBeginPrepare(nativeHandle_);
}
/**
* Returns true if MarkEndPrepare will be called during Iterate.
*
* Return true if MarkEndPrepare will be called during Iterate.
*/
public boolean hasEndPrepare() {
return hasEndPrepare(nativeHandle_);
}
/**
* Returns true if MarkCommit will be called during Iterate.
*
* Return true if MarkCommit will be called during Iterate.
*/
public boolean hasCommit() {
return hasCommit(nativeHandle_);
}
/**
* Returns true if MarkRollback will be called during Iterate.
*
* Return true if MarkRollback will be called during Iterate.
*/
public boolean hasRollback() {
return hasRollback(nativeHandle_);
}
@Override
public WriteBatch getWriteBatch() {
return this;
}
/**
* Marks this point in the WriteBatch as the last record to
* be inserted into the WAL, provided the WAL is enabled.
*/
public void markWalTerminationPoint() {
markWalTerminationPoint(nativeHandle_);
}
/**
* Gets the WAL termination point.
*
* See {@link #markWalTerminationPoint()}
*
* @return the WAL termination point
*/
public SavePoint getWalTerminationPoint() {
return getWalTerminationPoint(nativeHandle_);
}
@Override
WriteBatch getWriteBatch(final long handle) {
return this;
}
/**
* <p>Private WriteBatch constructor which is used to construct
* WriteBatch instances from C++ side. As the reference to this
@ -87,10 +223,14 @@ public class WriteBatch extends AbstractWriteBatch {
@Override final native void merge(final long handle, final byte[] key,
final int keyLen, final byte[] value, final int valueLen,
final long cfHandle);
@Override final native void remove(final long handle, final byte[] key,
final int keyLen);
@Override final native void remove(final long handle, final byte[] key,
final int keyLen, final long cfHandle);
@Override final native void delete(final long handle, final byte[] key,
final int keyLen) throws RocksDBException;
@Override final native void delete(final long handle, final byte[] key,
final int keyLen, final long cfHandle) throws RocksDBException;
@Override final native void singleDelete(final long handle, final byte[] key,
final int keyLen) throws RocksDBException;
@Override final native void singleDelete(final long handle, final byte[] key,
final int keyLen, final long cfHandle) throws RocksDBException;
@Override
final native void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen,
final byte[] endKey, final int endKeyLen);
@ -98,15 +238,32 @@ public class WriteBatch extends AbstractWriteBatch {
final native void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen,
final byte[] endKey, final int endKeyLen, final long cfHandle);
@Override final native void putLogData(final long handle,
final byte[] blob, final int blobLen);
final byte[] blob, final int blobLen) throws RocksDBException;
@Override final native void clear0(final long handle);
@Override final native void setSavePoint0(final long handle);
@Override final native void rollbackToSavePoint0(final long handle);
@Override final native void popSavePoint(final long handle) throws RocksDBException;
@Override final native void setMaxBytes(final long nativeHandle,
final long maxBytes);
private native static long newWriteBatch(final int reserved_bytes);
private native static long newWriteBatch(final byte[] serialized,
final int serializedLength);
private native void iterate(final long handle, final long handlerHandle)
throws RocksDBException;
private native byte[] data(final long nativeHandle);
private native long getDataSize(final long nativeHandle);
private native boolean hasPut(final long nativeHandle);
private native boolean hasDelete(final long nativeHandle);
private native boolean hasSingleDelete(final long nativeHandle);
private native boolean hasDeleteRange(final long nativeHandle);
private native boolean hasMerge(final long nativeHandle);
private native boolean hasBeginPrepare(final long nativeHandle);
private native boolean hasEndPrepare(final long nativeHandle);
private native boolean hasCommit(final long nativeHandle);
private native boolean hasRollback(final long nativeHandle);
private native void markWalTerminationPoint(final long nativeHandle);
private native SavePoint getWalTerminationPoint(final long nativeHandle);
/**
* Handler callback for iterating over the contents of a batch.
@ -122,15 +279,38 @@ public class WriteBatch extends AbstractWriteBatch {
return createNewHandler0();
}
public abstract void put(byte[] key, byte[] value);
public abstract void merge(byte[] key, byte[] value);
public abstract void delete(byte[] key);
public abstract void deleteRange(byte[] beginKey, byte[] endKey);
public abstract void logData(byte[] blob);
public abstract void put(final int columnFamilyId, final byte[] key,
final byte[] value) throws RocksDBException;
public abstract void put(final byte[] key, final byte[] value);
public abstract void merge(final int columnFamilyId, final byte[] key,
final byte[] value) throws RocksDBException;
public abstract void merge(final byte[] key, final byte[] value);
public abstract void delete(final int columnFamilyId, final byte[] key)
throws RocksDBException;
public abstract void delete(final byte[] key);
public abstract void singleDelete(final int columnFamilyId,
final byte[] key) throws RocksDBException;
public abstract void singleDelete(final byte[] key);
public abstract void deleteRange(final int columnFamilyId,
final byte[] beginKey, final byte[] endKey) throws RocksDBException;
public abstract void deleteRange(final byte[] beginKey,
final byte[] endKey);
public abstract void logData(final byte[] blob);
public abstract void putBlobIndex(final int columnFamilyId,
final byte[] key, final byte[] value) throws RocksDBException;
public abstract void markBeginPrepare() throws RocksDBException;
public abstract void markEndPrepare(final byte[] xid)
throws RocksDBException;
public abstract void markNoop(final boolean emptyBatch)
throws RocksDBException;
public abstract void markRollback(final byte[] xid)
throws RocksDBException;
public abstract void markCommit(final byte[] xid)
throws RocksDBException;
/**
* shouldContinue is called by the underlying iterator
* WriteBatch::Iterate. If it returns false,
* {@link WriteBatch#iterate(Handler)}. If it returns false,
* iteration is halted. Otherwise, it continues
* iterating. The default implementation always
* returns true.
@ -144,4 +324,57 @@ public class WriteBatch extends AbstractWriteBatch {
private native long createNewHandler0();
}
/**
* A structure for describing the save point in the Write Batch.
*/
public static class SavePoint {
private long size;
private long count;
private long contentFlags;
public SavePoint(final long size, final long count,
final long contentFlags) {
this.size = size;
this.count = count;
this.contentFlags = contentFlags;
}
public void clear() {
this.size = 0;
this.count = 0;
this.contentFlags = 0;
}
/**
* Get the size of the serialized representation.
*
* @return the size of the serialized representation.
*/
public long getSize() {
return size;
}
/**
* Get the number of elements.
*
* @return the number of elements.
*/
public long getCount() {
return count;
}
/**
* Get the content flags.
*
* @return the content flags.
*/
public long getContentFlags() {
return contentFlags;
}
public boolean isCleared() {
return (size | count | contentFlags) == 0;
}
}
}

@ -24,7 +24,7 @@ public interface WriteBatchInterface {
* @param key the specified key to be inserted.
* @param value the value associated with the specified key.
*/
void put(byte[] key, byte[] value);
void put(byte[] key, byte[] value) throws RocksDBException;
/**
* <p>Store the mapping "key-&gt;value" within given column
@ -36,7 +36,7 @@ public interface WriteBatchInterface {
* @param value the value associated with the specified key.
*/
void put(ColumnFamilyHandle columnFamilyHandle,
byte[] key, byte[] value);
byte[] key, byte[] value) throws RocksDBException;
/**
* <p>Merge "value" with the existing value of "key" in the database.
@ -46,7 +46,7 @@ public interface WriteBatchInterface {
* @param value the value to be merged with the current value for
* the specified key.
*/
void merge(byte[] key, byte[] value);
void merge(byte[] key, byte[] value) throws RocksDBException;
/**
* <p>Merge "value" with the existing value of "key" in given column family.
@ -58,14 +58,36 @@ public interface WriteBatchInterface {
* the specified key.
*/
void merge(ColumnFamilyHandle columnFamilyHandle,
byte[] key, byte[] value);
byte[] key, byte[] value) throws RocksDBException;
/**
* <p>If the database contains a mapping for "key", erase it. Else do nothing.</p>
*
* @param key Key to delete within database
*
* @deprecated Use {@link #delete(byte[])}
*/
@Deprecated
void remove(byte[] key) throws RocksDBException;
/**
* <p>If column family contains a mapping for "key", erase it. Else do nothing.</p>
*
* @param columnFamilyHandle {@link ColumnFamilyHandle} instance
* @param key Key to delete within database
*
* @deprecated Use {@link #delete(ColumnFamilyHandle, byte[])}
*/
@Deprecated
void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key)
throws RocksDBException;
/**
* <p>If the database contains a mapping for "key", erase it. Else do nothing.</p>
*
* @param key Key to delete within database
*/
void remove(byte[] key);
void delete(byte[] key) throws RocksDBException;
/**
* <p>If column family contains a mapping for "key", erase it. Else do nothing.</p>
@ -73,7 +95,58 @@ public interface WriteBatchInterface {
* @param columnFamilyHandle {@link ColumnFamilyHandle} instance
* @param key Key to delete within database
*/
void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key);
void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key)
throws RocksDBException;
/**
* Remove the database entry for {@code key}. Requires that the key exists
* and was not overwritten. It is not an error if the key did not exist
* in the database.
*
* If a key is overwritten (by calling {@link #put(byte[], byte[])} multiple
* times), then the result of calling SingleDelete() on this key is undefined.
* SingleDelete() only behaves correctly if there has been only one Put()
* for this key since the previous call to SingleDelete() for this key.
*
* This feature is currently an experimental performance optimization
* for a very specific workload. It is up to the caller to ensure that
* SingleDelete is only used for a key that is not deleted using Delete() or
* written using Merge(). Mixing SingleDelete operations with Deletes and
* Merges can result in undefined behavior.
*
* @param key Key to delete within database
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
@Experimental("Performance optimization for a very specific workload")
void singleDelete(final byte[] key) throws RocksDBException;
/**
* Remove the database entry for {@code key}. Requires that the key exists
* and was not overwritten. It is not an error if the key did not exist
* in the database.
*
* If a key is overwritten (by calling {@link #put(byte[], byte[])} multiple
* times), then the result of calling SingleDelete() on this key is undefined.
* SingleDelete() only behaves correctly if there has been only one Put()
* for this key since the previous call to SingleDelete() for this key.
*
* This feature is currently an experimental performance optimization
* for a very specific workload. It is up to the caller to ensure that
* SingleDelete is only used for a key that is not deleted using Delete() or
* written using Merge(). Mixing SingleDelete operations with Deletes and
* Merges can result in undefined behavior.
*
* @param columnFamilyHandle The column family to delete the key from
* @param key Key to delete within database
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
@Experimental("Performance optimization for a very specific workload")
void singleDelete(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key) throws RocksDBException;
/**
* Removes the database entries in the range ["beginKey", "endKey"), i.e.,
@ -89,7 +162,7 @@ public interface WriteBatchInterface {
* @param endKey
* Last key to delete within database (excluded)
*/
void deleteRange(byte[] beginKey, byte[] endKey);
void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDBException;
/**
* Removes the database entries in the range ["beginKey", "endKey"), i.e.,
@ -106,7 +179,8 @@ public interface WriteBatchInterface {
* @param endKey
* Last key to delete within database (excluded)
*/
void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey);
void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey,
byte[] endKey) throws RocksDBException;
/**
* Append a blob of arbitrary size to the records in this batch. The blob will
@ -122,7 +196,7 @@ public interface WriteBatchInterface {
*
* @param blob binary object to be inserted
*/
void putLogData(byte[] blob);
void putLogData(byte[] blob) throws RocksDBException;
/**
* Clear all updates buffered in this batch
@ -143,4 +217,30 @@ public interface WriteBatchInterface {
* @throws RocksDBException if there is no previous call to SetSavePoint()
*/
void rollbackToSavePoint() throws RocksDBException;
/**
* Pop the most recent save point.
*
* That is to say that it removes the last save point,
* which was set by {@link #setSavePoint()}.
*
* @throws RocksDBException If there is no previous call to
* {@link #setSavePoint()}, an exception with
* {@link Status.Code#NotFound} will be thrown.
*/
void popSavePoint() throws RocksDBException;
/**
* Set the maximum size of the write batch.
*
* @param maxBytes the maximum size in bytes.
*/
void setMaxBytes(long maxBytes);
/**
* Get the underlying Write Batch.
*
* @return the underlying WriteBatch.
*/
WriteBatch getWriteBatch();
}

@ -256,10 +256,14 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
@Override final native void merge(final long handle, final byte[] key,
final int keyLen, final byte[] value, final int valueLen,
final long cfHandle);
@Override final native void remove(final long handle, final byte[] key,
final int keyLen);
@Override final native void remove(final long handle, final byte[] key,
final int keyLen, final long cfHandle);
@Override final native void delete(final long handle, final byte[] key,
final int keyLen) throws RocksDBException;
@Override final native void delete(final long handle, final byte[] key,
final int keyLen, final long cfHandle) throws RocksDBException;
@Override final native void singleDelete(final long handle, final byte[] key,
final int keyLen) throws RocksDBException;
@Override final native void singleDelete(final long handle, final byte[] key,
final int keyLen, final long cfHandle) throws RocksDBException;
@Override
final native void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen,
final byte[] endKey, final int endKeyLen);
@ -267,10 +271,14 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
final native void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen,
final byte[] endKey, final int endKeyLen, final long cfHandle);
@Override final native void putLogData(final long handle, final byte[] blob,
final int blobLen);
final int blobLen) throws RocksDBException;
@Override final native void clear0(final long handle);
@Override final native void setSavePoint0(final long handle);
@Override final native void rollbackToSavePoint0(final long handle);
@Override final native void popSavePoint(final long handle) throws RocksDBException;
@Override final native void setMaxBytes(final long nativeHandle,
final long maxBytes);
@Override final native WriteBatch getWriteBatch(final long handle);
private native static long newWriteBatchWithIndex();
private native static long newWriteBatchWithIndex(final boolean overwriteKey);

@ -5,15 +5,16 @@
package org.rocksdb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.ClassRule;
import org.junit.Test;
import org.rocksdb.util.CapturingWriteBatchHandler;
import org.rocksdb.util.CapturingWriteBatchHandler.Event;
import static org.assertj.core.api.Assertions.assertThat;
import static org.rocksdb.util.CapturingWriteBatchHandler.Action.*;
public class WriteBatchHandlerTest {
@ -22,45 +23,37 @@ public class WriteBatchHandlerTest {
new RocksMemoryResource();
@Test
public void writeBatchHandler() throws IOException, RocksDBException {
public void writeBatchHandler() throws RocksDBException {
// setup test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> testEvents = Arrays.asList(
new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k0".getBytes(), null)),
new Tuple<>(Action.PUT,
new Tuple<>("k1".getBytes(), "v1".getBytes())),
new Tuple<>(Action.PUT,
new Tuple<>("k2".getBytes(), "v2".getBytes())),
new Tuple<>(Action.PUT,
new Tuple<>("k3".getBytes(), "v3".getBytes())),
new Tuple<>(Action.LOG,
new Tuple<byte[], byte[]>(null, "log1".getBytes())),
new Tuple<>(Action.MERGE,
new Tuple<>("k2".getBytes(), "v22".getBytes())),
new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>("k3".getBytes(), null))
final List<Event> testEvents = Arrays.asList(
new Event(DELETE, "k0".getBytes(), null),
new Event(PUT, "k1".getBytes(), "v1".getBytes()),
new Event(PUT, "k2".getBytes(), "v2".getBytes()),
new Event(PUT, "k3".getBytes(), "v3".getBytes()),
new Event(LOG, null, "log1".getBytes()),
new Event(MERGE, "k2".getBytes(), "v22".getBytes()),
new Event(DELETE, "k3".getBytes(), null)
);
// load test data to the write batch
try (final WriteBatch batch = new WriteBatch()) {
for (final Tuple<Action, Tuple<byte[], byte[]>> testEvent : testEvents) {
final Tuple<byte[], byte[]> data = testEvent.value;
switch (testEvent.key) {
for (final Event testEvent : testEvents) {
switch (testEvent.action) {
case PUT:
batch.put(data.key, data.value);
batch.put(testEvent.key, testEvent.value);
break;
case MERGE:
batch.merge(data.key, data.value);
batch.merge(testEvent.key, testEvent.value);
break;
case DELETE:
batch.remove(data.key);
batch.remove(testEvent.key);
break;
case LOG:
batch.putLogData(data.value);
batch.putLogData(testEvent.value);
break;
}
}
@ -72,98 +65,12 @@ public class WriteBatchHandlerTest {
batch.iterate(handler);
// compare the results to the test data
final List<Tuple<Action, Tuple<byte[], byte[]>>> actualEvents =
final List<Event> actualEvents =
handler.getEvents();
assertThat(testEvents.size()).isSameAs(actualEvents.size());
for (int i = 0; i < testEvents.size(); i++) {
assertThat(equals(testEvents.get(i), actualEvents.get(i))).isTrue();
}
assertThat(testEvents).isEqualTo(actualEvents);
}
}
}
private static boolean equals(
final Tuple<Action, Tuple<byte[], byte[]>> expected,
final Tuple<Action, Tuple<byte[], byte[]>> actual) {
if (!expected.key.equals(actual.key)) {
return false;
}
final Tuple<byte[], byte[]> expectedData = expected.value;
final Tuple<byte[], byte[]> actualData = actual.value;
return equals(expectedData.key, actualData.key)
&& equals(expectedData.value, actualData.value);
}
private static boolean equals(byte[] expected, byte[] actual) {
if (expected != null) {
return Arrays.equals(expected, actual);
} else {
return actual == null;
}
}
private static class Tuple<K, V> {
public final K key;
public final V value;
public Tuple(final K key, final V value) {
this.key = key;
this.value = value;
}
}
/**
* Enumeration of Write Batch
* event actions
*/
private enum Action { PUT, MERGE, DELETE, DELETE_RANGE, LOG }
/**
* A simple WriteBatch Handler which adds a record
* of each event that it receives to a list
*/
private static class CapturingWriteBatchHandler extends WriteBatch.Handler {
private final List<Tuple<Action, Tuple<byte[], byte[]>>> events
= new ArrayList<>();
/**
* Returns a copy of the current events list
*
* @return a list of the events which have happened upto now
*/
public List<Tuple<Action, Tuple<byte[], byte[]>>> getEvents() {
return new ArrayList<>(events);
}
@Override
public void put(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.PUT, new Tuple<>(key, value)));
}
@Override
public void merge(final byte[] key, final byte[] value) {
events.add(new Tuple<>(Action.MERGE, new Tuple<>(key, value)));
}
@Override
public void delete(final byte[] key) {
events.add(new Tuple<>(Action.DELETE,
new Tuple<byte[], byte[]>(key, null)));
}
@Override
public void deleteRange(final byte[] beginKey, final byte[] endKey) {
events.add(new Tuple<>(Action.DELETE_RANGE, new Tuple<byte[], byte[]>(beginKey, endKey)));
}
@Override
public void logData(final byte[] blob) {
events.add(new Tuple<>(Action.LOG,
new Tuple<byte[], byte[]>(null, blob)));
}
}
}

@ -12,20 +12,17 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import org.rocksdb.util.CapturingWriteBatchHandler;
import org.rocksdb.util.CapturingWriteBatchHandler.Event;
import org.rocksdb.util.WriteBatchGetter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.rocksdb.util.CapturingWriteBatchHandler.Action.*;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* This class mimics the db/write_batch_test.cc
* in the c++ rocksdb library.
* <p/>
* Not ported yet:
* <p/>
* Continue();
* PutGatherSlices();
*/
public class WriteBatchTest {
@ClassRule
@ -44,27 +41,45 @@ public class WriteBatchTest {
@Test
public void multipleBatchOperations()
throws UnsupportedEncodingException {
try (WriteBatch batch = new WriteBatch()) {
batch.put("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII"));
batch.remove("box".getBytes("US-ASCII"));
batch.put("baz".getBytes("US-ASCII"), "boo".getBytes("US-ASCII"));
WriteBatchTestInternalHelper.setSequence(batch, 100);
assertThat(WriteBatchTestInternalHelper.sequence(batch)).
isNotNull().
isEqualTo(100);
assertThat(batch.count()).isEqualTo(3);
assertThat(new String(getContents(batch), "US-ASCII")).
isEqualTo("Put(baz, boo)@102" +
"Delete(box)@101" +
"Put(foo, bar)@100");
throws RocksDBException {
final byte[] foo = "foo".getBytes(UTF_8);
final byte[] bar = "bar".getBytes(UTF_8);
final byte[] box = "box".getBytes(UTF_8);
final byte[] baz = "baz".getBytes(UTF_8);
final byte[] boo = "boo".getBytes(UTF_8);
final byte[] hoo = "hoo".getBytes(UTF_8);
final byte[] hello = "hello".getBytes(UTF_8);
try (final WriteBatch batch = new WriteBatch()) {
batch.put(foo, bar);
batch.delete(box);
batch.put(baz, boo);
batch.merge(baz, hoo);
batch.singleDelete(foo);
batch.deleteRange(baz, foo);
batch.putLogData(hello);
try(final CapturingWriteBatchHandler handler =
new CapturingWriteBatchHandler()) {
batch.iterate(handler);
assertThat(handler.getEvents().size()).isEqualTo(7);
assertThat(handler.getEvents().get(0)).isEqualTo(new Event(PUT, foo, bar));
assertThat(handler.getEvents().get(1)).isEqualTo(new Event(DELETE, box, null));
assertThat(handler.getEvents().get(2)).isEqualTo(new Event(PUT, baz, boo));
assertThat(handler.getEvents().get(3)).isEqualTo(new Event(MERGE, baz, hoo));
assertThat(handler.getEvents().get(4)).isEqualTo(new Event(SINGLE_DELETE, foo, null));
assertThat(handler.getEvents().get(5)).isEqualTo(new Event(DELETE_RANGE, baz, foo));
assertThat(handler.getEvents().get(6)).isEqualTo(new Event(LOG, null, hello));
}
}
}
@Test
public void testAppendOperation()
throws UnsupportedEncodingException {
throws RocksDBException {
try (final WriteBatch b1 = new WriteBatch();
final WriteBatch b2 = new WriteBatch()) {
WriteBatchTestInternalHelper.setSequence(b1, 200);
@ -72,67 +87,66 @@ public class WriteBatchTest {
WriteBatchTestInternalHelper.append(b1, b2);
assertThat(getContents(b1).length).isEqualTo(0);
assertThat(b1.count()).isEqualTo(0);
b2.put("a".getBytes("US-ASCII"), "va".getBytes("US-ASCII"));
b2.put("a".getBytes(UTF_8), "va".getBytes(UTF_8));
WriteBatchTestInternalHelper.append(b1, b2);
assertThat("Put(a, va)@200".equals(new String(getContents(b1),
"US-ASCII")));
UTF_8)));
assertThat(b1.count()).isEqualTo(1);
b2.clear();
b2.put("b".getBytes("US-ASCII"), "vb".getBytes("US-ASCII"));
b2.put("b".getBytes(UTF_8), "vb".getBytes(UTF_8));
WriteBatchTestInternalHelper.append(b1, b2);
assertThat(("Put(a, va)@200" +
"Put(b, vb)@201")
.equals(new String(getContents(b1), "US-ASCII")));
.equals(new String(getContents(b1), UTF_8)));
assertThat(b1.count()).isEqualTo(2);
b2.remove("foo".getBytes("US-ASCII"));
b2.delete("foo".getBytes(UTF_8));
WriteBatchTestInternalHelper.append(b1, b2);
assertThat(("Put(a, va)@200" +
"Put(b, vb)@202" +
"Put(b, vb)@201" +
"Delete(foo)@203")
.equals(new String(getContents(b1), "US-ASCII")));
.equals(new String(getContents(b1), UTF_8)));
assertThat(b1.count()).isEqualTo(4);
}
}
@Test
public void blobOperation()
throws UnsupportedEncodingException {
throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.put("k1".getBytes("US-ASCII"), "v1".getBytes("US-ASCII"));
batch.put("k2".getBytes("US-ASCII"), "v2".getBytes("US-ASCII"));
batch.put("k3".getBytes("US-ASCII"), "v3".getBytes("US-ASCII"));
batch.putLogData("blob1".getBytes("US-ASCII"));
batch.remove("k2".getBytes("US-ASCII"));
batch.putLogData("blob2".getBytes("US-ASCII"));
batch.merge("foo".getBytes("US-ASCII"), "bar".getBytes("US-ASCII"));
batch.put("k1".getBytes(UTF_8), "v1".getBytes(UTF_8));
batch.put("k2".getBytes(UTF_8), "v2".getBytes(UTF_8));
batch.put("k3".getBytes(UTF_8), "v3".getBytes(UTF_8));
batch.putLogData("blob1".getBytes(UTF_8));
batch.delete("k2".getBytes(UTF_8));
batch.putLogData("blob2".getBytes(UTF_8));
batch.merge("foo".getBytes(UTF_8), "bar".getBytes(UTF_8));
assertThat(batch.count()).isEqualTo(5);
assertThat(("Merge(foo, bar)@4" +
"Put(k1, v1)@0" +
"Delete(k2)@3" +
"Put(k2, v2)@1" +
"Put(k3, v3)@2")
.equals(new String(getContents(batch), "US-ASCII")));
.equals(new String(getContents(batch), UTF_8)));
}
}
@Test
public void savePoints()
throws UnsupportedEncodingException, RocksDBException {
throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.put("k1".getBytes("US-ASCII"), "v1".getBytes("US-ASCII"));
batch.put("k2".getBytes("US-ASCII"), "v2".getBytes("US-ASCII"));
batch.put("k3".getBytes("US-ASCII"), "v3".getBytes("US-ASCII"));
batch.put("k1".getBytes(UTF_8), "v1".getBytes(UTF_8));
batch.put("k2".getBytes(UTF_8), "v2".getBytes(UTF_8));
batch.put("k3".getBytes(UTF_8), "v3".getBytes(UTF_8));
assertThat(getFromWriteBatch(batch, "k1")).isEqualTo("v1");
assertThat(getFromWriteBatch(batch, "k2")).isEqualTo("v2");
assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3");
batch.setSavePoint();
batch.remove("k2".getBytes("US-ASCII"));
batch.put("k3".getBytes("US-ASCII"), "v3-2".getBytes("US-ASCII"));
batch.delete("k2".getBytes(UTF_8));
batch.put("k3".getBytes(UTF_8), "v3-2".getBytes(UTF_8));
assertThat(getFromWriteBatch(batch, "k2")).isNull();
assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3-2");
@ -140,8 +154,8 @@ public class WriteBatchTest {
batch.setSavePoint();
batch.put("k3".getBytes("US-ASCII"), "v3-3".getBytes("US-ASCII"));
batch.put("k4".getBytes("US-ASCII"), "v4".getBytes("US-ASCII"));
batch.put("k3".getBytes(UTF_8), "v3-3".getBytes(UTF_8));
batch.put("k4".getBytes(UTF_8), "v4".getBytes(UTF_8));
assertThat(getFromWriteBatch(batch, "k3")).isEqualTo("v3-3");
assertThat(getFromWriteBatch(batch, "k4")).isEqualTo("v4");
@ -187,6 +201,30 @@ public class WriteBatchTest {
}
}
@Test
public void restorePoints() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.put("k1".getBytes(), "v1".getBytes());
batch.put("k2".getBytes(), "v2".getBytes());
batch.setSavePoint();
batch.put("k1".getBytes(), "123456789".getBytes());
batch.delete("k2".getBytes());
batch.rollbackToSavePoint();
try(final CapturingWriteBatchHandler handler = new CapturingWriteBatchHandler()) {
batch.iterate(handler);
assertThat(handler.getEvents().size()).isEqualTo(2);
assertThat(handler.getEvents().get(0)).isEqualTo(new Event(PUT, "k1".getBytes(), "v1".getBytes()));
assertThat(handler.getEvents().get(1)).isEqualTo(new Event(PUT, "k2".getBytes(), "v2".getBytes()));
}
}
}
@Test(expected = RocksDBException.class)
public void restorePoints_withoutSavePoints() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
@ -206,67 +244,222 @@ public class WriteBatchTest {
}
}
static byte[] getContents(final WriteBatch wb) {
return getContents(wb.nativeHandle_);
@Test
public void popSavePoint() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.put("k1".getBytes(), "v1".getBytes());
batch.put("k2".getBytes(), "v2".getBytes());
batch.setSavePoint();
batch.put("k1".getBytes(), "123456789".getBytes());
batch.delete("k2".getBytes());
batch.setSavePoint();
batch.popSavePoint();
batch.rollbackToSavePoint();
try(final CapturingWriteBatchHandler handler = new CapturingWriteBatchHandler()) {
batch.iterate(handler);
assertThat(handler.getEvents().size()).isEqualTo(2);
assertThat(handler.getEvents().get(0)).isEqualTo(new Event(PUT, "k1".getBytes(), "v1".getBytes()));
assertThat(handler.getEvents().get(1)).isEqualTo(new Event(PUT, "k2".getBytes(), "v2".getBytes()));
}
}
}
static String getFromWriteBatch(final WriteBatch wb, final String key)
throws RocksDBException, UnsupportedEncodingException {
final WriteBatchGetter getter =
new WriteBatchGetter(key.getBytes("US-ASCII"));
wb.iterate(getter);
if(getter.getValue() != null) {
return new String(getter.getValue(), "US-ASCII");
} else {
return null;
@Test(expected = RocksDBException.class)
public void popSavePoint_withoutSavePoints() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.popSavePoint();
}
}
private static native byte[] getContents(final long writeBatchHandle);
@Test(expected = RocksDBException.class)
public void popSavePoint_withoutSavePoints_nested() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
private static class WriteBatchGetter extends WriteBatch.Handler {
batch.setSavePoint();
batch.popSavePoint();
// without previous corresponding setSavePoint
batch.popSavePoint();
}
}
private final byte[] key;
private byte[] value;
@Test
public void maxBytes() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.setMaxBytes(19);
public WriteBatchGetter(final byte[] key) {
this.key = key;
batch.put("k1".getBytes(), "v1".getBytes());
}
}
public byte[] getValue() {
return value;
@Test(expected = RocksDBException.class)
public void maxBytes_over() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.setMaxBytes(1);
batch.put("k1".getBytes(), "v1".getBytes());
}
}
@Override
public void put(final byte[] key, final byte[] value) {
if(Arrays.equals(this.key, key)) {
this.value = value;
@Test
public void data() throws RocksDBException {
try (final WriteBatch batch1 = new WriteBatch()) {
batch1.delete("k0".getBytes());
batch1.put("k1".getBytes(), "v1".getBytes());
batch1.put("k2".getBytes(), "v2".getBytes());
batch1.put("k3".getBytes(), "v3".getBytes());
batch1.putLogData("log1".getBytes());
batch1.merge("k2".getBytes(), "v22".getBytes());
batch1.delete("k3".getBytes());
final byte[] serialized = batch1.data();
try(final WriteBatch batch2 = new WriteBatch(serialized)) {
assertThat(batch2.count()).isEqualTo(batch1.count());
try(final CapturingWriteBatchHandler handler1 = new CapturingWriteBatchHandler()) {
batch1.iterate(handler1);
try (final CapturingWriteBatchHandler handler2 = new CapturingWriteBatchHandler()) {
batch2.iterate(handler2);
assertThat(handler1.getEvents().equals(handler2.getEvents())).isTrue();
}
}
}
}
}
@Override
public void merge(final byte[] key, final byte[] value) {
if(Arrays.equals(this.key, key)) {
throw new UnsupportedOperationException();
}
@Test
public void dataSize() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
batch.put("k1".getBytes(), "v1".getBytes());
assertThat(batch.getDataSize()).isEqualTo(19);
}
}
@Override
public void delete(final byte[] key) {
if(Arrays.equals(this.key, key)) {
this.value = null;
}
@Test
public void hasPut() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasPut()).isFalse();
batch.put("k1".getBytes(), "v1".getBytes());
assertThat(batch.hasPut()).isTrue();
}
}
@Test
public void hasDelete() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasDelete()).isFalse();
batch.delete("k1".getBytes());
assertThat(batch.hasDelete()).isTrue();
}
}
@Test
public void hasSingleDelete() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasSingleDelete()).isFalse();
batch.singleDelete("k1".getBytes());
assertThat(batch.hasSingleDelete()).isTrue();
}
}
@Test
public void hasDeleteRange() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasDeleteRange()).isFalse();
batch.deleteRange("k1".getBytes(), "k2".getBytes());
assertThat(batch.hasDeleteRange()).isTrue();
}
}
@Test
public void hasBeginPrepareRange() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasBeginPrepare()).isFalse();
}
}
@Test
public void hasEndrepareRange() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasEndPrepare()).isFalse();
}
}
@Test
public void hasCommit() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasCommit()).isFalse();
}
}
@Test
public void hasRollback() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.hasRollback()).isFalse();
}
}
@Test
public void walTerminationPoint() throws RocksDBException {
try (final WriteBatch batch = new WriteBatch()) {
WriteBatch.SavePoint walTerminationPoint = batch.getWalTerminationPoint();
assertThat(walTerminationPoint.isCleared()).isTrue();
batch.put("k1".getBytes(UTF_8), "v1".getBytes(UTF_8));
batch.markWalTerminationPoint();
walTerminationPoint = batch.getWalTerminationPoint();
assertThat(walTerminationPoint.getSize()).isEqualTo(19);
assertThat(walTerminationPoint.getCount()).isEqualTo(1);
assertThat(walTerminationPoint.getContentFlags()).isEqualTo(2);
}
}
@Override
public void deleteRange(final byte[] beginKey, final byte[] endKey) {
throw new UnsupportedOperationException();
@Test
public void getWriteBatch() {
try (final WriteBatch batch = new WriteBatch()) {
assertThat(batch.getWriteBatch()).isEqualTo(batch);
}
}
static byte[] getContents(final WriteBatch wb) {
return getContents(wb.nativeHandle_);
}
@Override
public void logData(final byte[] blob) {
static String getFromWriteBatch(final WriteBatch wb, final String key)
throws RocksDBException {
final WriteBatchGetter getter =
new WriteBatchGetter(key.getBytes(UTF_8));
wb.iterate(getter);
if(getter.getValue() != null) {
return new String(getter.getValue(), UTF_8);
} else {
return null;
}
}
private static native byte[] getContents(final long writeBatchHandle);
}
/**

@ -14,11 +14,11 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static org.assertj.core.api.Assertions.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
public class WriteBatchWithIndexTest {
@ -75,8 +75,8 @@ public class WriteBatchWithIndexTest {
assertThat(it.key()).isEqualTo(k2);
assertThat(it.value()).isEqualTo(v2Other);
//remove k1 and make sure we can read back the write
wbwi.remove(k1);
//delete k1 and make sure we can read back the write
wbwi.delete(k1);
it.seek(k1);
assertThat(it.key()).isNotEqualTo(k1);
@ -87,6 +87,19 @@ public class WriteBatchWithIndexTest {
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k1);
assertThat(it.value()).isEqualTo(v1Other);
//single remove k3 and make sure we can read back the write
wbwi.singleDelete(k3);
it.seek(k3);
assertThat(it.isValid()).isEqualTo(false);
//reinsert k3 and make sure we see the new value
final byte[] v3Other = "otherValue3".getBytes();
wbwi.put(k3, v3Other);
it.seek(k3);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k3);
assertThat(it.value()).isEqualTo(v3Other);
}
}
}
@ -124,22 +137,39 @@ public class WriteBatchWithIndexTest {
final String v2 = "value2";
final String k3 = "key3";
final String v3 = "value3";
final byte[] k1b = k1.getBytes();
final byte[] v1b = v1.getBytes();
final byte[] k2b = k2.getBytes();
final byte[] v2b = v2.getBytes();
final byte[] k3b = k3.getBytes();
final byte[] v3b = v3.getBytes();
//add put records
final String k4 = "key4";
final String k5 = "key5";
final String k6 = "key6";
final String k7 = "key7";
final String v8 = "value8";
final byte[] k1b = k1.getBytes(UTF_8);
final byte[] v1b = v1.getBytes(UTF_8);
final byte[] k2b = k2.getBytes(UTF_8);
final byte[] v2b = v2.getBytes(UTF_8);
final byte[] k3b = k3.getBytes(UTF_8);
final byte[] v3b = v3.getBytes(UTF_8);
final byte[] k4b = k4.getBytes(UTF_8);
final byte[] k5b = k5.getBytes(UTF_8);
final byte[] k6b = k6.getBytes(UTF_8);
final byte[] k7b = k7.getBytes(UTF_8);
final byte[] v8b = v8.getBytes(UTF_8);
// add put records
wbwi.put(k1b, v1b);
wbwi.put(k2b, v2b);
wbwi.put(k3b, v3b);
//add a deletion record
final String k4 = "key4";
final byte[] k4b = k4.getBytes();
wbwi.remove(k4b);
// add a deletion record
wbwi.delete(k4b);
// add a single deletion record
wbwi.singleDelete(k5b);
// add a delete range record
wbwi.deleteRange(k6b, k7b);
// add a log record
wbwi.putLogData(v8b);
final WBWIRocksIterator.WriteEntry[] expected = {
new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.PUT,
@ -149,12 +179,16 @@ public class WriteBatchWithIndexTest {
new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.PUT,
new DirectSlice(k3), new DirectSlice(v3)),
new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.DELETE,
new DirectSlice(k4), DirectSlice.NONE)
new DirectSlice(k4), DirectSlice.NONE),
new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.SINGLE_DELETE,
new DirectSlice(k5), DirectSlice.NONE),
new WBWIRocksIterator.WriteEntry(WBWIRocksIterator.WriteType.DELETE_RANGE,
new DirectSlice(k6), new DirectSlice(k7)),
};
try (final WBWIRocksIterator it = wbwi.newIterator()) {
//direct access - seek to key offsets
final int[] testOffsets = {2, 0, 1, 3};
final int[] testOffsets = {2, 0, 3, 4, 1, 5};
for (int i = 0; i < testOffsets.length; i++) {
final int testOffset = testOffsets[i];
@ -164,26 +198,26 @@ public class WriteBatchWithIndexTest {
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry.equals(expected[testOffset])).isTrue();
assertThat(entry).isEqualTo(expected[testOffset]);
}
//forward iterative access
int i = 0;
for (it.seekToFirst(); it.isValid(); it.next()) {
assertThat(it.entry().equals(expected[i++])).isTrue();
assertThat(it.entry()).isEqualTo(expected[i++]);
}
//reverse iterative access
i = expected.length - 1;
for (it.seekToLast(); it.isValid(); it.prev()) {
assertThat(it.entry().equals(expected[i--])).isTrue();
assertThat(it.entry()).isEqualTo(expected[i--]);
}
}
}
}
@Test
public void zeroByteTests() {
public void zeroByteTests() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true)) {
final byte[] zeroByteValue = new byte[]{0, 0};
//add zero byte value
@ -207,8 +241,7 @@ public class WriteBatchWithIndexTest {
}
@Test
public void savePoints()
throws UnsupportedEncodingException, RocksDBException {
public void savePoints() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options,
dbFolder.getRoot().getAbsolutePath())) {
@ -228,7 +261,7 @@ public class WriteBatchWithIndexTest {
wbwi.setSavePoint();
wbwi.remove("k2".getBytes());
wbwi.delete("k2".getBytes());
wbwi.put("k3".getBytes(), "v3-2".getBytes());
assertThat(getFromWriteBatchWithIndex(db, readOptions, wbwi, "k2"))
@ -272,6 +305,27 @@ public class WriteBatchWithIndexTest {
}
}
@Test
public void restorePoints() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
wbwi.put("k1".getBytes(UTF_8), "v1".getBytes(UTF_8));
wbwi.put("k2".getBytes(UTF_8), "v2".getBytes(UTF_8));
wbwi.setSavePoint();
wbwi.put("k1".getBytes(UTF_8), "123456789".getBytes(UTF_8));
wbwi.delete("k2".getBytes(UTF_8));
wbwi.rollbackToSavePoint();
try(final DBOptions options = new DBOptions()) {
assertThat(wbwi.getFromBatch(options,"k1".getBytes(UTF_8))).isEqualTo("v1".getBytes());
assertThat(wbwi.getFromBatch(options,"k2".getBytes(UTF_8))).isEqualTo("v2".getBytes());
}
}
}
@Test(expected = RocksDBException.class)
public void restorePoints_withoutSavePoints() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
@ -291,6 +345,78 @@ public class WriteBatchWithIndexTest {
}
}
@Test
public void popSavePoint() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
wbwi.put("k1".getBytes(), "v1".getBytes());
wbwi.put("k2".getBytes(), "v2".getBytes());
wbwi.setSavePoint();
wbwi.put("k1".getBytes(), "123456789".getBytes());
wbwi.delete("k2".getBytes());
wbwi.setSavePoint();
wbwi.popSavePoint();
wbwi.rollbackToSavePoint();
try(final DBOptions options = new DBOptions()) {
assertThat(wbwi.getFromBatch(options,"k1".getBytes(UTF_8))).isEqualTo("v1".getBytes());
assertThat(wbwi.getFromBatch(options,"k2".getBytes(UTF_8))).isEqualTo("v2".getBytes());
}
}
}
@Test(expected = RocksDBException.class)
public void popSavePoint_withoutSavePoints() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
wbwi.popSavePoint();
}
}
@Test(expected = RocksDBException.class)
public void popSavePoint_withoutSavePoints_nested() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
wbwi.setSavePoint();
wbwi.popSavePoint();
// without previous corresponding setSavePoint
wbwi.popSavePoint();
}
}
@Test
public void maxBytes() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
wbwi.setMaxBytes(19);
wbwi.put("k1".getBytes(), "v1".getBytes());
}
}
@Test(expected = RocksDBException.class)
public void maxBytes_over() throws RocksDBException {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
wbwi.setMaxBytes(1);
wbwi.put("k1".getBytes(), "v1".getBytes());
}
}
@Test
public void getWriteBatch() {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) {
final WriteBatch wb = wbwi.getWriteBatch();
assertThat(wb).isNotNull();
assertThat(wb.isOwningHandle()).isFalse();
}
}
private static String getFromWriteBatchWithIndex(final RocksDB db,
final ReadOptions readOptions, final WriteBatchWithIndex wbwi,
final String skey) {
@ -329,7 +455,7 @@ public class WriteBatchWithIndexTest {
assertThat(wbwi.getFromBatch(dbOptions, k3)).isEqualTo(v3);
assertThat(wbwi.getFromBatch(dbOptions, k4)).isNull();
wbwi.remove(k2);
wbwi.delete(k2);
assertThat(wbwi.getFromBatch(dbOptions, k2)).isNull();
}
@ -372,7 +498,7 @@ public class WriteBatchWithIndexTest {
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k3)).isEqualTo(v3);
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k4)).isEqualTo(v4);
wbwi.remove(k4);
wbwi.delete(k4);
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k4)).isNull();
}

@ -0,0 +1,171 @@
package org.rocksdb.util;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
* A simple WriteBatch Handler which adds a record
* of each event that it receives to a list
*/
public class CapturingWriteBatchHandler extends WriteBatch.Handler {
private final List<Event> events = new ArrayList<>();
/**
* Returns a copy of the current events list
*
* @return a list of the events which have happened upto now
*/
public List<Event> getEvents() {
return new ArrayList<>(events);
}
@Override
public void put(final int columnFamilyId, final byte[] key,
final byte[] value) {
events.add(new Event(Action.PUT, columnFamilyId, key, value));
}
@Override
public void put(final byte[] key, final byte[] value) {
events.add(new Event(Action.PUT, key, value));
}
@Override
public void merge(final int columnFamilyId, final byte[] key,
final byte[] value) {
events.add(new Event(Action.MERGE, columnFamilyId, key, value));
}
@Override
public void merge(final byte[] key, final byte[] value) {
events.add(new Event(Action.MERGE, key, value));
}
@Override
public void delete(final int columnFamilyId, final byte[] key) {
events.add(new Event(Action.DELETE, columnFamilyId, key, (byte[])null));
}
@Override
public void delete(final byte[] key) {
events.add(new Event(Action.DELETE, key, (byte[])null));
}
@Override
public void singleDelete(final int columnFamilyId, final byte[] key) {
events.add(new Event(Action.SINGLE_DELETE,
columnFamilyId, key, (byte[])null));
}
@Override
public void singleDelete(final byte[] key) {
events.add(new Event(Action.SINGLE_DELETE, key, (byte[])null));
}
@Override
public void deleteRange(final int columnFamilyId, final byte[] beginKey,
final byte[] endKey) {
events.add(new Event(Action.DELETE_RANGE, columnFamilyId, beginKey,
endKey));
}
@Override
public void deleteRange(final byte[] beginKey, final byte[] endKey) {
events.add(new Event(Action.DELETE_RANGE, beginKey, endKey));
}
@Override
public void logData(final byte[] blob) {
events.add(new Event(Action.LOG, (byte[])null, blob));
}
@Override
public void putBlobIndex(final int columnFamilyId, final byte[] key,
final byte[] value) {
events.add(new Event(Action.PUT_BLOB_INDEX, key, value));
}
@Override
public void markBeginPrepare() throws RocksDBException {
events.add(new Event(Action.MARK_BEGIN_PREPARE, (byte[])null,
(byte[])null));
}
@Override
public void markEndPrepare(final byte[] xid) throws RocksDBException {
events.add(new Event(Action.MARK_END_PREPARE, (byte[])null,
(byte[])null));
}
@Override
public void markNoop(final boolean emptyBatch) throws RocksDBException {
events.add(new Event(Action.MARK_NOOP, (byte[])null, (byte[])null));
}
@Override
public void markRollback(final byte[] xid) throws RocksDBException {
events.add(new Event(Action.MARK_ROLLBACK, (byte[])null, (byte[])null));
}
@Override
public void markCommit(final byte[] xid) throws RocksDBException {
events.add(new Event(Action.MARK_COMMIT, (byte[])null, (byte[])null));
}
public static class Event {
public final Action action;
public final int columnFamilyId;
public final byte[] key;
public final byte[] value;
public Event(final Action action, final byte[] key, final byte[] value) {
this(action, 0, key, value);
}
public Event(final Action action, final int columnFamilyId, final byte[] key,
final byte[] value) {
this.action = action;
this.columnFamilyId = columnFamilyId;
this.key = key;
this.value = value;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Event event = (Event) o;
return columnFamilyId == event.columnFamilyId &&
action == event.action &&
((key == null && event.key == null)
|| Arrays.equals(key, event.key)) &&
((value == null && event.value == null)
|| Arrays.equals(value, event.value));
}
@Override
public int hashCode() {
return Objects.hash(action, columnFamilyId, key, value);
}
}
/**
* Enumeration of Write Batch
* event actions
*/
public enum Action {
PUT, MERGE, DELETE, SINGLE_DELETE, DELETE_RANGE, LOG, PUT_BLOB_INDEX,
MARK_BEGIN_PREPARE, MARK_END_PREPARE, MARK_NOOP, MARK_COMMIT,
MARK_ROLLBACK }
}

@ -0,0 +1,133 @@
package org.rocksdb.util;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import java.util.Arrays;
public class WriteBatchGetter extends WriteBatch.Handler {
private int columnFamilyId = -1;
private final byte[] key;
private byte[] value;
public WriteBatchGetter(final byte[] key) {
this.key = key;
}
public byte[] getValue() {
return value;
}
@Override
public void put(final int columnFamilyId, final byte[] key,
final byte[] value) {
if(Arrays.equals(this.key, key)) {
this.columnFamilyId = columnFamilyId;
this.value = value;
}
}
@Override
public void put(final byte[] key, final byte[] value) {
if(Arrays.equals(this.key, key)) {
this.value = value;
}
}
@Override
public void merge(final int columnFamilyId, final byte[] key,
final byte[] value) {
if(Arrays.equals(this.key, key)) {
this.columnFamilyId = columnFamilyId;
this.value = value;
}
}
@Override
public void merge(final byte[] key, final byte[] value) {
if(Arrays.equals(this.key, key)) {
this.value = value;
}
}
@Override
public void delete(final int columnFamilyId, final byte[] key) {
if(Arrays.equals(this.key, key)) {
this.columnFamilyId = columnFamilyId;
this.value = null;
}
}
@Override
public void delete(final byte[] key) {
if(Arrays.equals(this.key, key)) {
this.value = null;
}
}
@Override
public void singleDelete(final int columnFamilyId, final byte[] key) {
if(Arrays.equals(this.key, key)) {
this.columnFamilyId = columnFamilyId;
this.value = null;
}
}
@Override
public void singleDelete(final byte[] key) {
if(Arrays.equals(this.key, key)) {
this.value = null;
}
}
@Override
public void deleteRange(final int columnFamilyId, final byte[] beginKey,
final byte[] endKey) {
throw new UnsupportedOperationException();
}
@Override
public void deleteRange(final byte[] beginKey, final byte[] endKey) {
throw new UnsupportedOperationException();
}
@Override
public void logData(final byte[] blob) {
throw new UnsupportedOperationException();
}
@Override
public void putBlobIndex(final int columnFamilyId, final byte[] key,
final byte[] value) {
if(Arrays.equals(this.key, key)) {
this.columnFamilyId = columnFamilyId;
this.value = value;
}
}
@Override
public void markBeginPrepare() throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void markEndPrepare(final byte[] xid) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void markNoop(final boolean emptyBatch) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void markRollback(final byte[] xid) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void markCommit(final byte[] xid) throws RocksDBException {
throw new UnsupportedOperationException();
}
}
Loading…
Cancel
Save