diff --git a/java/rocksjni/write_batch_with_index.cc b/java/rocksjni/write_batch_with_index.cc index cfe2cc8ae..021eb385c 100644 --- a/java/rocksjni/write_batch_with_index.cc +++ b/java/rocksjni/write_batch_with_index.cc @@ -533,20 +533,24 @@ jlong Java_org_rocksdb_WriteBatchWithIndex_iterator1(JNIEnv* /*env*/, /* * Class: org_rocksdb_WriteBatchWithIndex * Method: iteratorWithBase - * Signature: (JJJ)J + * Signature: (JJJJ)J */ -jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(JNIEnv* /*env*/, - jobject /*jobj*/, - jlong jwbwi_handle, - jlong jcf_handle, - jlong jbi_handle) { +jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase( + JNIEnv*, jobject, jlong jwbwi_handle, jlong jcf_handle, + jlong jbase_iterator_handle, jlong jread_opts_handle) { auto* wbwi = reinterpret_cast(jwbwi_handle); auto* cf_handle = reinterpret_cast(jcf_handle); auto* base_iterator = - reinterpret_cast(jbi_handle); - auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator); + reinterpret_cast(jbase_iterator_handle); + ROCKSDB_NAMESPACE::ReadOptions* read_opts = + jread_opts_handle == 0 + ? nullptr + : reinterpret_cast( + jread_opts_handle); + auto* iterator = + wbwi->NewIteratorWithBase(cf_handle, base_iterator, read_opts); return reinterpret_cast(iterator); } diff --git a/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java b/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java index 3831f85ba..e5b8ba011 100644 --- a/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java +++ b/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java @@ -131,11 +131,36 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { public RocksIterator newIteratorWithBase( final ColumnFamilyHandle columnFamilyHandle, final RocksIterator baseIterator) { - RocksIterator iterator = new RocksIterator(baseIterator.parent_, - iteratorWithBase( - nativeHandle_, columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_)); + return newIteratorWithBase(columnFamilyHandle, baseIterator, null); + } + + /** + * Provides Read-Your-Own-Writes like functionality by + * creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator} + * as a delta and baseIterator as a base + * + * Updating write batch with the current key of the iterator is not safe. + * We strongly recommand users not to do it. It will invalidate the current + * key() and value() of the iterator. This invalidation happens even before + * the write batch update finishes. The state may recover after Next() is + * called. + * + * @param columnFamilyHandle The column family to iterate over + * @param baseIterator The base iterator, + * e.g. {@link org.rocksdb.RocksDB#newIterator()} + * @param readOptions the read options, or null + * @return An iterator which shows a view comprised of both the database + * point-in-time from baseIterator and modifications made in this write batch. + */ + public RocksIterator newIteratorWithBase(final ColumnFamilyHandle columnFamilyHandle, + final RocksIterator baseIterator, /* @Nullable */ final ReadOptions readOptions) { + final RocksIterator iterator = new RocksIterator(baseIterator.parent_, + iteratorWithBase(nativeHandle_, columnFamilyHandle.nativeHandle_, + baseIterator.nativeHandle_, readOptions == null ? 0 : readOptions.nativeHandle_)); + // when the iterator is deleted it will also delete the baseIterator baseIterator.disOwnNativeHandle(); + return iterator; } @@ -151,7 +176,25 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { * point-in-timefrom baseIterator and modifications made in this write batch. */ public RocksIterator newIteratorWithBase(final RocksIterator baseIterator) { - return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(), baseIterator); + return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(), baseIterator, null); + } + + /** + * Provides Read-Your-Own-Writes like functionality by + * creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator} + * as a delta and baseIterator as a base. Operates on the default column + * family. + * + * @param baseIterator The base iterator, + * e.g. {@link org.rocksdb.RocksDB#newIterator()} + * @param readOptions the read options, or null + * @return An iterator which shows a view comprised of both the database + * point-in-timefrom baseIterator and modifications made in this write batch. + */ + public RocksIterator newIteratorWithBase(final RocksIterator baseIterator, + /* @Nullable */ final ReadOptions readOptions) { + return newIteratorWithBase( + baseIterator.parent_.getDefaultColumnFamily(), baseIterator, readOptions); } /** @@ -200,7 +243,7 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { * the results using the DB's merge operator (if the batch contains any * merge requests). * - * Setting {@link ReadOptions#setSnapshot(long, long)} will affect what is + * Setting {@link ReadOptions#setSnapshot(Snapshot)} will affect what is * read from the DB but will NOT change which keys are read from the batch * (the keys in this batch do not yet belong to any snapshot and will be * fetched regardless). @@ -230,7 +273,7 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { * the results using the DB's merge operator (if the batch contains any * merge requests). * - * Setting {@link ReadOptions#setSnapshot(long, long)} will affect what is + * Setting {@link ReadOptions#setSnapshot(Snapshot)} will affect what is * read from the DB but will NOT change which keys are read from the batch * (the keys in this batch do not yet belong to any snapshot and will be * fetched regardless). @@ -303,8 +346,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { final boolean overwriteKey); private native long iterator0(final long handle); private native long iterator1(final long handle, final long cfHandle); - private native long iteratorWithBase( - final long handle, final long baseIteratorHandle, final long cfHandle); + private native long iteratorWithBase(final long handle, final long baseIteratorHandle, + final long cfHandle, final long readOptionsHandle); private native byte[] getFromBatch(final long handle, final long optHandle, final byte[] key, final int keyLen); private native byte[] getFromBatch(final long handle, final long optHandle, diff --git a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java index 01eb652f1..38074be38 100644 --- a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java +++ b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java @@ -13,7 +13,9 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -101,6 +103,95 @@ public class WriteBatchWithIndexTest { } } + @Test + public void readYourOwnWritesCf() throws RocksDBException { + final List cfNames = + Arrays.asList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY), + new ColumnFamilyDescriptor("new_cf".getBytes())); + + final List columnFamilyHandleList = new ArrayList<>(); + + // Test open database with column family names + try (final DBOptions options = + new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + final RocksDB db = RocksDB.open( + options, dbFolder.getRoot().getAbsolutePath(), cfNames, columnFamilyHandleList)) { + final ColumnFamilyHandle newCf = columnFamilyHandleList.get(1); + + try { + final byte[] k1 = "key1".getBytes(); + final byte[] v1 = "value1".getBytes(); + final byte[] k2 = "key2".getBytes(); + final byte[] v2 = "value2".getBytes(); + + db.put(newCf, k1, v1); + db.put(newCf, k2, v2); + + try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true); + final ReadOptions readOptions = new ReadOptions(); + final RocksIterator base = db.newIterator(newCf, readOptions); + final RocksIterator it = wbwi.newIteratorWithBase(newCf, base, readOptions)) { + it.seek(k1); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(k1); + assertThat(it.value()).isEqualTo(v1); + + it.seek(k2); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(k2); + assertThat(it.value()).isEqualTo(v2); + + // put data to the write batch and make sure we can read it. + final byte[] k3 = "key3".getBytes(); + final byte[] v3 = "value3".getBytes(); + wbwi.put(newCf, k3, v3); + it.seek(k3); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(k3); + assertThat(it.value()).isEqualTo(v3); + + // update k2 in the write batch and check the value + final byte[] v2Other = "otherValue2".getBytes(); + wbwi.put(newCf, k2, v2Other); + it.seek(k2); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(k2); + assertThat(it.value()).isEqualTo(v2Other); + + // delete k1 and make sure we can read back the write + wbwi.delete(newCf, k1); + it.seek(k1); + assertThat(it.key()).isNotEqualTo(k1); + + // reinsert k1 and make sure we see the new value + final byte[] v1Other = "otherValue1".getBytes(); + wbwi.put(newCf, k1, v1Other); + it.seek(k1); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(k1); + assertThat(it.value()).isEqualTo(v1Other); + + // single remove k3 and make sure we can read back the write + wbwi.singleDelete(newCf, k3); + it.seek(k3); + assertThat(it.isValid()).isEqualTo(false); + + // reinsert k3 and make sure we see the new value + final byte[] v3Other = "otherValue3".getBytes(); + wbwi.put(newCf, k3, v3Other); + it.seek(k3); + assertThat(it.isValid()).isTrue(); + assertThat(it.key()).isEqualTo(k3); + assertThat(it.value()).isEqualTo(v3Other); + } + } finally { + for (final ColumnFamilyHandle columnFamilyHandle : columnFamilyHandleList) { + columnFamilyHandle.close(); + } + } + } + } + @Test public void writeBatchWithIndex() throws RocksDBException { try (final Options options = new Options().setCreateIfMissing(true);