RocksJava API - fix Transaction.multiGet() size limit, remove bogus EnsureLocalCapacity() calls (#10674)

Summary:
Resolves see https://github.com/facebook/rocksdb/issues/9006

Fixes 2 related issues with JNI local references in the RocksJava API.

1. Some instances of RocksJava API JNI code appear to have misunderstood the reason for `JNIEnv->EnsureLocalCapacity()` and are carrying out bogus checks which happen to fail with some larger parameter values (many column families in a single call, very long key names or values). Remove these checks and add some regression tests for the previous failures.

2. The helper for Transaction multiGet operations (`multiGet()`, `multiGetForUpdate()`,...) is limited in the number of keys it can `get()` for because it requires a corresponding number of live local references. Refactor the helper slightly, copying out the key contents within a loop so that the references don't have to exist at the same time.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10674

Reviewed By: ajkr

Differential Revision: D40515361

Pulled By: jay-zhuang

fbshipit-source-id: f1be0126181a698b3ad27c0945a39c54d950aa25
main
Alan Paxton 2 years ago committed by Facebook GitHub Bot
parent bf78380851
commit 17553bdd5e
  1. 1
      java/Makefile
  2. 14
      java/rocksjni/optimistic_transaction_db.cc
  3. 77
      java/rocksjni/transaction.cc
  4. 14
      java/rocksjni/transaction_db.cc
  5. 140
      java/src/test/java/org/rocksdb/MultiColumnRegressionTest.java
  6. 217
      java/src/test/java/org/rocksdb/MultiGetManyKeysTest.java
  7. 158
      java/src/test/java/org/rocksdb/PutMultiplePartsTest.java

@ -146,6 +146,7 @@ JAVA_TESTS = \
org.rocksdb.MemoryUtilTest\ org.rocksdb.MemoryUtilTest\
org.rocksdb.MemTableTest\ org.rocksdb.MemTableTest\
org.rocksdb.MergeTest\ org.rocksdb.MergeTest\
org.rocksdb.MultiColumnRegressionTest \
org.rocksdb.MultiGetManyKeysTest\ org.rocksdb.MultiGetManyKeysTest\
org.rocksdb.MultiGetTest\ org.rocksdb.MultiGetTest\
org.rocksdb.MixedOptionsTest\ org.rocksdb.MixedOptionsTest\

@ -63,12 +63,6 @@ Java_org_rocksdb_OptimisticTransactionDB_open__JLjava_lang_String_2_3_3B_3J(
std::vector<ROCKSDB_NAMESPACE::ColumnFamilyDescriptor> column_families; std::vector<ROCKSDB_NAMESPACE::ColumnFamilyDescriptor> column_families;
const jsize len_cols = env->GetArrayLength(jcolumn_names); const jsize len_cols = env->GetArrayLength(jcolumn_names);
if (len_cols > 0) { if (len_cols > 0) {
if (env->EnsureLocalCapacity(len_cols) != 0) {
// out of memory
env->ReleaseStringUTFChars(jdb_path, db_path);
return nullptr;
}
jlong* jco = env->GetLongArrayElements(jcolumn_options_handles, nullptr); jlong* jco = env->GetLongArrayElements(jcolumn_options_handles, nullptr);
if (jco == nullptr) { if (jco == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
@ -87,14 +81,6 @@ Java_org_rocksdb_OptimisticTransactionDB_open__JLjava_lang_String_2_3_3B_3J(
const jbyteArray jcn_ba = reinterpret_cast<jbyteArray>(jcn); const jbyteArray jcn_ba = reinterpret_cast<jbyteArray>(jcn);
const jsize jcf_name_len = env->GetArrayLength(jcn_ba); const jsize jcf_name_len = env->GetArrayLength(jcn_ba);
if (env->EnsureLocalCapacity(jcf_name_len) != 0) {
// out of memory
env->DeleteLocalRef(jcn);
env->ReleaseLongArrayElements(jcolumn_options_handles, jco, JNI_ABORT);
env->ReleaseStringUTFChars(jdb_path, db_path);
return nullptr;
}
jbyte* jcf_name = env->GetByteArrayElements(jcn_ba, nullptr); jbyte* jcf_name = env->GetByteArrayElements(jcn_ba, nullptr);
if (jcf_name == nullptr) { if (jcf_name == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError

@ -252,12 +252,6 @@ std::vector<ROCKSDB_NAMESPACE::ColumnFamilyHandle*> txn_column_families_helper(
if (jcolumn_family_handles != nullptr) { if (jcolumn_family_handles != nullptr) {
const jsize len_cols = env->GetArrayLength(jcolumn_family_handles); const jsize len_cols = env->GetArrayLength(jcolumn_family_handles);
if (len_cols > 0) { if (len_cols > 0) {
if (env->EnsureLocalCapacity(len_cols) != 0) {
// out of memory
*has_exception = JNI_TRUE;
return std::vector<ROCKSDB_NAMESPACE::ColumnFamilyHandle*>();
}
jlong* jcfh = env->GetLongArrayElements(jcolumn_family_handles, nullptr); jlong* jcfh = env->GetLongArrayElements(jcolumn_family_handles, nullptr);
if (jcfh == nullptr) { if (jcfh == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
@ -293,47 +287,48 @@ void free_parts(
} }
} }
void free_key_values(std::vector<jbyte*>& keys_to_free) {
for (auto& key : keys_to_free) {
delete[] key;
}
}
// TODO(AR) consider refactoring to share this between here and rocksjni.cc // TODO(AR) consider refactoring to share this between here and rocksjni.cc
// cf multi get // cf multi get
jobjectArray txn_multi_get_helper(JNIEnv* env, const FnMultiGet& fn_multi_get, jobjectArray txn_multi_get_helper(JNIEnv* env, const FnMultiGet& fn_multi_get,
const jlong& jread_options_handle, const jlong& jread_options_handle,
const jobjectArray& jkey_parts) { const jobjectArray& jkey_parts) {
const jsize len_key_parts = env->GetArrayLength(jkey_parts); const jsize len_key_parts = env->GetArrayLength(jkey_parts);
if (env->EnsureLocalCapacity(len_key_parts) != 0) {
// out of memory
return nullptr;
}
std::vector<ROCKSDB_NAMESPACE::Slice> key_parts; std::vector<ROCKSDB_NAMESPACE::Slice> key_parts;
std::vector<std::tuple<jbyteArray, jbyte*, jobject>> key_parts_to_free; std::vector<jbyte*> keys_to_free;
for (int i = 0; i < len_key_parts; i++) { for (int i = 0; i < len_key_parts; i++) {
const jobject jk = env->GetObjectArrayElement(jkey_parts, i); const jobject jk = env->GetObjectArrayElement(jkey_parts, i);
if (env->ExceptionCheck()) { if (env->ExceptionCheck()) {
// exception thrown: ArrayIndexOutOfBoundsException // exception thrown: ArrayIndexOutOfBoundsException
free_parts(env, key_parts_to_free); free_key_values(keys_to_free);
return nullptr; return nullptr;
} }
jbyteArray jk_ba = reinterpret_cast<jbyteArray>(jk); jbyteArray jk_ba = reinterpret_cast<jbyteArray>(jk);
const jsize len_key = env->GetArrayLength(jk_ba); const jsize len_key = env->GetArrayLength(jk_ba);
if (env->EnsureLocalCapacity(len_key) != 0) { jbyte* jk_val = new jbyte[len_key];
// out of memory
env->DeleteLocalRef(jk);
free_parts(env, key_parts_to_free);
return nullptr;
}
jbyte* jk_val = env->GetByteArrayElements(jk_ba, nullptr);
if (jk_val == nullptr) { if (jk_val == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
env->DeleteLocalRef(jk); env->DeleteLocalRef(jk);
free_parts(env, key_parts_to_free); free_key_values(keys_to_free);
jclass exception_cls = (env)->FindClass("java/lang/OutOfMemoryError");
(env)->ThrowNew(exception_cls,
"Insufficient Memory for CF handle array.");
return nullptr; return nullptr;
} }
env->GetByteArrayRegion(jk_ba, 0, len_key, jk_val);
ROCKSDB_NAMESPACE::Slice key_slice(reinterpret_cast<char*>(jk_val), ROCKSDB_NAMESPACE::Slice key_slice(reinterpret_cast<char*>(jk_val),
len_key); len_key);
key_parts.push_back(key_slice); key_parts.push_back(key_slice);
keys_to_free.push_back(jk_val);
key_parts_to_free.push_back(std::make_tuple(jk_ba, jk_val, jk)); env->DeleteLocalRef(jk);
} }
auto* read_options = auto* read_options =
@ -343,7 +338,7 @@ jobjectArray txn_multi_get_helper(JNIEnv* env, const FnMultiGet& fn_multi_get,
fn_multi_get(*read_options, key_parts, &value_parts); fn_multi_get(*read_options, key_parts, &value_parts);
// free up allocated byte arrays // free up allocated byte arrays
free_parts(env, key_parts_to_free); free_key_values(keys_to_free);
// prepare the results // prepare the results
const jclass jcls_ba = env->FindClass("[B"); const jclass jcls_ba = env->FindClass("[B");
@ -658,6 +653,20 @@ void txn_write_kv_parts_helper(JNIEnv* env,
auto value_parts = std::vector<ROCKSDB_NAMESPACE::Slice>(); auto value_parts = std::vector<ROCKSDB_NAMESPACE::Slice>();
auto jparts_to_free = std::vector<std::tuple<jbyteArray, jbyte*, jobject>>(); auto jparts_to_free = std::vector<std::tuple<jbyteArray, jbyte*, jobject>>();
// Since this is fundamentally a gather write at the RocksDB level,
// it seems wrong to refactor it by copying (gathering) keys and data here,
// in order to avoid the local reference limit.
// The user needs to be a aware that there is a limit to the number of parts
// which can be gathered.
if (env->EnsureLocalCapacity(jkey_parts_len + jvalue_parts_len) != 0) {
// no space for all the jobjects we store up
env->ExceptionClear();
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(
env, "Insufficient JNI local references for " +
std::to_string(jkey_parts_len) + " key/value parts");
return;
}
// convert java key_parts/value_parts byte[][] to Slice(s) // convert java key_parts/value_parts byte[][] to Slice(s)
for (jsize i = 0; i < jkey_parts_len; ++i) { for (jsize i = 0; i < jkey_parts_len; ++i) {
const jobject jobj_key_part = env->GetObjectArrayElement(jkey_parts, i); const jobject jobj_key_part = env->GetObjectArrayElement(jkey_parts, i);
@ -676,13 +685,6 @@ void txn_write_kv_parts_helper(JNIEnv* env,
const jbyteArray jba_key_part = reinterpret_cast<jbyteArray>(jobj_key_part); const jbyteArray jba_key_part = reinterpret_cast<jbyteArray>(jobj_key_part);
const jsize jkey_part_len = env->GetArrayLength(jba_key_part); const jsize jkey_part_len = env->GetArrayLength(jba_key_part);
if (env->EnsureLocalCapacity(jkey_part_len) != 0) {
// out of memory
env->DeleteLocalRef(jobj_value_part);
env->DeleteLocalRef(jobj_key_part);
free_parts(env, jparts_to_free);
return;
}
jbyte* jkey_part = env->GetByteArrayElements(jba_key_part, nullptr); jbyte* jkey_part = env->GetByteArrayElements(jba_key_part, nullptr);
if (jkey_part == nullptr) { if (jkey_part == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
@ -695,18 +697,9 @@ void txn_write_kv_parts_helper(JNIEnv* env,
const jbyteArray jba_value_part = const jbyteArray jba_value_part =
reinterpret_cast<jbyteArray>(jobj_value_part); reinterpret_cast<jbyteArray>(jobj_value_part);
const jsize jvalue_part_len = env->GetArrayLength(jba_value_part); const jsize jvalue_part_len = env->GetArrayLength(jba_value_part);
if (env->EnsureLocalCapacity(jvalue_part_len) != 0) {
// out of memory
env->DeleteLocalRef(jobj_value_part);
env->DeleteLocalRef(jobj_key_part);
env->ReleaseByteArrayElements(jba_key_part, jkey_part, JNI_ABORT);
free_parts(env, jparts_to_free);
return;
}
jbyte* jvalue_part = env->GetByteArrayElements(jba_value_part, nullptr); jbyte* jvalue_part = env->GetByteArrayElements(jba_value_part, nullptr);
if (jvalue_part == nullptr) { if (jvalue_part == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
env->ReleaseByteArrayElements(jba_value_part, jvalue_part, JNI_ABORT);
env->DeleteLocalRef(jobj_value_part); env->DeleteLocalRef(jobj_value_part);
env->DeleteLocalRef(jobj_key_part); env->DeleteLocalRef(jobj_key_part);
env->ReleaseByteArrayElements(jba_key_part, jkey_part, JNI_ABORT); env->ReleaseByteArrayElements(jba_key_part, jkey_part, JNI_ABORT);
@ -911,12 +904,6 @@ void txn_write_k_parts_helper(JNIEnv* env,
const jbyteArray jba_key_part = reinterpret_cast<jbyteArray>(jobj_key_part); const jbyteArray jba_key_part = reinterpret_cast<jbyteArray>(jobj_key_part);
const jsize jkey_part_len = env->GetArrayLength(jba_key_part); const jsize jkey_part_len = env->GetArrayLength(jba_key_part);
if (env->EnsureLocalCapacity(jkey_part_len) != 0) {
// out of memory
env->DeleteLocalRef(jobj_key_part);
free_parts(env, jkey_parts_to_free);
return;
}
jbyte* jkey_part = env->GetByteArrayElements(jba_key_part, nullptr); jbyte* jkey_part = env->GetByteArrayElements(jba_key_part, nullptr);
if (jkey_part == nullptr) { if (jkey_part == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError

@ -67,12 +67,6 @@ jlongArray Java_org_rocksdb_TransactionDB_open__JJLjava_lang_String_2_3_3B_3J(
} }
const jsize len_cols = env->GetArrayLength(jcolumn_names); const jsize len_cols = env->GetArrayLength(jcolumn_names);
if (env->EnsureLocalCapacity(len_cols) != 0) {
// out of memory
env->ReleaseStringUTFChars(jdb_path, db_path);
return nullptr;
}
jlong* jco = env->GetLongArrayElements(jcolumn_options_handles, nullptr); jlong* jco = env->GetLongArrayElements(jcolumn_options_handles, nullptr);
if (jco == nullptr) { if (jco == nullptr) {
// exception thrown: OutOfMemoryError // exception thrown: OutOfMemoryError
@ -99,14 +93,6 @@ jlongArray Java_org_rocksdb_TransactionDB_open__JJLjava_lang_String_2_3_3B_3J(
} }
const int jcf_name_len = env->GetArrayLength(jcn_ba); const int jcf_name_len = env->GetArrayLength(jcn_ba);
if (env->EnsureLocalCapacity(jcf_name_len) != 0) {
// out of memory
env->ReleaseByteArrayElements(jcn_ba, jcf_name, JNI_ABORT);
env->DeleteLocalRef(jcn);
env->ReleaseLongArrayElements(jcolumn_options_handles, jco, JNI_ABORT);
env->ReleaseStringUTFChars(jdb_path, db_path);
return nullptr;
}
const std::string cf_name(reinterpret_cast<char*>(jcf_name), jcf_name_len); const std::string cf_name(reinterpret_cast<char*>(jcf_name), jcf_name_len);
const ROCKSDB_NAMESPACE::ColumnFamilyOptions* cf_options = const ROCKSDB_NAMESPACE::ColumnFamilyOptions* cf_options =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jco[i]); reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyOptions*>(jco[i]);

@ -0,0 +1,140 @@
package org.rocksdb;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Test for changes made by
* <a link="https://github.com/facebook/rocksdb/issues/9006">transactional multiGet problem</a>
* the tests here were previously broken by the nonsense removed by that change.
*/
@RunWith(Parameterized.class)
public class MultiColumnRegressionTest {
@Parameterized.Parameters
public static List<Params> data() {
return Arrays.asList(new Params(3, 100), new Params(3, 1000000));
}
public static class Params {
final int numColumns;
final int keySize;
public Params(final int numColumns, final int keySize) {
this.numColumns = numColumns;
this.keySize = keySize;
}
}
@Rule public TemporaryFolder dbFolder = new TemporaryFolder();
private final Params params;
public MultiColumnRegressionTest(final Params params) {
this.params = params;
}
@Test
public void transactionDB() throws RocksDBException {
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
for (int i = 0; i < params.numColumns; i++) {
StringBuilder sb = new StringBuilder();
sb.append("cf" + i);
for (int j = 0; j < params.keySize; j++) sb.append("_cf");
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(sb.toString().getBytes()));
}
try (final Options opt = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) {
final List<ColumnFamilyHandle> columnFamilyHandles =
db.createColumnFamilies(columnFamilyDescriptors);
}
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
try (final TransactionDB tdb = TransactionDB.open(new DBOptions().setCreateIfMissing(true),
new TransactionDBOptions(), dbFolder.getRoot().getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles)) {
final WriteOptions writeOptions = new WriteOptions();
try (Transaction transaction = tdb.beginTransaction(writeOptions)) {
for (int i = 0; i < params.numColumns; i++) {
transaction.put(
columnFamilyHandles.get(i), ("key" + i).getBytes(), ("value" + (i - 7)).getBytes());
}
transaction.put("key".getBytes(), "value".getBytes());
transaction.commit();
}
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
columnFamilyHandle.close();
}
}
final List<ColumnFamilyHandle> columnFamilyHandles2 = new ArrayList<>();
try (final TransactionDB tdb = TransactionDB.open(new DBOptions().setCreateIfMissing(true),
new TransactionDBOptions(), dbFolder.getRoot().getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles2)) {
try (Transaction transaction = tdb.beginTransaction(new WriteOptions())) {
final ReadOptions readOptions = new ReadOptions();
for (int i = 0; i < params.numColumns; i++) {
final byte[] value =
transaction.get(columnFamilyHandles2.get(i), readOptions, ("key" + i).getBytes());
assertThat(value).isEqualTo(("value" + (i - 7)).getBytes());
}
transaction.commit();
}
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles2) {
columnFamilyHandle.close();
}
}
}
@Test
public void optimisticDB() throws RocksDBException {
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
for (int i = 0; i < params.numColumns; i++) {
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
}
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
try (final OptimisticTransactionDB otdb = OptimisticTransactionDB.open(
new DBOptions().setCreateIfMissing(true), dbFolder.getRoot().getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles)) {
try (Transaction transaction = otdb.beginTransaction(new WriteOptions())) {
for (int i = 0; i < params.numColumns; i++) {
transaction.put(
columnFamilyHandles.get(i), ("key" + i).getBytes(), ("value" + (i - 7)).getBytes());
}
transaction.put("key".getBytes(), "value".getBytes());
transaction.commit();
}
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
columnFamilyHandle.close();
}
}
final List<ColumnFamilyHandle> columnFamilyHandles2 = new ArrayList<>();
try (final OptimisticTransactionDB otdb = OptimisticTransactionDB.open(
new DBOptions().setCreateIfMissing(true), dbFolder.getRoot().getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles2)) {
try (Transaction transaction = otdb.beginTransaction(new WriteOptions())) {
final ReadOptions readOptions = new ReadOptions();
for (int i = 0; i < params.numColumns; i++) {
final byte[] value =
transaction.get(columnFamilyHandles2.get(i), readOptions, ("key" + i).getBytes());
assertThat(value).isEqualTo(("value" + (i - 7)).getBytes());
}
transaction.commit();
}
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles2) {
columnFamilyHandle.close();
}
}
}
}

@ -6,7 +6,6 @@ package org.rocksdb;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -18,53 +17,225 @@ import org.junit.runners.Parameterized;
public class MultiGetManyKeysTest { public class MultiGetManyKeysTest {
@Parameterized.Parameters @Parameterized.Parameters
public static List<Integer> data() { public static List<Integer> data() {
return Arrays.asList(3, 250, 60000, 70000, 150000, 750000); return Arrays.asList(2, 3, 250, 60000, 70000, 150000, 750000);
} }
@Rule public TemporaryFolder dbFolder = new TemporaryFolder(); @Rule public TemporaryFolder dbFolder = new TemporaryFolder();
private final int keySize; private final int numKeys;
public MultiGetManyKeysTest(final Integer keySize) { public MultiGetManyKeysTest(final Integer numKeys) {
this.keySize = keySize; this.numKeys = numKeys;
} }
/** /**
* Test for https://github.com/facebook/rocksdb/issues/8039 * Test for <a link="https://github.com/facebook/rocksdb/issues/8039">multiGet problem</a>
*/ */
@Test @Test
public void multiGetAsListLarge() throws RocksDBException { public void multiGetAsListLarge() throws RocksDBException {
final Random rand = new Random(); final List<byte[]> keys = generateRandomKeys(numKeys);
final List<byte[]> keys = new ArrayList<>(); final Map<Key, byte[]> keyValues = generateRandomKeyValues(keys, 10);
for (int i = 0; i < keySize; i++) { putKeysAndValues(keyValues);
final byte[] key = new byte[4];
rand.nextBytes(key);
keys.add(key);
}
try (final Options opt = new Options().setCreateIfMissing(true); try (final Options opt = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) {
final List<byte[]> values = db.multiGetAsList(keys); final List<byte[]> values = db.multiGetAsList(keys);
assertThat(values.size()).isEqualTo(keys.size()); assertKeysAndValues(keys, keyValues, values);
} }
} }
/**
* Test for <a link="https://github.com/facebook/rocksdb/issues/9006">transactional multiGet
* problem</a>
*/
@Test @Test
public void multiGetAsListCheckResults() throws RocksDBException { public void multiGetAsListLargeTransactional() throws RocksDBException {
try (final Options opt = new Options().setCreateIfMissing(true); final List<byte[]> keys = generateRandomKeys(numKeys);
final RocksDB db = RocksDB.open(opt, dbFolder.getRoot().getAbsolutePath())) { final Map<Key, byte[]> keyValues = generateRandomKeyValues(keys, 10);
putKeysAndValues(keyValues);
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB =
TransactionDB.open(options, txnDbOptions, dbFolder.getRoot().getAbsolutePath())) {
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final List<byte[]> values = transaction.multiGetAsList(new ReadOptions(), keys);
assertKeysAndValues(keys, keyValues, values);
}
}
}
/**
* Test for <a link="https://github.com/facebook/rocksdb/issues/9006">transactional multiGet
* problem</a>
*/
@Test
public void multiGetForUpdateAsListLargeTransactional() throws RocksDBException {
final List<byte[]> keys = generateRandomKeys(numKeys);
final Map<Key, byte[]> keyValues = generateRandomKeyValues(keys, 10);
putKeysAndValues(keyValues);
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB =
TransactionDB.open(options, txnDbOptions, dbFolder.getRoot().getAbsolutePath())) {
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final List<byte[]> values = transaction.multiGetForUpdateAsList(new ReadOptions(), keys);
assertKeysAndValues(keys, keyValues, values);
}
}
}
/**
* Test for <a link="https://github.com/facebook/rocksdb/issues/9006">transactional multiGet
* problem</a>
*/
@Test
public void multiGetAsListLargeTransactionalCF() throws RocksDBException {
final List<byte[]> keys = generateRandomKeys(numKeys);
final Map<Key, byte[]> keyValues = generateRandomKeyValues(keys, 10);
final ColumnFamilyDescriptor columnFamilyDescriptor =
new ColumnFamilyDescriptor("cfTest".getBytes());
putKeysAndValues(columnFamilyDescriptor, keyValues);
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
columnFamilyDescriptors.add(columnFamilyDescriptor);
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB = TransactionDB.open(new DBOptions(options), txnDbOptions,
dbFolder.getRoot().getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles)) {
final List<ColumnFamilyHandle> columnFamilyHandlesForMultiGet = new ArrayList<>(numKeys);
for (int i = 0; i < numKeys; i++)
columnFamilyHandlesForMultiGet.add(columnFamilyHandles.get(0));
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final List<byte[]> values =
transaction.multiGetAsList(new ReadOptions(), columnFamilyHandlesForMultiGet, keys);
assertKeysAndValues(keys, keyValues, values);
}
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
columnFamilyHandle.close();
}
}
}
/**
* Test for <a link="https://github.com/facebook/rocksdb/issues/9006">transactional multiGet
* problem</a>
*/
@Test
public void multiGetForUpdateAsListLargeTransactionalCF() throws RocksDBException {
final List<byte[]> keys = generateRandomKeys(numKeys);
final Map<Key, byte[]> keyValues = generateRandomKeyValues(keys, 10);
final ColumnFamilyDescriptor columnFamilyDescriptor =
new ColumnFamilyDescriptor("cfTest".getBytes());
putKeysAndValues(columnFamilyDescriptor, keyValues);
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
columnFamilyDescriptors.add(columnFamilyDescriptor);
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB = TransactionDB.open(new DBOptions(options), txnDbOptions,
dbFolder.getRoot().getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles)) {
final List<ColumnFamilyHandle> columnFamilyHandlesForMultiGet = new ArrayList<>(numKeys);
for (int i = 0; i < numKeys; i++)
columnFamilyHandlesForMultiGet.add(columnFamilyHandles.get(0));
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final List<byte[]> values = transaction.multiGetForUpdateAsList(
new ReadOptions(), columnFamilyHandlesForMultiGet, keys);
assertKeysAndValues(keys, keyValues, values);
}
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
columnFamilyHandle.close();
}
}
}
private List<byte[]> generateRandomKeys(final int numKeys) {
final Random rand = new Random();
final List<byte[]> keys = new ArrayList<>(); final List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < keySize; i++) { for (int i = 0; i < numKeys; i++) {
byte[] key = ("key" + i + ":").getBytes(); final byte[] key = new byte[4];
rand.nextBytes(key);
keys.add(key); keys.add(key);
db.put(key, ("value" + i + ":").getBytes()); }
return keys;
} }
final List<byte[]> values = db.multiGetAsList(keys); private Map<Key, byte[]> generateRandomKeyValues(final List<byte[]> keys, final int percent) {
final Random rand = new Random();
final Map<Key, byte[]> keyValues = new HashMap<>();
for (int i = 0; i < numKeys; i++) {
if (rand.nextInt(100) < percent) {
final byte[] value = new byte[1024];
rand.nextBytes(value);
keyValues.put(new Key(keys.get(i)), value);
}
}
return keyValues;
}
private void putKeysAndValues(Map<Key, byte[]> keyValues) throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) {
for (Map.Entry<Key, byte[]> keyValue : keyValues.entrySet()) {
db.put(keyValue.getKey().get(), keyValue.getValue());
}
}
}
private void putKeysAndValues(ColumnFamilyDescriptor columnFamilyDescriptor,
Map<Key, byte[]> keyValues) throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
final ColumnFamilyHandle columnFamilyHandle =
db.createColumnFamily(columnFamilyDescriptor)) {
for (Map.Entry<Key, byte[]> keyValue : keyValues.entrySet()) {
db.put(columnFamilyHandle, keyValue.getKey().get(), keyValue.getValue());
}
}
}
private void assertKeysAndValues(
final List<byte[]> keys, final Map<Key, byte[]> keyValues, final List<byte[]> values) {
assertThat(values.size()).isEqualTo(keys.size()); assertThat(values.size()).isEqualTo(keys.size());
for (int i = 0; i < keySize; i++) { for (int i = 0; i < numKeys; i++) {
assertThat(values.get(i)).isEqualTo(("value" + i + ":").getBytes()); final Key key = new Key(keys.get(i));
final byte[] value = values.get(i);
if (keyValues.containsKey(key)) {
assertThat(value).isEqualTo(keyValues.get(key));
} else {
assertThat(value).isNull();
}
} }
} }
static private class Key {
private final byte[] bytes;
public Key(byte[] bytes) {
this.bytes = bytes;
}
public byte[] get() {
return this.bytes;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Key key = (Key) o;
return Arrays.equals(bytes, key.bytes);
}
@Override
public int hashCode() {
return Arrays.hashCode(bytes);
}
} }
} }

@ -0,0 +1,158 @@
package org.rocksdb;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class PutMultiplePartsTest {
@Parameterized.Parameters
public static List<Integer> data() {
return Arrays.asList(2, 3, 250, 20000);
}
@Rule public TemporaryFolder dbFolder = new TemporaryFolder();
private final int numParts;
public PutMultiplePartsTest(final Integer numParts) {
this.numParts = numParts;
}
@Test
public void putUntracked() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB =
TransactionDB.open(options, txnDbOptions, dbFolder.getRoot().getAbsolutePath())) {
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final byte[][] keys = generateItems("key", ":", numParts);
final byte[][] values = generateItems("value", "", numParts);
transaction.putUntracked(keys, values);
transaction.commit();
}
txnDB.syncWal();
}
validateResults();
}
@Test
public void put() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB =
TransactionDB.open(options, txnDbOptions, dbFolder.getRoot().getAbsolutePath())) {
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final byte[][] keys = generateItems("key", ":", numParts);
final byte[][] values = generateItems("value", "", numParts);
transaction.put(keys, values);
transaction.commit();
}
txnDB.syncWal();
}
validateResults();
}
@Test
public void putUntrackedCF() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB =
TransactionDB.open(options, txnDbOptions, dbFolder.getRoot().getAbsolutePath());
final ColumnFamilyHandle columnFamilyHandle =
txnDB.createColumnFamily(new ColumnFamilyDescriptor("cfTest".getBytes()))) {
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final byte[][] keys = generateItems("key", ":", numParts);
final byte[][] values = generateItems("value", "", numParts);
transaction.putUntracked(columnFamilyHandle, keys, values);
transaction.commit();
}
txnDB.syncWal();
}
validateResultsCF();
}
@Test
public void putCF() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true);
final TransactionDBOptions txnDbOptions = new TransactionDBOptions();
final TransactionDB txnDB =
TransactionDB.open(options, txnDbOptions, dbFolder.getRoot().getAbsolutePath());
final ColumnFamilyHandle columnFamilyHandle =
txnDB.createColumnFamily(new ColumnFamilyDescriptor("cfTest".getBytes()))) {
try (final Transaction transaction = txnDB.beginTransaction(new WriteOptions())) {
final byte[][] keys = generateItems("key", ":", numParts);
final byte[][] values = generateItems("value", "", numParts);
transaction.put(columnFamilyHandle, keys, values);
transaction.commit();
}
txnDB.syncWal();
}
validateResultsCF();
}
private void validateResults() throws RocksDBException {
try (final RocksDB db = RocksDB.open(new Options(), dbFolder.getRoot().getAbsolutePath())) {
final List<byte[]> keys = generateItemsAsList("key", ":", numParts);
final byte[][] values = generateItems("value", "", numParts);
StringBuilder singleKey = new StringBuilder();
for (int i = 0; i < numParts; i++) {
singleKey.append(new String(keys.get(i), StandardCharsets.UTF_8));
}
final byte[] result = db.get(singleKey.toString().getBytes());
StringBuilder singleValue = new StringBuilder();
for (int i = 0; i < numParts; i++) {
singleValue.append(new String(values[i], StandardCharsets.UTF_8));
}
assertThat(result).isEqualTo(singleValue.toString().getBytes());
}
}
private void validateResultsCF() throws RocksDBException {
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("cfTest".getBytes()));
columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
try (final RocksDB db = RocksDB.open(new DBOptions(), dbFolder.getRoot().getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles)) {
final List<byte[]> keys = generateItemsAsList("key", ":", numParts);
final byte[][] values = generateItems("value", "", numParts);
StringBuilder singleKey = new StringBuilder();
for (int i = 0; i < numParts; i++) {
singleKey.append(new String(keys.get(i), StandardCharsets.UTF_8));
}
final byte[] result = db.get(columnFamilyHandles.get(0), singleKey.toString().getBytes());
StringBuilder singleValue = new StringBuilder();
for (int i = 0; i < numParts; i++) {
singleValue.append(new String(values[i], StandardCharsets.UTF_8));
}
assertThat(result).isEqualTo(singleValue.toString().getBytes());
}
}
private byte[][] generateItems(final String prefix, final String suffix, final int numItems) {
return generateItemsAsList(prefix, suffix, numItems).toArray(new byte[0][0]);
}
private List<byte[]> generateItemsAsList(
final String prefix, final String suffix, final int numItems) {
final List<byte[]> items = new ArrayList<>();
for (int i = 0; i < numItems; i++) {
items.add((prefix + i + suffix).getBytes());
}
return items;
}
}
Loading…
Cancel
Save