From dec144f172165e9ce007aa12fbeaacf748cea5c1 Mon Sep 17 00:00:00 2001 From: Alan Paxton Date: Thu, 24 Mar 2022 12:50:38 -0700 Subject: [PATCH] Extend Java RocksDB iterators to support indirect Byte Buffers (#9222) Summary: Extend Java RocksDB iterators to support indirect byte buffers, to add to the existing support for direct byte buffers. Code to distinguish direct/indirect buffers is switched in Java, and a 2nd separate JNI call implemented to support indirect buffers. Indirect support passes contained buffers using byte[] There are some Java subclasses of iterator (WBWIIterator, SstFileReaderIterator) which also now have parallel JNI support functions implemented, along with direct/indirect switches in Java methods. Closes https://github.com/facebook/rocksdb/issues/6282 Pull Request resolved: https://github.com/facebook/rocksdb/pull/9222 Reviewed By: ajkr Differential Revision: D35115283 Pulled By: jay-zhuang fbshipit-source-id: f8d5d20b975aef700560fbcc99f707bb028dc42e --- java/rocksjni/iterator.cc | 119 +++++-- java/rocksjni/portal.h | 35 +- java/rocksjni/sst_file_reader_iterator.cc | 100 ++++++ java/rocksjni/write_batch_with_index.cc | 70 ++++ .../org/rocksdb/AbstractRocksIterator.java | 53 +-- .../main/java/org/rocksdb/RocksIterator.java | 35 +- .../org/rocksdb/SstFileReaderIterator.java | 45 ++- .../java/org/rocksdb/WBWIRocksIterator.java | 19 +- .../java/org/rocksdb/RocksIteratorTest.java | 179 +++++++--- .../java/org/rocksdb/SstFileReaderTest.java | 125 +++++-- .../org/rocksdb/WriteBatchWithIndexTest.java | 335 +++++++++++++++++- .../org/rocksdb/util/ByteBufferAllocator.java | 10 + .../util/DirectByteBufferAllocator.java | 12 + .../rocksdb/util/HeapByteBufferAllocator.java | 12 + 14 files changed, 977 insertions(+), 172 deletions(-) create mode 100644 java/src/test/java/org/rocksdb/util/ByteBufferAllocator.java create mode 100644 java/src/test/java/org/rocksdb/util/DirectByteBufferAllocator.java create mode 100644 java/src/test/java/org/rocksdb/util/HeapByteBufferAllocator.java diff --git a/java/rocksjni/iterator.cc b/java/rocksjni/iterator.cc index 1660dad45..3ddb9778b 100644 --- a/java/rocksjni/iterator.cc +++ b/java/rocksjni/iterator.cc @@ -6,13 +6,15 @@ // This file implements the "bridge" between Java and C++ and enables // calling c++ ROCKSDB_NAMESPACE::Iterator methods from Java side. +#include "rocksdb/iterator.h" + #include #include #include + #include #include "include/org_rocksdb_RocksIterator.h" -#include "rocksdb/iterator.h" #include "rocksjni/portal.h" /* @@ -87,7 +89,7 @@ void Java_org_rocksdb_RocksIterator_prev0(JNIEnv* /*env*/, jobject /*jobj*/, * Signature: (J)V */ void Java_org_rocksdb_RocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/, - jlong handle) { + jlong handle) { auto* it = reinterpret_cast(handle); ROCKSDB_NAMESPACE::Status s = it->Refresh(); @@ -106,19 +108,31 @@ void Java_org_rocksdb_RocksIterator_refresh0(JNIEnv* env, jobject /*jobj*/, void Java_org_rocksdb_RocksIterator_seek0(JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, jint jtarget_len) { - jbyte* target = env->GetByteArrayElements(jtarget, nullptr); - if (target == nullptr) { - // exception thrown: OutOfMemoryError - return; - } - - ROCKSDB_NAMESPACE::Slice target_slice(reinterpret_cast(target), - jtarget_len); - auto* it = reinterpret_cast(handle); - it->Seek(target_slice); + auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) { + it->Seek(target_slice); + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, 0, jtarget_len); +} - env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT); +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * In this case, the buffer offset of the key may be non-zero. + * + * Class: org_rocksdb_RocksIterator + * Method: seek0 + * Signature: (J[BII)V + */ +void Java_org_rocksdb_RocksIterator_seekByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) { + it->Seek(target_slice); + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, jtarget_off, + jtarget_len); } /* @@ -163,19 +177,31 @@ void Java_org_rocksdb_RocksIterator_seekForPrev0(JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, jint jtarget_len) { - jbyte* target = env->GetByteArrayElements(jtarget, nullptr); - if (target == nullptr) { - // exception thrown: OutOfMemoryError - return; - } - - ROCKSDB_NAMESPACE::Slice target_slice(reinterpret_cast(target), - jtarget_len); - auto* it = reinterpret_cast(handle); - it->SeekForPrev(target_slice); + auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) { + it->SeekForPrev(target_slice); + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, 0, jtarget_len); +} - env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT); +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * In this case, the buffer offset of the key may be non-zero. + * + * Class: org_rocksdb_RocksIterator + * Method: seek0 + * Signature: (J[BII)V + */ +void Java_org_rocksdb_RocksIterator_seekForPrevByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seek = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) { + it->SeekForPrev(target_slice); + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_region(seek, env, jtarget, jtarget_off, + jtarget_len); } /* @@ -231,6 +257,29 @@ jint Java_org_rocksdb_RocksIterator_keyDirect0(JNIEnv* env, jobject /*jobj*/, jtarget_off, jtarget_len); } +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_RocksIterator + * Method: keyByteArray0 + * Signature: (J[BII)I + */ +jint Java_org_rocksdb_RocksIterator_keyByteArray0(JNIEnv* env, jobject /*jobj*/, + jlong handle, jbyteArray jkey, + jint jkey_off, + jint jkey_len) { + auto* it = reinterpret_cast(handle); + ROCKSDB_NAMESPACE::Slice key_slice = it->key(); + jsize copy_size = std::min(static_cast(key_slice.size()), + static_cast(jkey_len)); + env->SetByteArrayRegion( + jkey, jkey_off, copy_size, + const_cast(reinterpret_cast(key_slice.data()))); + + return static_cast(key_slice.size()); +} + /* * Class: org_rocksdb_RocksIterator * Method: value0 @@ -267,3 +316,25 @@ jint Java_org_rocksdb_RocksIterator_valueDirect0(JNIEnv* env, jobject /*jobj*/, return ROCKSDB_NAMESPACE::JniUtil::copyToDirect(env, value_slice, jtarget, jtarget_off, jtarget_len); } + +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_RocksIterator + * Method: valueByteArray0 + * Signature: (J[BII)I + */ +jint Java_org_rocksdb_RocksIterator_valueByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jvalue_target, + jint jvalue_off, jint jvalue_len) { + auto* it = reinterpret_cast(handle); + ROCKSDB_NAMESPACE::Slice value_slice = it->value(); + jsize copy_size = std::min(static_cast(value_slice.size()), + static_cast(jvalue_len)); + env->SetByteArrayRegion( + jvalue_target, jvalue_off, copy_size, + const_cast(reinterpret_cast(value_slice.data()))); + + return static_cast(value_slice.size()); +} diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index d14c95b64..f030c947e 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -2127,7 +2127,7 @@ class JniUtil { std::function op, JNIEnv* env, jobject /*jobj*/, jbyteArray jkey, jint jkey_len) { jbyte* key = env->GetByteArrayElements(jkey, nullptr); - if(env->ExceptionCheck()) { + if (env->ExceptionCheck()) { // exception thrown: OutOfMemoryError return nullptr; } @@ -2137,7 +2137,7 @@ class JniUtil { auto status = op(key_slice); - if(key != nullptr) { + if (key != nullptr) { env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); } @@ -2145,6 +2145,37 @@ class JniUtil { new ROCKSDB_NAMESPACE::Status(status)); } + /* + * Helper for operations on a key which is a region of an array + * Used to extract the common code from seek/seekForPrev. + * Possible that it can be generalised from that. + * + * We use GetByteArrayRegion to copy the key region of the whole array into + * a char[] We suspect this is not much slower than GetByteArrayElements, + * which probably copies anyway. + */ + static void k_op_region(std::function op, + JNIEnv* env, jbyteArray jkey, jint jkey_off, + jint jkey_len) { + const std::unique_ptr key(new char[jkey_len]); + if (key == nullptr) { + jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError"); + env->ThrowNew(oom_class, + "Memory allocation failed in RocksDB JNI function"); + return; + } + env->GetByteArrayRegion(jkey, jkey_off, jkey_len, + reinterpret_cast(key.get())); + if (env->ExceptionCheck()) { + // exception thrown: OutOfMemoryError + return; + } + + ROCKSDB_NAMESPACE::Slice key_slice(reinterpret_cast(key.get()), + jkey_len); + op(key_slice); + } + /* * Helper for operations on a value * for example WriteBatchWithIndex->GetFromBatch diff --git a/java/rocksjni/sst_file_reader_iterator.cc b/java/rocksjni/sst_file_reader_iterator.cc index 6e88ec5bd..ac92285b6 100644 --- a/java/rocksjni/sst_file_reader_iterator.cc +++ b/java/rocksjni/sst_file_reader_iterator.cc @@ -206,6 +206,29 @@ jint Java_org_rocksdb_SstFileReaderIterator_keyDirect0( jtarget_off, jtarget_len); } +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_SstFileReaderIterator + * Method: keyByteArray0 + * Signature: (J[BII)I + */ +jint Java_org_rocksdb_SstFileReaderIterator_keyByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jkey, jint jkey_off, + jint jkey_len) { + auto* it = reinterpret_cast(handle); + ROCKSDB_NAMESPACE::Slice key_slice = it->key(); + auto slice_size = key_slice.size(); + jsize copy_size = std::min(static_cast(slice_size), + static_cast(jkey_len)); + env->SetByteArrayRegion( + jkey, jkey_off, copy_size, + const_cast(reinterpret_cast(key_slice.data()))); + + return static_cast(slice_size); +} + /* * Class: org_rocksdb_SstFileReaderIterator * Method: valueDirect0 @@ -220,6 +243,29 @@ jint Java_org_rocksdb_SstFileReaderIterator_valueDirect0( jtarget_off, jtarget_len); } +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_SstFileReaderIterator + * Method: valueByteArray0 + * Signature: (J[BII)I + */ +jint Java_org_rocksdb_SstFileReaderIterator_valueByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jvalue_target, + jint jvalue_off, jint jvalue_len) { + auto* it = reinterpret_cast(handle); + ROCKSDB_NAMESPACE::Slice value_slice = it->value(); + auto slice_size = value_slice.size(); + jsize copy_size = std::min(static_cast(slice_size), + static_cast(jvalue_len)); + env->SetByteArrayRegion( + jvalue_target, jvalue_off, copy_size, + const_cast(reinterpret_cast(value_slice.data()))); + + return static_cast(slice_size); +} + /* * Class: org_rocksdb_SstFileReaderIterator * Method: seekDirect0 @@ -252,6 +298,60 @@ void Java_org_rocksdb_SstFileReaderIterator_seekForPrevDirect0( jtarget_len); } +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_SstFileReaderIterator + * Method: seekByteArray0 + * Signature: (J[BII)V + */ +void Java_org_rocksdb_SstFileReaderIterator_seekByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, + jint jtarget_off, jint jtarget_len) { + const std::unique_ptr target(new char[jtarget_len]); + if (target == nullptr) { + jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError"); + env->ThrowNew(oom_class, + "Memory allocation failed in RocksDB JNI function"); + return; + } + env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len, + reinterpret_cast(target.get())); + + ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len); + + auto* it = reinterpret_cast(handle); + it->Seek(target_slice); +} + +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_SstFileReaderIterator + * Method: seekForPrevByteArray0 + * Signature: (J[BII)V + */ +void Java_org_rocksdb_SstFileReaderIterator_seekForPrevByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, + jint jtarget_off, jint jtarget_len) { + const std::unique_ptr target(new char[jtarget_len]); + if (target == nullptr) { + jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError"); + env->ThrowNew(oom_class, + "Memory allocation failed in RocksDB JNI function"); + return; + } + env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len, + reinterpret_cast(target.get())); + + ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len); + + auto* it = reinterpret_cast(handle); + it->SeekForPrev(target_slice); +} + /* * Class: org_rocksdb_SstFileReaderIterator * Method: refresh0 diff --git a/java/rocksjni/write_batch_with_index.cc b/java/rocksjni/write_batch_with_index.cc index 45834264f..615921b65 100644 --- a/java/rocksjni/write_batch_with_index.cc +++ b/java/rocksjni/write_batch_with_index.cc @@ -765,6 +765,33 @@ void Java_org_rocksdb_WBWIRocksIterator_seekDirect0( jtarget_len); } +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_WBWIRocksIterator + * Method: seekByteArray0 + * Signature: (J[BII)V + */ +void Java_org_rocksdb_WBWIRocksIterator_seekByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, + jint jtarget_off, jint jtarget_len) { + const std::unique_ptr target(new char[jtarget_len]); + if (target == nullptr) { + jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError"); + env->ThrowNew(oom_class, + "Memory allocation failed in RocksDB JNI function"); + return; + } + env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len, + reinterpret_cast(target.get())); + + ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len); + + auto* it = reinterpret_cast(handle); + it->Seek(target_slice); +} + /* * Class: org_rocksdb_WBWIRocksIterator * Method: seekForPrev0 @@ -790,6 +817,49 @@ void Java_org_rocksdb_WBWIRocksIterator_seekForPrev0(JNIEnv* env, env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT); } +/* + * Class: org_rocksdb_WBWIRocksIterator + * Method: seekForPrevDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)V + */ +void Java_org_rocksdb_WBWIRocksIterator_seekForPrevDirect0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seek_for_prev = [&it](ROCKSDB_NAMESPACE::Slice& target_slice) { + it->SeekForPrev(target_slice); + }; + ROCKSDB_NAMESPACE::JniUtil::k_op_direct(seek_for_prev, env, jtarget, + jtarget_off, jtarget_len); +} + +/* + * This method supports fetching into indirect byte buffers; + * the Java wrapper extracts the byte[] and passes it here. + * + * Class: org_rocksdb_WBWIRocksIterator + * Method: seekForPrevByteArray0 + * Signature: (J[BII)V + */ +void Java_org_rocksdb_WBWIRocksIterator_seekForPrevByteArray0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jbyteArray jtarget, + jint jtarget_off, jint jtarget_len) { + const std::unique_ptr target(new char[jtarget_len]); + if (target == nullptr) { + jclass oom_class = env->FindClass("/lang/java/OutOfMemoryError"); + env->ThrowNew(oom_class, + "Memory allocation failed in RocksDB JNI function"); + return; + } + env->GetByteArrayRegion(jtarget, jtarget_off, jtarget_len, + reinterpret_cast(target.get())); + + ROCKSDB_NAMESPACE::Slice target_slice(target.get(), jtarget_len); + + auto* it = reinterpret_cast(handle); + it->SeekForPrev(target_slice); +} + /* * Class: org_rocksdb_WBWIRocksIterator * Method: status0 diff --git a/java/src/main/java/org/rocksdb/AbstractRocksIterator.java b/java/src/main/java/org/rocksdb/AbstractRocksIterator.java index e875c9382..1aade1b89 100644 --- a/java/src/main/java/org/rocksdb/AbstractRocksIterator.java +++ b/java/src/main/java/org/rocksdb/AbstractRocksIterator.java @@ -55,30 +55,40 @@ public abstract class AbstractRocksIterator

} @Override - public void seek(byte[] target) { + public void seek(final byte[] target) { assert (isOwningHandle()); seek0(nativeHandle_, target, target.length); } - @Override - public void seekForPrev(byte[] target) { - assert (isOwningHandle()); - seekForPrev0(nativeHandle_, target, target.length); - } - - @Override - public void seek(ByteBuffer target) { - assert (isOwningHandle() && target.isDirect()); - seekDirect0(nativeHandle_, target, target.position(), target.remaining()); - target.position(target.limit()); - } - - @Override - public void seekForPrev(ByteBuffer target) { - assert (isOwningHandle() && target.isDirect()); - seekForPrevDirect0(nativeHandle_, target, target.position(), target.remaining()); - target.position(target.limit()); - } + @Override + public void seekForPrev(final byte[] target) { + assert (isOwningHandle()); + seekForPrev0(nativeHandle_, target, target.length); + } + + @Override + public void seek(final ByteBuffer target) { + assert (isOwningHandle()); + if (target.isDirect()) { + seekDirect0(nativeHandle_, target, target.position(), target.remaining()); + } else { + seekByteArray0(nativeHandle_, target.array(), target.arrayOffset() + target.position(), + target.remaining()); + } + target.position(target.limit()); + } + + @Override + public void seekForPrev(final ByteBuffer target) { + assert (isOwningHandle()); + if (target.isDirect()) { + seekForPrevDirect0(nativeHandle_, target, target.position(), target.remaining()); + } else { + seekForPrevByteArray0(nativeHandle_, target.array(), target.arrayOffset() + target.position(), + target.remaining()); + } + target.position(target.limit()); + } @Override public void next() { @@ -129,5 +139,8 @@ public abstract class AbstractRocksIterator

abstract void seekForPrev0(long handle, byte[] target, int targetLen); abstract void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); abstract void seekForPrevDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); + abstract void seekByteArray0(long handle, byte[] target, int targetOffset, int targetLen); + abstract void seekForPrevByteArray0(long handle, byte[] target, int targetOffset, int targetLen); + abstract void status0(long handle) throws RocksDBException; } diff --git a/java/src/main/java/org/rocksdb/RocksIterator.java b/java/src/main/java/org/rocksdb/RocksIterator.java index f2919fe42..20e56d2eb 100644 --- a/java/src/main/java/org/rocksdb/RocksIterator.java +++ b/java/src/main/java/org/rocksdb/RocksIterator.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; * @see org.rocksdb.RocksObject */ public class RocksIterator extends AbstractRocksIterator { - protected RocksIterator(RocksDB rocksDB, long nativeHandle) { + protected RocksIterator(final RocksDB rocksDB, final long nativeHandle) { super(rocksDB, nativeHandle); } @@ -54,9 +54,16 @@ public class RocksIterator extends AbstractRocksIterator { * input buffer {@code key} is insufficient and partial result will * be returned. */ - public int key(ByteBuffer key) { - assert (isOwningHandle() && key.isDirect()); - int result = keyDirect0(nativeHandle_, key, key.position(), key.remaining()); + public int key(final ByteBuffer key) { + assert isOwningHandle(); + final int result; + if (key.isDirect()) { + result = keyDirect0(nativeHandle_, key, key.position(), key.remaining()); + } else { + assert key.hasArray(); + result = keyByteArray0( + nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining()); + } key.limit(Math.min(key.position() + result, key.limit())); return result; } @@ -89,9 +96,16 @@ public class RocksIterator extends AbstractRocksIterator { * input buffer {@code value} is insufficient and partial result will * be returned. */ - public int value(ByteBuffer value) { - assert (isOwningHandle() && value.isDirect()); - int result = valueDirect0(nativeHandle_, value, value.position(), value.remaining()); + public int value(final ByteBuffer value) { + assert isOwningHandle(); + final int result; + if (value.isDirect()) { + result = valueDirect0(nativeHandle_, value, value.position(), value.remaining()); + } else { + assert value.hasArray(); + result = valueByteArray0( + nativeHandle_, value.array(), value.arrayOffset() + value.position(), value.remaining()); + } value.limit(Math.min(value.position() + result, value.limit())); return result; } @@ -108,12 +122,19 @@ public class RocksIterator extends AbstractRocksIterator { @Override final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); @Override + final native void seekByteArray0(long handle, byte[] target, int targetOffset, int targetLen); + @Override final native void seekForPrevDirect0( long handle, ByteBuffer target, int targetOffset, int targetLen); + @Override + final native void seekForPrevByteArray0( + long handle, byte[] target, int targetOffset, int targetLen); @Override final native void status0(long handle) throws RocksDBException; private native byte[] key0(long handle); private native byte[] value0(long handle); private native int keyDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); + private native int keyByteArray0(long handle, byte[] array, int arrayOffset, int arrayLen); private native int valueDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); + private native int valueByteArray0(long handle, byte[] array, int arrayOffset, int arrayLen); } diff --git a/java/src/main/java/org/rocksdb/SstFileReaderIterator.java b/java/src/main/java/org/rocksdb/SstFileReaderIterator.java index b64956a3e..a4a08167b 100644 --- a/java/src/main/java/org/rocksdb/SstFileReaderIterator.java +++ b/java/src/main/java/org/rocksdb/SstFileReaderIterator.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; * @see RocksObject */ public class SstFileReaderIterator extends AbstractRocksIterator { - protected SstFileReaderIterator(SstFileReader reader, long nativeHandle) { + protected SstFileReaderIterator(final SstFileReader reader, final long nativeHandle) { super(reader, nativeHandle); } @@ -54,9 +54,15 @@ public class SstFileReaderIterator extends AbstractRocksIterator * input buffer {@code key} is insufficient and partial result will * be returned. */ - public int key(ByteBuffer key) { - assert (isOwningHandle() && key.isDirect()); - int result = keyDirect0(nativeHandle_, key, key.position(), key.remaining()); + public int key(final ByteBuffer key) { + assert (isOwningHandle()); + final int result; + if (key.isDirect()) { + result = keyDirect0(nativeHandle_, key, key.position(), key.remaining()); + } else { + result = keyByteArray0( + nativeHandle_, key.array(), key.arrayOffset() + key.position(), key.remaining()); + } key.limit(Math.min(key.position() + result, key.limit())); return result; } @@ -89,9 +95,15 @@ public class SstFileReaderIterator extends AbstractRocksIterator * input buffer {@code value} is insufficient and partial result will * be returned. */ - public int value(ByteBuffer value) { - assert (isOwningHandle() && value.isDirect()); - int result = valueDirect0(nativeHandle_, value, value.position(), value.remaining()); + public int value(final ByteBuffer value) { + assert (isOwningHandle()); + final int result; + if (value.isDirect()) { + result = valueDirect0(nativeHandle_, value, value.position(), value.remaining()); + } else { + result = valueByteArray0( + nativeHandle_, value.array(), value.arrayOffset() + value.position(), value.remaining()); + } value.limit(Math.min(value.position() + result, value.limit())); return result; } @@ -106,16 +118,23 @@ public class SstFileReaderIterator extends AbstractRocksIterator @Override final native void seek0(long handle, byte[] target, int targetLen); @Override final native void seekForPrev0(long handle, byte[] target, int targetLen); @Override final native void status0(long handle) throws RocksDBException; + @Override + final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); + @Override + final native void seekForPrevDirect0( + long handle, ByteBuffer target, int targetOffset, int targetLen); + @Override + final native void seekByteArray0( + final long handle, final byte[] target, final int targetOffset, final int targetLen); + @Override + final native void seekForPrevByteArray0( + final long handle, final byte[] target, final int targetOffset, final int targetLen); private native byte[] key0(long handle); private native byte[] value0(long handle); private native int keyDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); + private native int keyByteArray0(long handle, byte[] buffer, int bufferOffset, int bufferLen); private native int valueDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); - - @Override - final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); - @Override - final native void seekForPrevDirect0( - long handle, ByteBuffer target, int targetOffset, int targetLen); + private native int valueByteArray0(long handle, byte[] buffer, int bufferOffset, int bufferLen); } diff --git a/java/src/main/java/org/rocksdb/WBWIRocksIterator.java b/java/src/main/java/org/rocksdb/WBWIRocksIterator.java index 3501b1a61..ce146eb3f 100644 --- a/java/src/main/java/org/rocksdb/WBWIRocksIterator.java +++ b/java/src/main/java/org/rocksdb/WBWIRocksIterator.java @@ -31,7 +31,7 @@ public class WBWIRocksIterator */ public WriteEntry entry() { assert(isOwningHandle()); - final long ptrs[] = entry1(nativeHandle_); + final long[] ptrs = entry1(nativeHandle_); entry.type = WriteType.fromId((byte)ptrs[0]); entry.key.resetNativeHandle(ptrs[1], ptrs[1] != 0); @@ -51,7 +51,17 @@ public class WBWIRocksIterator @Override final native void seekForPrev0(long handle, byte[] target, int targetLen); @Override final native void status0(long handle) throws RocksDBException; @Override - final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); + final native void seekDirect0( + final long handle, final ByteBuffer target, final int targetOffset, final int targetLen); + @Override + final native void seekForPrevDirect0( + final long handle, final ByteBuffer target, final int targetOffset, final int targetLen); + @Override + final native void seekByteArray0( + final long handle, final byte[] target, final int targetOffset, final int targetLen); + @Override + final native void seekForPrevByteArray0( + final long handle, final byte[] target, final int targetOffset, final int targetLen); private native long[] entry1(final long handle); @@ -190,9 +200,4 @@ public class WBWIRocksIterator key.close(); } } - - @Override - void seekForPrevDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen) { - throw new IllegalAccessError("Not implemented"); - } } diff --git a/java/src/test/java/org/rocksdb/RocksIteratorTest.java b/java/src/test/java/org/rocksdb/RocksIteratorTest.java index 393284746..2a13550b7 100644 --- a/java/src/test/java/org/rocksdb/RocksIteratorTest.java +++ b/java/src/test/java/org/rocksdb/RocksIteratorTest.java @@ -7,6 +7,7 @@ package org.rocksdb; import static org.assertj.core.api.Assertions.assertThat; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -21,13 +22,33 @@ public class RocksIteratorTest { @Rule public TemporaryFolder dbFolder = new TemporaryFolder(); + private void validateByteBufferResult( + final int fetched, final ByteBuffer byteBuffer, final String expected) { + assertThat(fetched).isEqualTo(expected.length()); + assertThat(byteBuffer.position()).isEqualTo(0); + assertThat(byteBuffer.limit()).isEqualTo(Math.min(byteBuffer.remaining(), expected.length())); + final int bufferSpace = byteBuffer.remaining(); + final byte[] contents = new byte[bufferSpace]; + byteBuffer.get(contents, 0, bufferSpace); + assertThat(contents).isEqualTo( + expected.substring(0, bufferSpace).getBytes(StandardCharsets.UTF_8)); + } + + private void validateKey( + final RocksIterator iterator, final ByteBuffer byteBuffer, final String key) { + validateByteBufferResult(iterator.key(byteBuffer), byteBuffer, key); + } + + private void validateValue( + final RocksIterator iterator, final ByteBuffer byteBuffer, final String value) { + validateByteBufferResult(iterator.value(byteBuffer), byteBuffer, value); + } + @Test public void rocksIterator() throws RocksDBException { - try (final Options options = new Options() - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); - final RocksDB db = RocksDB.open(options, - dbFolder.getRoot().getAbsolutePath())) { + try (final Options options = + new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { db.put("key1".getBytes(), "value1".getBytes()); db.put("key2".getBytes(), "value2".getBytes()); @@ -37,37 +58,20 @@ public class RocksIteratorTest { assertThat(iterator.key()).isEqualTo("key1".getBytes()); assertThat(iterator.value()).isEqualTo("value1".getBytes()); - ByteBuffer key = ByteBuffer.allocateDirect(2); - ByteBuffer value = ByteBuffer.allocateDirect(2); - assertThat(iterator.key(key)).isEqualTo(4); - assertThat(iterator.value(value)).isEqualTo(6); - - assertThat(key.position()).isEqualTo(0); - assertThat(key.limit()).isEqualTo(2); - assertThat(value.position()).isEqualTo(0); - assertThat(value.limit()).isEqualTo(2); - - byte[] tmp = new byte[2]; - key.get(tmp); - assertThat(tmp).isEqualTo("ke".getBytes()); - value.get(tmp); - assertThat(tmp).isEqualTo("va".getBytes()); - - key = ByteBuffer.allocateDirect(12); - value = ByteBuffer.allocateDirect(12); - assertThat(iterator.key(key)).isEqualTo(4); - assertThat(iterator.value(value)).isEqualTo(6); - assertThat(key.position()).isEqualTo(0); - assertThat(key.limit()).isEqualTo(4); - assertThat(value.position()).isEqualTo(0); - assertThat(value.limit()).isEqualTo(6); - - tmp = new byte[4]; - key.get(tmp); - assertThat(tmp).isEqualTo("key1".getBytes()); - tmp = new byte[6]; - value.get(tmp); - assertThat(tmp).isEqualTo("value1".getBytes()); + validateKey(iterator, ByteBuffer.allocateDirect(2), "key1"); + validateKey(iterator, ByteBuffer.allocateDirect(2), "key0"); + validateKey(iterator, ByteBuffer.allocateDirect(4), "key1"); + validateKey(iterator, ByteBuffer.allocateDirect(5), "key1"); + validateValue(iterator, ByteBuffer.allocateDirect(2), "value2"); + validateValue(iterator, ByteBuffer.allocateDirect(2), "vasicu"); + validateValue(iterator, ByteBuffer.allocateDirect(8), "value1"); + + validateKey(iterator, ByteBuffer.allocate(2), "key1"); + validateKey(iterator, ByteBuffer.allocate(2), "key0"); + validateKey(iterator, ByteBuffer.allocate(4), "key1"); + validateKey(iterator, ByteBuffer.allocate(5), "key1"); + validateValue(iterator, ByteBuffer.allocate(2), "value1"); + validateValue(iterator, ByteBuffer.allocate(8), "value1"); iterator.next(); assertThat(iterator.isValid()).isTrue(); @@ -87,24 +91,85 @@ public class RocksIteratorTest { assertThat(iterator.value()).isEqualTo("value2".getBytes()); iterator.status(); - key.clear(); - key.put("key1".getBytes()); - key.flip(); - iterator.seek(key); - assertThat(iterator.isValid()).isTrue(); - assertThat(iterator.value()).isEqualTo("value1".getBytes()); - assertThat(key.position()).isEqualTo(4); - assertThat(key.limit()).isEqualTo(4); - - key.clear(); - key.put("key2".getBytes()); - key.flip(); - iterator.seekForPrev(key); - assertThat(iterator.isValid()).isTrue(); - assertThat(iterator.value()).isEqualTo("value2".getBytes()); - assertThat(key.position()).isEqualTo(4); - assertThat(key.limit()).isEqualTo(4); + { + final ByteBuffer key = ByteBuffer.allocate(12); + key.put("key1".getBytes()).flip(); + iterator.seek(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + + validateValue(iterator, ByteBuffer.allocateDirect(12), "value1"); + validateValue(iterator, ByteBuffer.allocateDirect(4), "valu56"); + } + + { + final ByteBuffer key = ByteBuffer.allocate(12); + key.put("key2".getBytes()).flip(); + iterator.seekForPrev(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value2".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + } + + { + final ByteBuffer key = ByteBuffer.allocate(12); + key.put("key1".getBytes()).flip(); + iterator.seek(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + } + + { + // Check offsets of slice byte buffers + final ByteBuffer key0 = ByteBuffer.allocate(24); + key0.put("key2key2".getBytes()); + final ByteBuffer key = key0.slice(); + key.put("key1".getBytes()).flip(); + iterator.seek(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + } + + { + // Check offsets of slice byte buffers + final ByteBuffer key0 = ByteBuffer.allocateDirect(24); + key0.put("key2key2".getBytes()); + final ByteBuffer key = key0.slice(); + key.put("key1".getBytes()).flip(); + iterator.seek(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + } + + { + final ByteBuffer key = ByteBuffer.allocate(12); + key.put("key2".getBytes()).flip(); + iterator.seekForPrev(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value2".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + } } + } + } + + @Test + public void rocksIteratorSeekAndInsert() throws RocksDBException { + try (final Options options = + new Options().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + db.put("key1".getBytes(), "value1".getBytes()); + db.put("key2".getBytes(), "value2".getBytes()); try (final RocksIterator iterator = db.newIterator()) { iterator.seek("key0".getBytes()); @@ -192,8 +257,8 @@ public class RocksIteratorTest { } // Test case: release iterator after custom CF close - ColumnFamilyDescriptor cfd1 = new ColumnFamilyDescriptor("cf1".getBytes()); - ColumnFamilyHandle cfHandle1 = db.createColumnFamily(cfd1); + final ColumnFamilyDescriptor cfd1 = new ColumnFamilyDescriptor("cf1".getBytes()); + final ColumnFamilyHandle cfHandle1 = db.createColumnFamily(cfd1); db.put(cfHandle1, "key1".getBytes(), "value1".getBytes()); try (final RocksIterator iterator = db.newIterator(cfHandle1)) { @@ -206,8 +271,8 @@ public class RocksIteratorTest { } // Test case: release iterator after custom CF drop & close - ColumnFamilyDescriptor cfd2 = new ColumnFamilyDescriptor("cf2".getBytes()); - ColumnFamilyHandle cfHandle2 = db.createColumnFamily(cfd2); + final ColumnFamilyDescriptor cfd2 = new ColumnFamilyDescriptor("cf2".getBytes()); + final ColumnFamilyHandle cfHandle2 = db.createColumnFamily(cfd2); db.put(cfHandle2, "key2".getBytes(), "value2".getBytes()); try (final RocksIterator iterator = db.newIterator(cfHandle2)) { diff --git a/java/src/test/java/org/rocksdb/SstFileReaderTest.java b/java/src/test/java/org/rocksdb/SstFileReaderTest.java index 0b841f420..e29df99f2 100644 --- a/java/src/test/java/org/rocksdb/SstFileReaderTest.java +++ b/java/src/test/java/org/rocksdb/SstFileReaderTest.java @@ -13,17 +13,21 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.rocksdb.util.BytewiseComparator; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.rocksdb.util.ByteBufferAllocator; +@RunWith(Parameterized.class) public class SstFileReaderTest { private static final String SST_FILE_NAME = "test.sst"; - class KeyValueWithOp { - KeyValueWithOp(String key, String value, OpType opType) { + static class KeyValueWithOp { + KeyValueWithOp(final String key, final String value, final OpType opType) { this.key = key; this.value = value; this.opType = opType; @@ -41,13 +45,23 @@ public class SstFileReaderTest { return opType; } - private String key; - private String value; - private OpType opType; + private final String key; + private final String value; + private final OpType opType; } @Rule public TemporaryFolder parentFolder = new TemporaryFolder(); + @Parameterized.Parameters(name = "{0}") + public static Iterable parameters() { + return Arrays.asList(new Object[][] { + {"direct", ByteBufferAllocator.DIRECT}, {"indirect", ByteBufferAllocator.HEAP}}); + } + + @Parameterized.Parameter(0) public String name; + + @Parameterized.Parameter(1) public ByteBufferAllocator byteBufferAllocator; + enum OpType { PUT, PUT_BYTES, MERGE, MERGE_BYTES, DELETE, DELETE_BYTES } private File newSstFile(final List keyValues) @@ -55,17 +69,17 @@ public class SstFileReaderTest { final EnvOptions envOptions = new EnvOptions(); final StringAppendOperator stringAppendOperator = new StringAppendOperator(); final Options options = new Options().setMergeOperator(stringAppendOperator); - SstFileWriter sstFileWriter; + final SstFileWriter sstFileWriter; sstFileWriter = new SstFileWriter(envOptions, options); final File sstFile = parentFolder.newFile(SST_FILE_NAME); try { sstFileWriter.open(sstFile.getAbsolutePath()); - for (KeyValueWithOp keyValue : keyValues) { - Slice keySlice = new Slice(keyValue.getKey()); - Slice valueSlice = new Slice(keyValue.getValue()); - byte[] keyBytes = keyValue.getKey().getBytes(); - byte[] valueBytes = keyValue.getValue().getBytes(); + for (final KeyValueWithOp keyValue : keyValues) { + final Slice keySlice = new Slice(keyValue.getKey()); + final Slice valueSlice = new Slice(keyValue.getValue()); + final byte[] keyBytes = keyValue.getKey().getBytes(); + final byte[] valueBytes = keyValue.getValue().getBytes(); switch (keyValue.getOpType()) { case PUT: sstFileWriter.put(keySlice, valueSlice); @@ -105,6 +119,8 @@ public class SstFileReaderTest { public void readSstFile() throws RocksDBException, IOException { final List keyValues = new ArrayList<>(); keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT)); + keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT)); + keyValues.add(new KeyValueWithOp("key3", "value3", OpType.PUT)); final File sstFile = newSstFile(keyValues); try (final StringAppendOperator stringAppendOperator = new StringAppendOperator(); @@ -123,33 +139,84 @@ public class SstFileReaderTest { reader.verifyChecksum(); // Verify Table Properties - assertEquals(reader.getTableProperties().getNumEntries(), 1); + assertEquals(reader.getTableProperties().getNumEntries(), 3); // Check key and value assertThat(iterator.key()).isEqualTo("key1".getBytes()); assertThat(iterator.value()).isEqualTo("value1".getBytes()); - ByteBuffer direct = ByteBuffer.allocateDirect(128); - direct.put("key1".getBytes()).flip(); - iterator.seek(direct); - assertThat(direct.position()).isEqualTo(4); - assertThat(direct.limit()).isEqualTo(4); + final ByteBuffer byteBuffer = byteBufferAllocator.allocate(128); + byteBuffer.put("key1".getBytes()).flip(); + iterator.seek(byteBuffer); + assertThat(byteBuffer.position()).isEqualTo(4); + assertThat(byteBuffer.limit()).isEqualTo(4); + + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.key()).isEqualTo("key1".getBytes()); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + + { + byteBuffer.clear(); + assertThat(iterator.key(byteBuffer)).isEqualTo("key1".getBytes().length); + final byte[] dst = new byte["key1".getBytes().length]; + byteBuffer.get(dst); + assertThat(new String(dst)).isEqualTo("key1"); + } + + { + byteBuffer.clear(); + byteBuffer.put("PREFIX".getBytes()); + final ByteBuffer slice = byteBuffer.slice(); + assertThat(iterator.key(byteBuffer)).isEqualTo("key1".getBytes().length); + final byte[] dst = new byte["key1".getBytes().length]; + slice.get(dst); + assertThat(new String(dst)).isEqualTo("key1"); + } + + { + byteBuffer.clear(); + assertThat(iterator.value(byteBuffer)).isEqualTo("value1".getBytes().length); + final byte[] dst = new byte["value1".getBytes().length]; + byteBuffer.get(dst); + assertThat(new String(dst)).isEqualTo("value1"); + } + byteBuffer.clear(); + byteBuffer.put("key1point5".getBytes()).flip(); + iterator.seek(byteBuffer); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.key()).isEqualTo("key2".getBytes()); + assertThat(iterator.value()).isEqualTo("value2".getBytes()); + + byteBuffer.clear(); + byteBuffer.put("key1point5".getBytes()).flip(); + iterator.seekForPrev(byteBuffer); assertThat(iterator.isValid()).isTrue(); assertThat(iterator.key()).isEqualTo("key1".getBytes()); assertThat(iterator.value()).isEqualTo("value1".getBytes()); - direct.clear(); - assertThat(iterator.key(direct)).isEqualTo("key1".getBytes().length); - byte[] dst = new byte["key1".getBytes().length]; - direct.get(dst); - assertThat(new String(dst)).isEqualTo("key1"); - - direct.clear(); - assertThat(iterator.value(direct)).isEqualTo("value1".getBytes().length); - dst = new byte["value1".getBytes().length]; - direct.get(dst); - assertThat(new String(dst)).isEqualTo("value1"); + byteBuffer.clear(); + byteBuffer.put("key2point5".getBytes()).flip(); + iterator.seek(byteBuffer); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.key()).isEqualTo("key3".getBytes()); + assertThat(iterator.value()).isEqualTo("value3".getBytes()); + + byteBuffer.clear(); + byteBuffer.put("key2point5".getBytes()).flip(); + iterator.seekForPrev(byteBuffer); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.key()).isEqualTo("key2".getBytes()); + assertThat(iterator.value()).isEqualTo("value2".getBytes()); + + byteBuffer.clear(); + byteBuffer.put("PREFIX".getBytes()); + final ByteBuffer slice = byteBuffer.slice(); + slice.put("key1point5".getBytes()).flip(); + iterator.seekForPrev(slice); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.key()).isEqualTo("key1".getBytes()); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); } } } diff --git a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java index 528c17d4d..b0a0cdc0e 100644 --- a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java +++ b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java @@ -20,6 +20,7 @@ import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.rocksdb.util.ByteBufferAllocator; public class WriteBatchWithIndexTest { @@ -192,6 +193,236 @@ public class WriteBatchWithIndexTest { } } + @Test + public void readYourOwnWritesCfIterDirectBB() throws RocksDBException { + readYourOwnWritesCfIterDirect(ByteBufferAllocator.DIRECT); + } + + @Test + public void readYourOwnWritesCfIterIndirectBB() throws RocksDBException { + readYourOwnWritesCfIterDirect(ByteBufferAllocator.HEAP); + } + + public void readYourOwnWritesCfIterDirect(final ByteBufferAllocator byteBufferAllocator) + throws RocksDBException { + final List cfNames = + Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY), + new ColumnFamilyDescriptor("new_cf".getBytes())); + + final List columnFamilyHandleList = new ArrayList<>(); + + // Test open database with column family names + try (final DBOptions options = + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + final RocksDB db = RocksDB.open( + options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) { + final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1); + + try { + final byte[] kv1 = "key1".getBytes(); + final byte[] vv1 = "value1".getBytes(); + final ByteBuffer k1 = byteBufferAllocator.allocate(12); + k1.put(kv1); + final byte[] kv2 = "key2".getBytes(); + final byte[] vv2 = "value2".getBytes(); + final ByteBuffer k2 = byteBufferAllocator.allocate(12); + k2.put(kv2); + + db.put(newCf, kv1, vv1); + db.put(newCf, kv2, vv2); + + try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true); + final ReadOptions readOptions = new ReadOptions(); + final RocksIterator base = db.newIterator(newCf, readOptions); + final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) { + k1.flip(); + it.seek(k1); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv1); + assertThat(it.value()).isEqualTo(vv1); + + k2.flip(); + it.seek(k2); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv2); + assertThat(it.value()).isEqualTo(vv2); + + final byte[] kv1point5 = "key1point5".getBytes(); + final ByteBuffer k1point5 = byteBufferAllocator.allocate(12); + k1point5.put(kv1point5); + + k1point5.flip(); + it.seek(k1point5); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv2); + assertThat(it.value()).isEqualTo(vv2); + + k1point5.flip(); + it.seekForPrev(k1point5); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv1); + assertThat(it.value()).isEqualTo(vv1); + + // put data to the write batch and make sure we can read it. + final byte[] kv3 = "key3".getBytes(); + final ByteBuffer k3 = byteBufferAllocator.allocate(12); + k3.put(kv3); + final byte[] vv3 = "value3".getBytes(); + wbwi.put(newCf, kv3, vv3); + k3.flip(); + it.seek(k3); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv3); + assertThat(it.value()).isEqualTo(vv3); + + // update k2 in the write batch and check the value + final byte[] v2Other = "otherValue2".getBytes(); + wbwi.put(newCf, kv2, v2Other); + k2.flip(); + it.seek(k2); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv2); + assertThat(it.value()).isEqualTo(v2Other); + + // delete k1 and make sure we can read back the write + wbwi.delete(newCf, kv1); + k1.flip(); + it.seek(k1); + assertThat(it.key()).isNotEqualTo(kv1); + + // reinsert k1 and make sure we see the new value + final byte[] v1Other = "otherValue1".getBytes(); + wbwi.put(newCf, kv1, v1Other); + k1.flip(); + it.seek(k1); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv1); + assertThat(it.value()).isEqualTo(v1Other); + + // single remove k3 and make sure we can read back the write + wbwi.singleDelete(newCf, kv3); + k3.flip(); + it.seek(k3); + assertThat(it.isValid()).isEqualTo(false); + + // reinsert k3 and make sure we see the new value + final byte[] v3Other = "otherValue3".getBytes(); + wbwi.put(newCf, kv3, v3Other); + k3.flip(); + it.seek(k3); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv3); + assertThat(it.value()).isEqualTo(v3Other); + } + } finally { + for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) { + columnFamilyHandle.close(); + } + } + } + } + + @Test + public void readYourOwnWritesCfIterIndirect() throws RocksDBException { + final List cfNames = + Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY), + new ColumnFamilyDescriptor("new_cf".getBytes())); + + final List columnFamilyHandleList = new ArrayList<>(); + + // Test open database with column family names + try (final DBOptions options = + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + final RocksDB db = RocksDB.open( + options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) { + final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1); + + try { + final byte[] kv1 = "key1".getBytes(); + final byte[] vv1 = "value1".getBytes(); + final ByteBuffer k1 = ByteBuffer.allocate(12); + k1.put(kv1).flip(); + final byte[] kv2 = "key2".getBytes(); + final byte[] vv2 = "value2".getBytes(); + final ByteBuffer k2 = ByteBuffer.allocate(12); + k2.put(kv2).flip(); + + db.put(newCf, kv1, vv1); + db.put(newCf, kv2, vv2); + + try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true); + final ReadOptions readOptions = new ReadOptions(); + final RocksIterator base = db.newIterator(newCf, readOptions); + final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) { + it.seek(k1); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv1); + assertThat(it.value()).isEqualTo(vv1); + + it.seek(k2); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv2); + assertThat(it.value()).isEqualTo(vv2); + + // put data to the write batch and make sure we can read it. + final byte[] kv3 = "key3".getBytes(); + final ByteBuffer k3 = ByteBuffer.allocate(12); + k3.put(kv3); + final byte[] vv3 = "value3".getBytes(); + wbwi.put(newCf, kv3, vv3); + k3.flip(); + it.seek(k3); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv3); + assertThat(it.value()).isEqualTo(vv3); + + // update k2 in the write batch and check the value + final byte[] v2Other = "otherValue2".getBytes(); + wbwi.put(newCf, kv2, v2Other); + k2.flip(); + it.seek(k2); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv2); + assertThat(it.value()).isEqualTo(v2Other); + + // delete k1 and make sure we can read back the write + wbwi.delete(newCf, kv1); + k1.flip(); + it.seek(k1); + assertThat(it.key()).isNotEqualTo(kv1); + + // reinsert k1 and make sure we see the new value + final byte[] v1Other = "otherValue1".getBytes(); + wbwi.put(newCf, kv1, v1Other); + k1.flip(); + it.seek(k1); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv1); + assertThat(it.value()).isEqualTo(v1Other); + + // single remove k3 and make sure we can read back the write + wbwi.singleDelete(newCf, kv3); + k3.flip(); + it.seek(k3); + assertThat(it.isValid()).isEqualTo(false); + + // reinsert k3 and make sure we see the new value + final byte[] v3Other = "otherValue3".getBytes(); + wbwi.put(newCf, kv3, v3Other); + k3.flip(); + it.seek(k3); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(kv3); + assertThat(it.value()).isEqualTo(v3Other); + } + } finally { + for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) { + columnFamilyHandle.close(); + } + } + } + } + @Test public void writeBatchWithIndex() throws RocksDBException { try (final Options options = new Options().setCreateIfMissing(true); @@ -220,10 +451,10 @@ public class WriteBatchWithIndexTest { public void write_writeBatchWithIndexDirect() throws RocksDBException { try (final Options options = new Options().setCreateIfMissing(true); final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { - ByteBuffer k1 = ByteBuffer.allocateDirect(16); - ByteBuffer v1 = ByteBuffer.allocateDirect(16); - ByteBuffer k2 = ByteBuffer.allocateDirect(16); - ByteBuffer v2 = ByteBuffer.allocateDirect(16); + final ByteBuffer k1 = ByteBuffer.allocateDirect(16); + final ByteBuffer v1 = ByteBuffer.allocateDirect(16); + final ByteBuffer k2 = ByteBuffer.allocateDirect(16); + final ByteBuffer v2 = ByteBuffer.allocateDirect(16); k1.put("key1".getBytes()).flip(); v1.put("value1".getBytes()).flip(); k2.put("key2".getBytes()).flip(); @@ -258,8 +489,6 @@ public class WriteBatchWithIndexTest { final String v3 = "value3"; final String k4 = "key4"; final String k5 = "key5"; - final String k6 = "key6"; - final String k7 = "key7"; final String v8 = "value8"; final byte[] k1b = k1.getBytes(UTF_8); final byte[] v1b = v1.getBytes(UTF_8); @@ -269,10 +498,11 @@ public class WriteBatchWithIndexTest { final byte[] v3b = v3.getBytes(UTF_8); final byte[] k4b = k4.getBytes(UTF_8); final byte[] k5b = k5.getBytes(UTF_8); - final byte[] k6b = k6.getBytes(UTF_8); - final byte[] k7b = k7.getBytes(UTF_8); final byte[] v8b = v8.getBytes(UTF_8); + final String k1point5 = "key1point5"; + final String k2point5 = "key2point5"; + // add put records wbwi.put(k1b, v1b); wbwi.put(k2b, v2b); @@ -303,9 +533,7 @@ public class WriteBatchWithIndexTest { try (final WBWIRocksIterator it = wbwi.newIterator()) { //direct access - seek to key offsets final int[] testOffsets = {2, 0, 3, 4, 1}; - - for (int i = 0; i < testOffsets.length; i++) { - final int testOffset = testOffsets[i]; + for (final int testOffset : testOffsets) { final byte[] key = toArray(expected[testOffset].getKey().data()); it.seek(key); @@ -313,13 +541,94 @@ public class WriteBatchWithIndexTest { final WBWIRocksIterator.WriteEntry entry = it.entry(); assertThat(entry).isEqualTo(expected[testOffset]); + } + + for (final int testOffset : testOffsets) { + final byte[] key = toArray(expected[testOffset].getKey().data()); + + // Direct buffer seek + final ByteBuffer db = expected[testOffset].getKey().data(); + it.seek(db); + assertThat(db.position()).isEqualTo(key.length); + assertThat(it.isValid()).isTrue(); + + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[testOffset]); + } + + for (final int testOffset : testOffsets) { + final byte[] key = toArray(expected[testOffset].getKey().data()); // Direct buffer seek - expected[testOffset].getKey().data().mark(); - ByteBuffer db = expected[testOffset].getKey().data(); + final ByteBuffer db = expected[testOffset].getKey().data(); + it.seekForPrev(db); + assertThat(db.position()).isEqualTo(key.length); + assertThat(it.isValid()).isTrue(); + + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[testOffset]); + } + + for (final int testOffset : testOffsets) { + final byte[] key = toArray(expected[testOffset].getKey().data()); + + // Indirect buffer seek + final ByteBuffer db = ByteBuffer.allocate(key.length); + System.arraycopy(key, 0, db.array(), 0, key.length); it.seek(db); assertThat(db.position()).isEqualTo(key.length); assertThat(it.isValid()).isTrue(); + + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[testOffset]); + } + + for (final int testOffset : testOffsets) { + final byte[] key = toArray(expected[testOffset].getKey().data()); + + // Indirect buffer seek for prev + final ByteBuffer db = ByteBuffer.allocate(key.length); + System.arraycopy(key, 0, db.array(), 0, key.length); + it.seekForPrev(db); + assertThat(db.position()).isEqualTo(key.length); + assertThat(it.isValid()).isTrue(); + + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[testOffset]); + } + + { + it.seekForPrev(k2point5.getBytes()); + assertThat(it.isValid()).isTrue(); + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[1]); + } + + { + it.seekForPrev(k1point5.getBytes()); + assertThat(it.isValid()).isTrue(); + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[0]); + } + + { + final ByteBuffer db = ByteBuffer.allocate(k2point5.length()); + db.put(k2point5.getBytes()); + db.flip(); + it.seekForPrev(db); + assertThat(it.isValid()).isTrue(); + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[1]); + } + + { + final ByteBuffer db = ByteBuffer.allocate(k1point5.length()); + db.put(k1point5.getBytes()); + db.flip(); + it.seekForPrev(db); + assertThat(it.isValid()).isTrue(); + final WBWIRocksIterator.WriteEntry entry = it.entry(); + assertThat(entry).isEqualTo(expected[0]); } //forward iterative access diff --git a/java/src/test/java/org/rocksdb/util/ByteBufferAllocator.java b/java/src/test/java/org/rocksdb/util/ByteBufferAllocator.java new file mode 100644 index 000000000..e94244424 --- /dev/null +++ b/java/src/test/java/org/rocksdb/util/ByteBufferAllocator.java @@ -0,0 +1,10 @@ +package org.rocksdb.util; + +import java.nio.ByteBuffer; + +public interface ByteBufferAllocator { + ByteBuffer allocate(int capacity); + + ByteBufferAllocator DIRECT = new DirectByteBufferAllocator(); + ByteBufferAllocator HEAP = new HeapByteBufferAllocator(); +} diff --git a/java/src/test/java/org/rocksdb/util/DirectByteBufferAllocator.java b/java/src/test/java/org/rocksdb/util/DirectByteBufferAllocator.java new file mode 100644 index 000000000..13955e73d --- /dev/null +++ b/java/src/test/java/org/rocksdb/util/DirectByteBufferAllocator.java @@ -0,0 +1,12 @@ +package org.rocksdb.util; + +import java.nio.ByteBuffer; + +public final class DirectByteBufferAllocator implements ByteBufferAllocator { + DirectByteBufferAllocator(){}; + + @Override + public ByteBuffer allocate(final int capacity) { + return ByteBuffer.allocateDirect(capacity); + } +} diff --git a/java/src/test/java/org/rocksdb/util/HeapByteBufferAllocator.java b/java/src/test/java/org/rocksdb/util/HeapByteBufferAllocator.java new file mode 100644 index 000000000..7dc2ded6d --- /dev/null +++ b/java/src/test/java/org/rocksdb/util/HeapByteBufferAllocator.java @@ -0,0 +1,12 @@ +package org.rocksdb.util; + +import java.nio.ByteBuffer; + +public final class HeapByteBufferAllocator implements ByteBufferAllocator { + HeapByteBufferAllocator(){}; + + @Override + public ByteBuffer allocate(final int capacity) { + return ByteBuffer.allocate(capacity); + } +}