From c1ec0b28eb6e93bc2799a68066cdc8147f92ba59 Mon Sep 17 00:00:00 2001 From: Alan Paxton Date: Wed, 15 Dec 2021 18:07:37 -0800 Subject: [PATCH] java / jni io_uring support (#9224) Summary: Existing multiGet() in java calls multi_get_helper() which then calls DB::std::vector MultiGet(). This doesn't take advantage of io_uring. This change adds another JNI level method that runs a parallel code path using the DB::void MultiGet(), using ByteBuffers at the JNI level. We call it multiGetDirect(). In addition to using the io_uring path, this code internally returns pinned slices which we can copy out of into our direct byte buffers; this should reduce the overall number of copies in the code path to/from Java. Some jmh benchmark runs (100k keys, 1000 key multiGet) suggest that for value sizes > 1k, we see about a 20% performance improvement, although performance is slightly reduced for small value sizes, there's a little bit more overhead in the JNI methods. Closes https://github.com/facebook/rocksdb/issues/8407 Pull Request resolved: https://github.com/facebook/rocksdb/pull/9224 Reviewed By: mrambacher Differential Revision: D32951754 Pulled By: jay-zhuang fbshipit-source-id: 1f70df7334be2b6c42a9c8f92725f67c71631690 --- java/CMakeLists.txt | 1 + java/jmh/README.md | 6 + java/jmh/pom.xml | 2 +- .../org/rocksdb/jmh/MultiGetBenchmarks.java | 100 +++- .../main/java/org/rocksdb/util/KVUtils.java | 18 +- java/rocksjni/rocksjni.cc | 306 +++++++++-- .../java/org/rocksdb/ByteBufferGetStatus.java | 44 ++ java/src/main/java/org/rocksdb/RocksDB.java | 166 +++++- .../test/java/org/rocksdb/MultiGetTest.java | 491 ++++++++++++++++++ .../test/java/org/rocksdb/util/TestUtil.java | 19 +- 10 files changed, 1083 insertions(+), 70 deletions(-) create mode 100644 java/src/main/java/org/rocksdb/ByteBufferGetStatus.java diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index f21c51b56..9afd42927 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -112,6 +112,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/BlockBasedTableConfig.java src/main/java/org/rocksdb/BloomFilter.java src/main/java/org/rocksdb/BuiltinComparator.java + src/main/java/org/rocksdb/ByteBufferGetStatus.java src/main/java/org/rocksdb/Cache.java src/main/java/org/rocksdb/CassandraCompactionFilter.java src/main/java/org/rocksdb/CassandraValueMergeOperator.java diff --git a/java/jmh/README.md b/java/jmh/README.md index f1ed0c686..1575ab517 100644 --- a/java/jmh/README.md +++ b/java/jmh/README.md @@ -6,6 +6,12 @@ These are micro-benchmarks for RocksJava functionality, using [JMH (Java Microbe **Note**: This uses a specific build of RocksDB that is set in the `` element of the `dependencies` section of the `pom.xml` file. If you are testing local changes you should build and install a SNAPSHOT version of rocksdbjni, and update the `pom.xml` of rocksdbjni-jmh file to test with this. +For instance, this is how to install the OSX jar you just built for 6.26.0 + +```bash +$ mvn install:install-file -Dfile=./java/target/rocksdbjni-6.26.0-SNAPSHOT-osx.jar -DgroupId=org.rocksdb -DartifactId=rocksdbjni -Dversion=6.26.0-SNAPSHOT -Dpackaging=jar +``` + ```bash $ mvn package ``` diff --git a/java/jmh/pom.xml b/java/jmh/pom.xml index 62671091c..26615da86 100644 --- a/java/jmh/pom.xml +++ b/java/jmh/pom.xml @@ -50,7 +50,7 @@ org.rocksdb rocksdbjni - 6.6.0-SNAPSHOT + 6.27.0-SNAPSHOT diff --git a/java/jmh/src/main/java/org/rocksdb/jmh/MultiGetBenchmarks.java b/java/jmh/src/main/java/org/rocksdb/jmh/MultiGetBenchmarks.java index 60a0de87f..c8c827444 100644 --- a/java/jmh/src/main/java/org/rocksdb/jmh/MultiGetBenchmarks.java +++ b/java/jmh/src/main/java/org/rocksdb/jmh/MultiGetBenchmarks.java @@ -6,23 +6,26 @@ */ package org.rocksdb.jmh; -import org.openjdk.jmh.annotations.*; -import org.rocksdb.*; -import org.rocksdb.util.FileUtils; +import static org.rocksdb.util.KVUtils.ba; +import static org.rocksdb.util.KVUtils.keys; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.rocksdb.*; +import org.rocksdb.util.FileUtils; -import static org.rocksdb.util.KVUtils.ba; -import static org.rocksdb.util.KVUtils.keys; - -@State(Scope.Benchmark) +@State(Scope.Thread) public class MultiGetBenchmarks { - @Param({ "no_column_family", "1_column_family", @@ -31,8 +34,7 @@ public class MultiGetBenchmarks { }) String columnFamilyTestType; - @Param("100000") - int keyCount; + @Param({"10000", "25000", "100000"}) int keyCount; @Param({ "10", @@ -42,6 +44,9 @@ public class MultiGetBenchmarks { }) int multiGetSize; + @Param({"16", "64", "250", "1000", "4000", "16000"}) int valueSize; + @Param({"16"}) int keySize; // big enough + Path dbDir; DBOptions options; int cfs = 0; // number of column families @@ -85,7 +90,8 @@ public class MultiGetBenchmarks { // store initial data for retrieving via get for (int i = 0; i < cfs; i++) { for (int j = 0; j < keyCount; j++) { - db.put(cfHandles[i], ba("key" + j), ba("value" + j)); + final byte[] paddedValue = Arrays.copyOf(ba("value" + j), valueSize); + db.put(cfHandles[i], ba("key" + j), paddedValue); } } @@ -149,10 +155,78 @@ public class MultiGetBenchmarks { } } + ByteBuffer keysBuffer; + ByteBuffer valuesBuffer; + + List valueBuffersList; + List keyBuffersList; + + @Setup + public void allocateSliceBuffers() { + keysBuffer = ByteBuffer.allocateDirect(keyCount * valueSize); + valuesBuffer = ByteBuffer.allocateDirect(keyCount * valueSize); + valueBuffersList = new ArrayList<>(); + keyBuffersList = new ArrayList<>(); + for (int i = 0; i < keyCount; i++) { + valueBuffersList.add(valuesBuffer.slice()); + valuesBuffer.position(i * valueSize); + keyBuffersList.add(keysBuffer.slice()); + keysBuffer.position(i * keySize); + } + } + + @TearDown + public void freeSliceBuffers() { + valueBuffersList.clear(); + } + @Benchmark public List multiGet10() throws RocksDBException { final int fromKeyIdx = next(multiGetSize, keyCount); - final List keys = keys(fromKeyIdx, fromKeyIdx + multiGetSize); - return db.multiGetAsList(keys); + if (fromKeyIdx >= 0) { + final List keys = keys(fromKeyIdx, fromKeyIdx + multiGetSize); + final List valueResults = db.multiGetAsList(keys); + for (final byte[] result : valueResults) { + if (result.length != valueSize) + throw new RuntimeException("Test valueSize assumption wrong"); + } + } + return new ArrayList<>(); + } + + @Benchmark + public List multiGetDirect10() throws RocksDBException { + final int fromKeyIdx = next(multiGetSize, keyCount); + if (fromKeyIdx >= 0) { + final List keys = keys(keyBuffersList, fromKeyIdx, fromKeyIdx + multiGetSize); + final List results = db.multiGetByteBuffers( + keys, valueBuffersList.subList(fromKeyIdx, fromKeyIdx + multiGetSize)); + for (final RocksDB.MultiGetInstance result : results) { + if (result.status.getCode() != Status.Code.Ok) + throw new RuntimeException("Test status assumption wrong"); + if (result.valueSize != valueSize) + throw new RuntimeException("Test valueSize assumption wrong"); + } + return results; + } + return new ArrayList<>(); + } + + public static void main(final String[] args) throws RunnerException { + final org.openjdk.jmh.runner.options.Options opt = + new OptionsBuilder() + .include(MultiGetBenchmarks.class.getSimpleName()) + .forks(1) + .jvmArgs("-ea") + .warmupIterations(1) + .measurementIterations(2) + .forks(2) + .param("columnFamilyTestType=", "1_column_family") + .param("multiGetSize=", "10", "1000") + .param("keyCount=", "1000") + .output("jmh_output") + .build(); + + new Runner(opt).run(); } } diff --git a/java/jmh/src/main/java/org/rocksdb/util/KVUtils.java b/java/jmh/src/main/java/org/rocksdb/util/KVUtils.java index 848de5d82..5077291c8 100644 --- a/java/jmh/src/main/java/org/rocksdb/util/KVUtils.java +++ b/java/jmh/src/main/java/org/rocksdb/util/KVUtils.java @@ -6,11 +6,12 @@ */ package org.rocksdb.util; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import static java.nio.charset.StandardCharsets.UTF_8; - public final class KVUtils { /** @@ -55,4 +56,17 @@ public final class KVUtils { } return keys; } + + public static List keys( + final List keyBuffers, final int from, final int to) { + final List keys = new ArrayList<>(to - from); + for (int i = from; i < to; i++) { + final ByteBuffer key = keyBuffers.get(i); + key.clear(); + key.put(ba("key" + i)); + key.flip(); + keys.add(key); + } + return keys; + } } diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 15019c132..f5f2efcf9 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -1685,16 +1685,18 @@ inline void multi_get_helper_release_keys(std::vector& keys_to_free) { } /** - * cf multi get + * @brief fill a native array of cf handles from java handles * - * @return byte[][] of values or nullptr if an exception occurs - */ -jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, - const ROCKSDB_NAMESPACE::ReadOptions& rOpt, - jobjectArray jkeys, jintArray jkey_offs, - jintArray jkey_lens, - jlongArray jcolumn_family_handles) { - std::vector cf_handles; + * @param env + * @param cf_handles to fill from the java variants + * @param jcolumn_family_handles + * @return true if the copy succeeds + * @return false if a JNI exception is generated + */ +inline bool cf_handles_from_jcf_handles( + JNIEnv* env, + std::vector& cf_handles, + jlongArray jcolumn_family_handles) { if (jcolumn_family_handles != nullptr) { const jsize len_cols = env->GetArrayLength(jcolumn_family_handles); @@ -1704,7 +1706,7 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError"); (env)->ThrowNew(exception_cls, "Insufficient Memory for CF handle array."); - return nullptr; + return false; } for (jsize i = 0; i < len_cols; i++) { @@ -1714,13 +1716,30 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, } env->ReleaseLongArrayElements(jcolumn_family_handles, jcfh, JNI_ABORT); } + return true; +} +/** + * @brief copy keys from JNI into vector of slices for Rocks API + * + * @param keys to instantiate + * @param jkeys + * @param jkey_offs + * @param jkey_lens + * @return true if the copy succeeds + * @return false if a JNI exception is raised + */ +inline bool keys_from_jkeys(JNIEnv* env, + std::vector& keys, + std::vector& keys_to_free, + jobjectArray jkeys, jintArray jkey_offs, + jintArray jkey_lens) { jint* jkey_off = env->GetIntArrayElements(jkey_offs, nullptr); if (jkey_off == nullptr) { // exception thrown: OutOfMemoryError jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError"); (env)->ThrowNew(exception_cls, "Insufficient Memory for key offset array."); - return nullptr; + return false; } jint* jkey_len = env->GetIntArrayElements(jkey_lens, nullptr); @@ -1729,12 +1748,10 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, env->ReleaseIntArrayElements(jkey_offs, jkey_off, JNI_ABORT); jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError"); (env)->ThrowNew(exception_cls, "Insufficient Memory for key length array."); - return nullptr; + return false; } const jsize len_keys = env->GetArrayLength(jkeys); - std::vector keys; - std::vector keys_to_free; for (jsize i = 0; i < len_keys; i++) { jobject jkey = env->GetObjectArrayElement(jkeys, i); if (env->ExceptionCheck()) { @@ -1745,7 +1762,7 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError"); (env)->ThrowNew(exception_cls, "Insufficient Memory for key object array."); - return nullptr; + return false; } jbyteArray jkey_ba = reinterpret_cast(jkey); @@ -1763,7 +1780,7 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, jclass exception_cls = (env)->FindClass("java/lang/ArrayIndexOutOfBoundsException"); (env)->ThrowNew(exception_cls, "Invalid byte array region index."); - return nullptr; + return false; } ROCKSDB_NAMESPACE::Slice key_slice(reinterpret_cast(key), len_key); @@ -1777,6 +1794,68 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, env->ReleaseIntArrayElements(jkey_lens, jkey_len, JNI_ABORT); env->ReleaseIntArrayElements(jkey_offs, jkey_off, JNI_ABORT); + return true; +} + +inline bool keys_from_bytebuffers(JNIEnv* env, + std::vector& keys, + jobjectArray jkeys, jintArray jkey_offs, + jintArray jkey_lens) { + jint* jkey_off = env->GetIntArrayElements(jkey_offs, nullptr); + if (jkey_off == nullptr) { + // exception thrown: OutOfMemoryError + jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError"); + (env)->ThrowNew(exception_cls, "Insufficient Memory for key offset array."); + return false; + } + + jint* jkey_len = env->GetIntArrayElements(jkey_lens, nullptr); + if (jkey_len == nullptr) { + // exception thrown: OutOfMemoryError + env->ReleaseIntArrayElements(jkey_offs, jkey_off, JNI_ABORT); + jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError"); + (env)->ThrowNew(exception_cls, "Insufficient Memory for key length array."); + return false; + } + + const jsize len_keys = env->GetArrayLength(jkeys); + for (jsize i = 0; i < len_keys; i++) { + jobject jkey = env->GetObjectArrayElement(jkeys, i); + if (env->ExceptionCheck()) { + // exception thrown: ArrayIndexOutOfBoundsException + return false; + } + char* key = reinterpret_cast(env->GetDirectBufferAddress(jkey)); + ROCKSDB_NAMESPACE::Slice key_slice(key + jkey_off[i], jkey_len[i]); + keys.push_back(key_slice); + + env->DeleteLocalRef(jkey); + } + return true; +} + +/** + * cf multi get + * + * @return byte[][] of values or nullptr if an + * exception occurs + */ +jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, + const ROCKSDB_NAMESPACE::ReadOptions& rOpt, + jobjectArray jkeys, jintArray jkey_offs, + jintArray jkey_lens, + jlongArray jcolumn_family_handles) { + std::vector cf_handles; + if (!cf_handles_from_jcf_handles(env, cf_handles, jcolumn_family_handles)) { + return nullptr; + } + + std::vector keys; + std::vector keys_to_free; + if (!keys_from_jkeys(env, keys, keys_to_free, jkeys, jkey_offs, jkey_lens)) { + return nullptr; + } + std::vector values; std::vector s; if (cf_handles.size() == 0) { @@ -1814,14 +1893,16 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, jentry_value, 0, static_cast(jvalue_len), const_cast(reinterpret_cast(value->c_str()))); if (env->ExceptionCheck()) { - // exception thrown: ArrayIndexOutOfBoundsException + // exception thrown: + // ArrayIndexOutOfBoundsException env->DeleteLocalRef(jentry_value); return nullptr; } env->SetObjectArrayElement(jresults, static_cast(i), jentry_value); if (env->ExceptionCheck()) { - // exception thrown: ArrayIndexOutOfBoundsException + // exception thrown: + // ArrayIndexOutOfBoundsException env->DeleteLocalRef(jentry_value); return nullptr; } @@ -1833,14 +1914,129 @@ jobjectArray multi_get_helper(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, return jresults; } +/** + * cf multi get + * + * fill supplied native buffers, or raise JNI + * exception on a problem + */ + +/** + * @brief multi_get_helper_direct for fast-path multiget (io_uring) on Linux + * + * @param env + * @param db + * @param rOpt read options + * @param jcolumn_family_handles 0, 1, or n column family handles + * @param jkeys + * @param jkey_offsets + * @param jkey_lengths + * @param jvalues byte buffers to receive values + * @param jvalue_sizes returned actual sizes of data values for keys + * @param jstatuses returned java RocksDB status values for per key + */ +void multi_get_helper_direct(JNIEnv* env, jobject, ROCKSDB_NAMESPACE::DB* db, + const ROCKSDB_NAMESPACE::ReadOptions& rOpt, + jlongArray jcolumn_family_handles, + jobjectArray jkeys, jintArray jkey_offsets, + jintArray jkey_lengths, jobjectArray jvalues, + jintArray jvalue_sizes, jobjectArray jstatuses) { + const jsize num_keys = env->GetArrayLength(jkeys); + + std::vector keys; + if (!keys_from_bytebuffers(env, keys, jkeys, jkey_offsets, jkey_lengths)) { + return; + } + + std::vector values(num_keys); + + std::vector cf_handles; + if (!cf_handles_from_jcf_handles(env, cf_handles, jcolumn_family_handles)) { + return; + } + + std::vector s(num_keys); + if (cf_handles.size() == 0) { + // we can use the more efficient call here + auto cf_handle = db->DefaultColumnFamily(); + db->MultiGet(rOpt, cf_handle, num_keys, keys.data(), values.data(), + s.data()); + } else if (cf_handles.size() == 1) { + // we can use the more efficient call here + auto cf_handle = cf_handles[0]; + db->MultiGet(rOpt, cf_handle, num_keys, keys.data(), values.data(), + s.data()); + } else { + // multiple CFs version + db->MultiGet(rOpt, num_keys, cf_handles.data(), keys.data(), values.data(), + s.data()); + } + + // prepare the results + jobjectArray jresults = ROCKSDB_NAMESPACE::ByteJni::new2dByteArray( + env, static_cast(s.size())); + if (jresults == nullptr) { + // exception occurred + jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError"); + (env)->ThrowNew(exception_cls, "Insufficient Memory for results."); + return; + } + + std::vector value_size; + for (int i = 0; i < num_keys; i++) { + auto jstatus = ROCKSDB_NAMESPACE::StatusJni::construct(env, s[i]); + if (jstatus == nullptr) { + // exception in context + return; + } + env->SetObjectArrayElement(jstatuses, i, jstatus); + + if (s[i].ok()) { + jobject jvalue_bytebuf = env->GetObjectArrayElement(jvalues, i); + if (env->ExceptionCheck()) { + // ArrayIndexOutOfBoundsException is thrown + return; + } + jlong jvalue_capacity = env->GetDirectBufferCapacity(jvalue_bytebuf); + if (jvalue_capacity == -1) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, + "Invalid value(s) argument (argument is not a valid direct " + "ByteBuffer)"); + return; + } + void* jvalue_address = env->GetDirectBufferAddress(jvalue_bytebuf); + if (jvalue_address == nullptr) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, + "Invalid value(s) argument (argument is not a valid direct " + "ByteBuffer)"); + return; + } + + // record num returned, push back that number, which may be bigger then + // the ByteBuffer supplied. then copy as much as fits in the ByteBuffer. + value_size.push_back(static_cast(values[i].size())); + auto copy_bytes = + std::min(static_cast(values[i].size()), jvalue_capacity); + memcpy(jvalue_address, values[i].data(), copy_bytes); + } else { + // bad status for this + value_size.push_back(0); + } + } + + env->SetIntArrayRegion(jvalue_sizes, 0, num_keys, value_size.data()); +} + /* * Class: org_rocksdb_RocksDB * Method: multiGet * Signature: (J[[B[I[I)[[B */ jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3I_3I( - JNIEnv* env, jobject jdb, jlong jdb_handle, - jobjectArray jkeys, jintArray jkey_offs, jintArray jkey_lens) { + JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys, + jintArray jkey_offs, jintArray jkey_lens) { return multi_get_helper( env, jdb, reinterpret_cast(jdb_handle), ROCKSDB_NAMESPACE::ReadOptions(), jkeys, jkey_offs, jkey_lens, nullptr); @@ -1852,8 +2048,8 @@ jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3I_3I( * Signature: (J[[B[I[I[J)[[B */ jobjectArray Java_org_rocksdb_RocksDB_multiGet__J_3_3B_3I_3I_3J( - JNIEnv* env, jobject jdb, jlong jdb_handle, - jobjectArray jkeys, jintArray jkey_offs, jintArray jkey_lens, + JNIEnv* env, jobject jdb, jlong jdb_handle, jobjectArray jkeys, + jintArray jkey_offs, jintArray jkey_lens, jlongArray jcolumn_family_handles) { return multi_get_helper(env, jdb, reinterpret_cast(jdb_handle), @@ -1890,38 +2086,60 @@ jobjectArray Java_org_rocksdb_RocksDB_multiGet__JJ_3_3B_3I_3I_3J( jkey_offs, jkey_lens, jcolumn_family_handles); } +/* + * Class: org_rocksdb_RocksDB + * Method: multiGet + * Signature: + * (JJ[J[Ljava/nio/ByteBuffer;[I[I[Ljava/nio/ByteBuffer;[I[Lorg/rocksdb/Status;)V + */ +void Java_org_rocksdb_RocksDB_multiGet__JJ_3J_3Ljava_nio_ByteBuffer_2_3I_3I_3Ljava_nio_ByteBuffer_2_3I_3Lorg_rocksdb_Status_2( + JNIEnv* env, jobject jdb, jlong jdb_handle, jlong jropt_handle, + jlongArray jcolumn_family_handles, jobjectArray jkeys, + jintArray jkey_offsets, jintArray jkey_lengths, jobjectArray jvalues, + jintArray jvalues_sizes, jobjectArray jstatus_objects) { + return multi_get_helper_direct( + env, jdb, reinterpret_cast(jdb_handle), + *reinterpret_cast(jropt_handle), + jcolumn_family_handles, jkeys, jkey_offsets, jkey_lengths, jvalues, + jvalues_sizes, jstatus_objects); +} +// private native void +// multiGet(final long dbHandle, final long rOptHandle, +// final long[] columnFamilyHandles, final ByteBuffer[] keysArray, +// final ByteBuffer[] valuesArray); + ////////////////////////////////////////////////////////////////////////////// // ROCKSDB_NAMESPACE::DB::KeyMayExist bool key_may_exist_helper(JNIEnv* env, jlong jdb_handle, jlong jcf_handle, - jlong jread_opts_handle, - jbyteArray jkey, jint jkey_offset, jint jkey_len, - bool* has_exception, std::string* value, bool* value_found) { + jlong jread_opts_handle, jbyteArray jkey, + jint jkey_offset, jint jkey_len, bool* has_exception, + std::string* value, bool* value_found) { auto* db = reinterpret_cast(jdb_handle); ROCKSDB_NAMESPACE::ColumnFamilyHandle* cf_handle; if (jcf_handle == 0) { cf_handle = db->DefaultColumnFamily(); - } else { - cf_handle = - reinterpret_cast(jcf_handle); - } - ROCKSDB_NAMESPACE::ReadOptions read_opts = - jread_opts_handle == 0 - ? ROCKSDB_NAMESPACE::ReadOptions() - : *(reinterpret_cast( - jread_opts_handle)); - - jbyte* key = new jbyte[jkey_len]; - env->GetByteArrayRegion(jkey, jkey_offset, jkey_len, key); - if (env->ExceptionCheck()) { - // exception thrown: ArrayIndexOutOfBoundsException - delete[] key; - *has_exception = true; - return false; + } else { + cf_handle = + reinterpret_cast(jcf_handle); + } + ROCKSDB_NAMESPACE::ReadOptions read_opts = + jread_opts_handle == 0 + ? ROCKSDB_NAMESPACE::ReadOptions() + : *(reinterpret_cast( + jread_opts_handle)); + + jbyte* key = new jbyte[jkey_len]; + env->GetByteArrayRegion(jkey, jkey_offset, jkey_len, key); + if (env->ExceptionCheck()) { + // exception thrown: ArrayIndexOutOfBoundsException + delete[] key; + *has_exception = true; + return false; } ROCKSDB_NAMESPACE::Slice key_slice(reinterpret_cast(key), jkey_len); - const bool exists = db->KeyMayExist( - read_opts, cf_handle, key_slice, value, value_found); + const bool exists = + db->KeyMayExist(read_opts, cf_handle, key_slice, value, value_found); // cleanup delete[] key; diff --git a/java/src/main/java/org/rocksdb/ByteBufferGetStatus.java b/java/src/main/java/org/rocksdb/ByteBufferGetStatus.java new file mode 100644 index 000000000..b836480af --- /dev/null +++ b/java/src/main/java/org/rocksdb/ByteBufferGetStatus.java @@ -0,0 +1,44 @@ +package org.rocksdb; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * A ByteBuffer containing fetched data, together with a result for the fetch + * and the total size of the object fetched. + * + * Used for the individual results of + * {@link RocksDB#multiGetByteBuffers(List, List)} + * {@link RocksDB#multiGetByteBuffers(List, List, List)} + * {@link RocksDB#multiGetByteBuffers(ReadOptions, List, List)} + * {@link RocksDB#multiGetByteBuffers(ReadOptions, List, List, List)} + */ +public class ByteBufferGetStatus { + public final Status status; + public final int requiredSize; + public final ByteBuffer value; + + /** + * Constructor used for success status, when the value is contained in the buffer + * + * @param status the status of the request to fetch into the buffer + * @param requiredSize the size of the data, which may be bigger than the buffer + * @param value the buffer containing as much of the value as fits + */ + ByteBufferGetStatus(final Status status, final int requiredSize, final ByteBuffer value) { + this.status = status; + this.requiredSize = requiredSize; + this.value = value; + } + + /** + * Constructor used for a failure status, when no value is filled in + * + * @param status the status of the request to fetch into the buffer + */ + ByteBufferGetStatus(final Status status) { + this.status = status; + this.requiredSize = 0; + this.value = null; + } +} diff --git a/java/src/main/java/org/rocksdb/RocksDB.java b/java/src/main/java/org/rocksdb/RocksDB.java index be8ec02c4..211cb9c0d 100644 --- a/java/src/main/java/org/rocksdb/RocksDB.java +++ b/java/src/main/java/org/rocksdb/RocksDB.java @@ -31,14 +31,14 @@ public class RocksDB extends RocksObject { LOADED } - private static AtomicReference libraryLoaded - = new AtomicReference<>(LibraryState.NOT_LOADED); + private static final AtomicReference libraryLoaded = + new AtomicReference<>(LibraryState.NOT_LOADED); static { RocksDB.loadLibrary(); } - private List ownedColumnFamilyHandles = new ArrayList<>(); + private final List ownedColumnFamilyHandles = new ArrayList<>(); /** * Loads the necessary library files. @@ -2544,6 +2544,154 @@ public class RocksDB extends RocksObject { keysArray, keyOffsets, keyLengths, cfHandles)); } + /** + * Fetches a list of values for the given list of keys, all from the default column family. + * + * @param keys list of keys for which values need to be retrieved. + * @param values list of buffers to return retrieved values in + * @return list of number of bytes in DB for each requested key + * this can be more than the size of the corresponding buffer; then the buffer will be filled + * with the appropriate truncation of the database value. + * @throws RocksDBException if error happens in underlying native library. + * @throws IllegalArgumentException thrown if the number of passed keys and passed values + * do not match. + */ + public List multiGetByteBuffers( + final List keys, final List values) throws RocksDBException { + final ReadOptions readOptions = new ReadOptions(); + final List columnFamilyHandleList = new ArrayList<>(1); + columnFamilyHandleList.add(getDefaultColumnFamily()); + return multiGetByteBuffers(readOptions, columnFamilyHandleList, keys, values); + } + + /** + * Fetches a list of values for the given list of keys, all from the default column family. + * + * @param readOptions Read options + * @param keys list of keys for which values need to be retrieved. + * @param values list of buffers to return retrieved values in + * @throws RocksDBException if error happens in underlying native library. + * @throws IllegalArgumentException thrown if the number of passed keys and passed values + * do not match. + */ + public List multiGetByteBuffers(final ReadOptions readOptions, + final List keys, final List values) throws RocksDBException { + final List columnFamilyHandleList = new ArrayList<>(1); + columnFamilyHandleList.add(getDefaultColumnFamily()); + return multiGetByteBuffers(readOptions, columnFamilyHandleList, keys, values); + } + + /** + * Fetches a list of values for the given list of keys. + *

+ * Note: Every key needs to have a related column family name in + * {@code columnFamilyHandleList}. + *

+ * + * @param columnFamilyHandleList {@link java.util.List} containing + * {@link org.rocksdb.ColumnFamilyHandle} instances. + * @param keys list of keys for which values need to be retrieved. + * @param values list of buffers to return retrieved values in + * @throws RocksDBException if error happens in underlying native library. + * @throws IllegalArgumentException thrown if the number of passed keys, passed values and + * passed column family handles do not match. + */ + public List multiGetByteBuffers( + final List columnFamilyHandleList, final List keys, + final List values) throws RocksDBException { + final ReadOptions readOptions = new ReadOptions(); + return multiGetByteBuffers(readOptions, columnFamilyHandleList, keys, values); + } + + /** + * Fetches a list of values for the given list of keys. + *

+ * Note: Every key needs to have a related column family name in + * {@code columnFamilyHandleList}. + *

+ * + * @param readOptions Read options + * @param columnFamilyHandleList {@link java.util.List} containing + * {@link org.rocksdb.ColumnFamilyHandle} instances. + * @param keys list of keys for which values need to be retrieved. + * @param values list of buffers to return retrieved values in + * @throws RocksDBException if error happens in underlying native library. + * @throws IllegalArgumentException thrown if the number of passed keys, passed values and + * passed column family handles do not match. + */ + public List multiGetByteBuffers(final ReadOptions readOptions, + final List columnFamilyHandleList, final List keys, + final List values) throws RocksDBException { + assert (keys.size() != 0); + + // Check if key size equals cfList size. If not a exception must be + // thrown. If not a Segmentation fault happens. + if (keys.size() != columnFamilyHandleList.size() && columnFamilyHandleList.size() > 1) { + throw new IllegalArgumentException( + "Wrong number of ColumnFamilyHandle(s) supplied. Provide 0, 1, or as many as there are key/value(s)"); + } + + // Check if key size equals cfList size. If not a exception must be + // thrown. If not a Segmentation fault happens. + if (values.size() != keys.size()) { + throw new IllegalArgumentException("For each key there must be a corresponding value."); + } + + // TODO (AP) support indirect buffers + for (final ByteBuffer key : keys) { + if (!key.isDirect()) { + throw new IllegalArgumentException("All key buffers must be direct byte buffers"); + } + } + + // TODO (AP) support indirect buffers, though probably via a less efficient code path + for (final ByteBuffer value : values) { + if (!value.isDirect()) { + throw new IllegalArgumentException("All value buffers must be direct byte buffers"); + } + } + + final int numCFHandles = columnFamilyHandleList.size(); + final long[] cfHandles = new long[numCFHandles]; + for (int i = 0; i < numCFHandles; i++) { + cfHandles[i] = columnFamilyHandleList.get(i).nativeHandle_; + } + + final int numValues = keys.size(); + + final ByteBuffer[] keysArray = keys.toArray(new ByteBuffer[0]); + final int[] keyOffsets = new int[numValues]; + final int[] keyLengths = new int[numValues]; + for (int i = 0; i < numValues; i++) { + // TODO (AP) add keysArray[i].arrayOffset() if the buffer is indirect + // TODO (AP) because in that case we have to pass the array directly, + // so that the JNI C++ code will not know to compensate for the array offset + keyOffsets[i] = keysArray[i].position(); + keyLengths[i] = keysArray[i].limit(); + } + final ByteBuffer[] valuesArray = values.toArray(new ByteBuffer[0]); + final int[] valuesSizeArray = new int[numValues]; + final Status[] statusArray = new Status[numValues]; + + multiGet(nativeHandle_, readOptions.nativeHandle_, cfHandles, keysArray, keyOffsets, keyLengths, + valuesArray, valuesSizeArray, statusArray); + + final List results = new ArrayList<>(); + for (int i = 0; i < numValues; i++) { + final Status status = statusArray[i]; + if (status.getCode() == Status.Code.Ok) { + final ByteBuffer value = valuesArray[i]; + value.position(Math.min(valuesSizeArray[i], value.capacity())); + value.flip(); // prepare for read out + results.add(new ByteBufferGetStatus(status, valuesSizeArray[i], value)); + } else { + results.add(new ByteBufferGetStatus(status)); + } + } + + return results; + } + /** * If the key definitely does not exist in the database, then this method * returns false, otherwise it returns true if the key might exist. @@ -4841,6 +4989,12 @@ public class RocksDB extends RocksObject { private native byte[][] multiGet(final long dbHandle, final long rOptHandle, final byte[][] keys, final int[] keyOffsets, final int[] keyLengths, final long[] columnFamilyHandles); + + private native void multiGet(final long dbHandle, final long rOptHandle, + final long[] columnFamilyHandles, final ByteBuffer[] keysArray, final int[] keyOffsets, + final int[] keyLengths, final ByteBuffer[] valuesArray, final int[] valuesSizeArray, + final Status[] statusArray); + private native boolean keyMayExist( final long handle, final long cfHandle, final long readOptHandle, final byte[] key, final int keyOffset, final int keyLength); @@ -4887,9 +5041,9 @@ public class RocksDB extends RocksObject { private native long[] getApproximateSizes(final long nativeHandle, final long columnFamilyHandle, final long[] rangeSliceHandles, final byte includeFlags); - private final native long[] getApproximateMemTableStats( - final long nativeHandle, final long columnFamilyHandle, - final long rangeStartSliceHandle, final long rangeLimitSliceHandle); + private native long[] getApproximateMemTableStats(final long nativeHandle, + final long columnFamilyHandle, final long rangeStartSliceHandle, + final long rangeLimitSliceHandle); private native void compactRange(final long handle, /* @Nullable */ final byte[] begin, final int beginLen, /* @Nullable */ final byte[] end, final int endLen, diff --git a/java/src/test/java/org/rocksdb/MultiGetTest.java b/java/src/test/java/org/rocksdb/MultiGetTest.java index ce8212409..323a6b1f4 100644 --- a/java/src/test/java/org/rocksdb/MultiGetTest.java +++ b/java/src/test/java/org/rocksdb/MultiGetTest.java @@ -5,12 +5,16 @@ package org.rocksdb; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.rocksdb.util.TestUtil; public class MultiGetTest { @Rule public TemporaryFolder dbFolder = new TemporaryFolder(); @@ -31,4 +35,491 @@ public class MultiGetTest { assertThat(values.get(2)).isEqualTo("value3ForKey3".getBytes()); } } + + @Test + public void putNThenMultiGetDirect() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + db.put("key1".getBytes(), "value1ForKey1".getBytes()); + db.put("key2".getBytes(), "value2ForKey2".getBytes()); + db.put("key3".getBytes(), "value3ForKey3".getBytes()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key1".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + + { + final List results = db.multiGetByteBuffers(keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value1ForKey1".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("value2ForKey2".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value3ForKey3".getBytes()); + } + + { + final List results = + db.multiGetByteBuffers(new ReadOptions(), keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value1ForKey1".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("value2ForKey2".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value3ForKey3".getBytes()); + } + } + } + + @Test + public void putNThenMultiGetDirectSliced() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + db.put("key1".getBytes(), "value1ForKey1".getBytes()); + db.put("key2".getBytes(), "value2ForKey2".getBytes()); + db.put("key3".getBytes(), "value3ForKey3".getBytes()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + keys.add( + ByteBuffer.allocateDirect(12).put("prefix1".getBytes()).slice().put("key1".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + + { + final List results = db.multiGetByteBuffers(keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value2ForKey2".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("value3ForKey3".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value1ForKey1".getBytes()); + } + } + } + + @Test + public void putNThenMultiGetDirectBadValuesArray() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + db.put("key1".getBytes(), "value1ForKey1".getBytes()); + db.put("key2".getBytes(), "value2ForKey2".getBytes()); + db.put("key3".getBytes(), "value3ForKey3".getBytes()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key1".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + + { + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + + values.remove(0); + + try { + db.multiGetByteBuffers(keys, values); + fail("Expected exception when not enough value ByteBuffers supplied"); + } catch (final IllegalArgumentException e) { + assertThat(e.getMessage()).contains("For each key there must be a corresponding value"); + } + } + + { + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + + values.add(ByteBuffer.allocateDirect(24)); + + try { + db.multiGetByteBuffers(keys, values); + fail("Expected exception when too many value ByteBuffers supplied"); + } catch (final IllegalArgumentException e) { + assertThat(e.getMessage()).contains("For each key there must be a corresponding value"); + } + } + } + } + + @Test + public void putNThenMultiGetDirectShortValueBuffers() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + db.put("key1".getBytes(), "value1ForKey1".getBytes()); + db.put("key2".getBytes(), "value2ForKey2".getBytes()); + db.put("key3".getBytes(), "value3ForKey3".getBytes()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key1".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + + { + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(4)); + } + + final List statii = db.multiGetByteBuffers(keys, values); + assertThat(statii.size()).isEqualTo(values.size()); + for (final ByteBufferGetStatus status : statii) { + assertThat(status.status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(status.requiredSize).isEqualTo("value3ForKey3".getBytes().length); + final ByteBuffer expected = + ByteBuffer.allocateDirect(24).put(Arrays.copyOf("valueX".getBytes(), 4)); + expected.flip(); + assertThat(status.value).isEqualTo(expected); + } + } + } + } + + @Test + public void putNThenMultiGetDirectNondefaultCF() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + final List cfDescriptors = new ArrayList<>(0); + cfDescriptors.add(new ColumnFamilyDescriptor("cf0".getBytes())); + cfDescriptors.add(new ColumnFamilyDescriptor("cf1".getBytes())); + cfDescriptors.add(new ColumnFamilyDescriptor("cf2".getBytes())); + + final List cf = db.createColumnFamilies(cfDescriptors); + + db.put(cf.get(0), "key1".getBytes(), "value1ForKey1".getBytes()); + db.put(cf.get(0), "key2".getBytes(), "value2ForKey2".getBytes()); + db.put(cf.get(0), "key3".getBytes(), "value3ForKey3".getBytes()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key1".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + + { + final List results = db.multiGetByteBuffers(keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.NotFound); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.NotFound); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.NotFound); + } + + { + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(cf.get(0)); + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value1ForKey1".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("value2ForKey2".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value3ForKey3".getBytes()); + } + + { + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(cf.get(0)); + columnFamilyHandles.add(cf.get(0)); + columnFamilyHandles.add(cf.get(0)); + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value1ForKey1".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("value2ForKey2".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value3ForKey3".getBytes()); + } + } + } + + @Test + public void putNThenMultiGetDirectCFParams() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + db.put("key1".getBytes(), "value1ForKey1".getBytes()); + db.put("key2".getBytes(), "value2ForKey2".getBytes()); + db.put("key3".getBytes(), "value3ForKey3".getBytes()); + + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(db.getDefaultColumnFamily()); + columnFamilyHandles.add(db.getDefaultColumnFamily()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key1".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + try { + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + fail("Expected exception when 2 column families supplied"); + } catch (final IllegalArgumentException e) { + assertThat(e.getMessage()).contains("Wrong number of ColumnFamilyHandle(s) supplied"); + } + + columnFamilyHandles.clear(); + columnFamilyHandles.add(db.getDefaultColumnFamily()); + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)).isEqualTo("value1ForKey1".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)).isEqualTo("value2ForKey2".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)).isEqualTo("value3ForKey3".getBytes()); + } + } + + @Test + public void putNThenMultiGetDirectMixedCF() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + final List cfDescriptors = new ArrayList<>(); + cfDescriptors.add(new ColumnFamilyDescriptor("cf0".getBytes())); + cfDescriptors.add(new ColumnFamilyDescriptor("cf1".getBytes())); + cfDescriptors.add(new ColumnFamilyDescriptor("cf2".getBytes())); + cfDescriptors.add(new ColumnFamilyDescriptor("cf3".getBytes())); + + final List cf = db.createColumnFamilies(cfDescriptors); + + db.put(cf.get(1), "key1".getBytes(), "value1ForKey1".getBytes()); + db.put("key2".getBytes(), "value2ForKey2".getBytes()); + db.put(cf.get(3), "key3".getBytes(), "value3ForKey3".getBytes()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key1".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + + { + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(db.getDefaultColumnFamily()); + + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.NotFound); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.NotFound); + + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("value2ForKey2".getBytes()); + } + + { + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(cf.get(1)); + + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.NotFound); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.NotFound); + + assertThat(results.get(0).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value1ForKey1".getBytes()); + } + + { + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(cf.get(1)); + columnFamilyHandles.add(db.getDefaultColumnFamily()); + columnFamilyHandles.add(cf.get(3)); + + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + assertThat(results.get(1).requiredSize).isEqualTo("value2ForKey2".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value1ForKey1".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("value2ForKey2".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value3ForKey3".getBytes()); + } + + { + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(db.getDefaultColumnFamily()); + columnFamilyHandles.add(cf.get(1)); + columnFamilyHandles.add(cf.get(3)); + + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.NotFound); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.NotFound); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value3ForKey3".getBytes()); + } + } + } + + @Test + public void putNThenMultiGetDirectTruncateCF() throws RocksDBException { + try (final Options opt = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { + final List cfDescriptors = new ArrayList<>(); + cfDescriptors.add(new ColumnFamilyDescriptor("cf0".getBytes())); + + final List cf = db.createColumnFamilies(cfDescriptors); + + db.put(cf.get(0), "key1".getBytes(), "value1ForKey1".getBytes()); + db.put(cf.get(0), "key2".getBytes(), "value2ForKey2WithLotsOfTrailingGarbage".getBytes()); + db.put(cf.get(0), "key3".getBytes(), "value3ForKey3".getBytes()); + + final List keys = new ArrayList<>(); + keys.add(ByteBuffer.allocateDirect(12).put("key1".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key2".getBytes())); + keys.add(ByteBuffer.allocateDirect(12).put("key3".getBytes())); + // Java8 and lower flip() returns Buffer not ByteBuffer, so can't chain above /\/\ + for (final ByteBuffer key : keys) { + key.flip(); + } + final List values = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + values.add(ByteBuffer.allocateDirect(24)); + } + + { + final List columnFamilyHandles = new ArrayList<>(); + columnFamilyHandles.add(cf.get(0)); + final List results = + db.multiGetByteBuffers(columnFamilyHandles, keys, values); + + assertThat(results.get(0).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(1).status.getCode()).isEqualTo(Status.Code.Ok); + assertThat(results.get(2).status.getCode()).isEqualTo(Status.Code.Ok); + + assertThat(results.get(0).requiredSize).isEqualTo("value1ForKey1".getBytes().length); + assertThat(results.get(1).requiredSize) + .isEqualTo("value2ForKey2WithLotsOfTrailingGarbage".getBytes().length); + assertThat(results.get(2).requiredSize).isEqualTo("value3ForKey3".getBytes().length); + + assertThat(TestUtil.bufferBytes(results.get(0).value)) + .isEqualTo("value1ForKey1".getBytes()); + assertThat(TestUtil.bufferBytes(results.get(1).value)) + .isEqualTo("valu e2Fo rKey 2Wit hLot sOfT".replace(" ", "").getBytes()); + assertThat(TestUtil.bufferBytes(results.get(2).value)) + .isEqualTo("value3ForKey3".getBytes()); + } + } + } } diff --git a/java/src/test/java/org/rocksdb/util/TestUtil.java b/java/src/test/java/org/rocksdb/util/TestUtil.java index 57347b084..e4f490c8e 100644 --- a/java/src/test/java/org/rocksdb/util/TestUtil.java +++ b/java/src/test/java/org/rocksdb/util/TestUtil.java @@ -5,14 +5,14 @@ package org.rocksdb.util; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.ByteBuffer; +import java.util.Random; import org.rocksdb.CompactionPriority; import org.rocksdb.Options; import org.rocksdb.WALRecoveryMode; -import java.util.Random; - -import static java.nio.charset.StandardCharsets.UTF_8; - /** * General test utilities. */ @@ -58,4 +58,15 @@ public class TestUtil { random.nextBytes(str); return str; } + + /** + * Copy a {@link ByteBuffer} into an array for shorthand ease of test coding + * @param byteBuffer the buffer to copy + * @return a {@link byte[]} containing the same bytes as the input + */ + public static byte[] bufferBytes(final ByteBuffer byteBuffer) { + final byte[] result = new byte[byteBuffer.limit()]; + byteBuffer.get(result); + return result; + } }