[RocksJava] Adjusted RateLimiter to 3.10.0 (#1368)

Summary:
- Deprecated RateLimiterConfig and GenericRateLimiterConfig
- Introduced RateLimiter

It is now possible to use all C++ related methods also in RocksJava.
A noteable method is setBytesPerSecond which can change the allowed
number of bytes per second at runtime.

Test Plan:
make rocksdbjava
make jtest

Reviewers: adamretter, yhchiang, ankgup87

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D35715
main
Adam Retter 8 years ago committed by Yueh-Hsuan Chiang
parent 37737c3a6b
commit 5cd28833a2
  1. 1
      java/Makefile
  2. 35
      java/rocksjni/options.cc
  3. 95
      java/rocksjni/ratelimiterjni.cc
  4. 14
      java/src/main/java/org/rocksdb/DBOptions.java
  5. 14
      java/src/main/java/org/rocksdb/DBOptionsInterface.java
  6. 2
      java/src/main/java/org/rocksdb/GenericRateLimiterConfig.java
  7. 14
      java/src/main/java/org/rocksdb/Options.java
  8. 119
      java/src/main/java/org/rocksdb/RateLimiter.java
  9. 3
      java/src/main/java/org/rocksdb/RateLimiterConfig.java
  10. 12
      java/src/test/java/org/rocksdb/DBOptionsTest.java
  11. 13
      java/src/test/java/org/rocksdb/OptionsTest.java

@ -25,6 +25,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.MergeOperator\
org.rocksdb.Options\
org.rocksdb.PlainTableConfig\
org.rocksdb.RateLimiter\
org.rocksdb.ReadOptions\
org.rocksdb.RemoveEmptyValueCompactionFilter\
org.rocksdb.RestoreOptions\

@ -700,15 +700,29 @@ void Java_org_rocksdb_Options_setMemTableFactory(
/*
* Class: org_rocksdb_Options
* Method: setRateLimiter
* Method: setOldRateLimiter
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setRateLimiter(
void Java_org_rocksdb_Options_setOldRateLimiter(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jrate_limiter_handle) {
reinterpret_cast<rocksdb::Options*>(jhandle)->rate_limiter.reset(
reinterpret_cast<rocksdb::RateLimiter*>(jrate_limiter_handle));
}
/*
* Class: org_rocksdb_Options
* Method: setRateLimiter
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setRateLimiter(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jrate_limiter_handle) {
std::shared_ptr<rocksdb::RateLimiter> *pRateLimiter =
reinterpret_cast<std::shared_ptr<rocksdb::RateLimiter> *>(
jrate_limiter_handle);
reinterpret_cast<rocksdb::Options*>(jhandle)->
rate_limiter = *pRateLimiter;
}
/*
* Class: org_rocksdb_Options
* Method: setLogger
@ -3587,15 +3601,28 @@ jboolean Java_org_rocksdb_DBOptions_paranoidChecks(
/*
* Class: org_rocksdb_DBOptions
* Method: setRateLimiter
* Method: setOldRateLimiter
* Signature: (JJ)V
*/
void Java_org_rocksdb_DBOptions_setRateLimiter(
void Java_org_rocksdb_DBOptions_setOldRateLimiter(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jrate_limiter_handle) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->rate_limiter.reset(
reinterpret_cast<rocksdb::RateLimiter*>(jrate_limiter_handle));
}
/*
* Class: org_rocksdb_DBOptions
* Method: setRateLimiter
* Signature: (JJ)V
*/
void Java_org_rocksdb_DBOptions_setRateLimiter(
JNIEnv* env, jobject jobj, jlong jhandle, jlong jrate_limiter_handle) {
std::shared_ptr<rocksdb::RateLimiter> *pRateLimiter =
reinterpret_cast<std::shared_ptr<rocksdb::RateLimiter> *>(
jrate_limiter_handle);
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->rate_limiter = *pRateLimiter;
}
/*
* Class: org_rocksdb_DBOptions
* Method: setLogger

@ -7,6 +7,7 @@
#include "rocksjni/portal.h"
#include "include/org_rocksdb_GenericRateLimiterConfig.h"
#include "include/org_rocksdb_RateLimiter.h"
#include "rocksdb/rate_limiter.h"
/*
@ -22,3 +23,97 @@ jlong Java_org_rocksdb_GenericRateLimiterConfig_newRateLimiterHandle(
static_cast<int64_t>(jrefill_period_micros),
static_cast<int32_t>(jfairness)));
}
/*
* Class: org_rocksdb_RateLimiter
* Method: newRateLimiterHandle
* Signature: (JJI)J
*/
jlong Java_org_rocksdb_RateLimiter_newRateLimiterHandle(
JNIEnv* env, jclass jclazz, jlong jrate_bytes_per_second,
jlong jrefill_period_micros, jint jfairness) {
auto* rate_limiter = rocksdb::NewGenericRateLimiter(
static_cast<int64_t>(jrate_bytes_per_second),
static_cast<int64_t>(jrefill_period_micros),
static_cast<int32_t>(jfairness));
std::shared_ptr<rocksdb::RateLimiter> *ptr_sptr_rate_limiter =
new std::shared_ptr<rocksdb::RateLimiter>;
*ptr_sptr_rate_limiter = std::shared_ptr<rocksdb::RateLimiter>(rate_limiter);
return reinterpret_cast<jlong>(ptr_sptr_rate_limiter);
}
/*
* Class: org_rocksdb_RateLimiter
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_RateLimiter_disposeInternal(
JNIEnv* env, jobject jobj, jlong jhandle) {
std::shared_ptr<rocksdb::RateLimiter> *handle =
reinterpret_cast<std::shared_ptr<rocksdb::RateLimiter> *>(jhandle);
handle->reset();
delete handle;
}
/*
* Class: org_rocksdb_RateLimiter
* Method: setBytesPerSecond
* Signature: (JJ)V
*/
void Java_org_rocksdb_RateLimiter_setBytesPerSecond(
JNIEnv* env, jobject jobj, jlong handle,
jlong jbytes_per_second) {
reinterpret_cast<rocksdb::RateLimiter*>(
handle)->SetBytesPerSecond(jbytes_per_second);
}
/*
* Class: org_rocksdb_RateLimiter
* Method: request
* Signature: (JJ)V
*/
void Java_org_rocksdb_RateLimiter_request(
JNIEnv* env, jobject jobj, jlong handle,
jlong jbytes) {
reinterpret_cast<rocksdb::RateLimiter*>(
handle)->Request(jbytes,
rocksdb::Env::IO_TOTAL);
}
/*
* Class: org_rocksdb_RateLimiter
* Method: getSingleBurstBytes
* Signature: (J)J
*/
jlong Java_org_rocksdb_RateLimiter_getSingleBurstBytes(
JNIEnv* env, jobject jobj, jlong handle,
jlong jbytes) {
return reinterpret_cast<rocksdb::RateLimiter*>(
handle)->GetSingleBurstBytes();
}
/*
* Class: org_rocksdb_RateLimiter
* Method: getTotalBytesThrough
* Signature: (J)J
*/
jlong Java_org_rocksdb_RateLimiter_getTotalBytesThrough(
JNIEnv* env, jobject jobj, jlong handle,
jlong jbytes) {
return reinterpret_cast<rocksdb::RateLimiter*>(
handle)->GetTotalBytesThrough();
}
/*
* Class: org_rocksdb_RateLimiter
* Method: getTotalRequests
* Signature: (J)J
*/
jlong Java_org_rocksdb_RateLimiter_getTotalRequests(
JNIEnv* env, jobject jobj, jlong handle,
jlong jbytes) {
return reinterpret_cast<rocksdb::RateLimiter*>(
handle)->GetTotalRequests();
}

@ -139,7 +139,15 @@ public class DBOptions extends RocksObject implements DBOptionsInterface {
final RateLimiterConfig config) {
assert(isOwningHandle());
rateLimiterConfig_ = config;
setRateLimiter(nativeHandle_, config.newRateLimiterHandle());
setOldRateLimiter(nativeHandle_, config.newRateLimiterHandle());
return this;
}
@Override
public DBOptions setRateLimiter(final RateLimiter rateLimiter) {
assert(isOwningHandle());
rateLimiter_ = rateLimiter;
setRateLimiter(nativeHandle_, rateLimiter.nativeHandle_);
return this;
}
@ -642,6 +650,9 @@ public long delayedWriteRate(){
private native void setParanoidChecks(
long handle, boolean paranoidChecks);
private native boolean paranoidChecks(long handle);
@Deprecated
private native void setOldRateLimiter(long handle,
long rateLimiterHandle);
private native void setRateLimiter(long handle,
long rateLimiterHandle);
private native void setLogger(long handle,
@ -741,4 +752,5 @@ public long delayedWriteRate(){
int numShardBits_;
RateLimiterConfig rateLimiterConfig_;
RateLimiter rateLimiter_;
}

@ -125,9 +125,23 @@ public interface DBOptionsInterface {
*
* @param config rate limiter config.
* @return the instance of the current Object.
* @deprecated See: {@link #setRateLimiter(RateLimiter)}.
*/
@Deprecated
Object setRateLimiterConfig(RateLimiterConfig config);
/**
* Use to control write rate of flush and compaction. Flush has higher
* priority than compaction. Rate limiting is disabled if nullptr.
* Default: nullptr
*
* @param rateLimiter {@link org.rocksdb.RateLimiter} instance.
* @return the instance of the current Object.
*
* @since 3.10.0
*/
Object setRateLimiter(RateLimiter rateLimiter);
/**
* <p>Any internal progress/error information generated by
* the db will be written to the Logger if it is non-nullptr,

@ -9,7 +9,9 @@ package org.rocksdb;
* compaction.
*
* @see RateLimiterConfig
* @deprecated obsolete. See: {@link org.rocksdb.RateLimiter}.
*/
@Deprecated
public class GenericRateLimiterConfig extends RateLimiterConfig {
private static final long DEFAULT_REFILL_PERIOD_MICROS = (100 * 1000);
private static final int DEFAULT_FAIRNESS = 10;

@ -685,7 +685,15 @@ public class Options extends RocksObject
@Override
public Options setRateLimiterConfig(final RateLimiterConfig config) {
rateLimiterConfig_ = config;
setRateLimiter(nativeHandle_, config.newRateLimiterHandle());
setOldRateLimiter(nativeHandle_, config.newRateLimiterHandle());
return this;
}
@Override
public Options setRateLimiter(final RateLimiter rateLimiter) {
assert(isOwningHandle());
rateLimiter_ = rateLimiter;
setRateLimiter(nativeHandle_, rateLimiter.nativeHandle_);
return this;
}
@ -1218,6 +1226,9 @@ public class Options extends RocksObject
private native void setParanoidChecks(
long handle, boolean paranoidChecks);
private native boolean paranoidChecks(long handle);
@Deprecated
private native void setOldRateLimiter(long handle,
long rateLimiterHandle);
private native void setRateLimiter(long handle,
long rateLimiterHandle);
private native void setLogger(long handle,
@ -1458,5 +1469,6 @@ public class Options extends RocksObject
MemTableConfig memTableConfig_;
TableFormatConfig tableFormatConfig_;
RateLimiterConfig rateLimiterConfig_;
RateLimiter rateLimiter_;
AbstractComparator<? extends AbstractSlice<?>> comparator_;
}

@ -0,0 +1,119 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb;
/**
* RateLimiter, which is used to control write rate of flush and
* compaction.
*
* @since 3.10.0
*/
public class RateLimiter extends RocksObject {
private static final long DEFAULT_REFILL_PERIOD_MICROS = (100 * 1000);
private static final int DEFAULT_FAIRNESS = 10;
/**
* RateLimiter constructor
*
* @param rateBytesPerSecond this is the only parameter you want to set
* most of the time. It controls the total write rate of compaction
* and flush in bytes per second. Currently, RocksDB does not enforce
* rate limit for anything other than flush and compaction, e.g. write to WAL.
* @param refillPeriodMicros this controls how often tokens are refilled. For example,
* when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to
* 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to
* burstier writes while smaller value introduces more CPU overhead.
* The default should work for most cases.
* @param fairness RateLimiter accepts high-pri requests and low-pri requests.
* A low-pri request is usually blocked in favor of hi-pri request. Currently,
* RocksDB assigns low-pri to request from compaction and high-pri to request
* from flush. Low-pri requests can get blocked if flush requests come in
* continuously. This fairness parameter grants low-pri requests permission by
* fairness chance even though high-pri requests exist to avoid starvation.
* You should be good by leaving it at default 10.
*/
public RateLimiter(final long rateBytesPerSecond,
final long refillPeriodMicros, final int fairness) {
super(newRateLimiterHandle(rateBytesPerSecond,
refillPeriodMicros, fairness));
}
/**
* RateLimiter constructor
*
* @param rateBytesPerSecond this is the only parameter you want to set
* most of the time. It controls the total write rate of compaction
* and flush in bytes per second. Currently, RocksDB does not enforce
* rate limit for anything other than flush and compaction, e.g. write to WAL.
*/
public RateLimiter(final long rateBytesPerSecond) {
this(rateBytesPerSecond, DEFAULT_REFILL_PERIOD_MICROS, DEFAULT_FAIRNESS);
}
/**
* <p>This API allows user to dynamically change rate limiter's bytes per second.
* REQUIRED: bytes_per_second &gt; 0</p>
*
* @param bytesPerSecond bytes per second.
*/
public void setBytesPerSecond(final long bytesPerSecond) {
assert(isOwningHandle());
setBytesPerSecond(nativeHandle_, bytesPerSecond);
}
/**
* <p>Request for token to write bytes. If this request can not be satisfied,
* the call is blocked. Caller is responsible to make sure
* {@code bytes &lt; GetSingleBurstBytes()}.</p>
*
* @param bytes requested bytes.
*/
public void request(final long bytes) {
assert(isOwningHandle());
request(nativeHandle_, bytes);
}
/**
* <p>Max bytes can be granted in a single burst.</p>
*
* @return max bytes can be granted in a single burst.
*/
public long getSingleBurstBytes() {
assert(isOwningHandle());
return getSingleBurstBytes(nativeHandle_);
}
/**
* <p>Total bytes that go though rate limiter.</p>
*
* @return total bytes that go though rate limiter.
*/
public long getTotalBytesThrough() {
assert(isOwningHandle());
return getTotalBytesThrough(nativeHandle_);
}
/**
* <p>Total # of requests that go though rate limiter.</p>
*
* @return total # of requests that go though rate limiter.
*/
public long getTotalRequests() {
assert(isOwningHandle());
return getTotalRequests(nativeHandle_);
}
private static native long newRateLimiterHandle(final long rateBytesPerSecond,
final long refillPeriodMicros, final int fairness);
@Override protected final native void disposeInternal(final long handle);
private native void setBytesPerSecond(final long handle,
final long bytesPerSecond);
private native void request(final long handle, final long bytes);
private native long getSingleBurstBytes(final long handle);
private native long getTotalBytesThrough(final long handle);
private native long getTotalRequests(final long handle);
}

@ -7,7 +7,10 @@ package org.rocksdb;
/**
* Config for rate limiter, which is used to control write rate of flush and
* compaction.
*
* @deprecated obsolete. See: {@link org.rocksdb.RateLimiter}.
*/
@Deprecated
public abstract class RateLimiterConfig {
/**
* This function should only be called by

@ -402,6 +402,18 @@ public class DBOptionsTest {
}
}
@Test
public void rateLimiter() {
try(final DBOptions options = new DBOptions();
final DBOptions anotherOptions = new DBOptions()) {
final RateLimiter rateLimiter = new RateLimiter(1000, 100 * 1000, 1);
options.setRateLimiter(rateLimiter);
// Test with parameter initialization
anotherOptions.setRateLimiter(
new RateLimiter(1000));
}
}
@Test
public void statistics() {
try(final DBOptions options = new DBOptions()) {

@ -800,6 +800,19 @@ public class OptionsTest {
}
}
@Test
public void rateLimiter() {
try (final Options options = new Options();
final Options anotherOptions = new Options()) {
final RateLimiter rateLimiter =
new RateLimiter(1000, 100 * 1000, 1);
options.setRateLimiter(rateLimiter);
// Test with parameter initialization
anotherOptions.setRateLimiter(
new RateLimiter(1000));
}
}
@Test
public void shouldSetTestPrefixExtractor() {
try (final Options options = new Options()) {

Loading…
Cancel
Save