Add newer WBWI::NewIteratorWithBase functions to RocksJava (#6872)

Summary:
Exposes the `ReadOptions` arguments to `NewIteratorWithBase`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6872

Reviewed By: ajkr

Differential Revision: D21725867

Pulled By: pdillinger

fbshipit-source-id: 4079ba590cc13ba7a6244ed91439d89c40a543b6
main
Adam Retter 5 years ago committed by Facebook GitHub Bot
parent 82a82c76e7
commit 9060e6fa79
  1. 20
      java/rocksjni/write_batch_with_index.cc
  2. 59
      java/src/main/java/org/rocksdb/WriteBatchWithIndex.java
  3. 91
      java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java

@ -533,20 +533,24 @@ jlong Java_org_rocksdb_WriteBatchWithIndex_iterator1(JNIEnv* /*env*/,
/* /*
* Class: org_rocksdb_WriteBatchWithIndex * Class: org_rocksdb_WriteBatchWithIndex
* Method: iteratorWithBase * Method: iteratorWithBase
* Signature: (JJJ)J * Signature: (JJJJ)J
*/ */
jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(JNIEnv* /*env*/, jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(
jobject /*jobj*/, JNIEnv*, jobject, jlong jwbwi_handle, jlong jcf_handle,
jlong jwbwi_handle, jlong jbase_iterator_handle, jlong jread_opts_handle) {
jlong jcf_handle,
jlong jbi_handle) {
auto* wbwi = auto* wbwi =
reinterpret_cast<ROCKSDB_NAMESPACE::WriteBatchWithIndex*>(jwbwi_handle); reinterpret_cast<ROCKSDB_NAMESPACE::WriteBatchWithIndex*>(jwbwi_handle);
auto* cf_handle = auto* cf_handle =
reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyHandle*>(jcf_handle); reinterpret_cast<ROCKSDB_NAMESPACE::ColumnFamilyHandle*>(jcf_handle);
auto* base_iterator = auto* base_iterator =
reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(jbi_handle); reinterpret_cast<ROCKSDB_NAMESPACE::Iterator*>(jbase_iterator_handle);
auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator); ROCKSDB_NAMESPACE::ReadOptions* read_opts =
jread_opts_handle == 0
? nullptr
: reinterpret_cast<ROCKSDB_NAMESPACE::ReadOptions*>(
jread_opts_handle);
auto* iterator =
wbwi->NewIteratorWithBase(cf_handle, base_iterator, read_opts);
return reinterpret_cast<jlong>(iterator); return reinterpret_cast<jlong>(iterator);
} }

@ -131,11 +131,36 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
public RocksIterator newIteratorWithBase( public RocksIterator newIteratorWithBase(
final ColumnFamilyHandle columnFamilyHandle, final ColumnFamilyHandle columnFamilyHandle,
final RocksIterator baseIterator) { final RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator(baseIterator.parent_, return newIteratorWithBase(columnFamilyHandle, baseIterator, null);
iteratorWithBase( }
nativeHandle_, columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_));
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base
*
* Updating write batch with the current key of the iterator is not safe.
* We strongly recommand users not to do it. It will invalidate the current
* key() and value() of the iterator. This invalidation happens even before
* the write batch update finishes. The state may recover after Next() is
* called.
*
* @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @param readOptions the read options, or null
* @return An iterator which shows a view comprised of both the database
* point-in-time from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(final ColumnFamilyHandle columnFamilyHandle,
final RocksIterator baseIterator, /* @Nullable */ final ReadOptions readOptions) {
final RocksIterator iterator = new RocksIterator(baseIterator.parent_,
iteratorWithBase(nativeHandle_, columnFamilyHandle.nativeHandle_,
baseIterator.nativeHandle_, readOptions == null ? 0 : readOptions.nativeHandle_));
// when the iterator is deleted it will also delete the baseIterator // when the iterator is deleted it will also delete the baseIterator
baseIterator.disOwnNativeHandle(); baseIterator.disOwnNativeHandle();
return iterator; return iterator;
} }
@ -151,7 +176,25 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* point-in-timefrom baseIterator and modifications made in this write batch. * point-in-timefrom baseIterator and modifications made in this write batch.
*/ */
public RocksIterator newIteratorWithBase(final RocksIterator baseIterator) { public RocksIterator newIteratorWithBase(final RocksIterator baseIterator) {
return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(), baseIterator); return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(), baseIterator, null);
}
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base. Operates on the default column
* family.
*
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @param readOptions the read options, or null
* @return An iterator which shows a view comprised of both the database
* point-in-timefrom baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(final RocksIterator baseIterator,
/* @Nullable */ final ReadOptions readOptions) {
return newIteratorWithBase(
baseIterator.parent_.getDefaultColumnFamily(), baseIterator, readOptions);
} }
/** /**
@ -200,7 +243,7 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* the results using the DB's merge operator (if the batch contains any * the results using the DB's merge operator (if the batch contains any
* merge requests). * merge requests).
* *
* Setting {@link ReadOptions#setSnapshot(long, long)} will affect what is * Setting {@link ReadOptions#setSnapshot(Snapshot)} will affect what is
* read from the DB but will NOT change which keys are read from the batch * read from the DB but will NOT change which keys are read from the batch
* (the keys in this batch do not yet belong to any snapshot and will be * (the keys in this batch do not yet belong to any snapshot and will be
* fetched regardless). * fetched regardless).
@ -230,7 +273,7 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* the results using the DB's merge operator (if the batch contains any * the results using the DB's merge operator (if the batch contains any
* merge requests). * merge requests).
* *
* Setting {@link ReadOptions#setSnapshot(long, long)} will affect what is * Setting {@link ReadOptions#setSnapshot(Snapshot)} will affect what is
* read from the DB but will NOT change which keys are read from the batch * read from the DB but will NOT change which keys are read from the batch
* (the keys in this batch do not yet belong to any snapshot and will be * (the keys in this batch do not yet belong to any snapshot and will be
* fetched regardless). * fetched regardless).
@ -303,8 +346,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
final boolean overwriteKey); final boolean overwriteKey);
private native long iterator0(final long handle); private native long iterator0(final long handle);
private native long iterator1(final long handle, final long cfHandle); private native long iterator1(final long handle, final long cfHandle);
private native long iteratorWithBase( private native long iteratorWithBase(final long handle, final long baseIteratorHandle,
final long handle, final long baseIteratorHandle, final long cfHandle); final long cfHandle, final long readOptionsHandle);
private native byte[] getFromBatch(final long handle, final long optHandle, private native byte[] getFromBatch(final long handle, final long optHandle,
final byte[] key, final int keyLen); final byte[] key, final int keyLen);
private native byte[] getFromBatch(final long handle, final long optHandle, private native byte[] getFromBatch(final long handle, final long optHandle,

@ -13,7 +13,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -101,6 +103,95 @@ public class WriteBatchWithIndexTest {
} }
} }
@Test
public void readYourOwnWritesCf() throws RocksDBException {
final List<ColumnFamilyDescriptor> cfNames =
Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY),
new ColumnFamilyDescriptor("new_cf".getBytes()));
final List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<>();
// Test open database with column family names
try (final DBOptions options =
new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(
options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) {
final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1);
try {
final byte[] k1 = "key1".getBytes();
final byte[] v1 = "value1".getBytes();
final byte[] k2 = "key2".getBytes();
final byte[] v2 = "value2".getBytes();
db.put(newCf, k1, v1);
db.put(newCf, k2, v2);
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final ReadOptions readOptions = new ReadOptions();
final RocksIterator base = db.newIterator(newCf, readOptions);
final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) {
it.seek(k1);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k1);
assertThat(it.value()).isEqualTo(v1);
it.seek(k2);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k2);
assertThat(it.value()).isEqualTo(v2);
// put data to the write batch and make sure we can read it.
final byte[] k3 = "key3".getBytes();
final byte[] v3 = "value3".getBytes();
wbwi.put(newCf, k3, v3);
it.seek(k3);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k3);
assertThat(it.value()).isEqualTo(v3);
// update k2 in the write batch and check the value
final byte[] v2Other = "otherValue2".getBytes();
wbwi.put(newCf, k2, v2Other);
it.seek(k2);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k2);
assertThat(it.value()).isEqualTo(v2Other);
// delete k1 and make sure we can read back the write
wbwi.delete(newCf, k1);
it.seek(k1);
assertThat(it.key()).isNotEqualTo(k1);
// reinsert k1 and make sure we see the new value
final byte[] v1Other = "otherValue1".getBytes();
wbwi.put(newCf, k1, v1Other);
it.seek(k1);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k1);
assertThat(it.value()).isEqualTo(v1Other);
// single remove k3 and make sure we can read back the write
wbwi.singleDelete(newCf, k3);
it.seek(k3);
assertThat(it.isValid()).isEqualTo(false);
// reinsert k3 and make sure we see the new value
final byte[] v3Other = "otherValue3".getBytes();
wbwi.put(newCf, k3, v3Other);
it.seek(k3);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k3);
assertThat(it.value()).isEqualTo(v3Other);
}
} finally {
for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) {
columnFamilyHandle.close();
}
}
}
}
@Test @Test
public void writeBatchWithIndex() throws RocksDBException { public void writeBatchWithIndex() throws RocksDBException {
try (final Options options = new Options().setCreateIfMissing(true); try (final Options options = new Options().setCreateIfMissing(true);

Loading…
Cancel
Save