diff --git a/java/org/rocksdb/Filter.java b/java/org/rocksdb/Filter.java index 0de392ac6..d16dedc69 100644 --- a/java/org/rocksdb/Filter.java +++ b/java/org/rocksdb/Filter.java @@ -20,7 +20,7 @@ public abstract class Filter { /** * Deletes underlying C++ filter pointer. */ - public synchronized void dispose() { + protected synchronized void dispose() { if(nativeHandle_ != 0) { dispose0(nativeHandle_); } diff --git a/java/org/rocksdb/Options.java b/java/org/rocksdb/Options.java index 0c5e401b9..cfb3c4a3f 100644 --- a/java/org/rocksdb/Options.java +++ b/java/org/rocksdb/Options.java @@ -146,15 +146,21 @@ public class Options { /** * Use the specified filter policy to reduce disk reads. + * + * Note that the caller should not dispose the input filter as + * Options.dispose() will dispose this filter. + * * @param Filter policy java instance. * @return the instance of the current Options. * @see RocksDB.open() */ public Options setFilter(Filter filter) { assert(isInitialized()); - setFilter0(nativeHandle_, filter); + setFilterHandle(nativeHandle_, filter.nativeHandle_); + filter_ = filter; return this; } + private native void setFilterHandle(long optHandle, long filterHandle); /* * Disable compaction triggered by seek. @@ -786,7 +792,8 @@ public class Options { long handle, int limit); /** - * The following two fields affect how archived logs will be deleted. + * WalTtlSeconds() and walSizeLimitMB() affect how archived logs + * will be deleted. * 1. If both set to 0, logs will be deleted asap and will not get into * the archive. * 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, @@ -800,6 +807,7 @@ public class Options { * checks will be performed with ttl being first. * * @return the wal-ttl seconds + * @see walSizeLimitMB() */ public long walTtlSeconds() { assert(isInitialized()); @@ -808,7 +816,8 @@ public class Options { private native long walTtlSeconds(long handle); /** - * The following two fields affect how archived logs will be deleted. + * WalTtlSeconds() and walSizeLimitMB() affect how archived logs + * will be deleted. * 1. If both set to 0, logs will be deleted asap and will not get into * the archive. * 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, @@ -823,13 +832,64 @@ public class Options { * * @param walTtlSeconds the ttl seconds * @return the reference to the current option. + * @see setWalSizeLimitMB() */ - public Options setWALTtlSeconds(long walTtlSeconds) { + public Options setWalTtlSeconds(long walTtlSeconds) { assert(isInitialized()); - setWALTtlSeconds(nativeHandle_, walTtlSeconds); + setWalTtlSeconds(nativeHandle_, walTtlSeconds); return this; } - private native void setWALTtlSeconds(long handle, long walTtlSeconds); + private native void setWalTtlSeconds(long handle, long walTtlSeconds); + + /** + * WalTtlSeconds() and walSizeLimitMB() affect how archived logs + * will be deleted. + * 1. If both set to 0, logs will be deleted asap and will not get into + * the archive. + * 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, + * WAL files will be checked every 10 min and if total size is greater + * then WAL_size_limit_MB, they will be deleted starting with the + * earliest until size_limit is met. All empty files will be deleted. + * 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then + * WAL files will be checked every WAL_ttl_secondsi / 2 and those that + * are older than WAL_ttl_seconds will be deleted. + * 4. If both are not 0, WAL files will be checked every 10 min and both + * checks will be performed with ttl being first. + * + * @return size limit in mega-bytes. + * @see walSizeLimitMB() + */ + public long walSizeLimitMB() { + assert(isInitialized()); + return walSizeLimitMB(nativeHandle_); + } + private native long walSizeLimitMB(long handle); + + /** + * WalTtlSeconds() and walSizeLimitMB() affect how archived logs + * will be deleted. + * 1. If both set to 0, logs will be deleted asap and will not get into + * the archive. + * 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, + * WAL files will be checked every 10 min and if total size is greater + * then WAL_size_limit_MB, they will be deleted starting with the + * earliest until size_limit is met. All empty files will be deleted. + * 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then + * WAL files will be checked every WAL_ttl_secondsi / 2 and those that + * are older than WAL_ttl_seconds will be deleted. + * 4. If both are not 0, WAL files will be checked every 10 min and both + * checks will be performed with ttl being first. + * + * @param sizeLimitMB size limit in mega-bytes. + * @return the reference to the current option. + * @see setWalSizeLimitMB() + */ + public Options setWalSizeLimitMB(long sizeLimitMB) { + assert(isInitialized()); + setWalSizeLimitMB(nativeHandle_, sizeLimitMB); + return this; + } + private native void setWalSizeLimitMB(long handle, long sizeLimitMB); /** * Number of bytes to preallocate (via fallocate) the manifest @@ -2298,8 +2358,7 @@ public class Options { private native void useFixedLengthPrefixExtractor( long handle, int prefixLength); - private native void setFilter0(long optHandle, Filter fp); - long nativeHandle_; long cacheSize_; + Filter filter_; } diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index 0b78450d5..d267171bb 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -36,6 +36,7 @@ public class RocksDB { // the c++ one. Options options = new Options(); db.open(options.nativeHandle_, options.cacheSize_, path); + db.transferCppRawPointersOwnership(options); options.dispose(); return db; } @@ -46,8 +47,12 @@ public class RocksDB { */ public static RocksDB open(Options options, String path) throws RocksDBException { + // when non-default Options is used, keeping an Options reference + // in RocksDB can prevent Java to GC during the life-time of + // the currently-created RocksDB. RocksDB db = new RocksDB(); db.open(options.nativeHandle_, options.cacheSize_, path); + db.transferCppRawPointersOwnershipFrom(options); return db; } @@ -195,6 +200,17 @@ public class RocksDB { nativeHandle_ = 0; } + /** + * Transfer the ownership of all c++ raw-pointers from Options + * to RocksDB to ensure the life-time of those raw-pointers + * will be at least as long as the life-time of any RocksDB + * that uses these raw-pointers. + */ + protected void transferCppRawPointersOwnershipFrom(Options opt) { + filter_ = opt.filter_; + opt.filter_ = null; + } + // native methods protected native void open( long optionsHandle, long cacheSize, String path) throws RocksDBException; @@ -227,4 +243,5 @@ public class RocksDB { protected native void close0(); protected long nativeHandle_; + protected Filter filter_; } diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java index 0106413cf..4b0e59936 100644 --- a/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -54,6 +54,10 @@ class Stats { StringBuilder message_; boolean excludeFromMerge_; + // TODO(yhchiang): use the following arguments: + // (Long)Flag.stats_interval + // (Integer)Flag.stats_per_interval + Stats(int id) { id_ = id; nextReport_ = 100; @@ -163,6 +167,7 @@ public class DbBenchmark { } abstract class BenchmarkTask implements Callable { + // TODO(yhchiang): use (Integer)Flag.perf_level. public BenchmarkTask( int tid, long randSeed, long numEntries, long keyRange) { tid_ = tid; @@ -311,13 +316,73 @@ public class DbBenchmark { } } + class WriteUniqueRandomTask extends WriteTask { + static final int MAX_BUFFER_SIZE = 10000000; + public WriteUniqueRandomTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch) { + super(tid, randSeed, numEntries, keyRange, + writeOpt, entriesPerBatch); + initRandomKeySequence(); + } + public WriteUniqueRandomTask( + int tid, long randSeed, long numEntries, long keyRange, + WriteOptions writeOpt, long entriesPerBatch, + long maxWritesPerSecond) { + super(tid, randSeed, numEntries, keyRange, + writeOpt, entriesPerBatch, + maxWritesPerSecond); + initRandomKeySequence(); + } + @Override protected void getKey(byte[] key, long id, long range) { + generateKeyFromLong(key, nextUniqueRandom()); + } + + protected void initRandomKeySequence() { + bufferSize_ = MAX_BUFFER_SIZE; + if (bufferSize_ > keyRange_) { + bufferSize_ = (int) keyRange_; + } + currentKeyCount_ = bufferSize_; + keyBuffer_ = new long[MAX_BUFFER_SIZE]; + for (int k = 0; k < bufferSize_; ++k) { + keyBuffer_[k] = k; + } + } + + /** + * Semi-randomly return the next unique key. It is guaranteed to be + * fully random if keyRange_ <= MAX_BUFFER_SIZE. + */ + long nextUniqueRandom() { + if (bufferSize_ == 0) { + System.err.println("bufferSize_ == 0."); + return 0; + } + int r = rand_.nextInt(bufferSize_); + // randomly pick one from the keyBuffer + long randKey = keyBuffer_[r]; + if (currentKeyCount_ < keyRange_) { + // if we have not yet inserted all keys, insert next new key to [r]. + keyBuffer_[r] = currentKeyCount_++; + } else { + // move the last element to [r] and decrease the size by 1. + keyBuffer_[r] = keyBuffer_[--bufferSize_]; + } + return randKey; + } + + int bufferSize_; + long currentKeyCount_; + long[] keyBuffer_; + } + class ReadRandomTask extends BenchmarkTask { public ReadRandomTask( int tid, long randSeed, long numEntries, long keyRange) { super(tid, randSeed, numEntries, keyRange); } @Override public void runTask() throws RocksDBException { - stats_.found_ = 0; byte[] key = new byte[keySize_]; byte[] value = new byte[valueSize_]; for (long i = 0; i < numEntries_; i++) { @@ -338,18 +403,22 @@ public class DbBenchmark { class ReadSequentialTask extends BenchmarkTask { public ReadSequentialTask( - int tid, long randSeed, long numEntries, long keyRange, long initId) { + int tid, long randSeed, long numEntries, long keyRange) { super(tid, randSeed, numEntries, keyRange); - initId_ = initId; } @Override public void runTask() throws RocksDBException { - // make sure we have enough things to read in sequential - if (numEntries_ > keyRange_ - initId_) { - numEntries_ = keyRange_ - initId_; + org.rocksdb.Iterator iter = db_.newIterator(); + long i; + for (iter.seekToFirst(), i = 0; + iter.isValid() && i < numEntries_; + iter.next(), ++i) { + stats_.found_++; + stats_.finishedSingleOp(iter.key().length + iter.value().length); + if (isFinished()) { + return; + } } - throw new UnsupportedOperationException(); } - private long initId_; } public DbBenchmark(Map flags) throws Exception { @@ -360,22 +429,33 @@ public class DbBenchmark { flags.get(Flag.num) : flags.get(Flag.reads)); keySize_ = (Integer) flags.get(Flag.key_size); valueSize_ = (Integer) flags.get(Flag.value_size); - writeBufferSize_ = (Integer) flags.get(Flag.write_buffer_size) > 0 ? - (Integer) flags.get(Flag.write_buffer_size) : 0; compressionRatio_ = (Double) flags.get(Flag.compression_ratio); useExisting_ = (Boolean) flags.get(Flag.use_existing_db); randSeed_ = (Long) flags.get(Flag.seed); databaseDir_ = (String) flags.get(Flag.db); writesPerSeconds_ = (Integer) flags.get(Flag.writes_per_second); cacheSize_ = (Long) flags.get(Flag.cache_size); - gen_ = new RandomGenerator(compressionRatio_); + gen_ = new RandomGenerator(randSeed_, compressionRatio_); memtable_ = (String) flags.get(Flag.memtablerep); maxWriteBufferNumber_ = (Integer) flags.get(Flag.max_write_buffer_number); prefixSize_ = (Integer) flags.get(Flag.prefix_size); keysPerPrefix_ = (Integer) flags.get(Flag.keys_per_prefix); hashBucketCount_ = (Long) flags.get(Flag.hash_bucket_count); usePlainTable_ = (Boolean) flags.get(Flag.use_plain_table); + flags_ = flags; finishLock_ = new Object(); + // options.setPrefixSize((Integer)flags_.get(Flag.prefix_size)); + // options.setKeysPerPrefix((Long)flags_.get(Flag.keys_per_prefix)); + } + + private void prepareReadOptions(ReadOptions options) { + options.setVerifyChecksums((Boolean)flags_.get(Flag.verify_checksum)); + options.setTailing((Boolean)flags_.get(Flag.use_tailing_iterator)); + } + + private void prepareWriteOptions(WriteOptions options) { + options.setSync((Boolean)flags_.get(Flag.sync)); + options.setDisableWAL((Boolean)flags_.get(Flag.disable_wal)); } private void prepareOptions(Options options) { @@ -405,9 +485,119 @@ public class DbBenchmark { options.memTableFactoryName()); } if (usePlainTable_) { - options.setSstFormatConfig( + options.setTableFormatConfig( new PlainTableConfig().setKeySize(keySize_)); } + options.setMaxWriteBufferNumber( + (Integer)flags_.get(Flag.max_write_buffer_number)); + options.setMaxBackgroundCompactions( + (Integer)flags_.get(Flag.max_background_compactions)); + options.setMaxBackgroundFlushes( + (Integer)flags_.get(Flag.max_background_flushes)); + options.setCacheSize( + (Long)flags_.get(Flag.cache_size)); + options.setBlockSize( + (Integer)flags_.get(Flag.block_size)); + options.setMaxOpenFiles( + (Integer)flags_.get(Flag.open_files)); + options.setCreateIfMissing( + !(Boolean)flags_.get(Flag.use_existing_db)); + options.setTableCacheRemoveScanCountLimit( + (Integer)flags_.get(Flag.cache_remove_scan_count_limit)); + options.setDisableDataSync( + (Boolean)flags_.get(Flag.disable_data_sync)); + options.setUseFsync( + (Boolean)flags_.get(Flag.use_fsync)); + options.setWalDir( + (String)flags_.get(Flag.wal_dir)); + options.setDisableSeekCompaction( + (Boolean)flags_.get(Flag.disable_seek_compaction)); + options.setDeleteObsoleteFilesPeriodMicros( + (Long)flags_.get(Flag.delete_obsolete_files_period_micros)); + options.setTableCacheNumshardbits( + (Integer)flags_.get(Flag.table_cache_numshardbits)); + options.setAllowMmapReads( + (Boolean)flags_.get(Flag.mmap_read)); + options.setAllowMmapWrites( + (Boolean)flags_.get(Flag.mmap_write)); + options.setAdviseRandomOnOpen( + (Boolean)flags_.get(Flag.advise_random_on_open)); + options.setUseAdaptiveMutex( + (Boolean)flags_.get(Flag.use_adaptive_mutex)); + options.setBytesPerSync( + (Long)flags_.get(Flag.bytes_per_sync)); + options.setBloomLocality( + (Integer)flags_.get(Flag.bloom_locality)); + options.setMinWriteBufferNumberToMerge( + (Integer)flags_.get(Flag.min_write_buffer_number_to_merge)); + options.setMemtablePrefixBloomBits( + (Integer)flags_.get(Flag.memtable_bloom_bits)); + options.setNumLevels( + (Integer)flags_.get(Flag.num_levels)); + options.setTargetFileSizeBase( + (Integer)flags_.get(Flag.target_file_size_base)); + options.setTargetFileSizeMultiplier( + (Integer)flags_.get(Flag.target_file_size_multiplier)); + options.setMaxBytesForLevelBase( + (Integer)flags_.get(Flag.max_bytes_for_level_base)); + options.setMaxBytesForLevelMultiplier( + (Integer)flags_.get(Flag.max_bytes_for_level_multiplier)); + options.setLevelZeroStopWritesTrigger( + (Integer)flags_.get(Flag.level0_stop_writes_trigger)); + options.setLevelZeroSlowdownWritesTrigger( + (Integer)flags_.get(Flag.level0_slowdown_writes_trigger)); + options.setLevelZeroFileNumCompactionTrigger( + (Integer)flags_.get(Flag.level0_file_num_compaction_trigger)); + options.setSoftRateLimit( + (Double)flags_.get(Flag.soft_rate_limit)); + options.setHardRateLimit( + (Double)flags_.get(Flag.hard_rate_limit)); + options.setRateLimitDelayMaxMilliseconds( + (Integer)flags_.get(Flag.rate_limit_delay_max_milliseconds)); + options.setMaxGrandparentOverlapFactor( + (Integer)flags_.get(Flag.max_grandparent_overlap_factor)); + options.setDisableAutoCompactions( + (Boolean)flags_.get(Flag.disable_auto_compactions)); + options.setSourceCompactionFactor( + (Integer)flags_.get(Flag.source_compaction_factor)); + options.setFilterDeletes( + (Boolean)flags_.get(Flag.filter_deletes)); + options.setMaxSuccessiveMerges( + (Integer)flags_.get(Flag.max_successive_merges)); + options.setWalTtlSeconds((Long)flags_.get(Flag.wal_ttl_seconds)); + options.setWalSizeLimitMB((Long)flags_.get(Flag.wal_size_limit_MB)); + int bloomBits = (Integer)flags_.get(Flag.bloom_bits); + if (bloomBits > 0) { + // Internally, options will keep a reference to this BloomFilter. + // This will disallow Java to GC this BloomFilter. In addition, + // options.dispose() will release the c++ object of this BloomFilter. + // As a result, the caller should not directly call + // BloomFilter.dispose(). + options.setFilter(new BloomFilter(bloomBits)); + } + /* TODO(yhchiang): enable the following parameters + options.setCompressionType((String)flags_.get(Flag.compression_type)); + options.setCompressionLevel((Integer)flags_.get(Flag.compression_level)); + options.setMinLevelToCompress((Integer)flags_.get(Flag.min_level_to_compress)); + options.setHdfs((String)flags_.get(Flag.hdfs)); // env + options.setCacheNumshardbits((Integer)flags_.get(Flag.cache_numshardbits)); + options.setStatistics((Boolean)flags_.get(Flag.statistics)); + options.setUniversalSizeRatio( + (Integer)flags_.get(Flag.universal_size_ratio)); + options.setUniversalMinMergeWidth( + (Integer)flags_.get(Flag.universal_min_merge_width)); + options.setUniversalMaxMergeWidth( + (Integer)flags_.get(Flag.universal_max_merge_width)); + options.setUniversalMaxSizeAmplificationPercent( + (Integer)flags_.get(Flag.universal_max_size_amplification_percent)); + options.setUniversalCompressionSizePercent( + (Integer)flags_.get(Flag.universal_compression_size_percent)); + // TODO(yhchiang): add RocksDB.openForReadOnly() to enable Flag.readonly + // TODO(yhchiang): enable Flag.merge_operator by switch + options.setAccessHintOnCompactionStart( + (String)flags_.get(Flag.compaction_fadvice)); + // available values of fadvice are "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" for fadvice + */ } private void run() throws RocksDBException { @@ -424,6 +614,9 @@ public class DbBenchmark { List> tasks = new ArrayList>(); List> bgTasks = new ArrayList>(); WriteOptions writeOpt = new WriteOptions(); + prepareWriteOptions(writeOpt); + ReadOptions readOpt = new ReadOptions(); + prepareReadOptions(readOpt); int currentTaskId = 0; boolean known = true; @@ -436,6 +629,9 @@ public class DbBenchmark { } else if (benchmark.equals("fillrandom")) { tasks.add(new WriteRandomTask( currentTaskId++, randSeed_, num_, num_, writeOpt, 1)); + } else if (benchmark.equals("filluniquerandom")) { + tasks.add(new WriteUniqueRandomTask( + currentTaskId++, randSeed_, num_, num_, writeOpt, 1)); } else if (benchmark.equals("fillsync")) { writeOpt.setSync(true); tasks.add(new WriteRandomTask( @@ -444,13 +640,12 @@ public class DbBenchmark { } else if (benchmark.equals("readseq")) { for (int t = 0; t < threadNum_; ++t) { tasks.add(new ReadSequentialTask( - currentTaskId++, randSeed_, reads_ / threadNum_, - num_, (num_ / threadNum_) * t)); + currentTaskId++, randSeed_, reads_, num_)); } } else if (benchmark.equals("readrandom")) { for (int t = 0; t < threadNum_; ++t) { tasks.add(new ReadRandomTask( - currentTaskId++, randSeed_, reads_ / threadNum_, num_)); + currentTaskId++, randSeed_, reads_, num_)); } } else if (benchmark.equals("readwhilewriting")) { WriteTask writeTask = new WriteRandomTask( @@ -508,6 +703,7 @@ public class DbBenchmark { } } writeOpt.dispose(); + readOpt.dispose(); } options.dispose(); db_.close(); @@ -573,7 +769,7 @@ public class DbBenchmark { System.out.printf( "%-16s : %11.5f micros/op; %6.1f MB/s; %d / %d task(s) finished.\n", - benchmark, elapsedSeconds * 1e6 / num_, + benchmark, elapsedSeconds * 1e6 / stats.done_, (stats.bytes_ / 1048576.0) / elapsedSeconds, taskFinishedCount, concurrentThreads); } @@ -616,14 +812,13 @@ public class DbBenchmark { static void printHelp() { System.out.println("usage:"); for (Flag flag : Flag.values()) { - System.out.format(" --%s%n %s%n", + System.out.format(" --%s%n\t%s%n", flag.name(), flag.desc()); if (flag.getDefaultValue() != null) { - System.out.format(" DEFAULT: %s%n", + System.out.format("\tDEFAULT: %s%n", flag.getDefaultValue().toString()); } - System.out.println(""); } } @@ -677,30 +872,28 @@ public class DbBenchmark { "\t\tfillseq -- write N values in sequential key order in async mode.\n" + "\t\tfillrandom -- write N values in random key order in async mode.\n" + "\t\tfillbatch -- write N/1000 batch where each batch has 1000 values\n" + - "\t\t in random key order in sync mode.\n" + + "\t\t in random key order in sync mode.\n" + "\t\tfillsync -- write N/100 values in random key order in sync mode.\n" + "\t\tfill100K -- write N/1000 100K values in random order in async mode.\n" + "\t\treadseq -- read N times sequentially.\n" + "\t\treadrandom -- read N times in random order.\n" + "\t\treadhot -- read N times in random order from 1% section of DB.\n" + "\t\treadwhilewriting -- measure the read performance of multiple readers\n" + - "\t\t with a bg single writer. The write rate of the bg\n" + - "\t\t is capped by --writes_per_second.\n" + + "\t\t with a bg single writer. The write rate of the bg\n" + + "\t\t is capped by --writes_per_second.\n" + "\tMeta Operations:\n" + "\t\tdelete -- delete DB") { @Override public Object parseValue(String value) { return new ArrayList(Arrays.asList(value.split(","))); } }, - compression_ratio(0.5d, "Arrange to generate values that shrink to this fraction of\n" + - "\ttheir original size after compression") { + "\ttheir original size after compression.") { @Override public Object parseValue(String value) { return Double.parseDouble(value); } }, - use_existing_db(false, "If true, do not destroy the existing database. If you set this\n" + "\tflag and also specify a benchmark that wants a fresh database,\n" + @@ -709,51 +902,43 @@ public class DbBenchmark { return Boolean.parseBoolean(value); } }, - num(1000000, "Number of key/values to place in database.") { @Override public Object parseValue(String value) { return Integer.parseInt(value); } }, - threads(1, "Number of concurrent threads to run.") { @Override public Object parseValue(String value) { return Integer.parseInt(value); } }, - reads(null, "Number of read operations to do. If negative, do --nums reads.") { - @Override - public Object parseValue(String value) { + @Override public Object parseValue(String value) { return Integer.parseInt(value); } }, - key_size(16, "The size of each key in bytes.") { @Override public Object parseValue(String value) { return Integer.parseInt(value); } }, - value_size(100, "The size of each value in bytes.") { @Override public Object parseValue(String value) { return Integer.parseInt(value); } }, - write_buffer_size(4 << 20, "Number of bytes to buffer in memtable before compacting\n" + "\t(initialized to default value by 'main'.)") { @Override public Object parseValue(String value) { - return Integer.parseInt(value); + return Long.parseLong(value); } }, - max_write_buffer_number(2, "The number of in-memory memtables. Each memtable is of size\n" + "\twrite_buffer_size.") { @@ -761,14 +946,12 @@ public class DbBenchmark { return Integer.parseInt(value); } }, - prefix_size(0, "Controls the prefix size for HashSkipList, HashLinkedList,\n" + "\tand plain table.") { @Override public Object parseValue(String value) { return Integer.parseInt(value); } }, - keys_per_prefix(0, "Controls the average number of keys generated\n" + "\tper prefix, 0 means no special handling of the prefix,\n" + "\ti.e. use the prefix comes with the generated random number.") { @@ -776,7 +959,6 @@ public class DbBenchmark { return Integer.parseInt(value); } }, - memtablerep("skip_list", "The memtable format. Available options are\n" + "\tskip_list,\n" + @@ -787,7 +969,6 @@ public class DbBenchmark { return value; } }, - hash_bucket_count(SizeUnit.MB, "The number of hash buckets used in the hash-bucket-based\n" + "\tmemtables. Memtables that currently support this argument are\n" + @@ -796,7 +977,6 @@ public class DbBenchmark { return Long.parseLong(value); } }, - writes_per_second(10000, "The write-rate of the background writer used in the\n" + "\t`readwhilewriting` benchmark. Non-positive number indicates\n" + @@ -805,14 +985,12 @@ public class DbBenchmark { return Integer.parseInt(value); } }, - use_plain_table(false, "Use plain-table sst format.") { @Override public Object parseValue(String value) { return Boolean.parseBoolean(value); } }, - cache_size(-1L, "Number of bytes to use as a cache of uncompressed data.\n" + "\tNegative means use default settings.") { @@ -820,15 +998,445 @@ public class DbBenchmark { return Long.parseLong(value); } }, - seed(0L, "Seed base for random number generators.") { @Override public Object parseValue(String value) { return Long.parseLong(value); } }, - - + num_levels(7, + "The total number of levels.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + numdistinct(1000, + "Number of distinct keys to use. Used in RandomWithVerify to\n" + + "\tread/write on fewer keys so that gets are more likely to find the\n" + + "\tkey and puts are more likely to update the same key.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + merge_keys(-1, + "Number of distinct keys to use for MergeRandom and\n" + + "\tReadRandomMergeRandom.\n" + + "\tIf negative, there will be FLAGS_num keys.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + bloom_locality(0,"Control bloom filter probes locality.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + duration(0,"Time in seconds for the random-ops tests to run.\n" + + "\tWhen 0 then num & reads determine the test duration.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + num_multi_db(0, + "Number of DBs used in the benchmark. 0 means single DB.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + histogram(false,"Print histogram of operation timings.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + min_write_buffer_number_to_merge( + defaultOptions_.minWriteBufferNumberToMerge(), + "The minimum number of write buffers that will be merged together\n" + + "\tbefore writing to storage. This is cheap because it is an\n" + + "\tin-memory merge. If this feature is not enabled, then all these\n" + + "\twrite buffers are flushed to L0 as separate files and this\n" + + "\tincreases read amplification because a get request has to check\n" + + "\tin all of these files. Also, an in-memory merge may result in\n" + + "\twriting less data to storage if there are duplicate records\n" + + "\tin each of these individual write buffers.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + max_background_compactions( + defaultOptions_.maxBackgroundCompactions(), + "The maximum number of concurrent background compactions\n" + + "\tthat can occur in parallel.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + max_background_flushes( + defaultOptions_.maxBackgroundFlushes(), + "The maximum number of concurrent background flushes\n" + + "\tthat can occur in parallel.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + /* TODO(yhchiang): enable the following + compaction_style((int32_t) defaultOptions_.compactionStyle(), + "style of compaction: level-based vs universal.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + },*/ + universal_size_ratio(0, + "Percentage flexibility while comparing file size\n" + + "\t(for universal compaction only).") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + universal_min_merge_width(0,"The minimum number of files in a\n" + + "\tsingle compaction run (for universal compaction only).") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + universal_max_merge_width(0,"The max number of files to compact\n" + + "\tin universal style compaction.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + universal_max_size_amplification_percent(0, + "The max size amplification for universal style compaction.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + universal_compression_size_percent(-1, + "The percentage of the database to compress for universal\n" + + "\tcompaction. -1 means compress everything.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + block_size(defaultOptions_.blockSize(), + "Number of bytes in a block.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + compressed_cache_size(-1, + "Number of bytes to use as a cache of compressed data.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + open_files(defaultOptions_.maxOpenFiles(), + "Maximum number of files to keep open at the same time\n" + + "\t(use default if == 0)") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + bloom_bits(-1,"Bloom filter bits per key. Negative means\n" + + "\tuse default settings.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + memtable_bloom_bits(0,"Bloom filter bits per key for memtable.\n" + + "\tNegative means no bloom filter.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + cache_numshardbits(-1,"Number of shards for the block cache\n" + + "\tis 2 ** cache_numshardbits. Negative means use default settings.\n" + + "\tThis is applied only if FLAGS_cache_size is non-negative.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + cache_remove_scan_count_limit(32,"") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + verify_checksum(false,"Verify checksum for every block read\n" + + "\tfrom storage.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + statistics(false,"Database statistics.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + writes(-1,"Number of write operations to do. If negative, do\n" + + "\t--num reads.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + sync(false,"Sync all writes to disk.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + disable_data_sync(false,"If true, do not wait until data is\n" + + "\tsynced to disk.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + use_fsync(false,"If true, issue fsync instead of fdatasync.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + disable_wal(false,"If true, do not write WAL for write.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + wal_dir("", "If not empty, use the given dir for WAL.") { + @Override public Object parseValue(String value) { + return value; + } + }, + target_file_size_base(2 * 1048576,"Target file size at level-1") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + target_file_size_multiplier(1, + "A multiplier to compute target level-N file size (N >= 2)") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + max_bytes_for_level_base(10 * 1048576, + "Max bytes for level-1") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + max_bytes_for_level_multiplier(10, + "A multiplier to compute max bytes for level-N (N >= 2)") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + level0_stop_writes_trigger(12,"Number of files in level-0\n" + + "\tthat will trigger put stop.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + level0_slowdown_writes_trigger(8,"Number of files in level-0\n" + + "\tthat will slow down writes.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + level0_file_num_compaction_trigger(4,"Number of files in level-0\n" + + "\twhen compactions start.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + readwritepercent(90,"Ratio of reads to reads/writes (expressed\n" + + "\tas percentage) for the ReadRandomWriteRandom workload. The\n" + + "\tdefault value 90 means 90% operations out of all reads and writes\n" + + "\toperations are reads. In other words, 9 gets for every 1 put.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + mergereadpercent(70,"Ratio of merges to merges&reads (expressed\n" + + "\tas percentage) for the ReadRandomMergeRandom workload. The\n" + + "\tdefault value 70 means 70% out of all read and merge operations\n" + + "\tare merges. In other words, 7 merges for every 3 gets.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + deletepercent(2,"Percentage of deletes out of reads/writes/\n" + + "\tdeletes (used in RandomWithVerify only). RandomWithVerify\n" + + "\tcalculates writepercent as (100 - FLAGS_readwritepercent -\n" + + "\tdeletepercent), so deletepercent must be smaller than (100 -\n" + + "\tFLAGS_readwritepercent)") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + disable_seek_compaction(false,"Option to disable compaction\n" + + "\ttriggered by read.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + delete_obsolete_files_period_micros(0,"Option to delete\n" + + "\tobsolete files periodically. 0 means that obsolete files are\n" + + "\tdeleted after every compaction run.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + compression_level(-1, + "Compression level. For zlib this should be -1 for the\n" + + "\tdefault level, or between 0 and 9.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + min_level_to_compress(-1,"If non-negative, compression starts\n" + + "\tfrom this level. Levels with number < min_level_to_compress are\n" + + "\tnot compressed. Otherwise, apply compression_type to\n" + + "\tall levels.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + table_cache_numshardbits(4,"") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + stats_interval(0,"Stats are reported every N operations when\n" + + "\tthis is greater than zero. When 0 the interval grows over time.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + stats_per_interval(0,"Reports additional stats per interval when\n" + + "\tthis is greater than 0.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + perf_level(0,"Level of perf collection.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + soft_rate_limit(0.0,"") { + @Override public Object parseValue(String value) { + return Double.parseDouble(value); + } + }, + hard_rate_limit(0.0,"When not equal to 0 this make threads\n" + + "\tsleep at each stats reporting interval until the compaction\n" + + "\tscore for all levels is less than or equal to this value.") { + @Override public Object parseValue(String value) { + return Double.parseDouble(value); + } + }, + rate_limit_delay_max_milliseconds(1000, + "When hard_rate_limit is set then this is the max time a put will\n" + + "\tbe stalled.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + max_grandparent_overlap_factor(10,"Control maximum bytes of\n" + + "\toverlaps in grandparent (i.e., level+2) before we stop building a\n" + + "\tsingle file in a level->level+1 compaction.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + readonly(false,"Run read only benchmarks.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + disable_auto_compactions(false,"Do not auto trigger compactions.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + source_compaction_factor(1,"Cap the size of data in level-K for\n" + + "\ta compaction run that compacts Level-K with Level-(K+1) (for\n" + + "\tK >= 1)") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, + wal_ttl_seconds(0L,"Set the TTL for the WAL Files in seconds.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + wal_size_limit_MB(0L,"Set the size limit for the WAL Files\n" + + "\tin MB.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + /* TODO(yhchiang): enable the following + bufferedio(rocksdb::EnvOptions().use_os_buffer, + "Allow buffered io using OS buffers.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + */ + mmap_read(false, + "Allow reads to occur via mmap-ing files.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + mmap_write(false, + "Allow writes to occur via mmap-ing files.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + advise_random_on_open(defaultOptions_.adviseRandomOnOpen(), + "Advise random access on table file open.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + compaction_fadvice("NORMAL", + "Access pattern advice when a file is compacted.") { + @Override public Object parseValue(String value) { + return value; + } + }, + use_tailing_iterator(false, + "Use tailing iterator to access a series of keys instead of get.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + use_adaptive_mutex(defaultOptions_.useAdaptiveMutex(), + "Use adaptive mutex.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + bytes_per_sync(defaultOptions_.bytesPerSync(), + "Allows OS to incrementally sync files to disk while they are\n" + + "\tbeing written, in the background. Issue one request for every\n" + + "\tbytes_per_sync written. 0 turns it off.") { + @Override public Object parseValue(String value) { + return Long.parseLong(value); + } + }, + filter_deletes(false," On true, deletes use bloom-filter and drop\n" + + "\tthe delete if key not present.") { + @Override public Object parseValue(String value) { + return Boolean.parseBoolean(value); + } + }, + max_successive_merges(0,"Maximum number of successive merge\n" + + "\toperations on a key in the memtable.") { + @Override public Object parseValue(String value) { + return Integer.parseInt(value); + } + }, db("/tmp/rocksdbjni-bench", "Use the db with the following name.") { @Override public Object parseValue(String value) { @@ -859,25 +1467,23 @@ public class DbBenchmark { private final byte[] data_; private int dataLength_; private int position_; + Random rand_; - private RandomGenerator(double compressionRatio) { + private RandomGenerator(long seed, double compressionRatio) { // We use a limited amount of data over and over again and ensure // that it is larger than the compression window (32KB), and also // large enough to serve all typical value sizes we want to write. - Random rand = new Random(301); + rand_ = new Random(seed); dataLength_ = 1048576 + 100; data_ = new byte[dataLength_]; // TODO(yhchiang): mimic test::CompressibleString? for (int i = 0; i < dataLength_; ++i) { - data_[i] = (byte) (' ' + rand.nextInt(95)); + data_[i] = (byte) (' ' + rand_.nextInt(95)); } } private byte[] generate(int length) { - if (position_ + length > data_.length) { - position_ = 0; - assert (length < data_.length); - } + position_ = rand_.nextInt(data_.length - length); return Arrays.copyOfRange(data_, position_, position_ + length); } } @@ -911,7 +1517,6 @@ public class DbBenchmark { long startTime_; // memtable related - final int writeBufferSize_; final int maxWriteBufferNumber_; final int prefixSize_; final int keysPerPrefix_; @@ -923,4 +1528,8 @@ public class DbBenchmark { Object finishLock_; boolean isFinished_; + Map flags_; + // as the scope of a static member equals to the scope of the problem, + // we let its c++ pointer to be disposed in its finalizer. + static Options defaultOptions_ = new Options(); } diff --git a/java/org/rocksdb/test/OptionsTest.java b/java/org/rocksdb/test/OptionsTest.java index 5971d96a5..cd3ba785d 100644 --- a/java/org/rocksdb/test/OptionsTest.java +++ b/java/org/rocksdb/test/OptionsTest.java @@ -123,9 +123,9 @@ public class OptionsTest { assert(opt.tableCacheRemoveScanCountLimit() == intValue); } - { // WALTtlSeconds test + { // WalTtlSeconds test long longValue = rand.nextLong(); - opt.setWALTtlSeconds(longValue); + opt.setWalTtlSeconds(longValue); assert(opt.walTtlSeconds() == longValue); } diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 6cbe1c27f..a05a74e7a 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -122,13 +122,13 @@ jlong Java_org_rocksdb_Options_statisticsPtr( /* * Class: org_rocksdb_Options - * Method: setFilter0 + * Method: setFilterHandle * Signature: (JJ)V */ -void Java_org_rocksdb_Options_setFilter0( - JNIEnv* env, jobject jobj, jlong jopt_handle, jobject jfp) { +void Java_org_rocksdb_Options_setFilterHandle( + JNIEnv* env, jobject jobj, jlong jopt_handle, jlong jfilter_handle) { reinterpret_cast(jopt_handle)->filter_policy = - rocksdb::FilterJni::getHandle(env, jfp); + reinterpret_cast(jfilter_handle); } /* @@ -602,15 +602,36 @@ jlong Java_org_rocksdb_Options_walTtlSeconds( /* * Class: org_rocksdb_Options - * Method: setWALTtlSeconds + * Method: setWalTtlSeconds * Signature: (JJ)V */ -void Java_org_rocksdb_Options_setWALTtlSeconds( +void Java_org_rocksdb_Options_setWalTtlSeconds( JNIEnv* env, jobject jobj, jlong jhandle, jlong WAL_ttl_seconds) { reinterpret_cast(jhandle)->WAL_ttl_seconds = static_cast(WAL_ttl_seconds); } +/* + * Class: org_rocksdb_Options + * Method: walTtlSeconds + * Signature: (J)J + */ +jlong Java_org_rocksdb_Options_walSizeLimitMB( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle)->WAL_size_limit_MB; +} + +/* + * Class: org_rocksdb_Options + * Method: setWalSizeLimitMB + * Signature: (JJ)V + */ +void Java_org_rocksdb_Options_setWalSizeLimitMB( + JNIEnv* env, jobject jobj, jlong jhandle, jlong WAL_size_limit_MB) { + reinterpret_cast(jhandle)->WAL_size_limit_MB = + static_cast(WAL_size_limit_MB); +} + /* * Class: org_rocksdb_Options * Method: manifestPreallocationSize