[RocksJava] GetUpdatesSince support

main
fyrz 10 years ago
parent d3a736761b
commit 9a456fba20
  1. 2
      java/Makefile
  2. 25
      java/org/rocksdb/RocksDB.java
  3. 116
      java/org/rocksdb/TransactionLogIterator.java
  4. 6
      java/org/rocksdb/WriteBatch.java
  5. 79
      java/org/rocksdb/test/TransactionLogIteratorTest.java
  6. 24
      java/rocksjni/rocksjni.cc
  7. 79
      java/rocksjni/transaction_log.cc

@ -29,6 +29,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractComparator\
org.rocksdb.SkipListMemTableConfig\
org.rocksdb.Slice\
org.rocksdb.Statistics\
org.rocksdb.TransactionLogIterator\
org.rocksdb.TtlDB\
org.rocksdb.VectorMemTableConfig\
org.rocksdb.StringAppendOperator\
@ -81,6 +82,7 @@ JAVA_TESTS = org.rocksdb.test.BackupableDBOptionsTest\
org.rocksdb.test.SizeUnitTest\
org.rocksdb.test.SliceTest\
org.rocksdb.test.SnapshotTest\
org.rocksdb.test.TransactionLogIteratorTest\
org.rocksdb.test.TtlDBTest\
org.rocksdb.test.StatisticsCollectorTest\
org.rocksdb.test.WriteBatchHandlerTest\

@ -1588,6 +1588,29 @@ public class RocksDB extends RocksObject {
columnFamilyHandle.nativeHandle_);
}
/**
* <p>Returns an iterator that is positioned at a write-batch containing
* seq_number. If the sequence number is non existent, it returns an iterator
* at the first available seq_no after the requested seq_no.</p>
*
* <p>Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
* use this api, else the WAL files will get
* cleared aggressively and the iterator might keep getting invalid before
* an update is read.</p>
*
* @param sequenceNumber sequence number offset
*
* @return {@link org.rocksdb.TransactionLogIterator} instance.
*
* @throws org.rocksdb.RocksDBException if iterator cannot be retrieved
* from native-side.
*/
public TransactionLogIterator getUpdatesSince(long sequenceNumber)
throws RocksDBException {
return new TransactionLogIterator(
getUpdatesSince(nativeHandle_, sequenceNumber));
}
/**
* Private constructor.
*/
@ -1730,6 +1753,8 @@ public class RocksDB extends RocksObject {
private native void compactRange(long handle, byte[] begin, int beginLen, byte[] end,
int endLen, boolean reduce_level, int target_level, int target_path_id,
long cfHandle) throws RocksDBException;
private native long getUpdatesSince(long handle, long sequenceNumber)
throws RocksDBException;
protected DBOptionsInterface options_;
}

@ -0,0 +1,116 @@
package org.rocksdb;
/**
* <p>A TransactionLogIterator is used to iterate over the transactions in a db.
* One run of the iterator is continuous, i.e. the iterator will stop at the
* beginning of any gap in sequences.</p>
*/
public class TransactionLogIterator extends RocksObject {
/**
* <p>An iterator is either positioned at a WriteBatch
* or not valid. This method returns true if the iterator
* is valid. Can read data from a valid iterator.</p>
*
* @return true if iterator position is valid.
*/
public boolean isValid() {
return isValid(nativeHandle_);
}
/**
* <p>Moves the iterator to the next WriteBatch.
* <strong>REQUIRES</strong>: Valid() to be true.</p>
*/
public void next() {
assert(isValid());
next(nativeHandle_);
}
/**
* <p>Throws RocksDBException if something went wrong.</p>
*
* @throws org.rocksdb.RocksDBException if something went
* wrong in the underlying C++ code.
*/
public void status() throws RocksDBException {
status(nativeHandle_);
}
/**
* <p>If iterator position is valid, return the current
* write_batch and the sequence number of the earliest
* transaction contained in the batch.</p>
*
* <p>ONLY use if Valid() is true and status() is OK.</p>
*
* @return {@link org.rocksdb.TransactionLogIterator.BatchResult}
* instance.
*/
public BatchResult getBatch() {
assert(isValid());
return getBatch(nativeHandle_);
}
/**
* <p>TransactionLogIterator constructor.</p>
*
* @param nativeHandle address to native address.
*/
TransactionLogIterator(long nativeHandle) {
super();
nativeHandle_ = nativeHandle;
}
@Override protected void disposeInternal() {
disposeInternal(nativeHandle_);
}
/**
* <p>BatchResult represents a data structure returned
* by a TransactionLogIterator containing a sequence
* number and a {@link WriteBatch} instance.</p>
*/
public class BatchResult {
/**
* <p>Constructor of BatchResult class.</p>
*
* @param sequenceNumber related to this BatchResult instance.
* @param nativeHandle to {@link org.rocksdb.WriteBatch}
* native instance.
*/
public BatchResult(long sequenceNumber, long nativeHandle) {
sequenceNumber_ = sequenceNumber;
writeBatch_ = new WriteBatch(nativeHandle);
}
/**
* <p>Return sequence number related to this BatchResult.</p>
*
* @return Sequence number.
*/
public long sequenceNumber() {
return sequenceNumber_;
}
/**
* <p>Return contained {@link org.rocksdb.WriteBatch}
* instance</p>
*
* @return {@link org.rocksdb.WriteBatch} instance.
*/
public WriteBatch writeBatch() {
return writeBatch_;
}
private final long sequenceNumber_;
private final WriteBatch writeBatch_;
}
private native void disposeInternal(long handle);
private native boolean isValid(long handle);
private native void next(long handle);
private native void status(long handle)
throws RocksDBException;
private native BatchResult getBatch(long handle);
}

@ -53,6 +53,12 @@ public class WriteBatch extends AbstractWriteBatch {
iterate(handler.nativeHandle_);
}
WriteBatch(long nativeHandle) {
super();
disOwnNativeHandle();
nativeHandle_ = nativeHandle;
}
@Override final native void disposeInternal(long handle);
@Override final native int count0();
@Override final native void put(byte[] key, int keyLen, byte[] value, int valueLen);

@ -0,0 +1,79 @@
package org.rocksdb.test;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.*;
import static org.assertj.core.api.Assertions.assertThat;
public class TransactionLogIteratorTest {
@ClassRule
public static final RocksMemoryResource rocksMemoryResource =
new RocksMemoryResource();
@Rule
public TemporaryFolder dbFolder = new TemporaryFolder();
@Test
public void transactionLogIterator() throws RocksDBException {
RocksDB db = null;
Options options = null;
TransactionLogIterator transactionLogIterator = null;
try {
options = new Options().
setCreateIfMissing(true);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
transactionLogIterator = db.getUpdatesSince(0);
} finally {
if (transactionLogIterator != null) {
transactionLogIterator.dispose();
}
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
@Test
public void getBatch() throws RocksDBException {
RocksDB db = null;
Options options = null;
TransactionLogIterator transactionLogIterator = null;
try {
options = new Options().
setCreateIfMissing(true).
setWalTtlSeconds(1000).
setWalSizeLimitMB(10);
db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
for (int i = 0; i < 250; i++){
db.put(String.valueOf(i).getBytes(),
String.valueOf(i).getBytes());
}
db.flush(new FlushOptions().setWaitForFlush(true));
transactionLogIterator = db.getUpdatesSince(0);
assertThat(transactionLogIterator.isValid()).isTrue();
transactionLogIterator.status();
TransactionLogIterator.BatchResult batchResult =
transactionLogIterator.getBatch();
assertThat(batchResult.sequenceNumber()).isEqualTo(1);
} finally {
if (transactionLogIterator != null) {
transactionLogIterator.dispose();
}
if (db != null) {
db.close();
}
if (options != null) {
options.dispose();
}
}
}
}

@ -9,6 +9,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <jni.h>
#include <memory>
#include <string>
#include <vector>
@ -16,6 +17,7 @@
#include "rocksjni/portal.h"
#include "rocksdb/db.h"
#include "rocksdb/cache.h"
#include "rocksdb/types.h"
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::Open
@ -1598,3 +1600,25 @@ void Java_org_rocksdb_RocksDB_compactRange__J_3BI_3BIZIIJ(
rocksdb_compactrange_helper(env, db, cf_handle, jbegin, jbegin_len,
jend, jend_len, jreduce_level, jtarget_level, jtarget_path_id);
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::DB::GetUpdatesSince
/*
* Class: org_rocksdb_RocksDB
* Method: getUpdatesSince
* Signature: (JJ)J
*/
jlong Java_org_rocksdb_RocksDB_getUpdatesSince(JNIEnv* env,
jobject jdb, jlong jdb_handle, jlong jsequence_number) {
auto db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
rocksdb::SequenceNumber sequence_number =
static_cast<rocksdb::SequenceNumber>(jsequence_number);
std::unique_ptr<rocksdb::TransactionLogIterator> iter;
rocksdb::Status s = db->GetUpdatesSince(sequence_number, &iter);
if (s.ok()) {
return reinterpret_cast<jlong>(iter.release());
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
return 0;
}

@ -0,0 +1,79 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::Iterator methods from Java side.
#include <stdio.h>
#include <stdlib.h>
#include <jni.h>
#include "include/org_rocksdb_TransactionLogIterator.h"
#include "rocksdb/transaction_log.h"
#include "rocksjni/portal.h"
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_TransactionLogIterator_disposeInternal(
JNIEnv* env, jobject jobj, jlong handle) {
auto* it = reinterpret_cast<rocksdb::TransactionLogIterator*>(handle);
delete it;
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: isValid
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_TransactionLogIterator_isValid(
JNIEnv* env, jobject jobj, jlong handle) {
return reinterpret_cast<rocksdb::TransactionLogIterator*>(handle)->Valid();
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: next
* Signature: (J)V
*/
void Java_org_rocksdb_TransactionLogIterator_next(
JNIEnv* env, jobject jobj, jlong handle) {
reinterpret_cast<rocksdb::TransactionLogIterator*>(handle)->Next();
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: status
* Signature: (J)V
*/
void Java_org_rocksdb_TransactionLogIterator_status(
JNIEnv* env, jobject jobj, jlong handle) {
rocksdb::Status s = reinterpret_cast<
rocksdb::TransactionLogIterator*>(handle)->status();
if (!s.ok()) {
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
}
}
/*
* Class: org_rocksdb_TransactionLogIterator
* Method: getBatch
* Signature: (J)Lorg/rocksdb/TransactionLogIterator$BatchResult
*/
jobject Java_org_rocksdb_TransactionLogIterator_getBatch(
JNIEnv* env, jobject jobj, jlong handle) {
rocksdb::BatchResult batch_result =
reinterpret_cast<rocksdb::TransactionLogIterator*>(handle)->GetBatch();
jclass jclazz = env->FindClass(
"org/rocksdb/TransactionLogIterator$BatchResult");
assert(jclazz != nullptr);
jmethodID mid = env->GetMethodID(
jclazz, "<init>", "(Lorg/rocksdb/TransactionLogIterator;JJ)V");
assert(mid != nullptr);
return env->NewObject(jclazz, mid, jobj,
batch_result.sequence, batch_result.writeBatchPtr.release());
}
Loading…
Cancel
Save