From e412a426d6ce019fb12f094f64e7f105d227201d Mon Sep 17 00:00:00 2001 From: Tomas Kolda Date: Tue, 11 Feb 2020 14:45:15 -0800 Subject: [PATCH] JNI direct buffer support for basic operations (#2283) Summary: It is very useful to support direct ByteBuffers in Java. It allows to have zero memory copy and some serializers are using that directly so one do not need to create byte[] array for it. This change also contains some fixes for Windows JNI build. Pull Request resolved: https://github.com/facebook/rocksdb/pull/2283 Differential Revision: D19834971 Pulled By: pdillinger fbshipit-source-id: 44173aa02afc9836c5498c592fd1ea95b6086e8e --- HISTORY.md | 3 +- java/rocksjni/iterator.cc | 61 +++++++ java/rocksjni/portal.h | 79 ++++++++ java/rocksjni/rocksjni.cc | 166 +++++++++++++++++ java/rocksjni/sst_file_reader_iterator.cc | 59 +++++- java/rocksjni/sst_file_writerjni.cc | 33 ++++ java/rocksjni/write_batch.cc | 46 +++++ java/rocksjni/write_batch_with_index.cc | 57 ++++++ .../org/rocksdb/AbstractRocksIterator.java | 18 ++ .../java/org/rocksdb/AbstractWriteBatch.java | 40 ++++ java/src/main/java/org/rocksdb/RocksDB.java | 171 +++++++++++++++++- .../main/java/org/rocksdb/RocksIterator.java | 53 ++++++ .../org/rocksdb/RocksIteratorInterface.java | 25 +++ .../org/rocksdb/SstFileReaderIterator.java | 55 ++++++ .../main/java/org/rocksdb/SstFileWriter.java | 41 ++++- .../java/org/rocksdb/WBWIRocksIterator.java | 9 + .../src/main/java/org/rocksdb/WriteBatch.java | 9 + .../java/org/rocksdb/WriteBatchInterface.java | 48 +++++ .../java/org/rocksdb/WriteBatchWithIndex.java | 9 + .../test/java/org/rocksdb/RocksDBTest.java | 58 +++++- .../java/org/rocksdb/RocksIteratorTest.java | 56 +++++- .../java/org/rocksdb/SstFileReaderTest.java | 38 +++- .../java/org/rocksdb/SstFileWriterTest.java | 35 +++- .../test/java/org/rocksdb/WriteBatchTest.java | 47 ++++- .../org/rocksdb/WriteBatchWithIndexTest.java | 49 ++++- .../rocksdb/util/BytewiseComparatorTest.java | 10 + 26 files changed, 1232 insertions(+), 43 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index cd38270c3..f40dabf6c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,7 +1,8 @@ # Rocksdb Change Log ## Unreleased -### Public API Change +### Java API Changes * Major breaking changes to Java comparators, toward standardizing on ByteBuffer for performant, locale-neutral operations on keys (#6252). +* Added overloads of common API methods using direct ByteBuffers for keys and values (#2283). ### Bug Fixes * Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev(). diff --git a/java/rocksjni/iterator.cc b/java/rocksjni/iterator.cc index 18daeb816..98883a352 100644 --- a/java/rocksjni/iterator.cc +++ b/java/rocksjni/iterator.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "include/org_rocksdb_RocksIterator.h" #include "rocksdb/iterator.h" @@ -102,6 +103,36 @@ void Java_org_rocksdb_RocksIterator_seek0(JNIEnv* env, jobject /*jobj*/, env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT); } +/* + * Class: org_rocksdb_RocksIterator + * Method: seekDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)V + */ +void Java_org_rocksdb_RocksIterator_seekDirect0(JNIEnv* env, jobject /*jobj*/, + jlong handle, jobject jtarget, + jint jtarget_off, + jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seek = [&it](rocksdb::Slice& target_slice) { it->Seek(target_slice); }; + rocksdb::JniUtil::k_op_direct(seek, env, jtarget, jtarget_off, jtarget_len); +} + +/* + * Class: org_rocksdb_RocksIterator + * Method: seekForPrevDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)V + */ +void Java_org_rocksdb_RocksIterator_seekForPrevDirect0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seekPrev = [&it](rocksdb::Slice& target_slice) { + it->SeekForPrev(target_slice); + }; + rocksdb::JniUtil::k_op_direct(seekPrev, env, jtarget, jtarget_off, + jtarget_len); +} + /* * Class: org_rocksdb_RocksIterator * Method: seekForPrev0 @@ -163,6 +194,21 @@ jbyteArray Java_org_rocksdb_RocksIterator_key0(JNIEnv* env, jobject /*jobj*/, return jkey; } +/* + * Class: org_rocksdb_RocksIterator + * Method: keyDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)I + */ +jint Java_org_rocksdb_RocksIterator_keyDirect0(JNIEnv* env, jobject /*jobj*/, + jlong handle, jobject jtarget, + jint jtarget_off, + jint jtarget_len) { + auto* it = reinterpret_cast(handle); + rocksdb::Slice key_slice = it->key(); + return rocksdb::JniUtil::copyToDirect(env, key_slice, jtarget, jtarget_off, + jtarget_len); +} + /* * Class: org_rocksdb_RocksIterator * Method: value0 @@ -184,3 +230,18 @@ jbyteArray Java_org_rocksdb_RocksIterator_value0(JNIEnv* env, jobject /*jobj*/, const_cast(reinterpret_cast(value_slice.data()))); return jkeyValue; } + +/* + * Class: org_rocksdb_RocksIterator + * Method: valueDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)I + */ +jint Java_org_rocksdb_RocksIterator_valueDirect0(JNIEnv* env, jobject /*jobj*/, + jlong handle, jobject jtarget, + jint jtarget_off, + jint jtarget_len) { + auto* it = reinterpret_cast(handle); + rocksdb::Slice value_slice = it->value(); + return rocksdb::JniUtil::copyToDirect(env, value_slice, jtarget, jtarget_off, + jtarget_len); +} diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 876474b16..b4f70f5f8 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -2233,6 +2233,85 @@ class JniUtil { return jpointers; } + + /* + * Helper for operations on a key and value + * for example WriteBatch->Put + * + * TODO(AR) could be extended to cover returning rocksdb::Status + * from `op` and used for RocksDB->Put etc. + */ + static void kv_op_direct( + std::function op, JNIEnv* env, + jobject jkey, jint jkey_off, jint jkey_len, jobject jval, jint jval_off, + jint jval_len) { + char* key = reinterpret_cast(env->GetDirectBufferAddress(jkey)); + if (key == nullptr || + env->GetDirectBufferCapacity(jkey) < (jkey_off + jkey_len)) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid key argument"); + return; + } + + char* value = reinterpret_cast(env->GetDirectBufferAddress(jval)); + if (value == nullptr || + env->GetDirectBufferCapacity(jval) < (jval_off + jval_len)) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid value argument"); + return; + } + + key += jkey_off; + value += jval_off; + + rocksdb::Slice key_slice(key, jkey_len); + rocksdb::Slice value_slice(value, jval_len); + + op(key_slice, value_slice); + } + + /* + * Helper for operations on a key and value + * for example WriteBatch->Delete + * + * TODO(AR) could be extended to cover returning rocksdb::Status + * from `op` and used for RocksDB->Delete etc. + */ + static void k_op_direct(std::function op, + JNIEnv* env, jobject jkey, jint jkey_off, + jint jkey_len) { + char* key = reinterpret_cast(env->GetDirectBufferAddress(jkey)); + if (key == nullptr || + env->GetDirectBufferCapacity(jkey) < (jkey_off + jkey_len)) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid key argument"); + return; + } + + key += jkey_off; + + rocksdb::Slice key_slice(key, jkey_len); + + return op(key_slice); + } + + template + static jint copyToDirect(JNIEnv* env, T& source, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + char* target = + reinterpret_cast(env->GetDirectBufferAddress(jtarget)); + if (target == nullptr || + env->GetDirectBufferCapacity(jtarget) < (jtarget_off + jtarget_len)) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, "Invalid target argument"); + return 0; + } + + target += jtarget_off; + + const jint cvalue_len = static_cast(source.size()); + const jint length = std::min(jtarget_len, cvalue_len); + + memcpy(target, source.data(), length); + + return cvalue_len; + } }; class MapJni : public JavaClass { diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 7f6d5ebfe..0342eef5f 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -575,6 +575,36 @@ void Java_org_rocksdb_RocksDB_put__JJ_3BII_3BIIJ( } } +/* + * Class: org_rocksdb_RocksDB + * Method: putDirect + * Signature: (JJLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;IIJ)V + */ +void Java_org_rocksdb_RocksDB_putDirect( + JNIEnv* env, jobject /*jdb*/, jlong jdb_handle, jlong jwrite_options_handle, + jobject jkey, jint jkey_off, jint jkey_len, jobject jval, jint jval_off, + jint jval_len, jlong jcf_handle) { + auto* db = reinterpret_cast(jdb_handle); + auto* write_options = + reinterpret_cast(jwrite_options_handle); + auto* cf_handle = reinterpret_cast(jcf_handle); + auto put = [&env, &db, &cf_handle, &write_options](rocksdb::Slice& key, + rocksdb::Slice& value) { + rocksdb::Status s; + if (cf_handle == nullptr) { + s = db->Put(*write_options, key, value); + } else { + s = db->Put(*write_options, cf_handle, key, value); + } + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + }; + rocksdb::JniUtil::kv_op_direct(put, env, jkey, jkey_off, jkey_len, jval, + jval_off, jval_len); +} + ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::Delete() @@ -868,6 +898,92 @@ void Java_org_rocksdb_RocksDB_deleteRange__J_3BII_3BII( jend_key, jend_key_off, jend_key_len); } +jint rocksdb_get_helper_direct( + JNIEnv* env, rocksdb::DB* db, const rocksdb::ReadOptions& read_options, + rocksdb::ColumnFamilyHandle* column_family_handle, jobject jkey, + jint jkey_off, jint jkey_len, jobject jval, jint jval_off, jint jval_len, + bool* has_exception) { + static const int kNotFound = -1; + static const int kStatusError = -2; + static const int kArgumentError = -3; + + char* key = reinterpret_cast(env->GetDirectBufferAddress(jkey)); + if (key == nullptr) { + rocksdb::RocksDBExceptionJni::ThrowNew( + env, + "Invalid key argument (argument is not a valid direct ByteBuffer)"); + *has_exception = true; + return kArgumentError; + } + if (env->GetDirectBufferCapacity(jkey) < (jkey_off + jkey_len)) { + rocksdb::RocksDBExceptionJni::ThrowNew( + env, + "Invalid key argument. Capacity is less than requested region (offset " + "+ length)."); + *has_exception = true; + return kArgumentError; + } + + char* value = reinterpret_cast(env->GetDirectBufferAddress(jval)); + if (value == nullptr) { + rocksdb::RocksDBExceptionJni::ThrowNew( + env, + "Invalid value argument (argument is not a valid direct ByteBuffer)"); + *has_exception = true; + return kArgumentError; + } + + if (env->GetDirectBufferCapacity(jval) < (jval_off + jval_len)) { + rocksdb::RocksDBExceptionJni::ThrowNew( + env, + "Invalid value argument. Capacity is less than requested region " + "(offset + length)."); + *has_exception = true; + return kArgumentError; + } + + key += jkey_off; + value += jval_off; + + rocksdb::Slice key_slice(key, jkey_len); + + // TODO(yhchiang): we might save one memory allocation here by adding + // a DB::Get() function which takes preallocated jbyte* as input. + std::string cvalue; + rocksdb::Status s; + if (column_family_handle != nullptr) { + s = db->Get(read_options, column_family_handle, key_slice, &cvalue); + } else { + // backwards compatibility + s = db->Get(read_options, key_slice, &cvalue); + } + + if (s.IsNotFound()) { + *has_exception = false; + return kNotFound; + } else if (!s.ok()) { + *has_exception = true; + // Here since we are throwing a Java exception from c++ side. + // As a result, c++ does not know calling this function will in fact + // throwing an exception. As a result, the execution flow will + // not stop here, and codes after this throw will still be + // executed. + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + + // Return a dummy const value to avoid compilation error, although + // java side might not have a chance to get the return value :) + return kStatusError; + } + + const jint cvalue_len = static_cast(cvalue.size()); + const jint length = std::min(jval_len, cvalue_len); + + memcpy(value, cvalue.c_str(), length); + + *has_exception = false; + return cvalue_len; +} + /* * Class: org_rocksdb_RocksDB * Method: deleteRange @@ -933,6 +1049,27 @@ void Java_org_rocksdb_RocksDB_deleteRange__JJ_3BII_3BIIJ( } } +/* + * Class: org_rocksdb_RocksDB + * Method: getDirect + * Signature: (JJLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;IIJ)I + */ +jint Java_org_rocksdb_RocksDB_getDirect(JNIEnv* env, jobject /*jdb*/, + jlong jdb_handle, jlong jropt_handle, + jobject jkey, jint jkey_off, + jint jkey_len, jobject jval, + jint jval_off, jint jval_len, + jlong jcf_handle) { + auto* db_handle = reinterpret_cast(jdb_handle); + auto* ro_opt = reinterpret_cast(jropt_handle); + auto* cf_handle = reinterpret_cast(jcf_handle); + bool has_exception = false; + return rocksdb_get_helper_direct( + env, db_handle, ro_opt == nullptr ? rocksdb::ReadOptions() : *ro_opt, + cf_handle, jkey, jkey_off, jkey_len, jval, jval_off, jval_len, + &has_exception); +} + ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::Merge @@ -1071,6 +1208,35 @@ jlong rocksdb_iterator_helper(rocksdb::DB* db, return reinterpret_cast(iterator); } +/* + * Class: org_rocksdb_RocksDB + * Method: deleteDirect + * Signature: (JJLjava/nio/ByteBuffer;IIJ)V + */ +void Java_org_rocksdb_RocksDB_deleteDirect(JNIEnv* env, jobject /*jdb*/, + jlong jdb_handle, + jlong jwrite_options, jobject jkey, + jint jkey_offset, jint jkey_len, + jlong jcf_handle) { + auto* db = reinterpret_cast(jdb_handle); + auto* write_options = + reinterpret_cast(jwrite_options); + auto* cf_handle = reinterpret_cast(jcf_handle); + auto remove = [&env, &db, &write_options, &cf_handle](rocksdb::Slice& key) { + rocksdb::Status s; + if (cf_handle == nullptr) { + s = db->Delete(*write_options, key); + } else { + s = db->Delete(*write_options, cf_handle, key); + } + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + }; + rocksdb::JniUtil::k_op_direct(remove, env, jkey, jkey_offset, jkey_len); +} + ////////////////////////////////////////////////////////////////////////////// // rocksdb::DB::Write /* diff --git a/java/rocksjni/sst_file_reader_iterator.cc b/java/rocksjni/sst_file_reader_iterator.cc index 4cbbf04bd..97ee89b99 100644 --- a/java/rocksjni/sst_file_reader_iterator.cc +++ b/java/rocksjni/sst_file_reader_iterator.cc @@ -188,4 +188,61 @@ jbyteArray Java_org_rocksdb_SstFileReaderIterator_value0(JNIEnv* env, jobject /* env->SetByteArrayRegion(jkeyValue, 0, static_cast(value_slice.size()), const_cast(reinterpret_cast(value_slice.data()))); return jkeyValue; -} \ No newline at end of file +} + +/* + * Class: org_rocksdb_SstFileReaderIterator + * Method: keyDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)I + */ +jint Java_org_rocksdb_SstFileReaderIterator_keyDirect0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + rocksdb::Slice key_slice = it->key(); + return rocksdb::JniUtil::copyToDirect(env, key_slice, jtarget, jtarget_off, + jtarget_len); +} + +/* + * Class: org_rocksdb_SstFileReaderIterator + * Method: valueDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)I + */ +jint Java_org_rocksdb_SstFileReaderIterator_valueDirect0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + rocksdb::Slice value_slice = it->value(); + return rocksdb::JniUtil::copyToDirect(env, value_slice, jtarget, jtarget_off, + jtarget_len); +} + +/* + * Class: org_rocksdb_SstFileReaderIterator + * Method: seekDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)V + */ +void Java_org_rocksdb_SstFileReaderIterator_seekDirect0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seek = [&it](rocksdb::Slice& target_slice) { it->Seek(target_slice); }; + rocksdb::JniUtil::k_op_direct(seek, env, jtarget, jtarget_off, jtarget_len); +} + +/* + * Class: org_rocksdb_SstFileReaderIterator + * Method: seekForPrevDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)V + */ +void Java_org_rocksdb_SstFileReaderIterator_seekForPrevDirect0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seekPrev = [&it](rocksdb::Slice& target_slice) { + it->SeekForPrev(target_slice); + }; + rocksdb::JniUtil::k_op_direct(seekPrev, env, jtarget, jtarget_off, + jtarget_len); +} diff --git a/java/rocksjni/sst_file_writerjni.cc b/java/rocksjni/sst_file_writerjni.cc index 033a45489..9f7afad04 100644 --- a/java/rocksjni/sst_file_writerjni.cc +++ b/java/rocksjni/sst_file_writerjni.cc @@ -137,6 +137,39 @@ void Java_org_rocksdb_SstFileWriter_put__J_3B_3B(JNIEnv *env, jobject /*jobj*/, } } +/* + * Class: org_rocksdb_SstFileWriter + * Method: putDirect + * Signature: (JLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;II)V + */ +void Java_org_rocksdb_SstFileWriter_putDirect(JNIEnv *env, jobject /*jdb*/, + jlong jdb_handle, jobject jkey, + jint jkey_off, jint jkey_len, + jobject jval, jint jval_off, + jint jval_len) { + auto *writer = reinterpret_cast(jdb_handle); + auto put = [&env, &writer](rocksdb::Slice &key, rocksdb::Slice &value) { + rocksdb::Status s = writer->Put(key, value); + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + }; + rocksdb::JniUtil::kv_op_direct(put, env, jkey, jkey_off, jkey_len, jval, + jval_off, jval_len); +} + +/* + * Class: org_rocksdb_SstFileWriter + * Method: fileSize + * Signature: (J)J + */ +jlong Java_org_rocksdb_SstFileWriter_fileSize(JNIEnv * /*env*/, jobject /*jdb*/, + jlong jdb_handle) { + auto *writer = reinterpret_cast(jdb_handle); + return static_cast(writer->FileSize()); +} + /* * Class: org_rocksdb_SstFileWriter * Method: merge diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc index c6d0b9072..a9b6981d5 100644 --- a/java/rocksjni/write_batch.cc +++ b/java/rocksjni/write_batch.cc @@ -192,6 +192,30 @@ void Java_org_rocksdb_WriteBatch_put__J_3BI_3BIJ( } } +/* + * Class: org_rocksdb_WriteBatch + * Method: putDirect + * Signature: (JLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;IIJ)V + */ +void Java_org_rocksdb_WriteBatch_putDirect(JNIEnv* env, jobject /*jobj*/, + jlong jwb_handle, jobject jkey, + jint jkey_offset, jint jkey_len, + jobject jval, jint jval_offset, + jint jval_len, jlong jcf_handle) { + auto* wb = reinterpret_cast(jwb_handle); + assert(wb != nullptr); + auto* cf_handle = reinterpret_cast(jcf_handle); + auto put = [&wb, &cf_handle](rocksdb::Slice& key, rocksdb::Slice& value) { + if (cf_handle == nullptr) { + wb->Put(key, value); + } else { + wb->Put(cf_handle, key, value); + } + }; + rocksdb::JniUtil::kv_op_direct(put, env, jkey, jkey_offset, jkey_len, jval, + jval_offset, jval_len); +} + /* * Class: org_rocksdb_WriteBatch * Method: merge @@ -320,6 +344,28 @@ void Java_org_rocksdb_WriteBatch_singleDelete__J_3BIJ(JNIEnv* env, jobject jobj, } } +/* + * Class: org_rocksdb_WriteBatch + * Method: removeDirect + * Signature: (JLjava/nio/ByteBuffer;IIJ)V + */ +void Java_org_rocksdb_WriteBatch_removeDirect(JNIEnv* env, jobject /*jobj*/, + jlong jwb_handle, jobject jkey, + jint jkey_offset, jint jkey_len, + jlong jcf_handle) { + auto* wb = reinterpret_cast(jwb_handle); + assert(wb != nullptr); + auto* cf_handle = reinterpret_cast(jcf_handle); + auto remove = [&wb, &cf_handle](rocksdb::Slice& key) { + if (cf_handle == nullptr) { + wb->Delete(key); + } else { + wb->Delete(cf_handle, key); + } + }; + rocksdb::JniUtil::k_op_direct(remove, env, jkey, jkey_offset, jkey_len); +} + /* * Class: org_rocksdb_WriteBatch * Method: deleteRange diff --git a/java/rocksjni/write_batch_with_index.cc b/java/rocksjni/write_batch_with_index.cc index f5e596e66..e5361da30 100644 --- a/java/rocksjni/write_batch_with_index.cc +++ b/java/rocksjni/write_batch_with_index.cc @@ -120,6 +120,29 @@ void Java_org_rocksdb_WriteBatchWithIndex_put__J_3BI_3BIJ( } } +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: putDirect + * Signature: (JLjava/nio/ByteBuffer;IILjava/nio/ByteBuffer;IIJ)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_putDirect( + JNIEnv* env, jobject /*jobj*/, jlong jwb_handle, jobject jkey, + jint jkey_offset, jint jkey_len, jobject jval, jint jval_offset, + jint jval_len, jlong jcf_handle) { + auto* wb = reinterpret_cast(jwb_handle); + assert(wb != nullptr); + auto* cf_handle = reinterpret_cast(jcf_handle); + auto put = [&wb, &cf_handle](rocksdb::Slice& key, rocksdb::Slice& value) { + if (cf_handle == nullptr) { + wb->Put(key, value); + } else { + wb->Put(cf_handle, key, value); + } + }; + rocksdb::JniUtil::kv_op_direct(put, env, jkey, jkey_offset, jkey_len, jval, + jval_offset, jval_len); +} + /* * Class: org_rocksdb_WriteBatchWithIndex * Method: merge @@ -247,6 +270,27 @@ void Java_org_rocksdb_WriteBatchWithIndex_singleDelete__J_3BIJ( } } +/* + * Class: org_rocksdb_WriteBatchWithIndex + * Method: removeDirect + * Signature: (JLjava/nio/ByteBuffer;IIJ)V + */ +void Java_org_rocksdb_WriteBatchWithIndex_removeDirect( + JNIEnv* env, jobject /*jobj*/, jlong jwb_handle, jobject jkey, + jint jkey_offset, jint jkey_len, jlong jcf_handle) { + auto* wb = reinterpret_cast(jwb_handle); + assert(wb != nullptr); + auto* cf_handle = reinterpret_cast(jcf_handle); + auto remove = [&wb, &cf_handle](rocksdb::Slice& key) { + if (cf_handle == nullptr) { + wb->Delete(key); + } else { + wb->Delete(cf_handle, key); + } + }; + rocksdb::JniUtil::k_op_direct(remove, env, jkey, jkey_offset, jkey_len); +} + /* * Class: org_rocksdb_WriteBatchWithIndex * Method: deleteRange @@ -640,6 +684,19 @@ void Java_org_rocksdb_WBWIRocksIterator_seek0(JNIEnv* env, jobject /*jobj*/, env->ReleaseByteArrayElements(jtarget, target, JNI_ABORT); } +/* + * Class: org_rocksdb_WBWIRocksIterator + * Method: seekDirect0 + * Signature: (JLjava/nio/ByteBuffer;II)V + */ +void Java_org_rocksdb_WBWIRocksIterator_seekDirect0( + JNIEnv* env, jobject /*jobj*/, jlong handle, jobject jtarget, + jint jtarget_off, jint jtarget_len) { + auto* it = reinterpret_cast(handle); + auto seek = [&it](rocksdb::Slice& target_slice) { it->Seek(target_slice); }; + rocksdb::JniUtil::k_op_direct(seek, env, jtarget, jtarget_off, jtarget_len); +} + /* * Class: org_rocksdb_WBWIRocksIterator * Method: seekForPrev0 diff --git a/java/src/main/java/org/rocksdb/AbstractRocksIterator.java b/java/src/main/java/org/rocksdb/AbstractRocksIterator.java index 2819b6c70..9e08f1465 100644 --- a/java/src/main/java/org/rocksdb/AbstractRocksIterator.java +++ b/java/src/main/java/org/rocksdb/AbstractRocksIterator.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** * Base class implementation for Rocks Iterators * in the Java API @@ -64,6 +66,20 @@ public abstract class AbstractRocksIterator

seekForPrev0(nativeHandle_, target, target.length); } + @Override + public void seek(ByteBuffer target) { + assert (isOwningHandle() && target.isDirect()); + seekDirect0(nativeHandle_, target, target.position(), target.remaining()); + target.position(target.limit()); + } + + @Override + public void seekForPrev(ByteBuffer target) { + assert (isOwningHandle() && target.isDirect()); + seekForPrevDirect0(nativeHandle_, target, target.position(), target.remaining()); + target.position(target.limit()); + } + @Override public void next() { assert (isOwningHandle()); @@ -104,5 +120,7 @@ public abstract class AbstractRocksIterator

abstract void prev0(long handle); abstract void seek0(long handle, byte[] target, int targetLen); abstract void seekForPrev0(long handle, byte[] target, int targetLen); + abstract void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); + abstract void seekForPrevDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); abstract void status0(long handle) throws RocksDBException; } diff --git a/java/src/main/java/org/rocksdb/AbstractWriteBatch.java b/java/src/main/java/org/rocksdb/AbstractWriteBatch.java index 9de0eb43c..1f81c99e3 100644 --- a/java/src/main/java/org/rocksdb/AbstractWriteBatch.java +++ b/java/src/main/java/org/rocksdb/AbstractWriteBatch.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + public abstract class AbstractWriteBatch extends RocksObject implements WriteBatchInterface { @@ -54,6 +56,24 @@ public abstract class AbstractWriteBatch extends RocksObject delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_); } + public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException { + assert key.isDirect() && value.isDirect(); + putDirect(nativeHandle_, key, key.position(), key.remaining(), value, value.position(), + value.remaining(), 0); + key.position(key.limit()); + value.position(value.limit()); + } + + @Override + public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) + throws RocksDBException { + assert key.isDirect() && value.isDirect(); + putDirect(nativeHandle_, key, key.position(), key.remaining(), value, value.position(), + value.remaining(), columnFamilyHandle.nativeHandle_); + key.position(key.limit()); + value.position(value.limit()); + } + @Override public void delete(byte[] key) throws RocksDBException { delete(nativeHandle_, key, key.length); @@ -90,6 +110,19 @@ public abstract class AbstractWriteBatch extends RocksObject columnFamilyHandle.nativeHandle_); } + public void remove(ByteBuffer key) throws RocksDBException { + removeDirect(nativeHandle_, key, key.position(), key.remaining(), 0); + key.position(key.limit()); + } + + @Override + public void remove(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) + throws RocksDBException { + removeDirect( + nativeHandle_, key, key.position(), key.remaining(), columnFamilyHandle.nativeHandle_); + key.position(key.limit()); + } + @Override public void putLogData(byte[] blob) throws RocksDBException { putLogData(nativeHandle_, blob, blob.length); @@ -134,6 +167,10 @@ public abstract class AbstractWriteBatch extends RocksObject final byte[] value, final int valueLen, final long cfHandle) throws RocksDBException; + abstract void putDirect(final long handle, final ByteBuffer key, final int keyOffset, + final int keyLength, final ByteBuffer value, final int valueOffset, final int valueLength, + final long cfHandle) throws RocksDBException; + abstract void merge(final long handle, final byte[] key, final int keyLen, final byte[] value, final int valueLen) throws RocksDBException; @@ -153,6 +190,9 @@ public abstract class AbstractWriteBatch extends RocksObject abstract void singleDelete(final long handle, final byte[] key, final int keyLen, final long cfHandle) throws RocksDBException; + abstract void removeDirect(final long handle, final ByteBuffer key, final int keyOffset, + final int keyLength, final long cfHandle) throws RocksDBException; + abstract void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen, final byte[] endKey, final int endKeyLen) throws RocksDBException; diff --git a/java/src/main/java/org/rocksdb/RocksDB.java b/java/src/main/java/org/rocksdb/RocksDB.java index ba16c207d..338324b13 100644 --- a/java/src/main/java/org/rocksdb/RocksDB.java +++ b/java/src/main/java/org/rocksdb/RocksDB.java @@ -5,10 +5,14 @@ package org.rocksdb; -import java.util.*; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; - import org.rocksdb.util.Environment; /** @@ -754,6 +758,57 @@ public class RocksDB extends RocksObject { 0, value.length, columnFamilyHandle.nativeHandle_); } + /** + * Set the database entry for "key" to "value" for the specified + * column family. + * + * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} + * instance + * @param writeOpts {@link org.rocksdb.WriteOptions} instance. + * @param key the specified key to be inserted. Position and limit is used. + * Supports direct buffer only. + * @param value the value associated with the specified key. Position and limit is used. + * Supports direct buffer only. + * + * throws IllegalArgumentException if column family is not present + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + * @see IllegalArgumentException + */ + public void put(final ColumnFamilyHandle columnFamilyHandle, final WriteOptions writeOpts, + final ByteBuffer key, final ByteBuffer value) throws RocksDBException { + assert key.isDirect() && value.isDirect(); + putDirect(nativeHandle_, writeOpts.nativeHandle_, key, key.position(), key.remaining(), value, + value.position(), value.remaining(), columnFamilyHandle.nativeHandle_); + key.position(key.limit()); + value.position(value.limit()); + } + + /** + * Set the database entry for "key" to "value". + * + * @param writeOpts {@link org.rocksdb.WriteOptions} instance. + * @param key the specified key to be inserted. Position and limit is used. + * Supports direct buffer only. + * @param value the value associated with the specified key. Position and limit is used. + * Supports direct buffer only. + * + * throws IllegalArgumentException if column family is not present + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + * @see IllegalArgumentException + */ + public void put(final WriteOptions writeOpts, final ByteBuffer key, final ByteBuffer value) + throws RocksDBException { + assert key.isDirect() && value.isDirect(); + putDirect(nativeHandle_, writeOpts.nativeHandle_, key, key.position(), key.remaining(), value, + value.position(), value.remaining(), 0); + key.position(key.limit()); + value.position(value.limit()); + } + /** * Set the database entry for "key" to "value" for the specified * column family. @@ -1016,6 +1071,70 @@ public class RocksDB extends RocksObject { columnFamilyHandle.nativeHandle_); } + /** + * Get the value associated with the specified key within column family. + * + * @param opt {@link org.rocksdb.ReadOptions} instance. + * @param key the key to retrieve the value. It is using position and limit. + * Supports direct buffer only. + * @param value the out-value to receive the retrieved value. + * It is using position and limit. Limit is set according to value size. + * Supports direct buffer only. + * @return The size of the actual value that matches the specified + * {@code key} in byte. If the return value is greater than the + * length of {@code value}, then it indicates that the size of the + * input buffer {@code value} is insufficient and partial result will + * be returned. RocksDB.NOT_FOUND will be returned if the value not + * found. + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public int get(final ReadOptions opt, final ByteBuffer key, final ByteBuffer value) + throws RocksDBException { + assert key.isDirect() && value.isDirect(); + int result = getDirect(nativeHandle_, opt.nativeHandle_, key, key.position(), key.remaining(), + value, value.position(), value.remaining(), 0); + if (result != NOT_FOUND) { + value.limit(Math.min(value.limit(), value.position() + result)); + } + key.position(key.limit()); + return result; + } + + /** + * Get the value associated with the specified key within column family. + * + * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} + * instance + * @param opt {@link org.rocksdb.ReadOptions} instance. + * @param key the key to retrieve the value. It is using position and limit. + * Supports direct buffer only. + * @param value the out-value to receive the retrieved value. + * It is using position and limit. Limit is set according to value size. + * Supports direct buffer only. + * @return The size of the actual value that matches the specified + * {@code key} in byte. If the return value is greater than the + * length of {@code value}, then it indicates that the size of the + * input buffer {@code value} is insufficient and partial result will + * be returned. RocksDB.NOT_FOUND will be returned if the value not + * found. + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public int get(final ColumnFamilyHandle columnFamilyHandle, final ReadOptions opt, + final ByteBuffer key, final ByteBuffer value) throws RocksDBException { + assert key.isDirect() && value.isDirect(); + int result = getDirect(nativeHandle_, opt.nativeHandle_, key, key.position(), key.remaining(), + value, value.position(), value.remaining(), columnFamilyHandle.nativeHandle_); + if (result != NOT_FOUND) { + value.limit(Math.min(value.limit(), value.position() + result)); + } + key.position(key.limit()); + return result; + } + /** * Remove the database entry for {@code key}. Requires that the key exists * and was not overwritten. It is not an error if the key did not exist @@ -1360,6 +1479,46 @@ public class RocksDB extends RocksObject { key, offset, len, value, vOffset, vLen); } + /** + * Delete the database entry (if any) for "key". Returns OK on + * success, and a non-OK status on error. It is not an error if "key" + * did not exist in the database. + * + * @param writeOpt WriteOptions to be used with delete operation + * @param key Key to delete within database. It is using position and limit. + * Supports direct buffer only. + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public void delete(final WriteOptions writeOpt, final ByteBuffer key) throws RocksDBException { + assert key.isDirect(); + deleteDirect(nativeHandle_, writeOpt.nativeHandle_, key, key.position(), key.remaining(), 0); + key.position(key.limit()); + } + + /** + * Delete the database entry (if any) for "key". Returns OK on + * success, and a non-OK status on error. It is not an error if "key" + * did not exist in the database. + * + * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} + * instance + * @param writeOpt WriteOptions to be used with delete operation + * @param key Key to delete within database. It is using position and limit. + * Supports direct buffer only. + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public void delete(final ColumnFamilyHandle columnFamilyHandle, final WriteOptions writeOpt, + final ByteBuffer key) throws RocksDBException { + assert key.isDirect(); + deleteDirect(nativeHandle_, writeOpt.nativeHandle_, key, key.position(), key.remaining(), + columnFamilyHandle.nativeHandle_); + key.position(key.limit()); + } + /** * Add merge operand for key/value pair. * @@ -4244,6 +4403,9 @@ public class RocksDB extends RocksObject { private native byte[][] keyMayExistFoundValue( final long handle, final long cfHandle, final long readOptHandle, final byte[] key, final int keyOffset, final int keyLength); + private native void putDirect(long handle, long writeOptHandle, ByteBuffer key, int keyOffset, + int keyLength, ByteBuffer value, int valueOffset, int valueLength, long cfHandle) + throws RocksDBException; private native long iterator(final long handle); private native long iterator(final long handle, final long readOptHandle); private native long iteratorCF(final long handle, final long cfHandle); @@ -4261,6 +4423,11 @@ public class RocksDB extends RocksObject { private native Map getMapProperty(final long nativeHandle, final long cfHandle, final String property, final int propertyLength) throws RocksDBException; + private native int getDirect(long handle, long readOptHandle, ByteBuffer key, int keyOffset, + int keyLength, ByteBuffer value, int valueOffset, int valueLength, long cfHandle) + throws RocksDBException; + private native void deleteDirect(long handle, long optHandle, ByteBuffer key, int keyOffset, + int keyLength, long cfHandle) throws RocksDBException; private native long getLongProperty(final long nativeHandle, final long cfHandle, final String property, final int propertyLength) throws RocksDBException; diff --git a/java/src/main/java/org/rocksdb/RocksIterator.java b/java/src/main/java/org/rocksdb/RocksIterator.java index 12c06f04e..94611cd7a 100644 --- a/java/src/main/java/org/rocksdb/RocksIterator.java +++ b/java/src/main/java/org/rocksdb/RocksIterator.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** *

An iterator that yields a sequence of key/value pairs from a source. * Multiple implementations are provided by this library. @@ -37,6 +39,28 @@ public class RocksIterator extends AbstractRocksIterator { return key0(nativeHandle_); } + /** + *

Return the key for the current entry. The underlying storage for + * the returned slice is valid only until the next modification of + * the iterator.

+ * + *

REQUIRES: {@link #isValid()}

+ * + * @param key the out-value to receive the retrieved key. + * It is using position and limit. Limit is set according to key size. + * Supports direct buffer only. + * @return The size of the actual key. If the return key is greater than the + * length of {@code key}, then it indicates that the size of the + * input buffer {@code key} is insufficient and partial result will + * be returned. + */ + public int key(ByteBuffer key) { + assert (isOwningHandle() && key.isDirect()); + int result = keyDirect0(nativeHandle_, key, key.position(), key.remaining()); + key.limit(Math.min(key.position() + result, key.limit())); + return result; + } + /** *

Return the value for the current entry. The underlying storage for * the returned slice is valid only until the next modification of @@ -50,6 +74,28 @@ public class RocksIterator extends AbstractRocksIterator { return value0(nativeHandle_); } + /** + *

Return the value for the current entry. The underlying storage for + * the returned slice is valid only until the next modification of + * the iterator.

+ * + *

REQUIRES: {@link #isValid()}

+ * + * @param value the out-value to receive the retrieved value. + * It is using position and limit. Limit is set according to value size. + * Supports direct buffer only. + * @return The size of the actual value. If the return value is greater than the + * length of {@code value}, then it indicates that the size of the + * input buffer {@code value} is insufficient and partial result will + * be returned. + */ + public int value(ByteBuffer value) { + assert (isOwningHandle() && value.isDirect()); + int result = valueDirect0(nativeHandle_, value, value.position(), value.remaining()); + value.limit(Math.min(value.position() + result, value.limit())); + return result; + } + @Override protected final native void disposeInternal(final long handle); @Override final native boolean isValid0(long handle); @Override final native void seekToFirst0(long handle); @@ -58,8 +104,15 @@ public class RocksIterator extends AbstractRocksIterator { @Override final native void prev0(long handle); @Override final native void seek0(long handle, byte[] target, int targetLen); @Override final native void seekForPrev0(long handle, byte[] target, int targetLen); + @Override + final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); + @Override + final native void seekForPrevDirect0( + long handle, ByteBuffer target, int targetOffset, int targetLen); @Override final native void status0(long handle) throws RocksDBException; private native byte[] key0(long handle); private native byte[] value0(long handle); + private native int keyDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); + private native int valueDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); } diff --git a/java/src/main/java/org/rocksdb/RocksIteratorInterface.java b/java/src/main/java/org/rocksdb/RocksIteratorInterface.java index a5a9eb88d..ddd2593c1 100644 --- a/java/src/main/java/org/rocksdb/RocksIteratorInterface.java +++ b/java/src/main/java/org/rocksdb/RocksIteratorInterface.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** *

Defines the interface for an Iterator which provides * access to data one entry at a time. Multiple implementations @@ -64,6 +66,29 @@ public interface RocksIteratorInterface { */ void seekForPrev(byte[] target); + /** + *

Position at the first entry in the source whose key is that or + * past target.

+ * + *

The iterator is valid after this call if the source contains + * a key that comes at or past target.

+ * + * @param target byte array describing a key or a + * key prefix to seek for. Supports direct buffer only. + */ + void seek(ByteBuffer target); + + /** + *

Position at the last key that is less than or equal to the target key.

+ * + *

The iterator is valid after this call if the source contains + * a key that comes at or past target.

+ * + * @param target byte array describing a key or a + * key prefix to seek for. Supports direct buffer only. + */ + void seekForPrev(ByteBuffer target); + /** *

Moves to the next entry in the source. After this call, Valid() is * true if the iterator was not positioned at the last entry in the source.

diff --git a/java/src/main/java/org/rocksdb/SstFileReaderIterator.java b/java/src/main/java/org/rocksdb/SstFileReaderIterator.java index d01b7a390..8ba39ba03 100644 --- a/java/src/main/java/org/rocksdb/SstFileReaderIterator.java +++ b/java/src/main/java/org/rocksdb/SstFileReaderIterator.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** *

An iterator that yields a sequence of key/value pairs from a source. * Multiple implementations are provided by this library. @@ -37,6 +39,28 @@ public class SstFileReaderIterator extends AbstractRocksIterator return key0(nativeHandle_); } + /** + *

Return the key for the current entry. The underlying storage for + * the returned slice is valid only until the next modification of + * the iterator.

+ * + *

REQUIRES: {@link #isValid()}

+ * + * @param key the out-value to receive the retrieved key. + * It is using position and limit. Limit is set according to key size. + * Supports direct buffer only. + * @return The size of the actual key. If the return key is greater than the + * length of {@code key}, then it indicates that the size of the + * input buffer {@code key} is insufficient and partial result will + * be returned. + */ + public int key(ByteBuffer key) { + assert (isOwningHandle() && key.isDirect()); + int result = keyDirect0(nativeHandle_, key, key.position(), key.remaining()); + key.limit(Math.min(key.position() + result, key.limit())); + return result; + } + /** *

Return the value for the current entry. The underlying storage for * the returned slice is valid only until the next modification of @@ -50,6 +74,28 @@ public class SstFileReaderIterator extends AbstractRocksIterator return value0(nativeHandle_); } + /** + *

Return the value for the current entry. The underlying storage for + * the returned slice is valid only until the next modification of + * the iterator.

+ * + *

REQUIRES: {@link #isValid()}

+ * + * @param value the out-value to receive the retrieved value. + * It is using position and limit. Limit is set according to value size. + * Supports direct buffer only. + * @return The size of the actual value. If the return value is greater than the + * length of {@code value}, then it indicates that the size of the + * input buffer {@code value} is insufficient and partial result will + * be returned. + */ + public int value(ByteBuffer value) { + assert (isOwningHandle() && value.isDirect()); + int result = valueDirect0(nativeHandle_, value, value.position(), value.remaining()); + value.limit(Math.min(value.position() + result, value.limit())); + return result; + } + @Override protected final native void disposeInternal(final long handle); @Override final native boolean isValid0(long handle); @Override final native void seekToFirst0(long handle); @@ -62,4 +108,13 @@ public class SstFileReaderIterator extends AbstractRocksIterator private native byte[] key0(long handle); private native byte[] value0(long handle); + + private native int keyDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); + private native int valueDirect0(long handle, ByteBuffer buffer, int bufferOffset, int bufferLen); + + @Override + final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); + @Override + final native void seekForPrevDirect0( + long handle, ByteBuffer target, int targetOffset, int targetLen); } diff --git a/java/src/main/java/org/rocksdb/SstFileWriter.java b/java/src/main/java/org/rocksdb/SstFileWriter.java index 9e08d7553..6d9c559bf 100644 --- a/java/src/main/java/org/rocksdb/SstFileWriter.java +++ b/java/src/main/java/org/rocksdb/SstFileWriter.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** * SstFileWriter is used to create sst files that can be added to the * database later. All keys in files generated by SstFileWriter will have @@ -118,6 +120,23 @@ public class SstFileWriter extends RocksObject { put(nativeHandle_, key.getNativeHandle(), value.getNativeHandle()); } + /** + * Add a Put key with value to currently opened file. + * + * @param key the specified key to be inserted. + * @param value the value associated with the specified key. + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public void put(final ByteBuffer key, final ByteBuffer value) throws RocksDBException { + assert key.isDirect() && value.isDirect(); + putDirect(nativeHandle_, key, key.position(), key.remaining(), value, value.position(), + value.remaining()); + key.position(key.limit()); + value.position(value.limit()); + } + /** * Add a Put key with value to currently opened file. * @@ -127,10 +146,9 @@ public class SstFileWriter extends RocksObject { * @throws RocksDBException thrown if error happens in underlying * native library. */ -public void put(final byte[] key, final byte[] value) - throws RocksDBException { - put(nativeHandle_, key, value); -} + public void put(final byte[] key, final byte[] value) throws RocksDBException { + put(nativeHandle_, key, value); + } /** * Add a Merge key with value to currently opened file. @@ -223,6 +241,16 @@ public void put(final byte[] key, final byte[] value) finish(nativeHandle_); } + /** + * Return the current file size. + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public long fileSize() throws RocksDBException { + return fileSize(nativeHandle_); + } + private native static long newSstFileWriter( final long envOptionsHandle, final long optionsHandle, final long userComparatorHandle, final byte comparatorType); @@ -239,6 +267,11 @@ public void put(final byte[] key, final byte[] value) private native void put(final long handle, final byte[] key, final byte[] value) throws RocksDBException; + private native void putDirect(long handle, ByteBuffer key, int keyOffset, int keyLength, + ByteBuffer value, int valueOffset, int valueLength) throws RocksDBException; + + private native long fileSize(long handle) throws RocksDBException; + private native void merge(final long handle, final long keyHandle, final long valueHandle) throws RocksDBException; diff --git a/java/src/main/java/org/rocksdb/WBWIRocksIterator.java b/java/src/main/java/org/rocksdb/WBWIRocksIterator.java index 482351e99..60922ae4b 100644 --- a/java/src/main/java/org/rocksdb/WBWIRocksIterator.java +++ b/java/src/main/java/org/rocksdb/WBWIRocksIterator.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + public class WBWIRocksIterator extends AbstractRocksIterator { private final WriteEntry entry = new WriteEntry(); @@ -47,6 +49,8 @@ public class WBWIRocksIterator @Override final native void seek0(long handle, byte[] target, int targetLen); @Override final native void seekForPrev0(long handle, byte[] target, int targetLen); @Override final native void status0(long handle) throws RocksDBException; + @Override + final native void seekDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen); private native long[] entry1(final long handle); @@ -185,4 +189,9 @@ public class WBWIRocksIterator key.close(); } } + + @Override + void seekForPrevDirect0(long handle, ByteBuffer target, int targetOffset, int targetLen) { + throw new IllegalAccessError("Not implemented"); + } } diff --git a/java/src/main/java/org/rocksdb/WriteBatch.java b/java/src/main/java/org/rocksdb/WriteBatch.java index 5673a25ef..01dbe5a5a 100644 --- a/java/src/main/java/org/rocksdb/WriteBatch.java +++ b/java/src/main/java/org/rocksdb/WriteBatch.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** * WriteBatch holds a collection of updates to apply atomically to a DB. * @@ -223,6 +225,10 @@ public class WriteBatch extends AbstractWriteBatch { @Override final native void put(final long handle, final byte[] key, final int keyLen, final byte[] value, final int valueLen, final long cfHandle); + @Override + final native void putDirect(final long handle, final ByteBuffer key, final int keyOffset, + final int keyLength, final ByteBuffer value, final int valueOffset, final int valueLength, + final long cfHandle); @Override final native void merge(final long handle, final byte[] key, final int keyLen, final byte[] value, final int valueLen); @Override final native void merge(final long handle, final byte[] key, @@ -237,6 +243,9 @@ public class WriteBatch extends AbstractWriteBatch { @Override final native void singleDelete(final long handle, final byte[] key, final int keyLen, final long cfHandle) throws RocksDBException; @Override + final native void removeDirect(final long handle, final ByteBuffer key, final int keyOffset, + final int keyLength, final long cfHandle) throws RocksDBException; + @Override final native void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen, final byte[] endKey, final int endKeyLen); @Override diff --git a/java/src/main/java/org/rocksdb/WriteBatchInterface.java b/java/src/main/java/org/rocksdb/WriteBatchInterface.java index e0999e21b..1f1ddc4ad 100644 --- a/java/src/main/java/org/rocksdb/WriteBatchInterface.java +++ b/java/src/main/java/org/rocksdb/WriteBatchInterface.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** *

Defines the interface for a Write Batch which * holds a collection of updates to apply atomically to a DB.

@@ -40,6 +42,33 @@ public interface WriteBatchInterface { void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException; + /** + *

Store the mapping "key->value" within given column + * family.

+ * + * @param key the specified key to be inserted. It is using position and limit. + * Supports direct buffer only. + * @param value the value associated with the specified key. It is using position and limit. + * Supports direct buffer only. + * @throws RocksDBException + */ + void put(ByteBuffer key, ByteBuffer value) throws RocksDBException; + + /** + *

Store the mapping "key->value" within given column + * family.

+ * + * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} + * instance + * @param key the specified key to be inserted. It is using position and limit. + * Supports direct buffer only. + * @param value the value associated with the specified key. It is using position and limit. + * Supports direct buffer only. + * @throws RocksDBException + */ + void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) + throws RocksDBException; + /** *

Merge "value" with the existing value of "key" in the database. * "key->merge(existing, value)"

@@ -156,6 +185,25 @@ public interface WriteBatchInterface { void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) throws RocksDBException; + /** + *

If column family contains a mapping for "key", erase it. Else do nothing.

+ * + * @param key Key to delete within database. It is using position and limit. + * Supports direct buffer only. + * @throws RocksDBException + */ + void remove(ByteBuffer key) throws RocksDBException; + + /** + *

If column family contains a mapping for "key", erase it. Else do nothing.

+ * + * @param columnFamilyHandle {@link ColumnFamilyHandle} instance + * @param key Key to delete within database. It is using position and limit. + * Supports direct buffer only. + * @throws RocksDBException + */ + void remove(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException; + /** * Removes the database entries in the range ["beginKey", "endKey"), i.e., * including "beginKey" and excluding "endKey". a non-OK status on error. It diff --git a/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java b/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java index 796fbb54f..57e4c2da5 100644 --- a/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java +++ b/java/src/main/java/org/rocksdb/WriteBatchWithIndex.java @@ -5,6 +5,8 @@ package org.rocksdb; +import java.nio.ByteBuffer; + /** * Similar to {@link org.rocksdb.WriteBatch} but with a binary searchable * index built for all the keys inserted. @@ -255,6 +257,10 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { @Override final native void put(final long handle, final byte[] key, final int keyLen, final byte[] value, final int valueLen, final long cfHandle); + @Override + final native void putDirect(final long handle, final ByteBuffer key, final int keyOffset, + final int keyLength, final ByteBuffer value, final int valueOffset, final int valueLength, + final long cfHandle); @Override final native void merge(final long handle, final byte[] key, final int keyLen, final byte[] value, final int valueLen); @Override final native void merge(final long handle, final byte[] key, @@ -269,6 +275,9 @@ public class WriteBatchWithIndex extends AbstractWriteBatch { @Override final native void singleDelete(final long handle, final byte[] key, final int keyLen, final long cfHandle) throws RocksDBException; @Override + final native void removeDirect(final long handle, final ByteBuffer key, final int keyOffset, + final int keyLength, final long cfHandle) throws RocksDBException; + @Override final native void deleteRange(final long handle, final byte[] beginKey, final int beginKeyLen, final byte[] endKey, final int endKeyLen); @Override diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 37f17e1a7..b4d96ed43 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -186,7 +186,7 @@ public class RocksDBTest { @Test public void put() throws RocksDBException { try (final RocksDB db = RocksDB.open(dbFolder.getRoot().getAbsolutePath()); - final WriteOptions opt = new WriteOptions()) { + final WriteOptions opt = new WriteOptions(); final ReadOptions optr = new ReadOptions()) { db.put("key1".getBytes(), "value".getBytes()); db.put(opt, "key2".getBytes(), "12345678".getBytes()); assertThat(db.get("key1".getBytes())).isEqualTo( @@ -194,6 +194,47 @@ public class RocksDBTest { assertThat(db.get("key2".getBytes())).isEqualTo( "12345678".getBytes()); + ByteBuffer key = ByteBuffer.allocateDirect(12); + ByteBuffer value = ByteBuffer.allocateDirect(12); + key.position(4); + key.put("key3".getBytes()); + key.position(4).limit(8); + value.position(4); + value.put("val3".getBytes()); + value.position(4).limit(8); + + db.put(opt, key, value); + + assertThat(key.position()).isEqualTo(8); + assertThat(key.limit()).isEqualTo(8); + + assertThat(value.position()).isEqualTo(8); + assertThat(value.limit()).isEqualTo(8); + + key.position(4); + + ByteBuffer result = ByteBuffer.allocateDirect(12); + assertThat(db.get(optr, key, result)).isEqualTo(4); + assertThat(result.position()).isEqualTo(0); + assertThat(result.limit()).isEqualTo(4); + assertThat(key.position()).isEqualTo(8); + assertThat(key.limit()).isEqualTo(8); + + byte[] tmp = new byte[4]; + result.get(tmp); + assertThat(tmp).isEqualTo("val3".getBytes()); + + key.position(4); + + result.clear().position(9); + assertThat(db.get(optr, key, result)).isEqualTo(4); + assertThat(result.position()).isEqualTo(9); + assertThat(result.limit()).isEqualTo(12); + assertThat(key.position()).isEqualTo(8); + assertThat(key.limit()).isEqualTo(8); + byte[] tmp2 = new byte[3]; + result.get(tmp2); + assertThat(tmp2).isEqualTo("val".getBytes()); // put Segment key3 = sliceSegment("key3"); @@ -473,16 +514,23 @@ public class RocksDBTest { final WriteOptions wOpt = new WriteOptions()) { db.put("key1".getBytes(), "value".getBytes()); db.put("key2".getBytes(), "12345678".getBytes()); + db.put("key3".getBytes(), "33".getBytes()); assertThat(db.get("key1".getBytes())).isEqualTo( "value".getBytes()); assertThat(db.get("key2".getBytes())).isEqualTo( "12345678".getBytes()); + assertThat(db.get("key3".getBytes())).isEqualTo("33".getBytes()); db.delete("key1".getBytes()); db.delete(wOpt, "key2".getBytes()); + ByteBuffer key = ByteBuffer.allocateDirect(16); + key.put("key3".getBytes()).flip(); + db.delete(wOpt, key); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + assertThat(db.get("key1".getBytes())).isNull(); assertThat(db.get("key2".getBytes())).isNull(); - Segment key3 = sliceSegment("key3"); Segment key4 = sliceSegment("key4"); db.put("key3".getBytes(), "key3 value".getBytes()); @@ -1103,9 +1151,11 @@ public class RocksDBTest { try (final RocksDB db = RocksDB.open(options, dbPath)) { db.put("key1".getBytes(), "value".getBytes()); } - assertThat(dbFolder.getRoot().exists()).isTrue(); + assertThat(dbFolder.getRoot().exists() && dbFolder.getRoot().listFiles().length != 0) + .isTrue(); RocksDB.destroyDB(dbPath, options); - assertThat(dbFolder.getRoot().exists()).isFalse(); + assertThat(dbFolder.getRoot().exists() && dbFolder.getRoot().listFiles().length != 0) + .isFalse(); } } diff --git a/java/src/test/java/org/rocksdb/RocksIteratorTest.java b/java/src/test/java/org/rocksdb/RocksIteratorTest.java index 96872e595..a8f773b57 100644 --- a/java/src/test/java/org/rocksdb/RocksIteratorTest.java +++ b/java/src/test/java/org/rocksdb/RocksIteratorTest.java @@ -4,13 +4,14 @@ // (found in the LICENSE.Apache file in the root directory). package org.rocksdb; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.assertj.core.api.Assertions.assertThat; - public class RocksIteratorTest { @ClassRule @@ -35,6 +36,39 @@ public class RocksIteratorTest { assertThat(iterator.isValid()).isTrue(); assertThat(iterator.key()).isEqualTo("key1".getBytes()); assertThat(iterator.value()).isEqualTo("value1".getBytes()); + + ByteBuffer key = ByteBuffer.allocateDirect(2); + ByteBuffer value = ByteBuffer.allocateDirect(2); + assertThat(iterator.key(key)).isEqualTo(4); + assertThat(iterator.value(value)).isEqualTo(6); + + assertThat(key.position()).isEqualTo(0); + assertThat(key.limit()).isEqualTo(2); + assertThat(value.position()).isEqualTo(0); + assertThat(value.limit()).isEqualTo(2); + + byte[] tmp = new byte[2]; + key.get(tmp); + assertThat(tmp).isEqualTo("ke".getBytes()); + value.get(tmp); + assertThat(tmp).isEqualTo("va".getBytes()); + + key = ByteBuffer.allocateDirect(12); + value = ByteBuffer.allocateDirect(12); + assertThat(iterator.key(key)).isEqualTo(4); + assertThat(iterator.value(value)).isEqualTo(6); + assertThat(key.position()).isEqualTo(0); + assertThat(key.limit()).isEqualTo(4); + assertThat(value.position()).isEqualTo(0); + assertThat(value.limit()).isEqualTo(6); + + tmp = new byte[4]; + key.get(tmp); + assertThat(tmp).isEqualTo("key1".getBytes()); + tmp = new byte[6]; + value.get(tmp); + assertThat(tmp).isEqualTo("value1".getBytes()); + iterator.next(); assertThat(iterator.isValid()).isTrue(); assertThat(iterator.key()).isEqualTo("key2".getBytes()); @@ -52,6 +86,24 @@ public class RocksIteratorTest { assertThat(iterator.key()).isEqualTo("key2".getBytes()); assertThat(iterator.value()).isEqualTo("value2".getBytes()); iterator.status(); + + key.clear(); + key.put("key1".getBytes()); + key.flip(); + iterator.seek(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); + + key.clear(); + key.put("key2".getBytes()); + key.flip(); + iterator.seekForPrev(key); + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.value()).isEqualTo("value2".getBytes()); + assertThat(key.position()).isEqualTo(4); + assertThat(key.limit()).isEqualTo(4); } try (final RocksIterator iterator = db.newIterator()) { diff --git a/java/src/test/java/org/rocksdb/SstFileReaderTest.java b/java/src/test/java/org/rocksdb/SstFileReaderTest.java index c0e3a73d8..0b841f420 100644 --- a/java/src/test/java/org/rocksdb/SstFileReaderTest.java +++ b/java/src/test/java/org/rocksdb/SstFileReaderTest.java @@ -5,19 +5,19 @@ package org.rocksdb; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.util.BytewiseComparator; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.util.BytewiseComparator; public class SstFileReaderTest { private static final String SST_FILE_NAME = "test.sst"; @@ -128,6 +128,28 @@ public class SstFileReaderTest { // Check key and value assertThat(iterator.key()).isEqualTo("key1".getBytes()); assertThat(iterator.value()).isEqualTo("value1".getBytes()); + + ByteBuffer direct = ByteBuffer.allocateDirect(128); + direct.put("key1".getBytes()).flip(); + iterator.seek(direct); + assertThat(direct.position()).isEqualTo(4); + assertThat(direct.limit()).isEqualTo(4); + + assertThat(iterator.isValid()).isTrue(); + assertThat(iterator.key()).isEqualTo("key1".getBytes()); + assertThat(iterator.value()).isEqualTo("value1".getBytes()); + + direct.clear(); + assertThat(iterator.key(direct)).isEqualTo("key1".getBytes().length); + byte[] dst = new byte["key1".getBytes().length]; + direct.get(dst); + assertThat(new String(dst)).isEqualTo("key1"); + + direct.clear(); + assertThat(iterator.value(direct)).isEqualTo("value1".getBytes().length); + dst = new byte["value1".getBytes().length]; + direct.get(dst); + assertThat(new String(dst)).isEqualTo("value1"); } } } diff --git a/java/src/test/java/org/rocksdb/SstFileWriterTest.java b/java/src/test/java/org/rocksdb/SstFileWriterTest.java index 6ed093e85..0a5506fc1 100644 --- a/java/src/test/java/org/rocksdb/SstFileWriterTest.java +++ b/java/src/test/java/org/rocksdb/SstFileWriterTest.java @@ -5,20 +5,20 @@ package org.rocksdb; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.util.BytewiseComparator; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.fail; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.util.BytewiseComparator; public class SstFileWriterTest { private static final String SST_FILE_NAME = "test.sst"; @@ -30,7 +30,7 @@ public class SstFileWriterTest { @Rule public TemporaryFolder parentFolder = new TemporaryFolder(); - enum OpType { PUT, PUT_BYTES, MERGE, MERGE_BYTES, DELETE, DELETE_BYTES} + enum OpType { PUT, PUT_BYTES, PUT_DIRECT, MERGE, MERGE_BYTES, DELETE, DELETE_BYTES } class KeyValueWithOp { KeyValueWithOp(String key, String value, OpType opType) { @@ -76,11 +76,18 @@ public class SstFileWriterTest { final File sstFile = parentFolder.newFile(SST_FILE_NAME); try { sstFileWriter.open(sstFile.getAbsolutePath()); + assertThat(sstFileWriter.fileSize()).isEqualTo(0); for (KeyValueWithOp keyValue : keyValues) { Slice keySlice = new Slice(keyValue.getKey()); Slice valueSlice = new Slice(keyValue.getValue()); byte[] keyBytes = keyValue.getKey().getBytes(); byte[] valueBytes = keyValue.getValue().getBytes(); + ByteBuffer keyDirect = ByteBuffer.allocateDirect(keyBytes.length); + keyDirect.put(keyBytes); + keyDirect.flip(); + ByteBuffer valueDirect = ByteBuffer.allocateDirect(valueBytes.length); + valueDirect.put(valueBytes); + valueDirect.flip(); switch (keyValue.getOpType()) { case PUT: sstFileWriter.put(keySlice, valueSlice); @@ -88,6 +95,13 @@ public class SstFileWriterTest { case PUT_BYTES: sstFileWriter.put(keyBytes, valueBytes); break; + case PUT_DIRECT: + sstFileWriter.put(keyDirect, valueDirect); + assertThat(keyDirect.position()).isEqualTo(keyBytes.length); + assertThat(keyDirect.limit()).isEqualTo(keyBytes.length); + assertThat(valueDirect.position()).isEqualTo(valueBytes.length); + assertThat(valueDirect.limit()).isEqualTo(valueBytes.length); + break; case MERGE: sstFileWriter.merge(keySlice, valueSlice); break; @@ -107,6 +121,7 @@ public class SstFileWriterTest { valueSlice.close(); } sstFileWriter.finish(); + assertThat(sstFileWriter.fileSize()).isGreaterThan(100); } finally { assertThat(sstFileWriter).isNotNull(); sstFileWriter.close(); @@ -152,7 +167,7 @@ public class SstFileWriterTest { public void ingestSstFile() throws RocksDBException, IOException { final List keyValues = new ArrayList<>(); keyValues.add(new KeyValueWithOp("key1", "value1", OpType.PUT)); - keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT)); + keyValues.add(new KeyValueWithOp("key2", "value2", OpType.PUT_DIRECT)); keyValues.add(new KeyValueWithOp("key3", "value3", OpType.PUT_BYTES)); keyValues.add(new KeyValueWithOp("key4", "value4", OpType.MERGE)); keyValues.add(new KeyValueWithOp("key5", "value5", OpType.MERGE_BYTES)); diff --git a/java/src/test/java/org/rocksdb/WriteBatchTest.java b/java/src/test/java/org/rocksdb/WriteBatchTest.java index 8e79ce2d0..f915c7dcb 100644 --- a/java/src/test/java/org/rocksdb/WriteBatchTest.java +++ b/java/src/test/java/org/rocksdb/WriteBatchTest.java @@ -8,6 +8,17 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. package org.rocksdb; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.rocksdb.util.CapturingWriteBatchHandler.Action.DELETE; +import static org.rocksdb.util.CapturingWriteBatchHandler.Action.DELETE_RANGE; +import static org.rocksdb.util.CapturingWriteBatchHandler.Action.LOG; +import static org.rocksdb.util.CapturingWriteBatchHandler.Action.MERGE; +import static org.rocksdb.util.CapturingWriteBatchHandler.Action.PUT; +import static org.rocksdb.util.CapturingWriteBatchHandler.Action.SINGLE_DELETE; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -16,10 +27,6 @@ import org.rocksdb.util.CapturingWriteBatchHandler; import org.rocksdb.util.CapturingWriteBatchHandler.Event; import org.rocksdb.util.WriteBatchGetter; -import static org.assertj.core.api.Assertions.assertThat; -import static org.rocksdb.util.CapturingWriteBatchHandler.Action.*; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * This class mimics the db/write_batch_test.cc * in the c++ rocksdb library. @@ -77,6 +84,38 @@ public class WriteBatchTest { } } + @Test + public void multipleBatchOperationsDirect() + throws UnsupportedEncodingException, RocksDBException { + try (WriteBatch batch = new WriteBatch()) { + ByteBuffer key = ByteBuffer.allocateDirect(16); + ByteBuffer value = ByteBuffer.allocateDirect(16); + key.put("foo".getBytes("US-ASCII")).flip(); + value.put("bar".getBytes("US-ASCII")).flip(); + batch.put(key, value); + assertThat(key.position()).isEqualTo(3); + assertThat(key.limit()).isEqualTo(3); + assertThat(value.position()).isEqualTo(3); + assertThat(value.limit()).isEqualTo(3); + + key.clear(); + key.put("box".getBytes("US-ASCII")).flip(); + batch.remove(key); + assertThat(key.position()).isEqualTo(3); + assertThat(key.limit()).isEqualTo(3); + + batch.put("baz".getBytes("US-ASCII"), "boo".getBytes("US-ASCII")); + + WriteBatchTestInternalHelper.setSequence(batch, 100); + assertThat(WriteBatchTestInternalHelper.sequence(batch)).isNotNull().isEqualTo(100); + assertThat(batch.count()).isEqualTo(3); + assertThat(new String(getContents(batch), "US-ASCII")) + .isEqualTo("Put(baz, boo)@102" + + "Delete(box)@101" + + "Put(foo, bar)@100"); + } + } + @Test public void testAppendOperation() throws RocksDBException { diff --git a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java index 12032dd8f..b2204ed3e 100644 --- a/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java +++ b/java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java @@ -9,18 +9,16 @@ package org.rocksdb; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.Arrays; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.nio.ByteBuffer; -import java.util.Arrays; - -import static org.assertj.core.api.Assertions.assertThat; -import static java.nio.charset.StandardCharsets.UTF_8; - - public class WriteBatchWithIndexTest { @ClassRule @@ -127,6 +125,36 @@ public class WriteBatchWithIndexTest { } } + @Test + public void write_writeBatchWithIndexDirect() throws RocksDBException { + try (final Options options = new Options().setCreateIfMissing(true); + final RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath())) { + ByteBuffer k1 = ByteBuffer.allocateDirect(16); + ByteBuffer v1 = ByteBuffer.allocateDirect(16); + ByteBuffer k2 = ByteBuffer.allocateDirect(16); + ByteBuffer v2 = ByteBuffer.allocateDirect(16); + k1.put("key1".getBytes()).flip(); + v1.put("value1".getBytes()).flip(); + k2.put("key2".getBytes()).flip(); + v2.put("value2".getBytes()).flip(); + + try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex()) { + wbwi.put(k1, v1); + assertThat(k1.position()).isEqualTo(4); + assertThat(k1.limit()).isEqualTo(4); + assertThat(v1.position()).isEqualTo(6); + assertThat(v1.limit()).isEqualTo(6); + + wbwi.put(k2, v2); + + db.write(new WriteOptions(), wbwi); + } + + assertThat(db.get("key1".getBytes())).isEqualTo("value1".getBytes()); + assertThat(db.get("key2".getBytes())).isEqualTo("value2".getBytes()); + } + } + @Test public void iterator() throws RocksDBException { try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true)) { @@ -199,6 +227,13 @@ public class WriteBatchWithIndexTest { final WBWIRocksIterator.WriteEntry entry = it.entry(); assertThat(entry).isEqualTo(expected[testOffset]); + + // Direct buffer seek + expected[testOffset].getKey().data().mark(); + ByteBuffer db = expected[testOffset].getKey().data(); + it.seek(db); + assertThat(db.position()).isEqualTo(key.length); + assertThat(it.isValid()).isTrue(); } //forward iterative access diff --git a/java/src/test/java/org/rocksdb/util/BytewiseComparatorTest.java b/java/src/test/java/org/rocksdb/util/BytewiseComparatorTest.java index ff90a4b7a..2e2ddc543 100644 --- a/java/src/test/java/org/rocksdb/util/BytewiseComparatorTest.java +++ b/java/src/test/java/org/rocksdb/util/BytewiseComparatorTest.java @@ -505,5 +505,15 @@ public class BytewiseComparatorTest { return entries.get(offset).getValue(); } } + + @Override + public void seek(ByteBuffer target) { + throw new IllegalAccessError("Not implemented"); + } + + @Override + public void seekForPrev(ByteBuffer target) { + throw new IllegalAccessError("Not implemented"); + } } }