diff --git a/HISTORY.md b/HISTORY.md index 429c891db..ecd63516f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### New Features ### Public API Change +* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate. ### Bug Fixes * Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls. diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index c1e2441bc..8db055890 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -208,8 +208,10 @@ class Transaction { // Read this key and ensure that this transaction will only // be able to be committed if this key is not written outside this // transaction after it has first been read (or after the snapshot if a - // snapshot is set in this transaction). The transaction behavior is the - // same regardless of whether the key exists or not. + // snapshot is set in this transaction and do_validate is true). If + // do_validate is false, ReadOptions::snapshot is expected to be nullptr so + // that GetForUpdate returns the latest committed value. The transaction + // behavior is the same regardless of whether the key exists or not. // // Note: Currently, this function will return Status::MergeInProgress // if the most recent write to the queried key in this batch is a Merge. @@ -234,27 +236,31 @@ class Transaction { virtual Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, - bool exclusive = true) = 0; + bool exclusive = true, + bool do_validate = true) = 0; // An overload of the above method that receives a PinnableSlice // For backward compatibility a default implementation is provided virtual Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, - bool exclusive = true) { + bool exclusive = true, + const bool do_validate = true) { if (pinnable_val == nullptr) { std::string* null_str = nullptr; - return GetForUpdate(options, column_family, key, null_str, exclusive); + return GetForUpdate(options, column_family, key, null_str, exclusive, + do_validate); } else { auto s = GetForUpdate(options, column_family, key, - pinnable_val->GetSelf(), exclusive); + pinnable_val->GetSelf(), exclusive, do_validate); pinnable_val->PinSelf(); return s; } } virtual Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value, bool exclusive = true) = 0; + std::string* value, bool exclusive = true, + const bool do_validate = true) = 0; virtual std::vector MultiGetForUpdate( const ReadOptions& options, @@ -287,6 +293,9 @@ class Transaction { // functions in WriteBatch, but will also do conflict checking on the // keys being written. // + // assume_tracked=false expects the key be already tracked. If valid then it + // skips ValidateSnapshot. Returns error otherwise. + // // If this Transaction was created on an OptimisticTransactionDB, these // functions should always return Status::OK(). // @@ -299,28 +308,33 @@ class Transaction { // (See max_write_buffer_number_to_maintain) // or other errors on unexpected failures. virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) = 0; + const Slice& value, const bool assume_tracked = false) = 0; virtual Status Put(const Slice& key, const Slice& value) = 0; virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) = 0; + const SliceParts& value, + const bool assume_tracked = false) = 0; virtual Status Put(const SliceParts& key, const SliceParts& value) = 0; virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) = 0; + const Slice& value, + const bool assume_tracked = false) = 0; virtual Status Merge(const Slice& key, const Slice& value) = 0; - virtual Status Delete(ColumnFamilyHandle* column_family, - const Slice& key) = 0; + virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) = 0; virtual Status Delete(const Slice& key) = 0; virtual Status Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) = 0; + const SliceParts& key, + const bool assume_tracked = false) = 0; virtual Status Delete(const SliceParts& key) = 0; virtual Status SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) = 0; + const Slice& key, + const bool assume_tracked = false) = 0; virtual Status SingleDelete(const Slice& key) = 0; virtual Status SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) = 0; + const SliceParts& key, + const bool assume_tracked = false) = 0; virtual Status SingleDelete(const SliceParts& key) = 0; // PutUntracked() will write a Put to the batch of operations to be committed diff --git a/java/rocksjni/transaction.cc b/java/rocksjni/transaction.cc index a29736df2..04eb654df 100644 --- a/java/rocksjni/transaction.cc +++ b/java/rocksjni/transaction.cc @@ -418,20 +418,20 @@ jobjectArray Java_org_rocksdb_Transaction_multiGet__JJ_3_3B( /* * Class: org_rocksdb_Transaction * Method: getForUpdate - * Signature: (JJ[BIJZ)[B + * Signature: (JJ[BIJZZ)[B */ -jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZ( +jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jlong jread_options_handle, jbyteArray jkey, jint jkey_part_len, jlong jcolumn_family_handle, - jboolean jexclusive) { + jboolean jexclusive, jboolean jdo_validate) { auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); auto* txn = reinterpret_cast(jhandle); FnGet fn_get_for_update = std::bind( + const rocksdb::Slice&, std::string*, bool, bool)>( &rocksdb::Transaction::GetForUpdate, txn, _1, column_family_handle, _2, - _3, jexclusive); + _3, jexclusive, jdo_validate); return txn_get_helper(env, fn_get_for_update, jread_options_handle, jkey, jkey_part_len); } @@ -439,15 +439,17 @@ jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZ( /* * Class: org_rocksdb_Transaction * Method: getForUpdate - * Signature: (JJ[BIZ)[B + * Signature: (JJ[BIZZ)[B */ -jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIZ( +jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIZZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jlong jread_options_handle, - jbyteArray jkey, jint jkey_part_len, jboolean jexclusive) { + jbyteArray jkey, jint jkey_part_len, jboolean jexclusive, + jboolean jdo_validate) { auto* txn = reinterpret_cast(jhandle); FnGet fn_get_for_update = std::bind( - &rocksdb::Transaction::GetForUpdate, txn, _1, _2, _3, jexclusive); + const rocksdb::ReadOptions&, const rocksdb::Slice&, std::string*, bool, + bool)>(&rocksdb::Transaction::GetForUpdate, txn, _1, _2, _3, jexclusive, + jdo_validate); return txn_get_helper(env, fn_get_for_update, jread_options_handle, jkey, jkey_part_len); } @@ -568,19 +570,20 @@ void txn_write_kv_helper(JNIEnv* env, const FnWriteKV& fn_write_kv, /* * Class: org_rocksdb_Transaction * Method: put - * Signature: (J[BI[BIJ)V + * Signature: (J[BI[BIJZ)V */ -void Java_org_rocksdb_Transaction_put__J_3BI_3BIJ( +void Java_org_rocksdb_Transaction_put__J_3BI_3BIJZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey, jint jkey_part_len, jbyteArray jval, jint jval_len, - jlong jcolumn_family_handle) { + jlong jcolumn_family_handle, jboolean jassume_tracked) { auto* txn = reinterpret_cast(jhandle); auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); FnWriteKV fn_put = std::bind(&rocksdb::Transaction::Put, txn, - column_family_handle, _1, _2); + const rocksdb::Slice&, bool)>(&rocksdb::Transaction::Put, txn, + column_family_handle, _1, _2, + jassume_tracked); txn_write_kv_helper(env, fn_put, jkey, jkey_part_len, jval, jval_len); } @@ -706,20 +709,21 @@ void txn_write_kv_parts_helper(JNIEnv* env, /* * Class: org_rocksdb_Transaction * Method: put - * Signature: (J[[BI[[BIJ)V + * Signature: (J[[BI[[BIJZ)V */ -void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BIJ( +void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BIJZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts, jint jkey_parts_len, jobjectArray jvalue_parts, jint jvalue_parts_len, - jlong jcolumn_family_handle) { + jlong jcolumn_family_handle, jboolean jassume_tracked) { auto* txn = reinterpret_cast(jhandle); auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); FnWriteKVParts fn_put_parts = std::bind(&rocksdb::Transaction::Put, txn, - column_family_handle, _1, _2); + const rocksdb::SliceParts&, bool)>(&rocksdb::Transaction::Put, txn, + column_family_handle, _1, _2, + jassume_tracked); txn_write_kv_parts_helper(env, fn_put_parts, jkey_parts, jkey_parts_len, jvalue_parts, jvalue_parts_len); } @@ -744,19 +748,20 @@ void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BI( /* * Class: org_rocksdb_Transaction * Method: merge - * Signature: (J[BI[BIJ)V + * Signature: (J[BI[BIJZ)V */ -void Java_org_rocksdb_Transaction_merge__J_3BI_3BIJ( +void Java_org_rocksdb_Transaction_merge__J_3BI_3BIJZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey, jint jkey_part_len, jbyteArray jval, jint jval_len, - jlong jcolumn_family_handle) { + jlong jcolumn_family_handle, jboolean jassume_tracked) { auto* txn = reinterpret_cast(jhandle); auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); FnWriteKV fn_merge = std::bind(&rocksdb::Transaction::Merge, txn, - column_family_handle, _1, _2); + const rocksdb::Slice&, bool)>(&rocksdb::Transaction::Merge, txn, + column_family_handle, _1, _2, + jassume_tracked); txn_write_kv_helper(env, fn_merge, jkey, jkey_part_len, jval, jval_len); } @@ -803,18 +808,18 @@ void txn_write_k_helper(JNIEnv* env, const FnWriteK& fn_write_k, /* * Class: org_rocksdb_Transaction * Method: delete - * Signature: (J[BIJ)V + * Signature: (J[BIJZ)V */ -void Java_org_rocksdb_Transaction_delete__J_3BIJ(JNIEnv* env, jobject /*jobj*/, - jlong jhandle, jbyteArray jkey, - jint jkey_part_len, - jlong jcolumn_family_handle) { +void Java_org_rocksdb_Transaction_delete__J_3BIJZ( + JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey, + jint jkey_part_len, jlong jcolumn_family_handle, jboolean jassume_tracked) { auto* txn = reinterpret_cast(jhandle); auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); FnWriteK fn_delete = std::bind( - &rocksdb::Transaction::Delete, txn, column_family_handle, _1); + rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&, bool)>( + &rocksdb::Transaction::Delete, txn, column_family_handle, _1, + jassume_tracked); txn_write_k_helper(env, fn_delete, jkey, jkey_part_len); } @@ -892,18 +897,20 @@ void txn_write_k_parts_helper(JNIEnv* env, /* * Class: org_rocksdb_Transaction * Method: delete - * Signature: (J[[BIJ)V + * Signature: (J[[BIJZ)V */ -void Java_org_rocksdb_Transaction_delete__J_3_3BIJ( +void Java_org_rocksdb_Transaction_delete__J_3_3BIJZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts, - jint jkey_parts_len, jlong jcolumn_family_handle) { + jint jkey_parts_len, jlong jcolumn_family_handle, + jboolean jassume_tracked) { auto* txn = reinterpret_cast(jhandle); auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); FnWriteKParts fn_delete_parts = std::bind( - &rocksdb::Transaction::Delete, txn, column_family_handle, _1); + rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&, bool)>( + &rocksdb::Transaction::Delete, txn, column_family_handle, _1, + jassume_tracked); txn_write_k_parts_helper(env, fn_delete_parts, jkey_parts, jkey_parts_len); } @@ -926,18 +933,19 @@ void Java_org_rocksdb_Transaction_delete__J_3_3BI(JNIEnv* env, jobject /*jobj*/, /* * Class: org_rocksdb_Transaction * Method: singleDelete - * Signature: (J[BIJ)V + * Signature: (J[BIJZ)V */ -void Java_org_rocksdb_Transaction_singleDelete__J_3BIJ( +void Java_org_rocksdb_Transaction_singleDelete__J_3BIJZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey, - jint jkey_part_len, jlong jcolumn_family_handle) { + jint jkey_part_len, jlong jcolumn_family_handle, jboolean jassume_tracked) { auto* txn = reinterpret_cast(jhandle); auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); FnWriteK fn_single_delete = std::bind( - &rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1); + rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&, bool)>( + &rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1, + jassume_tracked); txn_write_k_helper(env, fn_single_delete, jkey, jkey_part_len); } @@ -961,18 +969,20 @@ void Java_org_rocksdb_Transaction_singleDelete__J_3BI(JNIEnv* env, /* * Class: org_rocksdb_Transaction * Method: singleDelete - * Signature: (J[[BIJ)V + * Signature: (J[[BIJZ)V */ -void Java_org_rocksdb_Transaction_singleDelete__J_3_3BIJ( +void Java_org_rocksdb_Transaction_singleDelete__J_3_3BIJZ( JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts, - jint jkey_parts_len, jlong jcolumn_family_handle) { + jint jkey_parts_len, jlong jcolumn_family_handle, + jboolean jassume_tracked) { auto* txn = reinterpret_cast(jhandle); auto* column_family_handle = reinterpret_cast(jcolumn_family_handle); FnWriteKParts fn_single_delete_parts = std::bind( - &rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1); + rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&, bool)>( + &rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1, + jassume_tracked); txn_write_k_parts_helper(env, fn_single_delete_parts, jkey_parts, jkey_parts_len); } diff --git a/java/src/main/java/org/rocksdb/Transaction.java b/java/src/main/java/org/rocksdb/Transaction.java index c619bb105..96f1143d4 100644 --- a/java/src/main/java/org/rocksdb/Transaction.java +++ b/java/src/main/java/org/rocksdb/Transaction.java @@ -433,6 +433,33 @@ public class Transaction extends RocksObject { * @param key the key to retrieve the value for. * @param exclusive true if the transaction should have exclusive access to * the key, otherwise false for shared access. + * @param do_validate true if it should validate the snapshot before doing the read + * + * @return a byte array storing the value associated with the input key if + * any. null if it does not find the specified key. + * + * @throws RocksDBException thrown if error happens in underlying + * native library. + */ + public byte[] getForUpdate(final ReadOptions readOptions, + final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean exclusive, + final boolean do_validate) throws RocksDBException { + assert (isOwningHandle()); + return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key, key.length, + columnFamilyHandle.nativeHandle_, exclusive, do_validate); + } + + /** + * Same as + * {@link #getForUpdate(ReadOptions, ColumnFamilyHandle, byte[], boolean, boolean)} + * with do_validate=true. + * + * @param readOptions Read options. + * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle} + * instance + * @param key the key to retrieve the value for. + * @param exclusive true if the transaction should have exclusive access to + * the key, otherwise false for shared access. * * @return a byte array storing the value associated with the input key if * any. null if it does not find the specified key. @@ -444,8 +471,8 @@ public class Transaction extends RocksObject { final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean exclusive) throws RocksDBException { assert(isOwningHandle()); - return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key, - key.length, columnFamilyHandle.nativeHandle_, exclusive); + return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key, key.length, + columnFamilyHandle.nativeHandle_, exclusive, true /*do_validate*/); } /** @@ -495,8 +522,8 @@ public class Transaction extends RocksObject { public byte[] getForUpdate(final ReadOptions readOptions, final byte[] key, final boolean exclusive) throws RocksDBException { assert(isOwningHandle()); - return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key, - key.length, exclusive); + return getForUpdate( + nativeHandle_, readOptions.nativeHandle_, key, key.length, exclusive, true /*do_validate*/); } /** @@ -635,11 +662,23 @@ public class Transaction extends RocksObject { * @throws RocksDBException when one of the TransactionalDB conditions * described above occurs, or in the case of an unexpected error */ + public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value, + final boolean assume_tracked) throws RocksDBException { + assert (isOwningHandle()); + put(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_, + assume_tracked); + } + + /* + * Same as + * {@link #put(ColumnFamilyHandle, byte[], byte[], boolean)} + * with assume_tracked=false. + */ public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value) throws RocksDBException { assert(isOwningHandle()); - put(nativeHandle_, key, key.length, value, value.length, - columnFamilyHandle.nativeHandle_); + put(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_, + /*assume_tracked*/ false); } /** @@ -683,12 +722,24 @@ public class Transaction extends RocksObject { * @throws RocksDBException when one of the TransactionalDB conditions * described above occurs, or in the case of an unexpected error */ + public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts, + final byte[][] valueParts, final boolean assume_tracked) throws RocksDBException { + assert (isOwningHandle()); + put(nativeHandle_, keyParts, keyParts.length, valueParts, valueParts.length, + columnFamilyHandle.nativeHandle_, assume_tracked); + } + + /* + * Same as + * {@link #put(ColumnFamilyHandle, byte[][], byte[][], boolean)} + * with assume_tracked=false. + */ public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts, final byte[][] valueParts) throws RocksDBException { assert(isOwningHandle()); put(nativeHandle_, keyParts, keyParts.length, valueParts, valueParts.length, - columnFamilyHandle.nativeHandle_); + columnFamilyHandle.nativeHandle_, /*assume_tracked*/ false); } //TODO(AR) refactor if we implement org.rocksdb.SliceParts in future @@ -733,11 +784,23 @@ public class Transaction extends RocksObject { * @throws RocksDBException when one of the TransactionalDB conditions * described above occurs, or in the case of an unexpected error */ + public void merge(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, + final byte[] value, final boolean assume_tracked) throws RocksDBException { + assert (isOwningHandle()); + merge(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_, + assume_tracked); + } + + /* + * Same as + * {@link #merge(ColumnFamilyHandle, byte[], byte[], boolean)} + * with assume_tracked=false. + */ public void merge(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value) throws RocksDBException { assert(isOwningHandle()); - merge(nativeHandle_, key, key.length, value, value.length, - columnFamilyHandle.nativeHandle_); + merge(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_, + /*assume_tracked*/ false); } /** @@ -790,10 +853,22 @@ public class Transaction extends RocksObject { * @throws RocksDBException when one of the TransactionalDB conditions * described above occurs, or in the case of an unexpected error */ + public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, + final boolean assume_tracked) throws RocksDBException { + assert (isOwningHandle()); + delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_, assume_tracked); + } + + /* + * Same as + * {@link #delete(ColumnFamilyHandle, byte[], boolean)} + * with assume_tracked=false. + */ public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) throws RocksDBException { assert(isOwningHandle()); - delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_); + delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_, + /*assume_tracked*/ false); } /** @@ -834,11 +909,23 @@ public class Transaction extends RocksObject { * @throws RocksDBException when one of the TransactionalDB conditions * described above occurs, or in the case of an unexpected error */ + public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts, + final boolean assume_tracked) throws RocksDBException { + assert (isOwningHandle()); + delete( + nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_, assume_tracked); + } + + /* + * Same as + * {@link #delete(ColumnFamilyHandle, byte[][], boolean)} + * with assume_tracked=false. + */ public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts) throws RocksDBException { assert(isOwningHandle()); - delete(nativeHandle_, keyParts, keyParts.length, - columnFamilyHandle.nativeHandle_); + delete(nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_, + /*assume_tracked*/ false); } //TODO(AR) refactor if we implement org.rocksdb.SliceParts in future @@ -880,11 +967,23 @@ public class Transaction extends RocksObject { * described above occurs, or in the case of an unexpected error */ @Experimental("Performance optimization for a very specific workload") - public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, - final byte[] key) throws RocksDBException { + public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, + final boolean assume_tracked) throws RocksDBException { + assert (isOwningHandle()); + singleDelete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_, assume_tracked); + } + + /* + * Same as + * {@link #singleDelete(ColumnFamilyHandle, byte[], boolean)} + * with assume_tracked=false. + */ + @Experimental("Performance optimization for a very specific workload") + public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key) + throws RocksDBException { assert(isOwningHandle()); - singleDelete(nativeHandle_, key, key.length, - columnFamilyHandle.nativeHandle_); + singleDelete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_, + /*assume_tracked*/ false); } /** @@ -927,11 +1026,24 @@ public class Transaction extends RocksObject { * described above occurs, or in the case of an unexpected error */ @Experimental("Performance optimization for a very specific workload") - public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, - final byte[][] keyParts) throws RocksDBException { + public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts, + final boolean assume_tracked) throws RocksDBException { + assert (isOwningHandle()); + singleDelete( + nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_, assume_tracked); + } + + /* + * Same as + * {@link #singleDelete(ColumnFamilyHandle, byte[][], boolean)} + * with assume_tracked=false. + */ + @Experimental("Performance optimization for a very specific workload") + public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts) + throws RocksDBException { assert(isOwningHandle()); - singleDelete(nativeHandle_, keyParts, keyParts.length, - columnFamilyHandle.nativeHandle_); + singleDelete(nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_, + /*assume_tracked*/ false); } //TODO(AR) refactor if we implement org.rocksdb.SliceParts in future @@ -1642,13 +1754,12 @@ public class Transaction extends RocksObject { private native byte[][] multiGet(final long handle, final long readOptionsHandle, final byte[][] keys) throws RocksDBException; - private native byte[] getForUpdate(final long handle, - final long readOptionsHandle, final byte key[], final int keyLength, - final long columnFamilyHandle, final boolean exclusive) + private native byte[] getForUpdate(final long handle, final long readOptionsHandle, + final byte key[], final int keyLength, final long columnFamilyHandle, final boolean exclusive, + final boolean do_validate) throws RocksDBException; + private native byte[] getForUpdate(final long handle, final long readOptionsHandle, + final byte key[], final int keyLen, final boolean exclusive, final boolean do_validate) throws RocksDBException; - private native byte[] getForUpdate(final long handle, - final long readOptionsHandle, final byte key[], final int keyLen, - final boolean exclusive) throws RocksDBException; private native byte[][] multiGetForUpdate(final long handle, final long readOptionsHandle, final byte[][] keys, final long[] columnFamilyHandles) throws RocksDBException; @@ -1659,42 +1770,38 @@ public class Transaction extends RocksObject { final long readOptionsHandle); private native long getIterator(final long handle, final long readOptionsHandle, final long columnFamilyHandle); - private native void put(final long handle, final byte[] key, - final int keyLength, final byte[] value, final int valueLength, - final long columnFamilyHandle) throws RocksDBException; + private native void put(final long handle, final byte[] key, final int keyLength, + final byte[] value, final int valueLength, final long columnFamilyHandle, + final boolean assume_tracked) throws RocksDBException; private native void put(final long handle, final byte[] key, final int keyLength, final byte[] value, final int valueLength) throws RocksDBException; - private native void put(final long handle, final byte[][] keys, - final int keysLength, final byte[][] values, final int valuesLength, - final long columnFamilyHandle) throws RocksDBException; + private native void put(final long handle, final byte[][] keys, final int keysLength, + final byte[][] values, final int valuesLength, final long columnFamilyHandle, + final boolean assume_tracked) throws RocksDBException; private native void put(final long handle, final byte[][] keys, final int keysLength, final byte[][] values, final int valuesLength) throws RocksDBException; - private native void merge(final long handle, final byte[] key, - final int keyLength, final byte[] value, final int valueLength, - final long columnFamilyHandle) throws RocksDBException; + private native void merge(final long handle, final byte[] key, final int keyLength, + final byte[] value, final int valueLength, final long columnFamilyHandle, + final boolean assume_tracked) throws RocksDBException; private native void merge(final long handle, final byte[] key, final int keyLength, final byte[] value, final int valueLength) throws RocksDBException; - private native void delete(final long handle, final byte[] key, - final int keyLength, final long columnFamilyHandle) - throws RocksDBException; + private native void delete(final long handle, final byte[] key, final int keyLength, + final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException; private native void delete(final long handle, final byte[] key, final int keyLength) throws RocksDBException; - private native void delete(final long handle, final byte[][] keys, - final int keysLength, final long columnFamilyHandle) - throws RocksDBException; + private native void delete(final long handle, final byte[][] keys, final int keysLength, + final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException; private native void delete(final long handle, final byte[][] keys, final int keysLength) throws RocksDBException; - private native void singleDelete(final long handle, final byte[] key, - final int keyLength, final long columnFamilyHandle) - throws RocksDBException; + private native void singleDelete(final long handle, final byte[] key, final int keyLength, + final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException; private native void singleDelete(final long handle, final byte[] key, final int keyLength) throws RocksDBException; - private native void singleDelete(final long handle, final byte[][] keys, - final int keysLength, final long columnFamilyHandle) - throws RocksDBException; + private native void singleDelete(final long handle, final byte[][] keys, final int keysLength, + final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException; private native void singleDelete(final long handle, final byte[][] keys, final int keysLength) throws RocksDBException; private native void putUntracked(final long handle, final byte[] key, diff --git a/util/jemalloc_nodump_allocator.cc b/util/jemalloc_nodump_allocator.cc index 1db939b4f..3b7415460 100644 --- a/util/jemalloc_nodump_allocator.cc +++ b/util/jemalloc_nodump_allocator.cc @@ -134,7 +134,7 @@ Status NewJemallocNodumpAllocator( std::shared_ptr* memory_allocator) { *memory_allocator = nullptr; #ifndef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR - (void) options; + (void)options; return Status::NotSupported( "JemallocNodumpAllocator only available with jemalloc version >= 5 " "and MADV_DONTDUMP is available."); diff --git a/utilities/transactions/optimistic_transaction.cc b/utilities/transactions/optimistic_transaction.cc index 89d3226d5..48c9180ae 100644 --- a/utilities/transactions/optimistic_transaction.cc +++ b/utilities/transactions/optimistic_transaction.cc @@ -80,8 +80,11 @@ Status OptimisticTransaction::Rollback() { // 'exclusive' is unused for OptimisticTransaction. Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, - bool exclusive, bool untracked) { - if (untracked) { + bool exclusive, const bool do_validate, + const bool assume_tracked) { + assert(!assume_tracked); // not supported + (void)assume_tracked; + if (!do_validate) { return Status::OK(); } uint32_t cfh_id = GetColumnFamilyID(column_family); diff --git a/utilities/transactions/optimistic_transaction.h b/utilities/transactions/optimistic_transaction.h index 5a19489f2..445979b96 100644 --- a/utilities/transactions/optimistic_transaction.h +++ b/utilities/transactions/optimistic_transaction.h @@ -48,8 +48,8 @@ class OptimisticTransaction : public TransactionBaseImpl { protected: Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool exclusive, - bool untracked = false) override; + bool read_only, bool exclusive, const bool do_validate = true, + const bool assume_tracked = false) override; private: OptimisticTransactionDB* const txn_db_; diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index d895d9d93..0a0163ecc 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -515,7 +515,9 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch, // the snapshot time. Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, - bool exclusive, bool skip_validate) { + bool exclusive, const bool do_validate, + const bool assume_tracked) { + assert(!assume_tracked || !do_validate); Status s; if (UNLIKELY(skip_concurrency_control_)) { return s; @@ -559,7 +561,11 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, // any writes since this transaction's snapshot. // TODO(agiardullo): could optimize by supporting shared txn locks in the // future - if (skip_validate || snapshot_ == nullptr) { + if (!do_validate || snapshot_ == nullptr) { + if (assume_tracked && !previously_locked) { + s = Status::InvalidArgument( + "assume_tracked is set but it is not tracked yet"); + } // Need to remember the earliest sequence number that we know that this // key has not been modified after. This is useful if this same // transaction @@ -628,7 +634,7 @@ Status PessimisticTransaction::ValidateSnapshot( // Otherwise we have either // 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key // 2: snap_seq < tracked_at_seq: last time we lock the key was via - // skip_validate option which means we had skipped ValidateSnapshot. In both + // do_validate=false which means we had skipped ValidateSnapshot. In both // cases we should do ValidateSnapshot now. *tracked_at_seq = snap_seq; diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 145d561fb..d09c239ce 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -135,8 +135,8 @@ class PessimisticTransaction : public TransactionBaseImpl { Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool read_only, bool exclusive, - bool skip_validate = false) override; + bool read_only, bool exclusive, const bool do_validate = true, + const bool assume_tracked = false) override; void Clear() override; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index ac459a256..23cc41be1 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -97,7 +97,8 @@ void TransactionBaseImpl::SetSnapshotIfNeeded() { Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool read_only, - bool exclusive, bool skip_validate) { + bool exclusive, const bool do_validate, + const bool assume_tracked) { size_t key_size = 0; for (int i = 0; i < key.num_parts; ++i) { key_size += key.parts[i].size(); @@ -110,7 +111,8 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, str.append(key.parts[i].data(), key.parts[i].size()); } - return TryLock(column_family, str, read_only, exclusive, skip_validate); + return TryLock(column_family, str, read_only, exclusive, do_validate, + assume_tracked); } void TransactionBaseImpl::SetSavePoint() { @@ -215,8 +217,15 @@ Status TransactionBaseImpl::Get(const ReadOptions& read_options, Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, - bool exclusive) { - Status s = TryLock(column_family, key, true /* read_only */, exclusive); + bool exclusive, + const bool do_validate) { + if (!do_validate && read_options.snapshot != nullptr) { + return Status::InvalidArgument( + "If do_validate is false then GetForUpdate with snapshot is not " + "defined."); + } + Status s = + TryLock(column_family, key, true /* read_only */, exclusive, do_validate); if (s.ok() && value != nullptr) { assert(value != nullptr); @@ -234,8 +243,15 @@ Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, - bool exclusive) { - Status s = TryLock(column_family, key, true /* read_only */, exclusive); + bool exclusive, + const bool do_validate) { + if (!do_validate && read_options.snapshot != nullptr) { + return Status::InvalidArgument( + "If do_validate is false then GetForUpdate with snapshot is not " + "defined."); + } + Status s = + TryLock(column_family, key, true /* read_only */, exclusive, do_validate); if (s.ok() && pinnable_val != nullptr) { s = Get(read_options, column_family, key, pinnable_val); @@ -303,9 +319,11 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, } Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* exclusive */); + const Slice& key, const Slice& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, do_validate, assume_tracked); if (s.ok()) { s = GetBatchForWrite()->Put(column_family, key, value); @@ -318,10 +336,11 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, } Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, - const SliceParts& key, - const SliceParts& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* exclusive */); + const SliceParts& key, const SliceParts& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, do_validate, assume_tracked); if (s.ok()) { s = GetBatchForWrite()->Put(column_family, key, value); @@ -334,9 +353,11 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, } Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* exclusive */); + const Slice& key, const Slice& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, do_validate, assume_tracked); if (s.ok()) { s = GetBatchForWrite()->Merge(column_family, key, value); @@ -349,9 +370,11 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, } Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* exclusive */); + const Slice& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, do_validate, assume_tracked); if (s.ok()) { s = GetBatchForWrite()->Delete(column_family, key); @@ -364,9 +387,11 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, } Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* exclusive */); + const SliceParts& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, do_validate, assume_tracked); if (s.ok()) { s = GetBatchForWrite()->Delete(column_family, key); @@ -379,9 +404,11 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, } Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* exclusive */); + const Slice& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, do_validate, assume_tracked); if (s.ok()) { s = GetBatchForWrite()->SingleDelete(column_family, key); @@ -394,9 +421,11 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, } Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - Status s = - TryLock(column_family, key, false /* read_only */, true /* exclusive */); + const SliceParts& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + Status s = TryLock(column_family, key, false /* read_only */, + true /* exclusive */, do_validate, assume_tracked); if (s.ok()) { s = GetBatchForWrite()->SingleDelete(column_family, key); @@ -411,7 +440,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { Status s = TryLock(column_family, key, false /* read_only */, - true /* exclusive */, true /* skip_validate */); + true /* exclusive */, false /* do_validate */); if (s.ok()) { s = GetBatchForWrite()->Put(column_family, key, value); @@ -427,7 +456,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { Status s = TryLock(column_family, key, false /* read_only */, - true /* exclusive */, true /* skip_validate */); + true /* exclusive */, false /* do_validate */); if (s.ok()) { s = GetBatchForWrite()->Put(column_family, key, value); @@ -443,7 +472,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { Status s = TryLock(column_family, key, false /* read_only */, - true /* exclusive */, true /* skip_validate */); + true /* exclusive */, false /* do_validate */); if (s.ok()) { s = GetBatchForWrite()->Merge(column_family, key, value); @@ -458,7 +487,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, const Slice& key) { Status s = TryLock(column_family, key, false /* read_only */, - true /* exclusive */, true /* skip_validate */); + true /* exclusive */, false /* do_validate */); if (s.ok()) { s = GetBatchForWrite()->Delete(column_family, key); @@ -473,7 +502,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, const SliceParts& key) { Status s = TryLock(column_family, key, false /* read_only */, - true /* exclusive */, true /* skip_validate */); + true /* exclusive */, false /* do_validate */); if (s.ok()) { s = GetBatchForWrite()->Delete(column_family, key); @@ -488,7 +517,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::SingleDeleteUntracked( ColumnFamilyHandle* column_family, const Slice& key) { Status s = TryLock(column_family, key, false /* read_only */, - true /* exclusive */, true /* skip_validate */); + true /* exclusive */, false /* do_validate */); if (s.ok()) { s = GetBatchForWrite()->SingleDelete(column_family, key); diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 171e13588..9154b3274 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -36,11 +36,12 @@ class TransactionBaseImpl : public Transaction { // Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed. - // skip_validate will be true if called from PutUntracked, DeleteUntracked, or - // MergeUntracked. + // do_validate will be false if called from PutUntracked, DeleteUntracked, + // MergeUntracked, or GetForUpdate(do_validate=false) virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, bool read_only, bool exclusive, - bool skip_validate = false) = 0; + const bool do_validate = true, + const bool assume_tracked = false) = 0; void SetSavePoint() override; @@ -63,16 +64,19 @@ class TransactionBaseImpl : public Transaction { using Transaction::GetForUpdate; Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - std::string* value, bool exclusive) override; + std::string* value, bool exclusive, + const bool do_validate) override; Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - PinnableSlice* pinnable_val, bool exclusive) override; + PinnableSlice* pinnable_val, bool exclusive, + const bool do_validate) override; Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value, bool exclusive) override { + std::string* value, bool exclusive, + const bool do_validate) override { return GetForUpdate(options, db_->DefaultColumnFamily(), key, value, - exclusive); + exclusive, do_validate); } std::vector MultiGet( @@ -109,36 +113,38 @@ class TransactionBaseImpl : public Transaction { ColumnFamilyHandle* column_family) override; Status Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + const Slice& value, const bool assume_tracked = false) override; Status Put(const Slice& key, const Slice& value) override { return Put(nullptr, key, value); } Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; + const SliceParts& value, + const bool assume_tracked = false) override; Status Put(const SliceParts& key, const SliceParts& value) override { return Put(nullptr, key, value); } Status Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + const Slice& value, const bool assume_tracked = false) override; Status Merge(const Slice& key, const Slice& value) override { return Merge(nullptr, key, value); } - Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; Status Delete(const Slice& key) override { return Delete(nullptr, key); } - Status Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; + Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } - Status SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) override; + Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; Status SingleDelete(const Slice& key) override { return SingleDelete(nullptr, key); } - Status SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; + Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; Status SingleDelete(const SliceParts& key) override { return SingleDelete(nullptr, key); } @@ -335,7 +341,8 @@ class TransactionBaseImpl : public Transaction { std::shared_ptr snapshot_notifier_ = nullptr; Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, - bool read_only, bool exclusive, bool skip_validate = false); + bool read_only, bool exclusive, const bool do_validate = true, + const bool assume_tracked = false); WriteBatchBase* GetBatchForWrite(); void SetSnapshotInternal(const Snapshot* snapshot); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 0968b9a34..ec27d3e45 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -140,6 +140,61 @@ TEST_P(TransactionTest, SuccessTest) { delete txn; } +// The test clarifies the contract of do_validate and assume_tracked +// in GetForUpdate and Put/Merge/Delete +TEST_P(TransactionTest, AssumeExclusiveTracked) { + WriteOptions write_options; + ReadOptions read_options; + std::string value; + Status s; + TransactionOptions txn_options; + txn_options.lock_timeout = 1; + const bool EXCLUSIVE = true; + const bool DO_VALIDATE = true; + const bool ASSUME_LOCKED = true; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); + txn->SetSnapshot(); + + // commit a value after the snapshot is taken + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + + // By default write should fail to the commit after our snapshot + s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE); + ASSERT_TRUE(s.IsBusy()); + // But the user could direct the db to skip validating the snapshot. The read + // value then should be the most recently committed + ASSERT_OK( + txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE)); + ASSERT_EQ(value, "bar"); + + // Although ValidateSnapshot is skipped the key must have still got locked + s = db->Put(write_options, Slice("foo"), Slice("bar")); + ASSERT_TRUE(s.IsTimedOut()); + + // By default the write operations should fail due to the commit after the + // snapshot + s = txn->Put(Slice("foo"), Slice("bar1")); + ASSERT_TRUE(s.IsBusy()); + s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"), + !ASSUME_LOCKED); + ASSERT_TRUE(s.IsBusy()); + // But the user could direct the db that it already assumes exclusive lock on + // the key due to the previous GetForUpdate call. + ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"), + ASSUME_LOCKED)); + ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"), + ASSUME_LOCKED)); + ASSERT_OK( + txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED)); + ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"), + ASSUME_LOCKED)); + + txn->Rollback(); + delete txn; +} + // This test clarifies the contract of ValidateSnapshot TEST_P(TransactionTest, ValidateSnapshotTest) { for (bool with_2pc : {true, false}) { diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index d4efe8ff9..93a4bbe81 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -84,66 +84,72 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { } Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { + const Slice& key, const Slice& value, + const bool assume_tracked) { Status s = MaybeFlushWriteBatchToDB(); if (!s.ok()) { return s; } - return TransactionBaseImpl::Put(column_family, key, value); + return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); } Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, - const SliceParts& key, const SliceParts& value) { + const SliceParts& key, const SliceParts& value, + const bool assume_tracked) { Status s = MaybeFlushWriteBatchToDB(); if (!s.ok()) { return s; } - return TransactionBaseImpl::Put(column_family, key, value); + return TransactionBaseImpl::Put(column_family, key, value, assume_tracked); } Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { + const Slice& key, const Slice& value, + const bool assume_tracked) { Status s = MaybeFlushWriteBatchToDB(); if (!s.ok()) { return s; } - return TransactionBaseImpl::Merge(column_family, key, value); + return TransactionBaseImpl::Merge(column_family, key, value, assume_tracked); } Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { + const Slice& key, const bool assume_tracked) { Status s = MaybeFlushWriteBatchToDB(); if (!s.ok()) { return s; } - return TransactionBaseImpl::Delete(column_family, key); + return TransactionBaseImpl::Delete(column_family, key, assume_tracked); } Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { + const SliceParts& key, + const bool assume_tracked) { Status s = MaybeFlushWriteBatchToDB(); if (!s.ok()) { return s; } - return TransactionBaseImpl::Delete(column_family, key); + return TransactionBaseImpl::Delete(column_family, key, assume_tracked); } Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) { + const Slice& key, + const bool assume_tracked) { Status s = MaybeFlushWriteBatchToDB(); if (!s.ok()) { return s; } - return TransactionBaseImpl::SingleDelete(column_family, key); + return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); } Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) { + const SliceParts& key, + const bool assume_tracked) { Status s = MaybeFlushWriteBatchToDB(); if (!s.ok()) { return s; } - return TransactionBaseImpl::SingleDelete(column_family, key); + return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked); } Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 84594070a..ac9702fc0 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -48,25 +48,31 @@ class WriteUnpreparedTxn : public WritePreparedTxn { using TransactionBaseImpl::Put; virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + const Slice& value, + const bool assume_tracked = false) override; virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; + const SliceParts& value, + const bool assume_tracked = false) override; using TransactionBaseImpl::Merge; virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + const Slice& value, + const bool assume_tracked = false) override; using TransactionBaseImpl::Delete; + virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; virtual Status Delete(ColumnFamilyHandle* column_family, - const Slice& key) override; - virtual Status Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; + const SliceParts& key, + const bool assume_tracked = false) override; using TransactionBaseImpl::SingleDelete; virtual Status SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) override; + const Slice& key, + const bool assume_tracked = false) override; virtual Status SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; + const SliceParts& key, + const bool assume_tracked = false) override; virtual Status RebuildFromWriteBatch(WriteBatch*) override { // This function was only useful for recovering prepared transactions, but