From ec5add398c27b07595f583f1834da78b51490906 Mon Sep 17 00:00:00 2001 From: Tomasz Posluszny Date: Wed, 9 Sep 2020 12:42:54 -0700 Subject: [PATCH] Implement Java API for ConcurrentTaskLimiter class and compaction_thread_limiter field in ColumnFamilyOptions (#7347) Summary: as title Pull Request resolved: https://github.com/facebook/rocksdb/pull/7347 Test Plan: unit tests included Reviewed By: jay-zhuang Differential Revision: D23592552 Pulled By: pdillinger fbshipit-source-id: 1c3571b6f42bfd0cfd723ff49d01fbc02a1be45b --- java/CMakeLists.txt | 3 + java/Makefile | 3 + java/rocksjni/concurrent_task_limiter.cc | 90 +++++++++++++++++++ java/rocksjni/options.cc | 29 ++++++ .../java/org/rocksdb/ColumnFamilyOptions.java | 18 ++++ .../rocksdb/ColumnFamilyOptionsInterface.java | 18 ++++ .../org/rocksdb/ConcurrentTaskLimiter.java | 38 ++++++++ .../rocksdb/ConcurrentTaskLimiterImpl.java | 42 +++++++++ java/src/main/java/org/rocksdb/Options.java | 17 ++++ .../org/rocksdb/ColumnFamilyOptionsTest.java | 9 ++ .../rocksdb/ConcurrentTaskLimiterTest.java | 50 +++++++++++ .../test/java/org/rocksdb/OptionsTest.java | 9 ++ src.mk | 1 + 13 files changed, 327 insertions(+) create mode 100644 java/rocksjni/concurrent_task_limiter.cc create mode 100644 java/src/main/java/org/rocksdb/ConcurrentTaskLimiter.java create mode 100644 java/src/main/java/org/rocksdb/ConcurrentTaskLimiterImpl.java create mode 100644 java/src/test/java/org/rocksdb/ConcurrentTaskLimiterTest.java diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index cedf8910d..ebc1c1970 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -26,6 +26,7 @@ set(JNI_NATIVE_SOURCES rocksjni/comparator.cc rocksjni/comparatorjnicallback.cc rocksjni/compression_options.cc + rocksjni/concurrent_task_limiter.cc rocksjni/config_options.cc rocksjni/env.cc rocksjni/env_options.cc @@ -153,6 +154,8 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/InfoLogLevel.java src/main/java/org/rocksdb/IngestExternalFileOptions.java src/main/java/org/rocksdb/LevelMetaData.java + src/main/java/org/rocksdb/ConcurrentTaskLimiter.java + src/main/java/org/rocksdb/ConcurrentTaskLimiterImpl.java src/main/java/org/rocksdb/LiveFileMetaData.java src/main/java/org/rocksdb/LogFile.java src/main/java/org/rocksdb/Logger.java diff --git a/java/Makefile b/java/Makefile index 8b2dfba08..c391a9bd2 100644 --- a/java/Makefile +++ b/java/Makefile @@ -36,6 +36,8 @@ NATIVE_JAVA_CLASSES = \ org.rocksdb.HashLinkedListMemTableConfig\ org.rocksdb.HashSkipListMemTableConfig\ org.rocksdb.HdfsEnv\ + org.rocksdb.ConcurrentTaskLimiter\ + org.rocksdb.ConcurrentTaskLimiterImpl\ org.rocksdb.Logger\ org.rocksdb.LRUCache\ org.rocksdb.MemoryUsageType\ @@ -135,6 +137,7 @@ JAVA_TESTS = \ org.rocksdb.FlushTest\ org.rocksdb.InfoLogLevelTest\ org.rocksdb.KeyMayExistTest\ + org.rocksdb.ConcurrentTaskLimiterTest\ org.rocksdb.LoggerTest\ org.rocksdb.LRUCacheTest\ org.rocksdb.MemoryUtilTest\ diff --git a/java/rocksjni/concurrent_task_limiter.cc b/java/rocksjni/concurrent_task_limiter.cc new file mode 100644 index 000000000..ddcdda478 --- /dev/null +++ b/java/rocksjni/concurrent_task_limiter.cc @@ -0,0 +1,90 @@ +#include "rocksdb/concurrent_task_limiter.h" + +#include + +#include +#include + +#include "include/org_rocksdb_ConcurrentTaskLimiterImpl.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_ConcurrentTaskLimiterImpl + * Method: newConcurrentTaskLimiterImpl0 + * Signature: (Ljava/lang/String;I)J + */ +jlong Java_org_rocksdb_ConcurrentTaskLimiterImpl_newConcurrentTaskLimiterImpl0( + JNIEnv* env, jclass, jstring jname, jint limit) { + jboolean has_exception; + std::string name = + ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, jname, &has_exception); + if (JNI_TRUE == has_exception) { + return 0; + } + + auto* ptr = new std::shared_ptr( + ROCKSDB_NAMESPACE::NewConcurrentTaskLimiter(name, limit)); + + return reinterpret_cast(ptr); +} + +/* + * Class: org_rocksdb_ConcurrentTaskLimiterImpl + * Method: name + * Signature: (J)Ljava/lang/String; + */ +jstring Java_org_rocksdb_ConcurrentTaskLimiterImpl_name(JNIEnv* env, jclass, + jlong handle) { + const auto& limiter = *reinterpret_cast< + std::shared_ptr*>(handle); + return ROCKSDB_NAMESPACE::JniUtil::toJavaString(env, &limiter->GetName()); +} + +/* + * Class: org_rocksdb_ConcurrentTaskLimiterImpl + * Method: setMaxOutstandingTask + * Signature: (JI)V + */ +void Java_org_rocksdb_ConcurrentTaskLimiterImpl_setMaxOutstandingTask( + JNIEnv*, jclass, jlong handle, jint max_outstanding_task) { + const auto& limiter = *reinterpret_cast< + std::shared_ptr*>(handle); + limiter->SetMaxOutstandingTask(static_cast(max_outstanding_task)); +} + +/* + * Class: org_rocksdb_ConcurrentTaskLimiterImpl + * Method: resetMaxOutstandingTask + * Signature: (J)V + */ +void Java_org_rocksdb_ConcurrentTaskLimiterImpl_resetMaxOutstandingTask( + JNIEnv*, jclass, jlong handle) { + const auto& limiter = *reinterpret_cast< + std::shared_ptr*>(handle); + limiter->ResetMaxOutstandingTask(); +} + +/* + * Class: org_rocksdb_ConcurrentTaskLimiterImpl + * Method: outstandingTask + * Signature: (J)I + */ +jint Java_org_rocksdb_ConcurrentTaskLimiterImpl_outstandingTask(JNIEnv*, jclass, + jlong handle) { + const auto& limiter = *reinterpret_cast< + std::shared_ptr*>(handle); + return static_cast(limiter->GetOutstandingTask()); +} + +/* + * Class: org_rocksdb_ConcurrentTaskLimiterImpl + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_ConcurrentTaskLimiterImpl_disposeInternal(JNIEnv*, + jobject, + jlong jhandle) { + auto* ptr = reinterpret_cast< + std::shared_ptr*>(jhandle); + delete ptr; // delete std::shared_ptr +} diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 4979621a2..6a175471f 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -1145,6 +1145,20 @@ void Java_org_rocksdb_Options_setSstPartitionerFactory(JNIEnv*, jobject, options->sst_partitioner_factory = *factory; } +/* + * Class: org_rocksdb_Options + * Method: setCompactionThreadLimiter + * Signature: (JJ)V + */ +void Java_org_rocksdb_Options_setCompactionThreadLimiter( + JNIEnv*, jclass, jlong jhandle, jlong jlimiter_handle) { + auto* options = reinterpret_cast(jhandle); + auto* limiter = reinterpret_cast< + std::shared_ptr*>( + jlimiter_handle); + options->compaction_thread_limiter = *limiter; +} + /* * Class: org_rocksdb_Options * Method: allowMmapReads @@ -3649,6 +3663,21 @@ void Java_org_rocksdb_ColumnFamilyOptions_setSstPartitionerFactory( options->sst_partitioner_factory.reset(factory); } +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: setCompactionThreadLimiter + * Signature: (JJ)V + */ +void Java_org_rocksdb_ColumnFamilyOptions_setCompactionThreadLimiter( + JNIEnv*, jclass, jlong jhandle, jlong jlimiter_handle) { + auto* options = + reinterpret_cast(jhandle); + auto* limiter = reinterpret_cast< + std::shared_ptr*>( + jlimiter_handle); + options->compaction_thread_limiter = *limiter; +} + /* * Method: tableFactoryName * Signature: (J)Ljava/lang/String diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index 53fc1af6e..727a104d9 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -52,6 +52,7 @@ public class ColumnFamilyOptions extends RocksObject this.compactionOptionsFIFO_ = other.compactionOptionsFIFO_; this.bottommostCompressionOptions_ = other.bottommostCompressionOptions_; this.compressionOptions_ = other.compressionOptions_; + this.compactionThreadLimiter_ = other.compactionThreadLimiter_; } /** @@ -851,6 +852,20 @@ public class ColumnFamilyOptions extends RocksObject return this; } + @Override + public ColumnFamilyOptions setCompactionThreadLimiter( + final ConcurrentTaskLimiter compactionThreadLimiter) { + setCompactionThreadLimiter(nativeHandle_, compactionThreadLimiter.nativeHandle_); + this.compactionThreadLimiter_ = compactionThreadLimiter; + return this; + } + + @Override + public ConcurrentTaskLimiter compactionThreadLimiter() { + assert (isOwningHandle()); + return this.compactionThreadLimiter_; + } + @Override public SstPartitionerFactory sstPartitionerFactory() { return sstPartitionerFactory_; @@ -1018,6 +1033,8 @@ public class ColumnFamilyOptions extends RocksObject final boolean forceConsistencyChecks); private native boolean forceConsistencyChecks(final long handle); private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle); + private static native void setCompactionThreadLimiter( + final long nativeHandle_, final long compactionThreadLimiterHandle); // instance variables // NOTE: If you add new member variables, please update the copy constructor above! @@ -1032,4 +1049,5 @@ public class ColumnFamilyOptions extends RocksObject private CompressionOptions bottommostCompressionOptions_; private CompressionOptions compressionOptions_; private SstPartitionerFactory sstPartitionerFactory_; + private ConcurrentTaskLimiter compactionThreadLimiter_; } diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java index 0a8224e19..13a3f8d2c 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java @@ -454,6 +454,24 @@ public interface ColumnFamilyOptionsInterface + * limit = 0 means no new task allowed.
+ * limit < 0 means no limitation. + * + * @param maxOutstandinsTask max concurrent tasks. + * @return the reference to the current instance of ConcurrentTaskLimiter. + */ + public abstract ConcurrentTaskLimiter setMaxOutstandingTask(final int maxOutstandinsTask); + + /** + * Reset to unlimited max concurrent task. + * + * @return the reference to the current instance of ConcurrentTaskLimiter. + */ + public abstract ConcurrentTaskLimiter resetMaxOutstandingTask(); + + /** + * Returns current outstanding task count. + * + * @return current outstanding task count. + */ + public abstract int outstandingTask(); +} diff --git a/java/src/main/java/org/rocksdb/ConcurrentTaskLimiterImpl.java b/java/src/main/java/org/rocksdb/ConcurrentTaskLimiterImpl.java new file mode 100644 index 000000000..0f6026dc2 --- /dev/null +++ b/java/src/main/java/org/rocksdb/ConcurrentTaskLimiterImpl.java @@ -0,0 +1,42 @@ +package org.rocksdb; + +public class ConcurrentTaskLimiterImpl extends ConcurrentTaskLimiter { + public ConcurrentTaskLimiterImpl(final String name, final int maxOutstandingTask) { + super(newConcurrentTaskLimiterImpl0(name, maxOutstandingTask)); + } + + @Override + public String name() { + assert (isOwningHandle()); + return name(nativeHandle_); + } + + @Override + public ConcurrentTaskLimiter setMaxOutstandingTask(final int maxOutstandingTask) { + assert (isOwningHandle()); + setMaxOutstandingTask(nativeHandle_, maxOutstandingTask); + return this; + } + + @Override + public ConcurrentTaskLimiter resetMaxOutstandingTask() { + assert (isOwningHandle()); + resetMaxOutstandingTask(nativeHandle_); + return this; + } + + @Override + public int outstandingTask() { + assert (isOwningHandle()); + return outstandingTask(nativeHandle_); + } + + private static native long newConcurrentTaskLimiterImpl0( + final String name, final int maxOutstandingTask); + private static native String name(final long handle); + private static native void setMaxOutstandingTask(final long handle, final int limit); + private static native void resetMaxOutstandingTask(final long handle); + private static native int outstandingTask(final long handle); + + @Override protected final native void disposeInternal(final long handle); +} diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index 3b9c4ebf7..3d4947a6f 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -91,6 +91,7 @@ public class Options extends RocksObject this.compressionOptions_ = other.compressionOptions_; this.rowCache_ = other.rowCache_; this.writeBufferManager_ = other.writeBufferManager_; + this.compactionThreadLimiter_ = other.compactionThreadLimiter_; } @Override @@ -1820,6 +1821,19 @@ public class Options extends RocksObject return sstPartitionerFactory_; } + @Override + public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) { + setCompactionThreadLimiter(nativeHandle_, compactionThreadLimiter.nativeHandle_); + this.compactionThreadLimiter_ = compactionThreadLimiter; + return this; + } + + @Override + public ConcurrentTaskLimiter compactionThreadLimiter() { + assert (isOwningHandle()); + return this.compactionThreadLimiter_; + } + private native static long newOptions(); private native static long newOptions(long dbOptHandle, long cfOptHandle); @@ -2191,6 +2205,8 @@ public class Options extends RocksObject final boolean atomicFlush); private native boolean atomicFlush(final long handle); private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle); + private static native void setCompactionThreadLimiter( + final long nativeHandle_, final long newLimiterHandle); // instance variables // NOTE: If you add new member variables, please update the copy constructor above! @@ -2210,4 +2226,5 @@ public class Options extends RocksObject private WalFilter walFilter_; private WriteBufferManager writeBufferManager_; private SstPartitionerFactory sstPartitionerFactory_; + private ConcurrentTaskLimiter compactionThreadLimiter_; } diff --git a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java index df01ee60b..c598a30c2 100644 --- a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java +++ b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java @@ -643,4 +643,13 @@ public class ColumnFamilyOptionsTest { } } + @Test + public void compactionThreadLimiter() { + try (final ColumnFamilyOptions options = new ColumnFamilyOptions(); + final ConcurrentTaskLimiter compactionThreadLimiter = + new ConcurrentTaskLimiterImpl("name", 3)) { + options.setCompactionThreadLimiter(compactionThreadLimiter); + assertThat(options.compactionThreadLimiter()).isEqualTo(compactionThreadLimiter); + } + } } diff --git a/java/src/test/java/org/rocksdb/ConcurrentTaskLimiterTest.java b/java/src/test/java/org/rocksdb/ConcurrentTaskLimiterTest.java new file mode 100644 index 000000000..9bbf1f1ed --- /dev/null +++ b/java/src/test/java/org/rocksdb/ConcurrentTaskLimiterTest.java @@ -0,0 +1,50 @@ +package org.rocksdb; + +import static org.junit.Assert.assertEquals; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +public class ConcurrentTaskLimiterTest { + @ClassRule + public static final RocksNativeLibraryResource ROCKS_NATIVE_LIBRARY_RESOURCE = + new RocksNativeLibraryResource(); + + private static final String NAME = "name"; + + private ConcurrentTaskLimiter concurrentTaskLimiter; + + @Before + public void beforeTest() { + concurrentTaskLimiter = new ConcurrentTaskLimiterImpl(NAME, 3); + } + + @Test + public void name() { + assertEquals(NAME, concurrentTaskLimiter.name()); + } + + @Test + public void outstandingTask() { + assertEquals(0, concurrentTaskLimiter.outstandingTask()); + } + + @Test + public void setMaxOutstandingTask() { + assertEquals(concurrentTaskLimiter, concurrentTaskLimiter.setMaxOutstandingTask(4)); + assertEquals(0, concurrentTaskLimiter.outstandingTask()); + } + + @Test + public void resetMaxOutstandingTask() { + assertEquals(concurrentTaskLimiter, concurrentTaskLimiter.resetMaxOutstandingTask()); + assertEquals(0, concurrentTaskLimiter.outstandingTask()); + } + + @After + public void afterTest() { + concurrentTaskLimiter.close(); + } +} diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index b249f95fb..4990af441 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -1308,4 +1308,13 @@ public class OptionsTest { } } + @Test + public void compactionThreadLimiter() { + try (final Options options = new Options(); + final ConcurrentTaskLimiter compactionThreadLimiter = + new ConcurrentTaskLimiterImpl("name", 3)) { + options.setCompactionThreadLimiter(compactionThreadLimiter); + assertThat(options.compactionThreadLimiter()).isEqualTo(compactionThreadLimiter); + } + } } diff --git a/src.mk b/src.mk index a6c823b3b..1ec998dd6 100644 --- a/src.mk +++ b/src.mk @@ -531,6 +531,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/comparator.cc \ java/rocksjni/comparatorjnicallback.cc \ java/rocksjni/compression_options.cc \ + java/rocksjni/concurrent_task_limiter.cc \ java/rocksjni/config_options.cc \ java/rocksjni/env.cc \ java/rocksjni/env_options.cc \