From 9ab269ab393f9119f48b4be6a816ad20cc661376 Mon Sep 17 00:00:00 2001 From: Adam Retter Date: Wed, 3 Feb 2016 19:13:03 +0000 Subject: [PATCH] Threaded tests for WriteBatch --- java/Makefile | 1 + .../org/rocksdb/WriteBatchThreadedTest.java | 104 ++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 java/src/test/java/org/rocksdb/WriteBatchThreadedTest.java diff --git a/java/Makefile b/java/Makefile index abc8f73ee..bffca4b27 100644 --- a/java/Makefile +++ b/java/Makefile @@ -99,6 +99,7 @@ JAVA_TESTS = org.rocksdb.BackupableDBOptionsTest\ org.rocksdb.StatisticsCollectorTest\ org.rocksdb.WriteBatchHandlerTest\ org.rocksdb.WriteBatchTest\ + org.rocksdb.WriteBatchThreadedTest\ org.rocksdb.WriteOptionsTest\ org.rocksdb.WriteBatchWithIndexTest diff --git a/java/src/test/java/org/rocksdb/WriteBatchThreadedTest.java b/java/src/test/java/org/rocksdb/WriteBatchThreadedTest.java new file mode 100644 index 000000000..ab38c475f --- /dev/null +++ b/java/src/test/java/org/rocksdb/WriteBatchThreadedTest.java @@ -0,0 +1,104 @@ +// Copyright (c) 2016, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +package org.rocksdb; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.*; + +@RunWith(Parameterized.class) +public class WriteBatchThreadedTest { + + @Parameters(name = "WriteBatchThreadedTest(threadCount={0})") + public static Iterable data() { + return Arrays.asList(new Integer[]{1, 10, 50, 100}); + } + + @Parameter + public int threadCount; + + @Rule + public TemporaryFolder dbFolder = new TemporaryFolder(); + + RocksDB db; + + @Before + public void setUp() throws Exception { + RocksDB.loadLibrary(); + final Options options = new Options() + .setCreateIfMissing(true) + .setIncreaseParallelism(32); + db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + assert (db != null); + } + + @After + public void tearDown() throws Exception { + if (db != null) { + db.close(); + } + } + + @Test + public void threadedWrites() throws InterruptedException, ExecutionException { + final List> callables = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + final int offset = i * 100; + callables.add(new Callable() { + @Override + public Void call() throws RocksDBException { + final WriteBatch wb = new WriteBatch(); + for (int i = offset; i < offset + 100; i++) { + wb.put(ByteBuffer.allocate(4).putInt(i).array(), + "parallel rocks test".getBytes()); + } + db.write(new WriteOptions(), wb); + + return null; + } + }); + } + + //submit the callables + final ExecutorService executorService = + Executors.newFixedThreadPool(threadCount); + try { + final ExecutorCompletionService completionService = + new ExecutorCompletionService<>(executorService); + final Set> futures = new HashSet<>(); + for (final Callable callable : callables) { + futures.add(completionService.submit(callable)); + } + + while (futures.size() > 0) { + final Future future = completionService.take(); + futures.remove(future); + + try { + future.get(); + } catch (final ExecutionException e) { + for (final Future f : futures) { + f.cancel(true); + } + + throw e; + } + } + } finally { + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); + } + } +}