Extend Java RocksDB iterators to support indirect Byte Buffers (#9222)

Summary:
Extend Java RocksDB iterators to support indirect byte buffers, to add to the existing support for direct byte buffers.
Code to distinguish direct/indirect buffers is switched in Java, and a 2nd separate JNI call implemented to support indirect
buffers. Indirect support passes contained buffers using byte[]

There are some Java subclasses of iterator (WBWIIterator, SstFileReaderIterator) which also now have parallel JNI support functions implemented, along with direct/indirect switches in Java methods.

Closes https://github.com/facebook/rocksdb/issues/6282

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9222

Reviewed By: ajkr

Differential Revision: D35115283

Pulled By: jay-zhuang

fbshipit-source-id: f8d5d20b975aef700560fbcc99f707bb028dc42e
main
Alan Paxton 2 years ago committed by Facebook GitHub Bot
parent 8ae0c33a7a
commit dec144f172
  1. 119
      java/rocksjni/iterator.cc
  2. 35
      java/rocksjni/portal.h
  3. 100
      java/rocksjni/sst_file_reader_iterator.cc
  4. 70
      java/rocksjni/write_batch_with_index.cc
  5. 53
      java/src/main/java/org/rocksdb/AbstractRocksIterator.java
  6. 35
      java/src/main/java/org/rocksdb/RocksIterator.java
  7. 45
      java/src/main/java/org/rocksdb/SstFileReaderIterator.java
  8. 19
      java/src/main/java/org/rocksdb/WBWIRocksIterator.java
  9. 179
      java/src/test/java/org/rocksdb/RocksIteratorTest.java
  10. 125
      java/src/test/java/org/rocksdb/SstFileReaderTest.java
  11. 335
      java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java
  12. 10
      java/src/test/java/org/rocksdb/util/ByteBufferAllocator.java
  13. 12
      java/src/test/java/org/rocksdb/util/DirectByteBufferAllocator.java
  14. 12
      java/src/test/java/org/rocksdb/util/HeapByteBufferAllocator.java

@ -6,13 +6,15 @@
// This file implements the "bridge" between Java and C++ and enables
// calling c++ ROCKSDB_NAMESPACE::Iterator methods from Java side.
#include "rocksdb/iterator.h"
#include <jni.h>
#include <stdio.h>
#include <stdlib.h>
#include <algorithm>
#include "include/org_rocksdb_RocksIterator.h"
#include "rocksdb/iterator.h"
#include "rocksjni/portal.h"
/*
@ -87,7 +89,7 @@ void Java_org_rocksdb_RocksIterator_prev0(JNIEnv* /*env*/, jobject /*jobj*/,
* Signature: (J)V
*/
void Java_org_rocksdb_RocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/,
jlong handle) {
jlong handle) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
ROCKSDB_NAMESPACE::Status s = it->Refresh();
@ -106,19 +108,31 @@ void Java_org_rocksdb_RocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/,
void Java_org_rocksdb_RocksIterator_seek0(JNIEnv* env, jobject /*jobj*/,
jlong handle, jbyteArray jtarget,
jint jtarget_len) {
jbyte* target = env->GetByteArrayElements(jtarget, nullptr);
if (target == nullptr) {
// exception thrown: OutOfMemoryError
return;
}
ROCKSDB_NAMESPACE::Slice target_slice(reinterpret_cast<char*>(target),
jtarget_len);
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
it->Seek(target_slice);
auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) {
it->Seek(target_slice);
};
ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, 0, jtarget_len);
}
env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT);
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
* In this case, the buffer offset of the key may be non-zero.
*
* Class: org_rocksdb_RocksIterator
* Method: seek0
* Signature: (J[BII)V
*/
void Java_org_rocksdb_RocksIterator_seekByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget,
jint jtarget_off, jint jtarget_len) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) {
it->Seek(target_slice);
};
ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, jtarget_off,
jtarget_len);
}
/*
@ -163,19 +177,31 @@ void Java_org_rocksdb_RocksIterator_seekForPrev0(JNIEnv* env, jobject /*jobj*/,
jlong handle,
jbyteArray jtarget,
jint jtarget_len) {
jbyte* target = env->GetByteArrayElements(jtarget, nullptr);
if (target == nullptr) {
// exception thrown: OutOfMemoryError
return;
}
ROCKSDB_NAMESPACE::Slice target_slice(reinterpret_cast<char*>(target),
jtarget_len);
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
it->SeekForPrev(target_slice);
auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) {
it->SeekForPrev(target_slice);
};
ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, 0, jtarget_len);
}
env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT);
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
* In this case, the buffer offset of the key may be non-zero.
*
* Class: org_rocksdb_RocksIterator
* Method: seek0
* Signature: (J[BII)V
*/
void Java_org_rocksdb_RocksIterator_seekForPrevByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget,
jint jtarget_off, jint jtarget_len) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) {
it->SeekForPrev(target_slice);
};
ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, jtarget_off,
jtarget_len);
}
/*
@ -231,6 +257,29 @@ jint Java_org_rocksdb_RocksIterator_keyDirect0(JNIEnv* env, jobject /*jobj*/,
jtarget_off, jtarget_len);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_RocksIterator
* Method: keyByteArray0
* Signature: (J[BII)I
*/
jint Java_org_rocksdb_RocksIterator_keyByteArray0(JNIEnv* env, jobject /*jobj*/,
jlong handle, jbyteArray jkey,
jint jkey_off,
jint jkey_len) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
ROCKSDB_NAMESPACE::Slice key_slice = it->key();
jsize copy_size = std::min(static_cast<uint32_t>(key_slice.size()),
static_cast<uint32_t>(jkey_len));
env->SetByteArrayRegion(
jkey, jkey_off, copy_size,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(key_slice.data())));
return static_cast<jsize>(key_slice.size());
}
/*
* Class: org_rocksdb_RocksIterator
* Method: value0
@ -267,3 +316,25 @@ jint Java_org_rocksdb_RocksIterator_valueDirect0(JNIEnv* env, jobject /*jobj*/,
return ROCKSDB_NAMESPACE::JniUtil::copyToDirect(env, value_slice, jtarget,
jtarget_off, jtarget_len);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_RocksIterator
* Method: valueByteArray0
* Signature: (J[BII)I
*/
jint Java_org_rocksdb_RocksIterator_valueByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jvalue_target,
jint jvalue_off, jint jvalue_len) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
ROCKSDB_NAMESPACE::Slice value_slice = it->value();
jsize copy_size = std::min(static_cast<uint32_t>(value_slice.size()),
static_cast<uint32_t>(jvalue_len));
env->SetByteArrayRegion(
jvalue_target, jvalue_off, copy_size,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(value_slice.data())));
return static_cast<jsize>(value_slice.size());
}

@ -2127,7 +2127,7 @@ class JniUtil {
std::function<ROCKSDB_NAMESPACE::Status(ROCKSDB_NAMESPACE::Slice)> op,
JNIEnv* env, jobject /*jobj*/, jbyteArray jkey, jint jkey_len) {
jbyte* key = env->GetByteArrayElements(jkey, nullptr);
if(env->ExceptionCheck()) {
if (env->ExceptionCheck()) {
// exception thrown: OutOfMemoryError
return nullptr;
}
@ -2137,7 +2137,7 @@ class JniUtil {
auto status = op(key_slice);
if(key != nullptr) {
if (key != nullptr) {
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
}
@ -2145,6 +2145,37 @@ class JniUtil {
new ROCKSDB_NAMESPACE::Status(status));
}
/*
* Helper for operations on a key which is a region of an array
* Used to extract the common code from seek/seekForPrev.
* Possible that it can be generalised from that.
*
* We use GetByteArrayRegion to copy the key region of the whole array into
* a char[] We suspect this is not much slower than GetByteArrayElements,
* which probably copies anyway.
*/
static void k_op_region(std::function<void(ROCKSDB_NAMESPACE::Slice&)> op,
JNIEnv* env, jbyteArray jkey, jint jkey_off,
jint jkey_len) {
const std::unique_ptr<char[]> key(new char[jkey_len]);
if (key == nullptr) {
jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError");
env->ThrowNew(oom_class,
"Memory allocation failed in RocksDB JNI function");
return;
}
env->GetByteArrayRegion(jkey, jkey_off, jkey_len,
reinterpret_cast<jbyte*>(key.get()));
if (env->ExceptionCheck()) {
// exception thrown: OutOfMemoryError
return;
}
ROCKSDB_NAMESPACE::Slice key_slice(reinterpret_cast<char*>(key.get()),
jkey_len);
op(key_slice);
}
/*
* Helper for operations on a value
* for example WriteBatchWithIndex->GetFromBatch

@ -206,6 +206,29 @@ jint Java_org_rocksdb_SstFileReaderIterator_keyDirect0(
jtarget_off, jtarget_len);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_SstFileReaderIterator
* Method: keyByteArray0
* Signature: (J[BII)I
*/
jint Java_org_rocksdb_SstFileReaderIterator_keyByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jkey, jint jkey_off,
jint jkey_len) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
ROCKSDB_NAMESPACE::Slice key_slice = it->key();
auto slice_size = key_slice.size();
jsize copy_size = std::min(static_cast<uint32_t>(slice_size),
static_cast<uint32_t>(jkey_len));
env->SetByteArrayRegion(
jkey, jkey_off, copy_size,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(key_slice.data())));
return static_cast<jsize>(slice_size);
}
/*
* Class: org_rocksdb_SstFileReaderIterator
* Method: valueDirect0
@ -220,6 +243,29 @@ jint Java_org_rocksdb_SstFileReaderIterator_valueDirect0(
jtarget_off, jtarget_len);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_SstFileReaderIterator
* Method: valueByteArray0
* Signature: (J[BII)I
*/
jint Java_org_rocksdb_SstFileReaderIterator_valueByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jvalue_target,
jint jvalue_off, jint jvalue_len) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
ROCKSDB_NAMESPACE::Slice value_slice = it->value();
auto slice_size = value_slice.size();
jsize copy_size = std::min(static_cast<uint32_t>(slice_size),
static_cast<uint32_t>(jvalue_len));
env->SetByteArrayRegion(
jvalue_target, jvalue_off, copy_size,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(value_slice.data())));
return static_cast<jsize>(slice_size);
}
/*
* Class: org_rocksdb_SstFileReaderIterator
* Method: seekDirect0
@ -252,6 +298,60 @@ void Java_org_rocksdb_SstFileReaderIterator_seekForPrevDirect0(
jtarget_len);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_SstFileReaderIterator
* Method: seekByteArray0
* Signature: (J[BII)V
*/
void Java_org_rocksdb_SstFileReaderIterator_seekByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget,
jint jtarget_off, jint jtarget_len) {
const std::unique_ptr<char[]> target(new char[jtarget_len]);
if (target == nullptr) {
jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError");
env->ThrowNew(oom_class,
"Memory allocation failed in RocksDB JNI function");
return;
}
env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len,
reinterpret_cast<jbyte*>(target.get()));
ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len);
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
it->Seek(target_slice);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_SstFileReaderIterator
* Method: seekForPrevByteArray0
* Signature: (J[BII)V
*/
void Java_org_rocksdb_SstFileReaderIterator_seekForPrevByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget,
jint jtarget_off, jint jtarget_len) {
const std::unique_ptr<char[]> target(new char[jtarget_len]);
if (target == nullptr) {
jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError");
env->ThrowNew(oom_class,
"Memory allocation failed in RocksDB JNI function");
return;
}
env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len,
reinterpret_cast<jbyte*>(target.get()));
ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len);
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(handle);
it->SeekForPrev(target_slice);
}
/*
* Class: org_rocksdb_SstFileReaderIterator
* Method: refresh0

@ -765,6 +765,33 @@ void Java_org_rocksdb_WBWIRocksIterator_seekDirect0(
jtarget_len);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_WBWIRocksIterator
* Method: seekByteArray0
* Signature: (J[BII)V
*/
void Java_org_rocksdb_WBWIRocksIterator_seekByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget,
jint jtarget_off, jint jtarget_len) {
const std::unique_ptr<char[]> target(new char[jtarget_len]);
if (target == nullptr) {
jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError");
env->ThrowNew(oom_class,
"Memory allocation failed in RocksDB JNI function");
return;
}
env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len,
reinterpret_cast<jbyte*>(target.get()));
ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len);
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::WBWIIterator*>(handle);
it->Seek(target_slice);
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: seekForPrev0
@ -790,6 +817,49 @@ void Java_org_rocksdb_WBWIRocksIterator_seekForPrev0(JNIEnv* env,
env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT);
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: seekForPrevDirect0
* Signature: (JLjava/nio/ByteBuffer;II)V
*/
void Java_org_rocksdb_WBWIRocksIterator_seekForPrevDirect0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget,
jint jtarget_off, jint jtarget_len) {
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::WBWIIterator*>(handle);
auto seek_for_prev = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) {
it->SeekForPrev(target_slice);
};
ROCKSDB_NAMESPACE::JniUtil::k_op_direct(seek_for_prev, env, jtarget,
jtarget_off, jtarget_len);
}
/*
* This method supports fetching into indirect byte buffers;
* the Java wrapper extracts the byte[] and passes it here.
*
* Class: org_rocksdb_WBWIRocksIterator
* Method: seekForPrevByteArray0
* Signature: (J[BII)V
*/
void Java_org_rocksdb_WBWIRocksIterator_seekForPrevByteArray0(
JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget,
jint jtarget_off, jint jtarget_len) {
const std::unique_ptr<char[]> target(new char[jtarget_len]);
if (target == nullptr) {
jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError");
env->ThrowNew(oom_class,
"Memory allocation failed in RocksDB JNI function");
return;
}
env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len,
reinterpret_cast<jbyte*>(target.get()));
ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len);
auto* it = reinterpret_cast<ROCKSDB_NAMESPACE::WBWIIterator*>(handle);
it->SeekForPrev(target_slice);
}
/*
* Class: org_rocksdb_WBWIRocksIterator
* Method: status0

@ -55,30 +55,40 @@ public abstract class AbstractRocksIterator<P extends RocksObject>
}
@Override
public void seek(byte[] target) {
public void seek(final byte[] target) {
assert (isOwningHandle());
seek0(nativeHandle_, target, target.length);
}
@Override
public void seekForPrev(byte[] target) {
assert (isOwningHandle());
seekForPrev0(nativeHandle_, target, target.length);
}
@Override
public void seek(ByteBuffer target) {
assert (isOwningHandle() && target.isDirect());
seekDirect0(nativeHandle_, target, target.position(), target.remaining());
target.position(target.limit());
}
@Override
public void seekForPrev(ByteBuffer target) {
assert (isOwningHandle() && target.isDirect());
seekForPrevDirect0(nativeHandle_, target, target.position(), target.remaining());
target.position(target.limit());
}
@Override
public void seekForPrev(final byte[] target) {
assert (isOwningHandle());
seekForPrev0(nativeHandle_, target, target.length);
}
@Override
public void seek(final ByteBuffer target) {
assert (isOwningHandle());
if (target.isDirect()) {
seekDirect0(nativeHandle_, target, target.position(), target.remaining());
} else {
seekByteArray0(nativeHandle_, target.array(), target.arrayOffset() + target.position(),
target.remaining());
}
target.position(target.limit());
}
@Override
public void seekForPrev(final ByteBuffer target) {
assert (isOwningHandle());
if (target.isDirect()) {
seekForPrevDirect0(nativeHandle_, target, target.position(), target.remaining());
} else {
seekForPrevByteArray0(nativeHandle_, target.array(), target.arrayOffset() + target.position(),
target.remaining());
}
target.position(target.limit());
}
@Override
public void next() {
@ -129,5 +139,8 @@ public abstract class AbstractRocksIterator<P extends RocksObject>
abstract void seekForPrev0(long handle, byte[] target, int targetLen);
abstract void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
abstract void seekForPrevDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
abstract void seekByteArray0(long handle, byte[] target, int targetOffset, int targetLen);
abstract void seekForPrevByteArray0(long handle, byte[] target, int targetOffset, int targetLen);
abstract void status0(long handle) throws RocksDBException;
}

@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
* @see org.rocksdb.RocksObject
*/
public class RocksIterator extends AbstractRocksIterator<RocksDB> {
protected RocksIterator(RocksDB rocksDB, long nativeHandle) {
protected RocksIterator(final RocksDB rocksDB, final long nativeHandle) {
super(rocksDB, nativeHandle);
}
@ -54,9 +54,16 @@ public class RocksIterator extends AbstractRocksIterator<RocksDB> {
* input buffer {@code key} is insufficient and partial result will
* be returned.
*/
public int key(ByteBuffer key) {
assert (isOwningHandle() && key.isDirect());
int result = keyDirect0(nativeHandle_, key, key.position(), key.remaining());
public int key(final ByteBuffer key) {
assert isOwningHandle();
final int result;
if (key.isDirect()) {
result = keyDirect0(nativeHandle_, key, key.position(), key.remaining());
} else {
assert key.hasArray();
result = keyByteArray0(
nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining());
}
key.limit(Math.min(key.position() + result, key.limit()));
return result;
}
@ -89,9 +96,16 @@ public class RocksIterator extends AbstractRocksIterator<RocksDB> {
* input buffer {@code value} is insufficient and partial result will
* be returned.
*/
public int value(ByteBuffer value) {
assert (isOwningHandle() && value.isDirect());
int result = valueDirect0(nativeHandle_, value, value.position(), value.remaining());
public int value(final ByteBuffer value) {
assert isOwningHandle();
final int result;
if (value.isDirect()) {
result = valueDirect0(nativeHandle_, value, value.position(), value.remaining());
} else {
assert value.hasArray();
result = valueByteArray0(
nativeHandle_, value.array(), value.arrayOffset() + value.position(), value.remaining());
}
value.limit(Math.min(value.position() + result, value.limit()));
return result;
}
@ -108,12 +122,19 @@ public class RocksIterator extends AbstractRocksIterator<RocksDB> {
@Override
final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
@Override
final native void seekByteArray0(long handle, byte[] target, int targetOffset, int targetLen);
@Override
final native void seekForPrevDirect0(
long handle, ByteBuffer target, int targetOffset, int targetLen);
@Override
final native void seekForPrevByteArray0(
long handle, byte[] target, int targetOffset, int targetLen);
@Override final native void status0(long handle) throws RocksDBException;
private native byte[] key0(long handle);
private native byte[] value0(long handle);
private native int keyDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen);
private native int keyByteArray0(long handle, byte[] array, int arrayOffset, int arrayLen);
private native int valueDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen);
private native int valueByteArray0(long handle, byte[] array, int arrayOffset, int arrayLen);
}

@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
* @see RocksObject
*/
public class SstFileReaderIterator extends AbstractRocksIterator<SstFileReader> {
protected SstFileReaderIterator(SstFileReader reader, long nativeHandle) {
protected SstFileReaderIterator(final SstFileReader reader, final long nativeHandle) {
super(reader, nativeHandle);
}
@ -54,9 +54,15 @@ public class SstFileReaderIterator extends AbstractRocksIterator<SstFileReader>
* input buffer {@code key} is insufficient and partial result will
* be returned.
*/
public int key(ByteBuffer key) {
assert (isOwningHandle() && key.isDirect());
int result = keyDirect0(nativeHandle_, key, key.position(), key.remaining());
public int key(final ByteBuffer key) {
assert (isOwningHandle());
final int result;
if (key.isDirect()) {
result = keyDirect0(nativeHandle_, key, key.position(), key.remaining());
} else {
result = keyByteArray0(
nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining());
}
key.limit(Math.min(key.position() + result, key.limit()));
return result;
}
@ -89,9 +95,15 @@ public class SstFileReaderIterator extends AbstractRocksIterator<SstFileReader>
* input buffer {@code value} is insufficient and partial result will
* be returned.
*/
public int value(ByteBuffer value) {
assert (isOwningHandle() && value.isDirect());
int result = valueDirect0(nativeHandle_, value, value.position(), value.remaining());
public int value(final ByteBuffer value) {
assert (isOwningHandle());
final int result;
if (value.isDirect()) {
result = valueDirect0(nativeHandle_, value, value.position(), value.remaining());
} else {
result = valueByteArray0(
nativeHandle_, value.array(), value.arrayOffset() + value.position(), value.remaining());
}
value.limit(Math.min(value.position() + result, value.limit()));
return result;
}
@ -106,16 +118,23 @@ public class SstFileReaderIterator extends AbstractRocksIterator<SstFileReader>
@Override final native void seek0(long handle, byte[] target, int targetLen);
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override final native void status0(long handle) throws RocksDBException;
@Override
final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
@Override
final native void seekForPrevDirect0(
long handle, ByteBuffer target, int targetOffset, int targetLen);
@Override
final native void seekByteArray0(
final long handle, final byte[] target, final int targetOffset, final int targetLen);
@Override
final native void seekForPrevByteArray0(
final long handle, final byte[] target, final int targetOffset, final int targetLen);
private native byte[] key0(long handle);
private native byte[] value0(long handle);
private native int keyDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen);
private native int keyByteArray0(long handle, byte[] buffer, int bufferOffset, int bufferLen);
private native int valueDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen);
@Override
final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
@Override
final native void seekForPrevDirect0(
long handle, ByteBuffer target, int targetOffset, int targetLen);
private native int valueByteArray0(long handle, byte[] buffer, int bufferOffset, int bufferLen);
}

@ -31,7 +31,7 @@ public class WBWIRocksIterator
*/
public WriteEntry entry() {
assert(isOwningHandle());
final long ptrs[] = entry1(nativeHandle_);
final long[] ptrs = entry1(nativeHandle_);
entry.type = WriteType.fromId((byte)ptrs[0]);
entry.key.resetNativeHandle(ptrs[1], ptrs[1] != 0);
@ -51,7 +51,17 @@ public class WBWIRocksIterator
@Override final native void seekForPrev0(long handle, byte[] target, int targetLen);
@Override final native void status0(long handle) throws RocksDBException;
@Override
final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen);
final native void seekDirect0(
final long handle, final ByteBuffer target, final int targetOffset, final int targetLen);
@Override
final native void seekForPrevDirect0(
final long handle, final ByteBuffer target, final int targetOffset, final int targetLen);
@Override
final native void seekByteArray0(
final long handle, final byte[] target, final int targetOffset, final int targetLen);
@Override
final native void seekForPrevByteArray0(
final long handle, final byte[] target, final int targetOffset, final int targetLen);
private native long[] entry1(final long handle);
@ -190,9 +200,4 @@ public class WBWIRocksIterator
key.close();
}
}
@Override
void seekForPrevDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen) {
throw new IllegalAccessError("Not implemented");
}
}

@ -7,6 +7,7 @@ package org.rocksdb;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@ -21,13 +22,33 @@ public class RocksIteratorTest {
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
private void validateByteBufferResult(
final int fetched, final ByteBuffer byteBuffer, final String expected) {
assertThat(fetched).isEqualTo(expected.length());
assertThat(byteBuffer.position()).isEqualTo(0);
assertThat(byteBuffer.limit()).isEqualTo(Math.min(byteBuffer.remaining(), expected.length()));
final int bufferSpace = byteBuffer.remaining();
final byte[] contents = new byte[bufferSpace];
byteBuffer.get(contents, 0, bufferSpace);
assertThat(contents).isEqualTo(
expected.substring(0, bufferSpace).getBytes(StandardCharsets.UTF_8));
}
private void validateKey(
final RocksIterator iterator, final ByteBuffer byteBuffer, final String key) {
validateByteBufferResult(iterator.key(byteBuffer), byteBuffer, key);
}
private void validateValue(
final RocksIterator iterator, final ByteBuffer byteBuffer, final String value) {
validateByteBufferResult(iterator.value(byteBuffer), byteBuffer, value);
}
@Test
public void rocksIterator() throws RocksDBException {
try (final Options options = new Options()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options,
dbFolder.getRoot().getAbsolutePath())) {
try (final Options options =
new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
db.put("key1".getBytes(), "value1".getBytes());
db.put("key2".getBytes(), "value2".getBytes());
@ -37,37 +58,20 @@ public class RocksIteratorTest {
assertThat(iterator.key()).isEqualTo("key1".getBytes());
assertThat(iterator.value()).isEqualTo("value1".getBytes());
ByteBuffer key = ByteBuffer.allocateDirect(2);
ByteBuffer value = ByteBuffer.allocateDirect(2);
assertThat(iterator.key(key)).isEqualTo(4);
assertThat(iterator.value(value)).isEqualTo(6);
assertThat(key.position()).isEqualTo(0);
assertThat(key.limit()).isEqualTo(2);
assertThat(value.position()).isEqualTo(0);
assertThat(value.limit()).isEqualTo(2);
byte[] tmp = new byte[2];
key.get(tmp);
assertThat(tmp).isEqualTo("ke".getBytes());
value.get(tmp);
assertThat(tmp).isEqualTo("va".getBytes());
key = ByteBuffer.allocateDirect(12);
value = ByteBuffer.allocateDirect(12);
assertThat(iterator.key(key)).isEqualTo(4);
assertThat(iterator.value(value)).isEqualTo(6);
assertThat(key.position()).isEqualTo(0);
assertThat(key.limit()).isEqualTo(4);
assertThat(value.position()).isEqualTo(0);
assertThat(value.limit()).isEqualTo(6);
tmp = new byte[4];
key.get(tmp);
assertThat(tmp).isEqualTo("key1".getBytes());
tmp = new byte[6];
value.get(tmp);
assertThat(tmp).isEqualTo("value1".getBytes());
validateKey(iterator, ByteBuffer.allocateDirect(2), "key1");
validateKey(iterator, ByteBuffer.allocateDirect(2), "key0");
validateKey(iterator, ByteBuffer.allocateDirect(4), "key1");
validateKey(iterator, ByteBuffer.allocateDirect(5), "key1");
validateValue(iterator, ByteBuffer.allocateDirect(2), "value2");
validateValue(iterator, ByteBuffer.allocateDirect(2), "vasicu");
validateValue(iterator, ByteBuffer.allocateDirect(8), "value1");
validateKey(iterator, ByteBuffer.allocate(2), "key1");
validateKey(iterator, ByteBuffer.allocate(2), "key0");
validateKey(iterator, ByteBuffer.allocate(4), "key1");
validateKey(iterator, ByteBuffer.allocate(5), "key1");
validateValue(iterator, ByteBuffer.allocate(2), "value1");
validateValue(iterator, ByteBuffer.allocate(8), "value1");
iterator.next();
assertThat(iterator.isValid()).isTrue();
@ -87,24 +91,85 @@ public class RocksIteratorTest {
assertThat(iterator.value()).isEqualTo("value2".getBytes());
iterator.status();
key.clear();
key.put("key1".getBytes());
key.flip();
iterator.seek(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value1".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
key.clear();
key.put("key2".getBytes());
key.flip();
iterator.seekForPrev(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value2".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
{
final ByteBuffer key = ByteBuffer.allocate(12);
key.put("key1".getBytes()).flip();
iterator.seek(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value1".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
validateValue(iterator, ByteBuffer.allocateDirect(12), "value1");
validateValue(iterator, ByteBuffer.allocateDirect(4), "valu56");
}
{
final ByteBuffer key = ByteBuffer.allocate(12);
key.put("key2".getBytes()).flip();
iterator.seekForPrev(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value2".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
}
{
final ByteBuffer key = ByteBuffer.allocate(12);
key.put("key1".getBytes()).flip();
iterator.seek(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value1".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
}
{
// Check offsets of slice byte buffers
final ByteBuffer key0 = ByteBuffer.allocate(24);
key0.put("key2key2".getBytes());
final ByteBuffer key = key0.slice();
key.put("key1".getBytes()).flip();
iterator.seek(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value1".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
}
{
// Check offsets of slice byte buffers
final ByteBuffer key0 = ByteBuffer.allocateDirect(24);
key0.put("key2key2".getBytes());
final ByteBuffer key = key0.slice();
key.put("key1".getBytes()).flip();
iterator.seek(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value1".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
}
{
final ByteBuffer key = ByteBuffer.allocate(12);
key.put("key2".getBytes()).flip();
iterator.seekForPrev(key);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.value()).isEqualTo("value2".getBytes());
assertThat(key.position()).isEqualTo(4);
assertThat(key.limit()).isEqualTo(4);
}
}
}
}
@Test
public void rocksIteratorSeekAndInsert() throws RocksDBException {
try (final Options options =
new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
db.put("key1".getBytes(), "value1".getBytes());
db.put("key2".getBytes(), "value2".getBytes());
try (final RocksIterator iterator = db.newIterator()) {
iterator.seek("key0".getBytes());
@ -192,8 +257,8 @@ public class RocksIteratorTest {
}
// Test case: release iterator after custom CF close
ColumnFamilyDescriptor cfd1 = new ColumnFamilyDescriptor("cf1".getBytes());
ColumnFamilyHandle cfHandle1 = db.createColumnFamily(cfd1);
final ColumnFamilyDescriptor cfd1 = new ColumnFamilyDescriptor("cf1".getBytes());
final ColumnFamilyHandle cfHandle1 = db.createColumnFamily(cfd1);
db.put(cfHandle1, "key1".getBytes(), "value1".getBytes());
try (final RocksIterator iterator = db.newIterator(cfHandle1)) {
@ -206,8 +271,8 @@ public class RocksIteratorTest {
}
// Test case: release iterator after custom CF drop & close
ColumnFamilyDescriptor cfd2 = new ColumnFamilyDescriptor("cf2".getBytes());
ColumnFamilyHandle cfHandle2 = db.createColumnFamily(cfd2);
final ColumnFamilyDescriptor cfd2 = new ColumnFamilyDescriptor("cf2".getBytes());
final ColumnFamilyHandle cfHandle2 = db.createColumnFamily(cfd2);
db.put(cfHandle2, "key2".getBytes(), "value2".getBytes());
try (final RocksIterator iterator = db.newIterator(cfHandle2)) {

@ -13,17 +13,21 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.util.BytewiseComparator;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.rocksdb.util.ByteBufferAllocator;
@RunWith(Parameterized.class)
public class SstFileReaderTest {
private static final String SST_FILE_NAME = "test.sst";
class KeyValueWithOp {
KeyValueWithOp(String key, String value, OpType opType) {
static class KeyValueWithOp {
KeyValueWithOp(final String key, final String value, final OpType opType) {
this.key = key;
this.value = value;
this.opType = opType;
@ -41,13 +45,23 @@ public class SstFileReaderTest {
return opType;
}
private String key;
private String value;
private OpType opType;
private final String key;
private final String value;
private final OpType opType;
}
@Rule public TemporaryFolder parentFolder = new TemporaryFolder();
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{"direct", ByteBufferAllocator.DIRECT}, {"indirect", ByteBufferAllocator.HEAP}});
}
@Parameterized.Parameter(0) public String name;
@Parameterized.Parameter(1) public ByteBufferAllocator byteBufferAllocator;
enum OpType { PUT, PUT_BYTES, MERGE, MERGE_BYTES, DELETE, DELETE_BYTES }
private File newSstFile(final List<KeyValueWithOp> keyValues)
@ -55,17 +69,17 @@ public class SstFileReaderTest {
final EnvOptions envOptions = new EnvOptions();
final StringAppendOperator stringAppendOperator = new StringAppendOperator();
final Options options = new Options().setMergeOperator(stringAppendOperator);
SstFileWriter sstFileWriter;
final SstFileWriter sstFileWriter;
sstFileWriter = new SstFileWriter(envOptions, options);
final File sstFile = parentFolder.newFile(SST_FILE_NAME);
try {
sstFileWriter.open(sstFile.getAbsolutePath());
for (KeyValueWithOp keyValue : keyValues) {
Slice keySlice = new Slice(keyValue.getKey());
Slice valueSlice = new Slice(keyValue.getValue());
byte[] keyBytes = keyValue.getKey().getBytes();
byte[] valueBytes = keyValue.getValue().getBytes();
for (final KeyValueWithOp keyValue : keyValues) {
final Slice keySlice = new Slice(keyValue.getKey());
final Slice valueSlice = new Slice(keyValue.getValue());
final byte[] keyBytes = keyValue.getKey().getBytes();
final byte[] valueBytes = keyValue.getValue().getBytes();
switch (keyValue.getOpType()) {
case PUT:
sstFileWriter.put(keySlice, valueSlice);
@ -105,6 +119,8 @@ public class SstFileReaderTest {
public void readSstFile() throws RocksDBException, IOException {
final List<KeyValueWithOp> keyValues = new ArrayList<>();
keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT));
keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT));
keyValues.add(new KeyValueWithOp("key3", "value3", OpType.PUT));
final File sstFile = newSstFile(keyValues);
try (final StringAppendOperator stringAppendOperator = new StringAppendOperator();
@ -123,33 +139,84 @@ public class SstFileReaderTest {
reader.verifyChecksum();
// Verify Table Properties
assertEquals(reader.getTableProperties().getNumEntries(), 1);
assertEquals(reader.getTableProperties().getNumEntries(), 3);
// Check key and value
assertThat(iterator.key()).isEqualTo("key1".getBytes());
assertThat(iterator.value()).isEqualTo("value1".getBytes());
ByteBuffer direct = ByteBuffer.allocateDirect(128);
direct.put("key1".getBytes()).flip();
iterator.seek(direct);
assertThat(direct.position()).isEqualTo(4);
assertThat(direct.limit()).isEqualTo(4);
final ByteBuffer byteBuffer = byteBufferAllocator.allocate(128);
byteBuffer.put("key1".getBytes()).flip();
iterator.seek(byteBuffer);
assertThat(byteBuffer.position()).isEqualTo(4);
assertThat(byteBuffer.limit()).isEqualTo(4);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());
assertThat(iterator.value()).isEqualTo("value1".getBytes());
{
byteBuffer.clear();
assertThat(iterator.key(byteBuffer)).isEqualTo("key1".getBytes().length);
final byte[] dst = new byte["key1".getBytes().length];
byteBuffer.get(dst);
assertThat(new String(dst)).isEqualTo("key1");
}
{
byteBuffer.clear();
byteBuffer.put("PREFIX".getBytes());
final ByteBuffer slice = byteBuffer.slice();
assertThat(iterator.key(byteBuffer)).isEqualTo("key1".getBytes().length);
final byte[] dst = new byte["key1".getBytes().length];
slice.get(dst);
assertThat(new String(dst)).isEqualTo("key1");
}
{
byteBuffer.clear();
assertThat(iterator.value(byteBuffer)).isEqualTo("value1".getBytes().length);
final byte[] dst = new byte["value1".getBytes().length];
byteBuffer.get(dst);
assertThat(new String(dst)).isEqualTo("value1");
}
byteBuffer.clear();
byteBuffer.put("key1point5".getBytes()).flip();
iterator.seek(byteBuffer);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());
assertThat(iterator.value()).isEqualTo("value2".getBytes());
byteBuffer.clear();
byteBuffer.put("key1point5".getBytes()).flip();
iterator.seekForPrev(byteBuffer);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());
assertThat(iterator.value()).isEqualTo("value1".getBytes());
direct.clear();
assertThat(iterator.key(direct)).isEqualTo("key1".getBytes().length);
byte[] dst = new byte["key1".getBytes().length];
direct.get(dst);
assertThat(new String(dst)).isEqualTo("key1");
direct.clear();
assertThat(iterator.value(direct)).isEqualTo("value1".getBytes().length);
dst = new byte["value1".getBytes().length];
direct.get(dst);
assertThat(new String(dst)).isEqualTo("value1");
byteBuffer.clear();
byteBuffer.put("key2point5".getBytes()).flip();
iterator.seek(byteBuffer);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key3".getBytes());
assertThat(iterator.value()).isEqualTo("value3".getBytes());
byteBuffer.clear();
byteBuffer.put("key2point5".getBytes()).flip();
iterator.seekForPrev(byteBuffer);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());
assertThat(iterator.value()).isEqualTo("value2".getBytes());
byteBuffer.clear();
byteBuffer.put("PREFIX".getBytes());
final ByteBuffer slice = byteBuffer.slice();
slice.put("key1point5".getBytes()).flip();
iterator.seekForPrev(slice);
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());
assertThat(iterator.value()).isEqualTo("value1".getBytes());
}
}
}

@ -20,6 +20,7 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.util.ByteBufferAllocator;
public class WriteBatchWithIndexTest {
@ -192,6 +193,236 @@ public class WriteBatchWithIndexTest {
}
}
@Test
public void readYourOwnWritesCfIterDirectBB() throws RocksDBException {
readYourOwnWritesCfIterDirect(ByteBufferAllocator.DIRECT);
}
@Test
public void readYourOwnWritesCfIterIndirectBB() throws RocksDBException {
readYourOwnWritesCfIterDirect(ByteBufferAllocator.HEAP);
}
public void readYourOwnWritesCfIterDirect(final ByteBufferAllocator byteBufferAllocator)
throws RocksDBException {
final List<ColumnFamilyDescriptor> cfNames =
Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
new ColumnFamilyDescriptor("new_cf".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
// Test open database with column family names
try (final DBOptions options =
new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(
options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) {
final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1);
try {
final byte[] kv1 = "key1".getBytes();
final byte[] vv1 = "value1".getBytes();
final ByteBuffer k1 = byteBufferAllocator.allocate(12);
k1.put(kv1);
final byte[] kv2 = "key2".getBytes();
final byte[] vv2 = "value2".getBytes();
final ByteBuffer k2 = byteBufferAllocator.allocate(12);
k2.put(kv2);
db.put(newCf, kv1, vv1);
db.put(newCf, kv2, vv2);
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final ReadOptions readOptions = new ReadOptions();
final RocksIterator base = db.newIterator(newCf, readOptions);
final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) {
k1.flip();
it.seek(k1);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv1);
assertThat(it.value()).isEqualTo(vv1);
k2.flip();
it.seek(k2);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv2);
assertThat(it.value()).isEqualTo(vv2);
final byte[] kv1point5 = "key1point5".getBytes();
final ByteBuffer k1point5 = byteBufferAllocator.allocate(12);
k1point5.put(kv1point5);
k1point5.flip();
it.seek(k1point5);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv2);
assertThat(it.value()).isEqualTo(vv2);
k1point5.flip();
it.seekForPrev(k1point5);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv1);
assertThat(it.value()).isEqualTo(vv1);
// put data to the write batch and make sure we can read it.
final byte[] kv3 = "key3".getBytes();
final ByteBuffer k3 = byteBufferAllocator.allocate(12);
k3.put(kv3);
final byte[] vv3 = "value3".getBytes();
wbwi.put(newCf, kv3, vv3);
k3.flip();
it.seek(k3);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv3);
assertThat(it.value()).isEqualTo(vv3);
// update k2 in the write batch and check the value
final byte[] v2Other = "otherValue2".getBytes();
wbwi.put(newCf, kv2, v2Other);
k2.flip();
it.seek(k2);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv2);
assertThat(it.value()).isEqualTo(v2Other);
// delete k1 and make sure we can read back the write
wbwi.delete(newCf, kv1);
k1.flip();
it.seek(k1);
assertThat(it.key()).isNotEqualTo(kv1);
// reinsert k1 and make sure we see the new value
final byte[] v1Other = "otherValue1".getBytes();
wbwi.put(newCf, kv1, v1Other);
k1.flip();
it.seek(k1);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv1);
assertThat(it.value()).isEqualTo(v1Other);
// single remove k3 and make sure we can read back the write
wbwi.singleDelete(newCf, kv3);
k3.flip();
it.seek(k3);
assertThat(it.isValid()).isEqualTo(false);
// reinsert k3 and make sure we see the new value
final byte[] v3Other = "otherValue3".getBytes();
wbwi.put(newCf, kv3, v3Other);
k3.flip();
it.seek(k3);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv3);
assertThat(it.value()).isEqualTo(v3Other);
}
} finally {
for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
columnFamilyHandle.close();
}
}
}
}
@Test
public void readYourOwnWritesCfIterIndirect() throws RocksDBException {
final List<ColumnFamilyDescriptor> cfNames =
Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
new ColumnFamilyDescriptor("new_cf".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
// Test open database with column family names
try (final DBOptions options =
new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(
options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) {
final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1);
try {
final byte[] kv1 = "key1".getBytes();
final byte[] vv1 = "value1".getBytes();
final ByteBuffer k1 = ByteBuffer.allocate(12);
k1.put(kv1).flip();
final byte[] kv2 = "key2".getBytes();
final byte[] vv2 = "value2".getBytes();
final ByteBuffer k2 = ByteBuffer.allocate(12);
k2.put(kv2).flip();
db.put(newCf, kv1, vv1);
db.put(newCf, kv2, vv2);
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final ReadOptions readOptions = new ReadOptions();
final RocksIterator base = db.newIterator(newCf, readOptions);
final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) {
it.seek(k1);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv1);
assertThat(it.value()).isEqualTo(vv1);
it.seek(k2);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv2);
assertThat(it.value()).isEqualTo(vv2);
// put data to the write batch and make sure we can read it.
final byte[] kv3 = "key3".getBytes();
final ByteBuffer k3 = ByteBuffer.allocate(12);
k3.put(kv3);
final byte[] vv3 = "value3".getBytes();
wbwi.put(newCf, kv3, vv3);
k3.flip();
it.seek(k3);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv3);
assertThat(it.value()).isEqualTo(vv3);
// update k2 in the write batch and check the value
final byte[] v2Other = "otherValue2".getBytes();
wbwi.put(newCf, kv2, v2Other);
k2.flip();
it.seek(k2);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv2);
assertThat(it.value()).isEqualTo(v2Other);
// delete k1 and make sure we can read back the write
wbwi.delete(newCf, kv1);
k1.flip();
it.seek(k1);
assertThat(it.key()).isNotEqualTo(kv1);
// reinsert k1 and make sure we see the new value
final byte[] v1Other = "otherValue1".getBytes();
wbwi.put(newCf, kv1, v1Other);
k1.flip();
it.seek(k1);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv1);
assertThat(it.value()).isEqualTo(v1Other);
// single remove k3 and make sure we can read back the write
wbwi.singleDelete(newCf, kv3);
k3.flip();
it.seek(k3);
assertThat(it.isValid()).isEqualTo(false);
// reinsert k3 and make sure we see the new value
final byte[] v3Other = "otherValue3".getBytes();
wbwi.put(newCf, kv3, v3Other);
k3.flip();
it.seek(k3);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(kv3);
assertThat(it.value()).isEqualTo(v3Other);
}
} finally {
for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
columnFamilyHandle.close();
}
}
}
}
@Test
public void writeBatchWithIndex() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
@ -220,10 +451,10 @@ public class WriteBatchWithIndexTest {
public void write_writeBatchWithIndexDirect() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
ByteBuffer k1 = ByteBuffer.allocateDirect(16);
ByteBuffer v1 = ByteBuffer.allocateDirect(16);
ByteBuffer k2 = ByteBuffer.allocateDirect(16);
ByteBuffer v2 = ByteBuffer.allocateDirect(16);
final ByteBuffer k1 = ByteBuffer.allocateDirect(16);
final ByteBuffer v1 = ByteBuffer.allocateDirect(16);
final ByteBuffer k2 = ByteBuffer.allocateDirect(16);
final ByteBuffer v2 = ByteBuffer.allocateDirect(16);
k1.put("key1".getBytes()).flip();
v1.put("value1".getBytes()).flip();
k2.put("key2".getBytes()).flip();
@ -258,8 +489,6 @@ public class WriteBatchWithIndexTest {
final String v3 = "value3";
final String k4 = "key4";
final String k5 = "key5";
final String k6 = "key6";
final String k7 = "key7";
final String v8 = "value8";
final byte[] k1b = k1.getBytes(UTF_8);
final byte[] v1b = v1.getBytes(UTF_8);
@ -269,10 +498,11 @@ public class WriteBatchWithIndexTest {
final byte[] v3b = v3.getBytes(UTF_8);
final byte[] k4b = k4.getBytes(UTF_8);
final byte[] k5b = k5.getBytes(UTF_8);
final byte[] k6b = k6.getBytes(UTF_8);
final byte[] k7b = k7.getBytes(UTF_8);
final byte[] v8b = v8.getBytes(UTF_8);
final String k1point5 = "key1point5";
final String k2point5 = "key2point5";
// add put records
wbwi.put(k1b, v1b);
wbwi.put(k2b, v2b);
@ -303,9 +533,7 @@ public class WriteBatchWithIndexTest {
try (final WBWIRocksIterator it = wbwi.newIterator()) {
//direct access - seek to key offsets
final int[] testOffsets = {2, 0, 3, 4, 1};
for (int i = 0; i < testOffsets.length; i++) {
final int testOffset = testOffsets[i];
for (final int testOffset : testOffsets) {
final byte[] key = toArray(expected[testOffset].getKey().data());
it.seek(key);
@ -313,13 +541,94 @@ public class WriteBatchWithIndexTest {
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[testOffset]);
}
for (final int testOffset : testOffsets) {
final byte[] key = toArray(expected[testOffset].getKey().data());
// Direct buffer seek
final ByteBuffer db = expected[testOffset].getKey().data();
it.seek(db);
assertThat(db.position()).isEqualTo(key.length);
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[testOffset]);
}
for (final int testOffset : testOffsets) {
final byte[] key = toArray(expected[testOffset].getKey().data());
// Direct buffer seek
expected[testOffset].getKey().data().mark();
ByteBuffer db = expected[testOffset].getKey().data();
final ByteBuffer db = expected[testOffset].getKey().data();
it.seekForPrev(db);
assertThat(db.position()).isEqualTo(key.length);
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[testOffset]);
}
for (final int testOffset : testOffsets) {
final byte[] key = toArray(expected[testOffset].getKey().data());
// Indirect buffer seek
final ByteBuffer db = ByteBuffer.allocate(key.length);
System.arraycopy(key, 0, db.array(), 0, key.length);
it.seek(db);
assertThat(db.position()).isEqualTo(key.length);
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[testOffset]);
}
for (final int testOffset : testOffsets) {
final byte[] key = toArray(expected[testOffset].getKey().data());
// Indirect buffer seek for prev
final ByteBuffer db = ByteBuffer.allocate(key.length);
System.arraycopy(key, 0, db.array(), 0, key.length);
it.seekForPrev(db);
assertThat(db.position()).isEqualTo(key.length);
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[testOffset]);
}
{
it.seekForPrev(k2point5.getBytes());
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[1]);
}
{
it.seekForPrev(k1point5.getBytes());
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[0]);
}
{
final ByteBuffer db = ByteBuffer.allocate(k2point5.length());
db.put(k2point5.getBytes());
db.flip();
it.seekForPrev(db);
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[1]);
}
{
final ByteBuffer db = ByteBuffer.allocate(k1point5.length());
db.put(k1point5.getBytes());
db.flip();
it.seekForPrev(db);
assertThat(it.isValid()).isTrue();
final WBWIRocksIterator.WriteEntry entry = it.entry();
assertThat(entry).isEqualTo(expected[0]);
}
//forward iterative access

@ -0,0 +1,10 @@
package org.rocksdb.util;
import java.nio.ByteBuffer;
public interface ByteBufferAllocator {
ByteBuffer allocate(int capacity);
ByteBufferAllocator DIRECT = new DirectByteBufferAllocator();
ByteBufferAllocator HEAP = new HeapByteBufferAllocator();
}

@ -0,0 +1,12 @@
package org.rocksdb.util;
import java.nio.ByteBuffer;
public final class DirectByteBufferAllocator implements ByteBufferAllocator {
DirectByteBufferAllocator(){};
@Override
public ByteBuffer allocate(final int capacity) {
return ByteBuffer.allocateDirect(capacity);
}
}

@ -0,0 +1,12 @@
package org.rocksdb.util;
import java.nio.ByteBuffer;
public final class HeapByteBufferAllocator implements ByteBufferAllocator {
HeapByteBufferAllocator(){};
@Override
public ByteBuffer allocate(final int capacity) {
return ByteBuffer.allocate(capacity);
}
}
Loading…
Cancel
Save