Added further Java API options for controlling concurrent writes

main
Adam Retter 9 years ago committed by krad
parent ebdfe34cc4
commit d367555027
  1. 176
      java/rocksjni/options.cc
  2. 56
      java/src/main/java/org/rocksdb/DBOptions.java
  3. 104
      java/src/main/java/org/rocksdb/DBOptionsInterface.java
  4. 56
      java/src/main/java/org/rocksdb/Options.java
  5. 36
      java/src/test/java/org/rocksdb/DBOptionsTest.java
  6. 36
      java/src/test/java/org/rocksdb/OptionsTest.java

@ -1034,6 +1034,94 @@ void Java_org_rocksdb_Options_setBytesPerSync(
static_cast<int64_t>(bytes_per_sync);
}
/*
* Class: org_rocksdb_Options
* Method: setAllowConcurrentMemtableWrite
* Signature: (JZ)V
*/
void Java_org_rocksdb_Options_setAllowConcurrentMemtableWrite(
JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow) {
reinterpret_cast<rocksdb::Options*>(jhandle)->
allow_concurrent_memtable_write = static_cast<bool>(allow);
}
/*
* Class: org_rocksdb_Options
* Method: allowConcurrentMemtableWrite
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_Options_allowConcurrentMemtableWrite(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)->
allow_concurrent_memtable_write;
}
/*
* Class: org_rocksdb_Options
* Method: setEnableWriteThreadAdaptiveYield
* Signature: (JZ)V
*/
void Java_org_rocksdb_Options_setEnableWriteThreadAdaptiveYield(
JNIEnv* env, jobject jobj, jlong jhandle, jboolean yield) {
reinterpret_cast<rocksdb::Options*>(jhandle)->
enable_write_thread_adaptive_yield = static_cast<bool>(yield);
}
/*
* Class: org_rocksdb_Options
* Method: enableWriteThreadAdaptiveYield
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_Options_enableWriteThreadAdaptiveYield(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)->
enable_write_thread_adaptive_yield;
}
/*
* Class: org_rocksdb_Options
* Method: setWriteThreadMaxYieldUsec
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setWriteThreadMaxYieldUsec(
JNIEnv* env, jobject jobject, jlong jhandle, jlong max) {
reinterpret_cast<rocksdb::Options*>(jhandle)->
write_thread_max_yield_usec = static_cast<int64_t>(max);
}
/*
* Class: org_rocksdb_Options
* Method: writeThreadMaxYieldUsec
* Signature: (J)J
*/
jlong Java_org_rocksdb_Options_writeThreadMaxYieldUsec(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)->
write_thread_max_yield_usec;
}
/*
* Class: org_rocksdb_Options
* Method: setWriteThreadSlowYieldUsec
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setWriteThreadSlowYieldUsec(
JNIEnv* env, jobject jobj, jlong jhandle, jlong slow) {
reinterpret_cast<rocksdb::Options*>(jhandle)->
write_thread_slow_yield_usec = static_cast<int64_t>(slow);
}
/*
* Class: org_rocksdb_Options
* Method: writeThreadSlowYieldUsec
* Signature: (J)J
*/
jlong Java_org_rocksdb_Options_writeThreadSlowYieldUsec(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)->
write_thread_slow_yield_usec;
}
/*
* Method: tableFactoryName
* Signature: (J)Ljava/lang/String
@ -4287,6 +4375,94 @@ jlong Java_org_rocksdb_DBOptions_bytesPerSync(
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->bytes_per_sync;
}
/*
* Class: org_rocksdb_DBOptions
* Method: setAllowConcurrentMemtableWrite
* Signature: (JZ)V
*/
void Java_org_rocksdb_DBOptions_setAllowConcurrentMemtableWrite(
JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
allow_concurrent_memtable_write = static_cast<bool>(allow);
}
/*
* Class: org_rocksdb_DBOptions
* Method: allowConcurrentMemtableWrite
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_DBOptions_allowConcurrentMemtableWrite(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
allow_concurrent_memtable_write;
}
/*
* Class: org_rocksdb_DBOptions
* Method: setEnableWriteThreadAdaptiveYield
* Signature: (JZ)V
*/
void Java_org_rocksdb_DBOptions_setEnableWriteThreadAdaptiveYield(
JNIEnv* env, jobject jobj, jlong jhandle, jboolean yield) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
enable_write_thread_adaptive_yield = static_cast<bool>(yield);
}
/*
* Class: org_rocksdb_DBOptions
* Method: enableWriteThreadAdaptiveYield
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_DBOptions_enableWriteThreadAdaptiveYield(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
enable_write_thread_adaptive_yield;
}
/*
* Class: org_rocksdb_DBOptions
* Method: setWriteThreadMaxYieldUsec
* Signature: (JJ)V
*/
void Java_org_rocksdb_DBOptions_setWriteThreadMaxYieldUsec(
JNIEnv* env, jobject jobject, jlong jhandle, jlong max) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
write_thread_max_yield_usec = static_cast<int64_t>(max);
}
/*
* Class: org_rocksdb_DBOptions
* Method: writeThreadMaxYieldUsec
* Signature: (J)J
*/
jlong Java_org_rocksdb_DBOptions_writeThreadMaxYieldUsec(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
write_thread_max_yield_usec;
}
/*
* Class: org_rocksdb_DBOptions
* Method: setWriteThreadSlowYieldUsec
* Signature: (JJ)V
*/
void Java_org_rocksdb_DBOptions_setWriteThreadSlowYieldUsec(
JNIEnv* env, jobject jobj, jlong jhandle, jlong slow) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
write_thread_slow_yield_usec = static_cast<int64_t>(slow);
}
/*
* Class: org_rocksdb_DBOptions
* Method: writeThreadSlowYieldUsec
* Signature: (J)J
*/
jlong Java_org_rocksdb_DBOptions_writeThreadSlowYieldUsec(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->
write_thread_slow_yield_usec;
}
//////////////////////////////////////////////////////////////////////////////
// rocksdb::WriteOptions

@ -558,6 +558,50 @@ public class DBOptions extends RocksObject implements DBOptionsInterface {
return bytesPerSync(nativeHandle_);
}
@Override
public void setAllowConcurrentMemtableWrite(
final boolean allowConcurrentMemtableWrite) {
setAllowConcurrentMemtableWrite(nativeHandle_,
allowConcurrentMemtableWrite);
}
@Override
public boolean allowConcurrentMemtableWrite() {
return allowConcurrentMemtableWrite(nativeHandle_);
}
@Override
public void setEnableWriteThreadAdaptiveYield(
final boolean enableWriteThreadAdaptiveYield) {
setEnableWriteThreadAdaptiveYield(nativeHandle_,
enableWriteThreadAdaptiveYield);
}
@Override
public boolean enableWriteThreadAdaptiveYield() {
return enableWriteThreadAdaptiveYield(nativeHandle_);
}
@Override
public void setWriteThreadMaxYieldUsec(final long writeThreadMaxYieldUsec) {
setWriteThreadMaxYieldUsec(nativeHandle_, writeThreadMaxYieldUsec);
}
@Override
public long writeThreadMaxYieldUsec() {
return writeThreadMaxYieldUsec(nativeHandle_);
}
@Override
public void setWriteThreadSlowYieldUsec(final long writeThreadSlowYieldUsec) {
setWriteThreadSlowYieldUsec(nativeHandle_, writeThreadSlowYieldUsec);
}
@Override
public long writeThreadSlowYieldUsec() {
return writeThreadSlowYieldUsec(nativeHandle_);
}
static final int DEFAULT_NUM_SHARD_BITS = -1;
/**
@ -668,6 +712,18 @@ public class DBOptions extends RocksObject implements DBOptionsInterface {
private native void setBytesPerSync(
long handle, long bytesPerSync);
private native long bytesPerSync(long handle);
private native void setAllowConcurrentMemtableWrite(long handle,
boolean allowConcurrentMemtableWrite);
private native boolean allowConcurrentMemtableWrite(long handle);
private native void setEnableWriteThreadAdaptiveYield(long handle,
boolean enableWriteThreadAdaptiveYield);
private native boolean enableWriteThreadAdaptiveYield(long handle);
private native void setWriteThreadMaxYieldUsec(long handle,
long writeThreadMaxYieldUsec);
private native long writeThreadMaxYieldUsec(long handle);
private native void setWriteThreadSlowYieldUsec(long handle,
long writeThreadSlowYieldUsec);
private native long writeThreadSlowYieldUsec(long handle);
int numShardBits_;
RateLimiterConfig rateLimiterConfig_;

@ -802,4 +802,108 @@ public interface DBOptionsInterface {
* @return size in bytes
*/
long bytesPerSync();
/**
* If true, allow multi-writers to update mem tables in parallel.
* Only some memtable factorys support concurrent writes; currently it
* is implemented only for SkipListFactory. Concurrent memtable writes
* are not compatible with inplace_update_support or filter_deletes.
* It is strongly recommended to set
* {@link #setEnableWriteThreadAdaptiveYield(boolean)} if you are going to use
* this feature.
* Default: false
*
* @param allowConcurrentMemtableWrite true to enable concurrent writes
* for the memtable
*/
void setAllowConcurrentMemtableWrite(boolean allowConcurrentMemtableWrite);
/**
* If true, allow multi-writers to update mem tables in parallel.
* Only some memtable factorys support concurrent writes; currently it
* is implemented only for SkipListFactory. Concurrent memtable writes
* are not compatible with inplace_update_support or filter_deletes.
* It is strongly recommended to set
* {@link #setEnableWriteThreadAdaptiveYield(boolean)} if you are going to use
* this feature.
* Default: false
*
* @return true if concurrent writes are enabled for the memtable
*/
boolean allowConcurrentMemtableWrite();
/**
* If true, threads synchronizing with the write batch group leader will
* wait for up to {@link #writeThreadMaxYieldUsec()} before blocking on a
* mutex. This can substantially improve throughput for concurrent workloads,
* regardless of whether {@link #allowConcurrentMemtableWrite()} is enabled.
* Default: false
*
* @param enableWriteThreadAdaptiveYield true to enable adaptive yield for the
* write threads
*/
void setEnableWriteThreadAdaptiveYield(
boolean enableWriteThreadAdaptiveYield);
/**
* If true, threads synchronizing with the write batch group leader will
* wait for up to {@link #writeThreadMaxYieldUsec()} before blocking on a
* mutex. This can substantially improve throughput for concurrent workloads,
* regardless of whether {@link #allowConcurrentMemtableWrite()} is enabled.
* Default: false
*
* @return true if adaptive yield is enabled
* for the writing threads
*/
boolean enableWriteThreadAdaptiveYield();
/**
* The maximum number of microseconds that a write operation will use
* a yielding spin loop to coordinate with other write threads before
* blocking on a mutex. (Assuming {@link #writeThreadSlowYieldUsec()} is
* set properly) increasing this value is likely to increase RocksDB
* throughput at the expense of increased CPU usage.
* Default: 100
*
* @param writeThreadMaxYieldUsec maximum number of microseconds
*/
void setWriteThreadMaxYieldUsec(long writeThreadMaxYieldUsec);
/**
* The maximum number of microseconds that a write operation will use
* a yielding spin loop to coordinate with other write threads before
* blocking on a mutex. (Assuming {@link #writeThreadSlowYieldUsec()} is
* set properly) increasing this value is likely to increase RocksDB
* throughput at the expense of increased CPU usage.
* Default: 100
*
* @return the maximum number of microseconds
*/
long writeThreadMaxYieldUsec();
/**
* The latency in microseconds after which a std::this_thread::yield
* call (sched_yield on Linux) is considered to be a signal that
* other processes or threads would like to use the current core.
* Increasing this makes writer threads more likely to take CPU
* by spinning, which will show up as an increase in the number of
* involuntary context switches.
* Default: 3
*
* @param writeThreadSlowYieldUsec the latency in microseconds
*/
void setWriteThreadSlowYieldUsec(long writeThreadSlowYieldUsec);
/**
* The latency in microseconds after which a std::this_thread::yield
* call (sched_yield on Linux) is considered to be a signal that
* other processes or threads would like to use the current core.
* Increasing this makes writer threads more likely to take CPU
* by spinning, which will show up as an increase in the number of
* involuntary context switches.
* Default: 3
*
* @return writeThreadSlowYieldUsec the latency in microseconds
*/
long writeThreadSlowYieldUsec();
}

@ -631,6 +631,50 @@ public class Options extends RocksObject
return this;
}
@Override
public void setAllowConcurrentMemtableWrite(
final boolean allowConcurrentMemtableWrite) {
setAllowConcurrentMemtableWrite(nativeHandle_,
allowConcurrentMemtableWrite);
}
@Override
public boolean allowConcurrentMemtableWrite() {
return allowConcurrentMemtableWrite(nativeHandle_);
}
@Override
public void setEnableWriteThreadAdaptiveYield(
final boolean enableWriteThreadAdaptiveYield) {
setEnableWriteThreadAdaptiveYield(nativeHandle_,
enableWriteThreadAdaptiveYield);
}
@Override
public boolean enableWriteThreadAdaptiveYield() {
return enableWriteThreadAdaptiveYield(nativeHandle_);
}
@Override
public void setWriteThreadMaxYieldUsec(final long writeThreadMaxYieldUsec) {
setWriteThreadMaxYieldUsec(nativeHandle_, writeThreadMaxYieldUsec);
}
@Override
public long writeThreadMaxYieldUsec() {
return writeThreadMaxYieldUsec(nativeHandle_);
}
@Override
public void setWriteThreadSlowYieldUsec(final long writeThreadSlowYieldUsec) {
setWriteThreadSlowYieldUsec(nativeHandle_, writeThreadSlowYieldUsec);
}
@Override
public long writeThreadSlowYieldUsec() {
return writeThreadSlowYieldUsec(nativeHandle_);
}
@Override
public Options setMemTableConfig(final MemTableConfig config) {
memTableConfig_ = config;
@ -1282,6 +1326,18 @@ public class Options extends RocksObject
private native void setBytesPerSync(
long handle, long bytesPerSync);
private native long bytesPerSync(long handle);
private native void setAllowConcurrentMemtableWrite(long handle,
boolean allowConcurrentMemtableWrite);
private native boolean allowConcurrentMemtableWrite(long handle);
private native void setEnableWriteThreadAdaptiveYield(long handle,
boolean enableWriteThreadAdaptiveYield);
private native boolean enableWriteThreadAdaptiveYield(long handle);
private native void setWriteThreadMaxYieldUsec(long handle,
long writeThreadMaxYieldUsec);
private native long writeThreadMaxYieldUsec(long handle);
private native void setWriteThreadSlowYieldUsec(long handle,
long writeThreadSlowYieldUsec);
private native long writeThreadSlowYieldUsec(long handle);
// CF native handles
private native void optimizeForPointLookup(long handle,
long blockCacheSizeMb);

@ -352,6 +352,42 @@ public class DBOptionsTest {
}
}
@Test
public void allowConcurrentMemtableWrite() {
try (final DBOptions opt = new DBOptions()) {
final boolean boolValue = rand.nextBoolean();
opt.setAllowConcurrentMemtableWrite(boolValue);
assertThat(opt.allowConcurrentMemtableWrite()).isEqualTo(boolValue);
}
}
@Test
public void enableWriteThreadAdaptiveYield() {
try (final DBOptions opt = new DBOptions()) {
final boolean boolValue = rand.nextBoolean();
opt.setEnableWriteThreadAdaptiveYield(boolValue);
assertThat(opt.enableWriteThreadAdaptiveYield()).isEqualTo(boolValue);
}
}
@Test
public void writeThreadMaxYieldUsec() {
try (final DBOptions opt = new DBOptions()) {
final long longValue = rand.nextLong();
opt.setWriteThreadMaxYieldUsec(longValue);
assertThat(opt.writeThreadMaxYieldUsec()).isEqualTo(longValue);
}
}
@Test
public void writeThreadSlowYieldUsec() {
try (final DBOptions opt = new DBOptions()) {
final long longValue = rand.nextLong();
opt.setWriteThreadSlowYieldUsec(longValue);
assertThat(opt.writeThreadSlowYieldUsec()).isEqualTo(longValue);
}
}
@Test
public void rateLimiterConfig() {
try(final DBOptions options = new DBOptions();

@ -661,6 +661,42 @@ public class OptionsTest {
}
}
@Test
public void allowConcurrentMemtableWrite() {
try (final Options opt = new Options()) {
final boolean boolValue = rand.nextBoolean();
opt.setAllowConcurrentMemtableWrite(boolValue);
assertThat(opt.allowConcurrentMemtableWrite()).isEqualTo(boolValue);
}
}
@Test
public void enableWriteThreadAdaptiveYield() {
try (final Options opt = new Options()) {
final boolean boolValue = rand.nextBoolean();
opt.setEnableWriteThreadAdaptiveYield(boolValue);
assertThat(opt.enableWriteThreadAdaptiveYield()).isEqualTo(boolValue);
}
}
@Test
public void writeThreadMaxYieldUsec() {
try (final Options opt = new Options()) {
final long longValue = rand.nextLong();
opt.setWriteThreadMaxYieldUsec(longValue);
assertThat(opt.writeThreadMaxYieldUsec()).isEqualTo(longValue);
}
}
@Test
public void writeThreadSlowYieldUsec() {
try (final Options opt = new Options()) {
final long longValue = rand.nextLong();
opt.setWriteThreadSlowYieldUsec(longValue);
assertThat(opt.writeThreadSlowYieldUsec()).isEqualTo(longValue);
}
}
@Test
public void env() {
try (final Options options = new Options();

Loading…
Cancel
Save