Java API - Implement GetFromBatch and GetFromBatchAndDB in WBWI

Summary:
Needed for working with `get` after `merge` on a WBWI.
Closes https://github.com/facebook/rocksdb/pull/1093

Differential Revision: D4137978

Pulled By: yhchiang

fbshipit-source-id: e18d50d
main
Adam Retter 8 years ago committed by Facebook Github Bot
parent 815f54afad
commit 24bceb0963
  1. 32
      java/rocksjni/portal.h
  2. 81
      java/rocksjni/write_batch_with_index.cc
  3. 92
      java/src/main/java/org/rocksdb/WriteBatchWithIndex.java
  4. 71
      java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java

@ -900,6 +900,38 @@ class JniUtil {
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
} }
/*
* Helper for operations on a value
* for example WriteBatchWithIndex->GetFromBatch
*/
static jbyteArray v_op(
std::function<rocksdb::Status(rocksdb::Slice, std::string*)> op,
JNIEnv* env, jbyteArray jkey, jint jkey_len) {
jboolean isCopy;
jbyte* key = env->GetByteArrayElements(jkey, &isCopy);
rocksdb::Slice key_slice(reinterpret_cast<char*>(key), jkey_len);
std::string value;
rocksdb::Status s = op(key_slice, &value);
env->ReleaseByteArrayElements(jkey, key, JNI_ABORT);
if (s.IsNotFound()) {
return nullptr;
}
if (s.ok()) {
jbyteArray jret_value =
env->NewByteArray(static_cast<jsize>(value.size()));
env->SetByteArrayRegion(jret_value, 0, static_cast<jsize>(value.size()),
reinterpret_cast<const jbyte*>(value.c_str()));
return jret_value;
}
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
return nullptr;
}
}; };
} // namespace rocksdb } // namespace rocksdb

@ -273,6 +273,87 @@ jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(
return reinterpret_cast<jlong>(iterator); return reinterpret_cast<jlong>(iterator);
} }
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: getFromBatch
* Signature: (JJ[BI)[B
*/
jbyteArray JNICALL Java_org_rocksdb_WriteBatchWithIndex_getFromBatch__JJ_3BI(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jlong jdbopt_handle,
jbyteArray jkey, jint jkey_len) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
auto* dbopt = reinterpret_cast<rocksdb::DBOptions*>(jdbopt_handle);
auto getter = [&wbwi, &dbopt](const rocksdb::Slice& key, std::string* value) {
return wbwi->GetFromBatch(*dbopt, key, value);
};
return rocksdb::JniUtil::v_op(getter, env, jkey, jkey_len);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: getFromBatch
* Signature: (JJ[BIJ)[B
*/
jbyteArray Java_org_rocksdb_WriteBatchWithIndex_getFromBatch__JJ_3BIJ(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jlong jdbopt_handle,
jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
auto* dbopt = reinterpret_cast<rocksdb::DBOptions*>(jdbopt_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto getter =
[&wbwi, &cf_handle, &dbopt](const rocksdb::Slice& key,
std::string* value) {
return wbwi->GetFromBatch(cf_handle, *dbopt, key, value);
};
return rocksdb::JniUtil::v_op(getter, env, jkey, jkey_len);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: getFromBatchAndDB
* Signature: (JJJ[BI)[B
*/
jbyteArray Java_org_rocksdb_WriteBatchWithIndex_getFromBatchAndDB__JJJ_3BI(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jlong jdb_handle,
jlong jreadopt_handle, jbyteArray jkey, jint jkey_len) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto* readopt = reinterpret_cast<rocksdb::ReadOptions*>(jreadopt_handle);
auto getter =
[&wbwi, &db, &readopt](const rocksdb::Slice& key, std::string* value) {
return wbwi->GetFromBatchAndDB(db, *readopt, key, value);
};
return rocksdb::JniUtil::v_op(getter, env, jkey, jkey_len);
}
/*
* Class: org_rocksdb_WriteBatchWithIndex
* Method: getFromBatchAndDB
* Signature: (JJJ[BIJ)[B
*/
jbyteArray Java_org_rocksdb_WriteBatchWithIndex_getFromBatchAndDB__JJJ_3BIJ(
JNIEnv* env, jobject jobj, jlong jwbwi_handle, jlong jdb_handle,
jlong jreadopt_handle, jbyteArray jkey, jint jkey_len, jlong jcf_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
auto* db = reinterpret_cast<rocksdb::DB*>(jdb_handle);
auto* readopt = reinterpret_cast<rocksdb::ReadOptions*>(jreadopt_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto getter =
[&wbwi, &db, &cf_handle, &readopt](const rocksdb::Slice& key,
std::string* value) {
return wbwi->GetFromBatchAndDB(db, *readopt, cf_handle, key, value);
};
return rocksdb::JniUtil::v_op(getter, env, jkey, jkey_len);
}
/* /*
* Class: org_rocksdb_WriteBatchWithIndex * Class: org_rocksdb_WriteBatchWithIndex
* Method: disposeInternal * Method: disposeInternal

@ -136,6 +136,88 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
baseIterator); baseIterator);
} }
/**
* Similar to {@link RocksDB#get(ColumnFamilyHandle, byte[])} but will only
* read the key from this batch.
*
* @param columnFamilyHandle The column family to retrieve the value from
* @param options The database options to use
* @param key The key to read the value for
*
* @throws RocksDBException if the batch does not have enough data to resolve
* Merge operations, MergeInProgress status may be returned.
*/
public byte[] getFromBatch(final ColumnFamilyHandle columnFamilyHandle,
final DBOptions options, final byte[] key) throws RocksDBException {
return getFromBatch(nativeHandle_, options.nativeHandle_,
key, key.length, columnFamilyHandle.nativeHandle_);
}
/**
* Similar to {@link RocksDB#get(byte[])} but will only
* read the key from this batch.
*
* @param options The database options to use
* @param key The key to read the value for
*
* @throws RocksDBException if the batch does not have enough data to resolve
* Merge operations, MergeInProgress status may be returned.
*/
public byte[] getFromBatch(final DBOptions options, final byte[] key)
throws RocksDBException {
return getFromBatch(nativeHandle_, options.nativeHandle_, key, key.length);
}
/**
* Similar to {@link RocksDB#get(ColumnFamilyHandle, byte[])} but will also
* read writes from this batch.
*
* This function will query both this batch and the DB and then merge
* the results using the DB's merge operator (if the batch contains any
* merge requests).
*
* Setting {@link ReadOptions#setSnapshot(long, long)} will affect what is
* read from the DB but will NOT change which keys are read from the batch
* (the keys in this batch do not yet belong to any snapshot and will be
* fetched regardless).
*
* @param columnFamilyHandle The column family to retrieve the value from
* @param options The read options to use
* @param key The key to read the value for
*
* @throws RocksDBException if the value for the key cannot be read
*/
public byte[] getFromBatchAndDB(final RocksDB db, final ColumnFamilyHandle columnFamilyHandle,
final ReadOptions options, final byte[] key) throws RocksDBException {
return getFromBatchAndDB(nativeHandle_, db.nativeHandle_,
options.nativeHandle_, key, key.length,
columnFamilyHandle.nativeHandle_);
}
/**
* Similar to {@link RocksDB#get(byte[])} but will also
* read writes from this batch.
*
* This function will query both this batch and the DB and then merge
* the results using the DB's merge operator (if the batch contains any
* merge requests).
*
* Setting {@link ReadOptions#setSnapshot(long, long)} will affect what is
* read from the DB but will NOT change which keys are read from the batch
* (the keys in this batch do not yet belong to any snapshot and will be
* fetched regardless).
*
* @param options The read options to use
* @param key The key to read the value for
*
* @throws RocksDBException if the value for the key cannot be read
*/
public byte[] getFromBatchAndDB(final RocksDB db, final ReadOptions options,
final byte[] key) throws RocksDBException {
return getFromBatchAndDB(nativeHandle_, db.nativeHandle_,
options.nativeHandle_, key, key.length);
}
@Override protected final native void disposeInternal(final long handle); @Override protected final native void disposeInternal(final long handle);
@Override final native int count0(final long handle); @Override final native int count0(final long handle);
@Override final native void put(final long handle, final byte[] key, @Override final native void put(final long handle, final byte[] key,
@ -167,4 +249,14 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
private native long iterator1(final long handle, final long cfHandle); private native long iterator1(final long handle, final long cfHandle);
private native long iteratorWithBase(final long handle, private native long iteratorWithBase(final long handle,
final long baseIteratorHandle, final long cfHandle); final long baseIteratorHandle, final long cfHandle);
private native byte[] getFromBatch(final long handle, final long optHandle,
final byte[] key, final int keyLen);
private native byte[] getFromBatch(final long handle, final long optHandle,
final byte[] key, final int keyLen, final long cfHandle);
private native byte[] getFromBatchAndDB(final long handle,
final long dbHandle, final long readOptHandle, final byte[] key,
final int keyLen);
private native byte[] getFromBatchAndDB(final long handle,
final long dbHandle, final long readOptHandle, final byte[] key,
final int keyLen, final long cfHandle);
} }

@ -307,6 +307,77 @@ public class WriteBatchWithIndexTest {
} }
} }
@Test
public void getFromBatch() throws RocksDBException {
final byte[] k1 = "k1".getBytes();
final byte[] k2 = "k2".getBytes();
final byte[] k3 = "k3".getBytes();
final byte[] k4 = "k4".getBytes();
final byte[] v1 = "v1".getBytes();
final byte[] v2 = "v2".getBytes();
final byte[] v3 = "v3".getBytes();
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final DBOptions dbOptions = new DBOptions()) {
wbwi.put(k1, v1);
wbwi.put(k2, v2);
wbwi.put(k3, v3);
assertThat(wbwi.getFromBatch(dbOptions, k1)).isEqualTo(v1);
assertThat(wbwi.getFromBatch(dbOptions, k2)).isEqualTo(v2);
assertThat(wbwi.getFromBatch(dbOptions, k3)).isEqualTo(v3);
assertThat(wbwi.getFromBatch(dbOptions, k4)).isNull();
wbwi.remove(k2);
assertThat(wbwi.getFromBatch(dbOptions, k2)).isNull();
}
}
@Test
public void getFromBatchAndDB() throws RocksDBException {
final byte[] k1 = "k1".getBytes();
final byte[] k2 = "k2".getBytes();
final byte[] k3 = "k3".getBytes();
final byte[] k4 = "k4".getBytes();
final byte[] v1 = "v1".getBytes();
final byte[] v2 = "v2".getBytes();
final byte[] v3 = "v3".getBytes();
final byte[] v4 = "v4".getBytes();
try (final Options options = new Options().setCreateIfMissing(true);
final RocksDB db = RocksDB.open(options,
dbFolder.getRoot().getAbsolutePath())) {
db.put(k1, v1);
db.put(k2, v2);
db.put(k4, v4);
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final DBOptions dbOptions = new DBOptions();
final ReadOptions readOptions = new ReadOptions()) {
assertThat(wbwi.getFromBatch(dbOptions, k1)).isNull();
assertThat(wbwi.getFromBatch(dbOptions, k2)).isNull();
assertThat(wbwi.getFromBatch(dbOptions, k4)).isNull();
wbwi.put(k3, v3);
assertThat(wbwi.getFromBatch(dbOptions, k3)).isEqualTo(v3);
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k1)).isEqualTo(v1);
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k2)).isEqualTo(v2);
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k3)).isEqualTo(v3);
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k4)).isEqualTo(v4);
wbwi.remove(k4);
assertThat(wbwi.getFromBatchAndDB(db, readOptions, k4)).isNull();
}
}
}
private byte[] toArray(final ByteBuffer buf) { private byte[] toArray(final ByteBuffer buf) {
final byte[] ary = new byte[buf.remaining()]; final byte[] ary = new byte[buf.remaining()];
buf.get(ary); buf.get(ary);

Loading…
Cancel
Save