diff --git a/java/Makefile b/java/Makefile index 42f465e10..97f0b0244 100644 --- a/java/Makefile +++ b/java/Makefile @@ -29,6 +29,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\ org.rocksdb.SkipListMemTableConfig\ org.rocksdb.Slice\ org.rocksdb.Statistics\ + org.rocksdb.TransactionLogIterator\ org.rocksdb.TtlDB\ org.rocksdb.VectorMemTableConfig\ org.rocksdb.StringAppendOperator\ @@ -81,6 +82,7 @@ JAVA_TESTS = org.rocksdb.test.BackupableDBOptionsTest\ org.rocksdb.test.SizeUnitTest\ org.rocksdb.test.SliceTest\ org.rocksdb.test.SnapshotTest\ + org.rocksdb.test.TransactionLogIteratorTest\ org.rocksdb.test.TtlDBTest\ org.rocksdb.test.StatisticsCollectorTest\ org.rocksdb.test.WriteBatchHandlerTest\ diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index ac02860e8..96032165e 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -1588,6 +1588,29 @@ public class RocksDB extends RocksObject { columnFamilyHandle.nativeHandle_); } + /** + *

Returns an iterator that is positioned at a write-batch containing + * seq_number. If the sequence number is non existent, it returns an iterator + * at the first available seq_no after the requested seq_no.

+ * + *

Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to + * use this api, else the WAL files will get + * cleared aggressively and the iterator might keep getting invalid before + * an update is read.

+ * + * @param sequenceNumber sequence number offset + * + * @return {@link org.rocksdb.TransactionLogIterator} instance. + * + * @throws org.rocksdb.RocksDBException if iterator cannot be retrieved + * from native-side. + */ + public TransactionLogIterator getUpdatesSince(long sequenceNumber) + throws RocksDBException { + return new TransactionLogIterator( + getUpdatesSince(nativeHandle_, sequenceNumber)); + } + /** * Private constructor. */ @@ -1730,6 +1753,8 @@ public class RocksDB extends RocksObject { private native void compactRange(long handle, byte[] begin, int beginLen, byte[] end, int endLen, boolean reduce_level, int target_level, int target_path_id, long cfHandle) throws RocksDBException; + private native long getUpdatesSince(long handle, long sequenceNumber) + throws RocksDBException; protected DBOptionsInterface options_; } diff --git a/java/org/rocksdb/TransactionLogIterator.java b/java/org/rocksdb/TransactionLogIterator.java new file mode 100644 index 000000000..8de61aa00 --- /dev/null +++ b/java/org/rocksdb/TransactionLogIterator.java @@ -0,0 +1,116 @@ +package org.rocksdb; + +/** + *

A TransactionLogIterator is used to iterate over the transactions in a db. + * One run of the iterator is continuous, i.e. the iterator will stop at the + * beginning of any gap in sequences.

+ */ +public class TransactionLogIterator extends RocksObject { + + /** + *

An iterator is either positioned at a WriteBatch + * or not valid. This method returns true if the iterator + * is valid. Can read data from a valid iterator.

+ * + * @return true if iterator position is valid. + */ + public boolean isValid() { + return isValid(nativeHandle_); + } + + /** + *

Moves the iterator to the next WriteBatch. + * REQUIRES: Valid() to be true.

+ */ + public void next() { + assert(isValid()); + next(nativeHandle_); + } + + /** + *

Throws RocksDBException if something went wrong.

+ * + * @throws org.rocksdb.RocksDBException if something went + * wrong in the underlying C++ code. + */ + public void status() throws RocksDBException { + status(nativeHandle_); + } + + /** + *

If iterator position is valid, return the current + * write_batch and the sequence number of the earliest + * transaction contained in the batch.

+ * + *

ONLY use if Valid() is true and status() is OK.

+ * + * @return {@link org.rocksdb.TransactionLogIterator.BatchResult} + * instance. + */ + public BatchResult getBatch() { + assert(isValid()); + return getBatch(nativeHandle_); + } + + /** + *

TransactionLogIterator constructor.

+ * + * @param nativeHandle address to native address. + */ + TransactionLogIterator(long nativeHandle) { + super(); + nativeHandle_ = nativeHandle; + } + + @Override protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + /** + *

BatchResult represents a data structure returned + * by a TransactionLogIterator containing a sequence + * number and a {@link WriteBatch} instance.

+ */ + public class BatchResult { + /** + *

Constructor of BatchResult class.

+ * + * @param sequenceNumber related to this BatchResult instance. + * @param nativeHandle to {@link org.rocksdb.WriteBatch} + * native instance. + */ + public BatchResult(long sequenceNumber, long nativeHandle) { + sequenceNumber_ = sequenceNumber; + writeBatch_ = new WriteBatch(nativeHandle); + } + + /** + *

Return sequence number related to this BatchResult.

+ * + * @return Sequence number. + */ + public long sequenceNumber() { + return sequenceNumber_; + } + + /** + *

Return contained {@link org.rocksdb.WriteBatch} + * instance

+ * + * @return {@link org.rocksdb.WriteBatch} instance. + */ + public WriteBatch writeBatch() { + return writeBatch_; + } + + private final long sequenceNumber_; + private final WriteBatch writeBatch_; + } + + private native void disposeInternal(long handle); + private native boolean isValid(long handle); + private native void next(long handle); + private native void status(long handle) + throws RocksDBException; + private native BatchResult getBatch(long handle); +} diff --git a/java/org/rocksdb/WriteBatch.java b/java/org/rocksdb/WriteBatch.java index 24133ec39..fd8b894cb 100644 --- a/java/org/rocksdb/WriteBatch.java +++ b/java/org/rocksdb/WriteBatch.java @@ -53,6 +53,12 @@ public class WriteBatch extends AbstractWriteBatch { iterate(handler.nativeHandle_); } + WriteBatch(long nativeHandle) { + super(); + disOwnNativeHandle(); + nativeHandle_ = nativeHandle; + } + @Override final native void disposeInternal(long handle); @Override final native int count0(); @Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen); diff --git a/java/org/rocksdb/test/TransactionLogIteratorTest.java b/java/org/rocksdb/test/TransactionLogIteratorTest.java new file mode 100644 index 000000000..2069e1200 --- /dev/null +++ b/java/org/rocksdb/test/TransactionLogIteratorTest.java @@ -0,0 +1,79 @@ +package org.rocksdb.test; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.*; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TransactionLogIteratorTest { + @ClassRule + public static final RocksMemoryResource rocksMemoryResource = + new RocksMemoryResource(); + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + @Test + public void transactionLogIterator() throws RocksDBException { + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + transactionLogIterator = db.getUpdatesSince(0); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } + + @Test + public void getBatch() throws RocksDBException { + RocksDB db = null; + Options options = null; + TransactionLogIterator transactionLogIterator = null; + try { + options = new Options(). + setCreateIfMissing(true). + setWalTtlSeconds(1000). + setWalSizeLimitMB(10); + + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + + for (int i = 0; i < 250; i++){ + db.put(String.valueOf(i).getBytes(), + String.valueOf(i).getBytes()); + } + db.flush(new FlushOptions().setWaitForFlush(true)); + transactionLogIterator = db.getUpdatesSince(0); + assertThat(transactionLogIterator.isValid()).isTrue(); + transactionLogIterator.status(); + + TransactionLogIterator.BatchResult batchResult = + transactionLogIterator.getBatch(); + assertThat(batchResult.sequenceNumber()).isEqualTo(1); + } finally { + if (transactionLogIterator != null) { + transactionLogIterator.dispose(); + } + if (db != null) { + db.close(); + } + if (options != null) { + options.dispose(); + } + } + } +} diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 1055f87fe..9f5b9446e 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -16,6 +17,7 @@ #include "rocksjni/portal.h" #include "rocksdb/db.h" #include "rocksdb/cache.h" +#include "rocksdb/types.h" ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::Open @@ -1598,3 +1600,25 @@ void Java_org_rocksdb_RocksDB_compactRange__J_3BI_3BIZIIJ( rocksdb_compactrange_helper(env, db, cf_handle, jbegin, jbegin_len, jend, jend_len, jreduce_level, jtarget_level, jtarget_path_id); } + +////////////////////////////////////////////////////////////////////////////// +// rocksdb::DB::GetUpdatesSince + +/* + * Class: org_rocksdb_RocksDB + * Method: getUpdatesSince + * Signature: (JJ)J + */ +jlong Java_org_rocksdb_RocksDB_getUpdatesSince(JNIEnv* env, + jobject jdb, jlong jdb_handle, jlong jsequence_number) { + auto db = reinterpret_cast(jdb_handle); + rocksdb::SequenceNumber sequence_number = + static_cast(jsequence_number); + std::unique_ptr iter; + rocksdb::Status s = db->GetUpdatesSince(sequence_number, &iter); + if (s.ok()) { + return reinterpret_cast(iter.release()); + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + return 0; +} diff --git a/java/rocksjni/transaction_log.cc b/java/rocksjni/transaction_log.cc new file mode 100644 index 000000000..28e387fe1 --- /dev/null +++ b/java/rocksjni/transaction_log.cc @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::Iterator methods from Java side. + +#include +#include +#include + +#include "include/org_rocksdb_TransactionLogIterator.h" +#include "rocksdb/transaction_log.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_disposeInternal( + JNIEnv* env, jobject jobj, jlong handle) { + auto* it = reinterpret_cast(handle); + delete it; +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: isValid + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_TransactionLogIterator_isValid( + JNIEnv* env, jobject jobj, jlong handle) { + return reinterpret_cast(handle)->Valid(); +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: next + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_next( + JNIEnv* env, jobject jobj, jlong handle) { + reinterpret_cast(handle)->Next(); +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: status + * Signature: (J)V + */ +void Java_org_rocksdb_TransactionLogIterator_status( + JNIEnv* env, jobject jobj, jlong handle) { + rocksdb::Status s = reinterpret_cast< + rocksdb::TransactionLogIterator*>(handle)->status(); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_TransactionLogIterator + * Method: getBatch + * Signature: (J)Lorg/rocksdb/TransactionLogIterator$BatchResult + */ +jobject Java_org_rocksdb_TransactionLogIterator_getBatch( + JNIEnv* env, jobject jobj, jlong handle) { + rocksdb::BatchResult batch_result = + reinterpret_cast(handle)->GetBatch(); + jclass jclazz = env->FindClass( + "org/rocksdb/TransactionLogIterator$BatchResult"); + assert(jclazz != nullptr); + jmethodID mid = env->GetMethodID( + jclazz, "", "(Lorg/rocksdb/TransactionLogIterator;JJ)V"); + assert(mid != nullptr); + return env->NewObject(jclazz, mid, jobj, + batch_result.sequence, batch_result.writeBatchPtr.release()); +}