Transaction multiGet convert to list-based (#9522)

Summary:
Transaction multiGet convert to list-based.

RocksDB Java (non-transactional) has multiGetAsList() methods to expose multiGet(). These return a list of results. These methods replaced multiGet() methods returning an array of results, which were deprecated in Rocks 6 and are being removed in Rocks 7.

The transactional API still presents multiGet() methods returning arrays, so in Rocks 7 we replace these with multiGetAsList()methods and deprecate the multiGet() methods.

This does not require any changes to the supporting JNI/C++ code, only to the wrappers which present the Java API.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9522

Reviewed By: mrambacher

Differential Revision: D34114373

Pulled By: jay-zhuang

fbshipit-source-id: cb22d6095934d951b6aee4aed3e07923d3c18007
main
Alan Paxton 3 years ago committed by Facebook GitHub Bot
parent 479eb1aad6
commit eed71dfa82
  1. 172
      java/src/main/java/org/rocksdb/Transaction.java
  2. 99
      java/src/test/java/org/rocksdb/AbstractTransactionTest.java
  3. 130
      java/src/test/java/org/rocksdb/OptimisticTransactionTest.java
  4. 102
      java/src/test/java/org/rocksdb/TransactionTest.java

@ -5,6 +5,8 @@
package org.rocksdb;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
@ -310,7 +312,7 @@ public class Transaction extends RocksObject {
/**
* This function is similar to
* {@link RocksDB#multiGet(ReadOptions, List, List)} except it will
* {@link RocksDB#multiGetAsList} except it will
* also read pending changes in this transaction.
* Currently, this function will return Status::MergeInProgress if the most
* recent write to the queried key in this batch is a Merge.
@ -336,9 +338,10 @@ public class Transaction extends RocksObject {
* @throws IllegalArgumentException thrown if the size of passed keys is not
* equal to the amount of passed column family handles.
*/
@Deprecated
public byte[][] multiGet(final ReadOptions readOptions,
final List<ColumnFamilyHandle> columnFamilyHandles,
final byte[][] keys) throws RocksDBException {
final List<ColumnFamilyHandle> columnFamilyHandles, final byte[][] keys)
throws RocksDBException {
assert(isOwningHandle());
// Check if key size equals cfList size. If not a exception must be
// thrown. If not a Segmentation fault happens.
@ -360,7 +363,57 @@ public class Transaction extends RocksObject {
/**
* This function is similar to
* {@link RocksDB#multiGet(ReadOptions, List)} except it will
* {@link RocksDB#multiGetAsList(ReadOptions, List, List)} except it will
* also read pending changes in this transaction.
* Currently, this function will return Status::MergeInProgress if the most
* recent write to the queried key in this batch is a Merge.
*
* If {@link ReadOptions#snapshot()} is not set, the current version of the
* key will be read. Calling {@link #setSnapshot()} does not affect the
* version of the data returned.
*
* Note that setting {@link ReadOptions#setSnapshot(Snapshot)} will affect
* what is read from the DB but will NOT change which keys are read from this
* transaction (the keys in this transaction do not yet belong to any snapshot
* and will be fetched regardless).
*
* @param readOptions Read options.
* @param columnFamilyHandles {@link java.util.List} containing
* {@link org.rocksdb.ColumnFamilyHandle} instances.
* @param keys of keys for which values need to be retrieved.
*
* @return Array of values, one for each key
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
* @throws IllegalArgumentException thrown if the size of passed keys is not
* equal to the amount of passed column family handles.
*/
public List<byte[]> multiGetAsList(final ReadOptions readOptions,
final List<ColumnFamilyHandle> columnFamilyHandles, final List<byte[]> keys)
throws RocksDBException {
assert (isOwningHandle());
// Check if key size equals cfList size. If not a exception must be
// thrown. If not a Segmentation fault happens.
if (keys.size() != columnFamilyHandles.size()) {
throw new IllegalArgumentException("For each key there must be a ColumnFamilyHandle.");
}
if (keys.size() == 0) {
return new ArrayList<>(0);
}
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final long[] cfHandles = new long[columnFamilyHandles.size()];
for (int i = 0; i < columnFamilyHandles.size(); i++) {
cfHandles[i] = columnFamilyHandles.get(i).nativeHandle_;
}
return Arrays.asList(multiGet(nativeHandle_, readOptions.nativeHandle_, keysArray, cfHandles));
}
/**
* This function is similar to
* {@link RocksDB#multiGetAsList} except it will
* also read pending changes in this transaction.
* Currently, this function will return Status::MergeInProgress if the most
* recent write to the queried key in this batch is a Merge.
@ -383,8 +436,9 @@ public class Transaction extends RocksObject {
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
public byte[][] multiGet(final ReadOptions readOptions,
final byte[][] keys) throws RocksDBException {
@Deprecated
public byte[][] multiGet(final ReadOptions readOptions, final byte[][] keys)
throws RocksDBException {
assert(isOwningHandle());
if(keys.length == 0) {
return new byte[0][0];
@ -394,6 +448,41 @@ public class Transaction extends RocksObject {
keys);
}
/**
* This function is similar to
* {@link RocksDB#multiGetAsList} except it will
* also read pending changes in this transaction.
* Currently, this function will return Status::MergeInProgress if the most
* recent write to the queried key in this batch is a Merge.
*
* If {@link ReadOptions#snapshot()} is not set, the current version of the
* key will be read. Calling {@link #setSnapshot()} does not affect the
* version of the data returned.
*
* Note that setting {@link ReadOptions#setSnapshot(Snapshot)} will affect
* what is read from the DB but will NOT change which keys are read from this
* transaction (the keys in this transaction do not yet belong to any snapshot
* and will be fetched regardless).
*
* @param readOptions Read options.=
* {@link org.rocksdb.ColumnFamilyHandle} instances.
* @param keys of keys for which values need to be retrieved.
*
* @return Array of values, one for each key
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
public List<byte[]> multiGetAsList(final ReadOptions readOptions, final List<byte[]> keys)
throws RocksDBException {
if (keys.size() == 0) {
return new ArrayList<>(0);
}
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
return Arrays.asList(multiGet(nativeHandle_, readOptions.nativeHandle_, keysArray));
}
/**
* Read this key and ensure that this transaction will only
* be able to be committed if this key is not written outside this
@ -541,9 +630,10 @@ public class Transaction extends RocksObject {
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
@Deprecated
public byte[][] multiGetForUpdate(final ReadOptions readOptions,
final List<ColumnFamilyHandle> columnFamilyHandles,
final byte[][] keys) throws RocksDBException {
final List<ColumnFamilyHandle> columnFamilyHandles, final byte[][] keys)
throws RocksDBException {
assert(isOwningHandle());
// Check if key size equals cfList size. If not a exception must be
// thrown. If not a Segmentation fault happens.
@ -562,6 +652,43 @@ public class Transaction extends RocksObject {
keys, cfHandles);
}
/**
* A multi-key version of
* {@link #getForUpdate(ReadOptions, ColumnFamilyHandle, byte[], boolean)}.
*
*
* @param readOptions Read options.
* @param columnFamilyHandles {@link org.rocksdb.ColumnFamilyHandle}
* instances
* @param keys the keys to retrieve the values for.
*
* @return Array of values, one for each key
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
public List<byte[]> multiGetForUpdateAsList(final ReadOptions readOptions,
final List<ColumnFamilyHandle> columnFamilyHandles, final List<byte[]> keys)
throws RocksDBException {
assert (isOwningHandle());
// Check if key size equals cfList size. If not a exception must be
// thrown. If not a Segmentation fault happens.
if (keys.size() != columnFamilyHandles.size()) {
throw new IllegalArgumentException("For each key there must be a ColumnFamilyHandle.");
}
if (keys.size() == 0) {
return new ArrayList<>();
}
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
final long[] cfHandles = new long[columnFamilyHandles.size()];
for (int i = 0; i < columnFamilyHandles.size(); i++) {
cfHandles[i] = columnFamilyHandles.get(i).nativeHandle_;
}
return Arrays.asList(
multiGetForUpdate(nativeHandle_, readOptions.nativeHandle_, keysArray, cfHandles));
}
/**
* A multi-key version of {@link #getForUpdate(ReadOptions, byte[], boolean)}.
*
@ -574,8 +701,9 @@ public class Transaction extends RocksObject {
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
public byte[][] multiGetForUpdate(final ReadOptions readOptions,
final byte[][] keys) throws RocksDBException {
@Deprecated
public byte[][] multiGetForUpdate(final ReadOptions readOptions, final byte[][] keys)
throws RocksDBException {
assert(isOwningHandle());
if(keys.length == 0) {
return new byte[0][0];
@ -585,6 +713,30 @@ public class Transaction extends RocksObject {
readOptions.nativeHandle_, keys);
}
/**
* A multi-key version of {@link #getForUpdate(ReadOptions, byte[], boolean)}.
*
*
* @param readOptions Read options.
* @param keys the keys to retrieve the values for.
*
* @return List of values, one for each key
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
public List<byte[]> multiGetForUpdateAsList(
final ReadOptions readOptions, final List<byte[]> keys) throws RocksDBException {
assert (isOwningHandle());
if (keys.size() == 0) {
return new ArrayList<>(0);
}
final byte[][] keysArray = keys.toArray(new byte[keys.size()][]);
return Arrays.asList(multiGetForUpdate(nativeHandle_, readOptions.nativeHandle_, keysArray));
}
/**
* Returns an iterator that will iterate on all keys in the default
* column family including both keys in the DB and uncommitted keys in this

@ -206,12 +206,8 @@ public abstract class AbstractTransactionTest {
@Test
public void multiGetPut_cf() throws RocksDBException {
final byte keys[][] = new byte[][] {
"key1".getBytes(UTF_8),
"key2".getBytes(UTF_8)};
final byte values[][] = new byte[][] {
"value1".getBytes(UTF_8),
"value2".getBytes(UTF_8)};
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions();
@ -227,14 +223,31 @@ public abstract class AbstractTransactionTest {
}
}
@Test
public void multiGetPutAsList_cf() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions();
final Transaction txn = dbContainer.beginTransaction()) {
final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
final List<ColumnFamilyHandle> cfList = Arrays.asList(testCf, testCf);
assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(null, null);
txn.put(testCf, keys[0], values[0]);
txn.put(testCf, keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(values);
}
}
@Test
public void multiGetPut() throws RocksDBException {
final byte keys[][] = new byte[][] {
"key1".getBytes(UTF_8),
"key2".getBytes(UTF_8)};
final byte values[][] = new byte[][] {
"value1".getBytes(UTF_8),
"value2".getBytes(UTF_8)};
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions();
@ -248,6 +261,22 @@ public abstract class AbstractTransactionTest {
}
}
@Test
public void multiGetPutAsList() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions();
final Transaction txn = dbContainer.beginTransaction()) {
assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(null, null);
txn.put(keys[0], values[0]);
txn.put(keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values);
}
}
@Test
public void getForUpdate_cf() throws RocksDBException {
final byte k1[] = "key1".getBytes(UTF_8);
@ -495,6 +524,7 @@ public abstract class AbstractTransactionTest {
}
}
@Deprecated
@Test
public void multiGetPutUntracked_cf() throws RocksDBException {
final byte keys[][] = new byte[][] {
@ -518,14 +548,32 @@ public abstract class AbstractTransactionTest {
}
}
@Test
public void multiGetPutUntrackedAsList_cf() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions();
final Transaction txn = dbContainer.beginTransaction()) {
final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
final List<ColumnFamilyHandle> cfList = Arrays.asList(testCf, testCf);
assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(null, null);
txn.putUntracked(testCf, keys[0], values[0]);
txn.putUntracked(testCf, keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(values);
}
}
@Deprecated
@Test
public void multiGetPutUntracked() throws RocksDBException {
final byte keys[][] = new byte[][] {
"key1".getBytes(UTF_8),
"key2".getBytes(UTF_8)};
final byte values[][] = new byte[][] {
"value1".getBytes(UTF_8),
"value2".getBytes(UTF_8)};
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions();
@ -538,6 +586,21 @@ public abstract class AbstractTransactionTest {
}
}
@Test
public void multiGetPutAsListUntracked() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions();
final Transaction txn = dbContainer.beginTransaction()) {
assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(null, null);
txn.putUntracked(keys[0], values[0]);
txn.putUntracked(keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values);
}
}
@Test
public void mergeUntracked_cf() throws RocksDBException {
final byte[] k1 = "key1".getBytes(UTF_8);

@ -19,9 +19,9 @@ public class OptimisticTransactionTest extends AbstractTransactionTest {
@Test
public void getForUpdate_cf_conflict() throws RocksDBException {
final byte k1[] = "key1".getBytes(UTF_8);
final byte v1[] = "value1".getBytes(UTF_8);
final byte v12[] = "value12".getBytes(UTF_8);
final byte[] k1 = "key1".getBytes(UTF_8);
final byte[] v1 = "value1".getBytes(UTF_8);
final byte[] v12 = "value12".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
@ -57,9 +57,9 @@ public class OptimisticTransactionTest extends AbstractTransactionTest {
@Test
public void getForUpdate_conflict() throws RocksDBException {
final byte k1[] = "key1".getBytes(UTF_8);
final byte v1[] = "value1".getBytes(UTF_8);
final byte v12[] = "value12".getBytes(UTF_8);
final byte[] k1 = "key1".getBytes(UTF_8);
final byte[] v1 = "value1".getBytes(UTF_8);
final byte[] v12 = "value12".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
@ -92,14 +92,11 @@ public class OptimisticTransactionTest extends AbstractTransactionTest {
}
}
@Deprecated
@Test
public void multiGetForUpdate_cf_conflict() throws RocksDBException {
final byte keys[][] = new byte[][] {
"key1".getBytes(UTF_8),
"key2".getBytes(UTF_8)};
final byte values[][] = new byte[][] {
"value1".getBytes(UTF_8),
"value2".getBytes(UTF_8)};
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
@ -139,14 +136,54 @@ public class OptimisticTransactionTest extends AbstractTransactionTest {
}
}
@Test
public void multiGetAsListForUpdate_cf_conflict() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
final List<ColumnFamilyHandle> cfList = Arrays.asList(testCf, testCf);
try (final Transaction txn = dbContainer.beginTransaction()) {
txn.put(testCf, keys[0], values[0]);
txn.put(testCf, keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(values);
txn.commit();
}
try (final Transaction txn2 = dbContainer.beginTransaction()) {
try (final Transaction txn3 = dbContainer.beginTransaction()) {
assertThat(txn3.multiGetForUpdateAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(values);
// NOTE: txn2 updates k1, during txn3
txn2.put(testCf, keys[0], otherValue);
assertThat(txn2.get(testCf, readOptions, keys[0])).isEqualTo(otherValue);
txn2.commit();
try {
txn3.commit(); // should cause an exception!
} catch (final RocksDBException e) {
assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy);
return;
}
}
}
fail("Expected an exception for put after getForUpdate from conflicting"
+ "transactions");
}
}
@Deprecated
@Test
public void multiGetForUpdate_conflict() throws RocksDBException {
final byte keys[][] = new byte[][] {
"key1".getBytes(UTF_8),
"key2".getBytes(UTF_8)};
final byte values[][] = new byte[][] {
"value1".getBytes(UTF_8),
"value2".getBytes(UTF_8)};
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
@ -183,11 +220,50 @@ public class OptimisticTransactionTest extends AbstractTransactionTest {
}
}
@Test
public void multiGetasListForUpdate_conflict() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
try (final Transaction txn = dbContainer.beginTransaction()) {
txn.put(keys[0], values[0]);
txn.put(keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values);
txn.commit();
}
try (final Transaction txn2 = dbContainer.beginTransaction()) {
try (final Transaction txn3 = dbContainer.beginTransaction()) {
assertThat(txn3.multiGetForUpdateAsList(readOptions, Arrays.asList(keys)))
.containsExactly(values);
// NOTE: txn2 updates k1, during txn3
txn2.put(keys[0], otherValue);
assertThat(txn2.get(readOptions, keys[0])).isEqualTo(otherValue);
txn2.commit();
try {
txn3.commit(); // should cause an exception!
} catch (final RocksDBException e) {
assertThat(e.getStatus().getCode()).isSameAs(Status.Code.Busy);
return;
}
}
}
fail("Expected an exception for put after getForUpdate from conflicting"
+ "transactions");
}
}
@Test
public void undoGetForUpdate_cf_conflict() throws RocksDBException {
final byte k1[] = "key1".getBytes(UTF_8);
final byte v1[] = "value1".getBytes(UTF_8);
final byte v12[] = "value12".getBytes(UTF_8);
final byte[] k1 = "key1".getBytes(UTF_8);
final byte[] v1 = "value1".getBytes(UTF_8);
final byte[] v12 = "value12".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
@ -220,9 +296,9 @@ public class OptimisticTransactionTest extends AbstractTransactionTest {
@Test
public void undoGetForUpdate_conflict() throws RocksDBException {
final byte k1[] = "key1".getBytes(UTF_8);
final byte v1[] = "value1".getBytes(UTF_8);
final byte v12[] = "value12".getBytes(UTF_8);
final byte[] k1 = "key1".getBytes(UTF_8);
final byte[] v1 = "value1".getBytes(UTF_8);
final byte[] v12 = "value12".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
@ -261,12 +337,10 @@ public class OptimisticTransactionTest extends AbstractTransactionTest {
try {
txn.setName(name);
fail("Optimistic transactions cannot be named.");
} catch(final RocksDBException e) {
assertThat(e.getStatus().getCode() == Status.Code.InvalidArgument);
return;
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.InvalidArgument);
}
fail("Optimistic transactions cannot be named.");
}
}

@ -19,9 +19,9 @@ public class TransactionTest extends AbstractTransactionTest {
@Test
public void getForUpdate_cf_conflict() throws RocksDBException {
final byte k1[] = "key1".getBytes(UTF_8);
final byte v1[] = "value1".getBytes(UTF_8);
final byte v12[] = "value12".getBytes(UTF_8);
final byte[] k1 = "key1".getBytes(UTF_8);
final byte[] v1 = "value1".getBytes(UTF_8);
final byte[] v12 = "value12".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
@ -53,9 +53,9 @@ public class TransactionTest extends AbstractTransactionTest {
@Test
public void getForUpdate_conflict() throws RocksDBException {
final byte k1[] = "key1".getBytes(UTF_8);
final byte v1[] = "value1".getBytes(UTF_8);
final byte v12[] = "value12".getBytes(UTF_8);
final byte[] k1 = "key1".getBytes(UTF_8);
final byte[] v1 = "value1".getBytes(UTF_8);
final byte[] v12 = "value12".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
@ -86,12 +86,8 @@ public class TransactionTest extends AbstractTransactionTest {
@Test
public void multiGetForUpdate_cf_conflict() throws RocksDBException {
final byte keys[][] = new byte[][] {
"key1".getBytes(UTF_8),
"key2".getBytes(UTF_8)};
final byte values[][] = new byte[][] {
"value1".getBytes(UTF_8),
"value2".getBytes(UTF_8)};
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
@ -126,14 +122,49 @@ public class TransactionTest extends AbstractTransactionTest {
}
}
@Test
public void multiGetAsListForUpdate_cf_conflict() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
final ColumnFamilyHandle testCf = dbContainer.getTestColumnFamily();
final List<ColumnFamilyHandle> cfList = Arrays.asList(testCf, testCf);
try (final Transaction txn = dbContainer.beginTransaction()) {
txn.put(testCf, keys[0], values[0]);
txn.put(testCf, keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(values);
txn.commit();
}
try (final Transaction txn2 = dbContainer.beginTransaction()) {
try (final Transaction txn3 = dbContainer.beginTransaction()) {
assertThat(txn3.multiGetForUpdateAsList(readOptions, cfList, Arrays.asList(keys)))
.containsExactly(values);
// NOTE: txn2 updates k1, during txn3
try {
txn2.put(testCf, keys[0], otherValue); // should cause an exception!
} catch (final RocksDBException e) {
assertThat(e.getStatus().getCode()).isSameAs(Status.Code.TimedOut);
return;
}
}
}
fail("Expected an exception for put after getForUpdate from conflicting"
+ "transactions");
}
}
@Test
public void multiGetForUpdate_conflict() throws RocksDBException {
final byte keys[][] = new byte[][] {
"key1".getBytes(UTF_8),
"key2".getBytes(UTF_8)};
final byte values[][] = new byte[][] {
"value1".getBytes(UTF_8),
"value2".getBytes(UTF_8)};
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try(final DBContainer dbContainer = startDb();
@ -150,6 +181,41 @@ public class TransactionTest extends AbstractTransactionTest {
assertThat(txn3.multiGetForUpdate(readOptions, keys))
.isEqualTo(values);
// NOTE: txn2 updates k1, during txn3
try {
txn2.put(keys[0], otherValue); // should cause an exception!
} catch (final RocksDBException e) {
assertThat(e.getStatus().getCode()).isSameAs(Status.Code.TimedOut);
return;
}
}
}
fail("Expected an exception for put after getForUpdate from conflicting"
+ "transactions");
}
}
@Test
public void multiGetAsListForUpdate_conflict() throws RocksDBException {
final byte[][] keys = new byte[][] {"key1".getBytes(UTF_8), "key2".getBytes(UTF_8)};
final byte[][] values = new byte[][] {"value1".getBytes(UTF_8), "value2".getBytes(UTF_8)};
final byte[] otherValue = "otherValue".getBytes(UTF_8);
try (final DBContainer dbContainer = startDb();
final ReadOptions readOptions = new ReadOptions()) {
try (final Transaction txn = dbContainer.beginTransaction()) {
txn.put(keys[0], values[0]);
txn.put(keys[1], values[1]);
assertThat(txn.multiGetAsList(readOptions, Arrays.asList(keys))).containsExactly(values);
txn.commit();
}
try (final Transaction txn2 = dbContainer.beginTransaction()) {
try (final Transaction txn3 = dbContainer.beginTransaction()) {
assertThat(txn3.multiGetForUpdateAsList(readOptions, Arrays.asList(keys)))
.containsExactly(values);
// NOTE: txn2 updates k1, during txn3
try {
txn2.put(keys[0], otherValue); // should cause an exception!

Loading…
Cancel
Save