Extend Transaction::GetForUpdate with do_validate (#4680)

Summary:
Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot (if there is any) before doing the read. After the read it also returns the latest value (expects the ReadOptions::snapshot to be nullptr). This allows RocksDB applications to use GetForUpdate similarly to how InnoDB does. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_exclusive_tracked with default value of false. It true it indicates that call is assumed to be after a ::GetForUpdate(do_validate=false).
The Java APIs are accordingly updated.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4680

Differential Revision: D13068508

Pulled By: maysamyabandeh

fbshipit-source-id: f0b59db28f7f6a078b60844d902057140765e67d
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent 1d679e35fd
commit b878f93c70
  1. 1
      HISTORY.md
  2. 44
      include/rocksdb/utilities/transaction.h
  3. 104
      java/rocksjni/transaction.cc
  4. 201
      java/src/main/java/org/rocksdb/Transaction.java
  5. 2
      util/jemalloc_nodump_allocator.cc
  6. 7
      utilities/transactions/optimistic_transaction.cc
  7. 4
      utilities/transactions/optimistic_transaction.h
  8. 12
      utilities/transactions/pessimistic_transaction.cc
  9. 4
      utilities/transactions/pessimistic_transaction.h
  10. 97
      utilities/transactions/transaction_base.cc
  11. 43
      utilities/transactions/transaction_base.h
  12. 55
      utilities/transactions/transaction_test.cc
  13. 34
      utilities/transactions/write_unprepared_txn.cc
  14. 22
      utilities/transactions/write_unprepared_txn.h

@ -3,6 +3,7 @@
### New Features
### Public API Change
* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate.
### Bug Fixes
* Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls.

@ -208,8 +208,10 @@ class Transaction {
// Read this key and ensure that this transaction will only
// be able to be committed if this key is not written outside this
// transaction after it has first been read (or after the snapshot if a
// snapshot is set in this transaction). The transaction behavior is the
// same regardless of whether the key exists or not.
// snapshot is set in this transaction and do_validate is true). If
// do_validate is false, ReadOptions::snapshot is expected to be nullptr so
// that GetForUpdate returns the latest committed value. The transaction
// behavior is the same regardless of whether the key exists or not.
//
// Note: Currently, this function will return Status::MergeInProgress
// if the most recent write to the queried key in this batch is a Merge.
@ -234,27 +236,31 @@ class Transaction {
virtual Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool exclusive = true) = 0;
bool exclusive = true,
bool do_validate = true) = 0;
// An overload of the above method that receives a PinnableSlice
// For backward compatibility a default implementation is provided
virtual Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val,
bool exclusive = true) {
bool exclusive = true,
const bool do_validate = true) {
if (pinnable_val == nullptr) {
std::string* null_str = nullptr;
return GetForUpdate(options, column_family, key, null_str, exclusive);
return GetForUpdate(options, column_family, key, null_str, exclusive,
do_validate);
} else {
auto s = GetForUpdate(options, column_family, key,
pinnable_val->GetSelf(), exclusive);
pinnable_val->GetSelf(), exclusive, do_validate);
pinnable_val->PinSelf();
return s;
}
}
virtual Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value, bool exclusive = true) = 0;
std::string* value, bool exclusive = true,
const bool do_validate = true) = 0;
virtual std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
@ -287,6 +293,9 @@ class Transaction {
// functions in WriteBatch, but will also do conflict checking on the
// keys being written.
//
// assume_tracked=false expects the key be already tracked. If valid then it
// skips ValidateSnapshot. Returns error otherwise.
//
// If this Transaction was created on an OptimisticTransactionDB, these
// functions should always return Status::OK().
//
@ -299,28 +308,33 @@ class Transaction {
// (See max_write_buffer_number_to_maintain)
// or other errors on unexpected failures.
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
const Slice& value, const bool assume_tracked = false) = 0;
virtual Status Put(const Slice& key, const Slice& value) = 0;
virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) = 0;
const SliceParts& value,
const bool assume_tracked = false) = 0;
virtual Status Put(const SliceParts& key, const SliceParts& value) = 0;
virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
const Slice& value,
const bool assume_tracked = false) = 0;
virtual Status Merge(const Slice& key, const Slice& value) = 0;
virtual Status Delete(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) = 0;
virtual Status Delete(const Slice& key) = 0;
virtual Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) = 0;
const SliceParts& key,
const bool assume_tracked = false) = 0;
virtual Status Delete(const SliceParts& key) = 0;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
const Slice& key,
const bool assume_tracked = false) = 0;
virtual Status SingleDelete(const Slice& key) = 0;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) = 0;
const SliceParts& key,
const bool assume_tracked = false) = 0;
virtual Status SingleDelete(const SliceParts& key) = 0;
// PutUntracked() will write a Put to the batch of operations to be committed

@ -418,20 +418,20 @@ jobjectArray Java_org_rocksdb_Transaction_multiGet__JJ_3_3B(
/*
* Class: org_rocksdb_Transaction
* Method: getForUpdate
* Signature: (JJ[BIJZ)[B
* Signature: (JJ[BIJZZ)[B
*/
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZ(
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_part_len, jlong jcolumn_family_handle,
jboolean jexclusive) {
jboolean jexclusive, jboolean jdo_validate) {
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
FnGet fn_get_for_update = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
const rocksdb::ReadOptions&, rocksdb::ColumnFamilyHandle*,
const rocksdb::Slice&, std::string*, bool)>(
const rocksdb::Slice&, std::string*, bool, bool)>(
&rocksdb::Transaction::GetForUpdate, txn, _1, column_family_handle, _2,
_3, jexclusive);
_3, jexclusive, jdo_validate);
return txn_get_helper(env, fn_get_for_update, jread_options_handle, jkey,
jkey_part_len);
}
@ -439,15 +439,17 @@ jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZ(
/*
* Class: org_rocksdb_Transaction
* Method: getForUpdate
* Signature: (JJ[BIZ)[B
* Signature: (JJ[BIZZ)[B
*/
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIZ(
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIZZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_part_len, jboolean jexclusive) {
jbyteArray jkey, jint jkey_part_len, jboolean jexclusive,
jboolean jdo_validate) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
FnGet fn_get_for_update = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
const rocksdb::ReadOptions&, const rocksdb::Slice&, std::string*, bool)>(
&rocksdb::Transaction::GetForUpdate, txn, _1, _2, _3, jexclusive);
const rocksdb::ReadOptions&, const rocksdb::Slice&, std::string*, bool,
bool)>(&rocksdb::Transaction::GetForUpdate, txn, _1, _2, _3, jexclusive,
jdo_validate);
return txn_get_helper(env, fn_get_for_update, jread_options_handle, jkey,
jkey_part_len);
}
@ -568,19 +570,20 @@ void txn_write_kv_helper(JNIEnv* env, const FnWriteKV& fn_write_kv,
/*
* Class: org_rocksdb_Transaction
* Method: put
* Signature: (J[BI[BIJ)V
* Signature: (J[BI[BIJZ)V
*/
void Java_org_rocksdb_Transaction_put__J_3BI_3BIJ(
void Java_org_rocksdb_Transaction_put__J_3BI_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jbyteArray jval, jint jval_len,
jlong jcolumn_family_handle) {
jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKV fn_put = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&,
const rocksdb::Slice&)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2);
const rocksdb::Slice&, bool)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2,
jassume_tracked);
txn_write_kv_helper(env, fn_put, jkey, jkey_part_len, jval, jval_len);
}
@ -706,20 +709,21 @@ void txn_write_kv_parts_helper(JNIEnv* env,
/*
* Class: org_rocksdb_Transaction
* Method: put
* Signature: (J[[BI[[BIJ)V
* Signature: (J[[BI[[BIJZ)V
*/
void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BIJ(
void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts,
jint jkey_parts_len, jobjectArray jvalue_parts, jint jvalue_parts_len,
jlong jcolumn_family_handle) {
jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKVParts fn_put_parts =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&,
const rocksdb::SliceParts&)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2);
const rocksdb::SliceParts&, bool)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2,
jassume_tracked);
txn_write_kv_parts_helper(env, fn_put_parts, jkey_parts, jkey_parts_len,
jvalue_parts, jvalue_parts_len);
}
@ -744,19 +748,20 @@ void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BI(
/*
* Class: org_rocksdb_Transaction
* Method: merge
* Signature: (J[BI[BIJ)V
* Signature: (J[BI[BIJZ)V
*/
void Java_org_rocksdb_Transaction_merge__J_3BI_3BIJ(
void Java_org_rocksdb_Transaction_merge__J_3BI_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jbyteArray jval, jint jval_len,
jlong jcolumn_family_handle) {
jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKV fn_merge = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&,
const rocksdb::Slice&)>(&rocksdb::Transaction::Merge, txn,
column_family_handle, _1, _2);
const rocksdb::Slice&, bool)>(&rocksdb::Transaction::Merge, txn,
column_family_handle, _1, _2,
jassume_tracked);
txn_write_kv_helper(env, fn_merge, jkey, jkey_part_len, jval, jval_len);
}
@ -803,18 +808,18 @@ void txn_write_k_helper(JNIEnv* env, const FnWriteK& fn_write_k,
/*
* Class: org_rocksdb_Transaction
* Method: delete
* Signature: (J[BIJ)V
* Signature: (J[BIJZ)V
*/
void Java_org_rocksdb_Transaction_delete__J_3BIJ(JNIEnv* env, jobject /*jobj*/,
jlong jhandle, jbyteArray jkey,
jint jkey_part_len,
jlong jcolumn_family_handle) {
void Java_org_rocksdb_Transaction_delete__J_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteK fn_delete = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&, bool)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_helper(env, fn_delete, jkey, jkey_part_len);
}
@ -892,18 +897,20 @@ void txn_write_k_parts_helper(JNIEnv* env,
/*
* Class: org_rocksdb_Transaction
* Method: delete
* Signature: (J[[BIJ)V
* Signature: (J[[BIJZ)V
*/
void Java_org_rocksdb_Transaction_delete__J_3_3BIJ(
void Java_org_rocksdb_Transaction_delete__J_3_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts,
jint jkey_parts_len, jlong jcolumn_family_handle) {
jint jkey_parts_len, jlong jcolumn_family_handle,
jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKParts fn_delete_parts =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&, bool)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_parts_helper(env, fn_delete_parts, jkey_parts, jkey_parts_len);
}
@ -926,18 +933,19 @@ void Java_org_rocksdb_Transaction_delete__J_3_3BI(JNIEnv* env, jobject /*jobj*/,
/*
* Class: org_rocksdb_Transaction
* Method: singleDelete
* Signature: (J[BIJ)V
* Signature: (J[BIJZ)V
*/
void Java_org_rocksdb_Transaction_singleDelete__J_3BIJ(
void Java_org_rocksdb_Transaction_singleDelete__J_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jlong jcolumn_family_handle) {
jint jkey_part_len, jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteK fn_single_delete =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&, bool)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_helper(env, fn_single_delete, jkey, jkey_part_len);
}
@ -961,18 +969,20 @@ void Java_org_rocksdb_Transaction_singleDelete__J_3BI(JNIEnv* env,
/*
* Class: org_rocksdb_Transaction
* Method: singleDelete
* Signature: (J[[BIJ)V
* Signature: (J[[BIJZ)V
*/
void Java_org_rocksdb_Transaction_singleDelete__J_3_3BIJ(
void Java_org_rocksdb_Transaction_singleDelete__J_3_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts,
jint jkey_parts_len, jlong jcolumn_family_handle) {
jint jkey_parts_len, jlong jcolumn_family_handle,
jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKParts fn_single_delete_parts =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&, bool)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_parts_helper(env, fn_single_delete_parts, jkey_parts,
jkey_parts_len);
}

@ -433,6 +433,33 @@ public class Transaction extends RocksObject {
* @param key the key to retrieve the value for.
* @param exclusive true if the transaction should have exclusive access to
* the key, otherwise false for shared access.
* @param do_validate true if it should validate the snapshot before doing the read
*
* @return a byte array storing the value associated with the input key if
* any. null if it does not find the specified key.
*
* @throws RocksDBException thrown if error happens in underlying
* native library.
*/
public byte[] getForUpdate(final ReadOptions readOptions,
final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final boolean exclusive,
final boolean do_validate) throws RocksDBException {
assert (isOwningHandle());
return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key, key.length,
columnFamilyHandle.nativeHandle_, exclusive, do_validate);
}
/**
* Same as
* {@link #getForUpdate(ReadOptions, ColumnFamilyHandle, byte[], boolean, boolean)}
* with do_validate=true.
*
* @param readOptions Read options.
* @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle}
* instance
* @param key the key to retrieve the value for.
* @param exclusive true if the transaction should have exclusive access to
* the key, otherwise false for shared access.
*
* @return a byte array storing the value associated with the input key if
* any. null if it does not find the specified key.
@ -444,8 +471,8 @@ public class Transaction extends RocksObject {
final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final boolean exclusive) throws RocksDBException {
assert(isOwningHandle());
return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key,
key.length, columnFamilyHandle.nativeHandle_, exclusive);
return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key, key.length,
columnFamilyHandle.nativeHandle_, exclusive, true /*do_validate*/);
}
/**
@ -495,8 +522,8 @@ public class Transaction extends RocksObject {
public byte[] getForUpdate(final ReadOptions readOptions, final byte[] key,
final boolean exclusive) throws RocksDBException {
assert(isOwningHandle());
return getForUpdate(nativeHandle_, readOptions.nativeHandle_, key,
key.length, exclusive);
return getForUpdate(
nativeHandle_, readOptions.nativeHandle_, key, key.length, exclusive, true /*do_validate*/);
}
/**
@ -635,11 +662,23 @@ public class Transaction extends RocksObject {
* @throws RocksDBException when one of the TransactionalDB conditions
* described above occurs, or in the case of an unexpected error
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key, final byte[] value,
final boolean assume_tracked) throws RocksDBException {
assert (isOwningHandle());
put(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_,
assume_tracked);
}
/*
* Same as
* {@link #put(ColumnFamilyHandle, byte[], byte[], boolean)}
* with assume_tracked=false.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final byte[] value) throws RocksDBException {
assert(isOwningHandle());
put(nativeHandle_, key, key.length, value, value.length,
columnFamilyHandle.nativeHandle_);
put(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_,
/*assume_tracked*/ false);
}
/**
@ -683,12 +722,24 @@ public class Transaction extends RocksObject {
* @throws RocksDBException when one of the TransactionalDB conditions
* described above occurs, or in the case of an unexpected error
*/
public void put(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts,
final byte[][] valueParts, final boolean assume_tracked) throws RocksDBException {
assert (isOwningHandle());
put(nativeHandle_, keyParts, keyParts.length, valueParts, valueParts.length,
columnFamilyHandle.nativeHandle_, assume_tracked);
}
/*
* Same as
* {@link #put(ColumnFamilyHandle, byte[][], byte[][], boolean)}
* with assume_tracked=false.
*/
public void put(final ColumnFamilyHandle columnFamilyHandle,
final byte[][] keyParts, final byte[][] valueParts)
throws RocksDBException {
assert(isOwningHandle());
put(nativeHandle_, keyParts, keyParts.length, valueParts, valueParts.length,
columnFamilyHandle.nativeHandle_);
columnFamilyHandle.nativeHandle_, /*assume_tracked*/ false);
}
//TODO(AR) refactor if we implement org.rocksdb.SliceParts in future
@ -733,11 +784,23 @@ public class Transaction extends RocksObject {
* @throws RocksDBException when one of the TransactionalDB conditions
* described above occurs, or in the case of an unexpected error
*/
public void merge(final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final byte[] value, final boolean assume_tracked) throws RocksDBException {
assert (isOwningHandle());
merge(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_,
assume_tracked);
}
/*
* Same as
* {@link #merge(ColumnFamilyHandle, byte[], byte[], boolean)}
* with assume_tracked=false.
*/
public void merge(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key, final byte[] value) throws RocksDBException {
assert(isOwningHandle());
merge(nativeHandle_, key, key.length, value, value.length,
columnFamilyHandle.nativeHandle_);
merge(nativeHandle_, key, key.length, value, value.length, columnFamilyHandle.nativeHandle_,
/*assume_tracked*/ false);
}
/**
@ -790,10 +853,22 @@ public class Transaction extends RocksObject {
* @throws RocksDBException when one of the TransactionalDB conditions
* described above occurs, or in the case of an unexpected error
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final boolean assume_tracked) throws RocksDBException {
assert (isOwningHandle());
delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_, assume_tracked);
}
/*
* Same as
* {@link #delete(ColumnFamilyHandle, byte[], boolean)}
* with assume_tracked=false.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key) throws RocksDBException {
assert(isOwningHandle());
delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_);
delete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_,
/*assume_tracked*/ false);
}
/**
@ -834,11 +909,23 @@ public class Transaction extends RocksObject {
* @throws RocksDBException when one of the TransactionalDB conditions
* described above occurs, or in the case of an unexpected error
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts,
final boolean assume_tracked) throws RocksDBException {
assert (isOwningHandle());
delete(
nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_, assume_tracked);
}
/*
* Same as
* {@link #delete(ColumnFamilyHandle, byte[][], boolean)}
* with assume_tracked=false.
*/
public void delete(final ColumnFamilyHandle columnFamilyHandle,
final byte[][] keyParts) throws RocksDBException {
assert(isOwningHandle());
delete(nativeHandle_, keyParts, keyParts.length,
columnFamilyHandle.nativeHandle_);
delete(nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_,
/*assume_tracked*/ false);
}
//TODO(AR) refactor if we implement org.rocksdb.SliceParts in future
@ -880,11 +967,23 @@ public class Transaction extends RocksObject {
* described above occurs, or in the case of an unexpected error
*/
@Experimental("Performance optimization for a very specific workload")
public void singleDelete(final ColumnFamilyHandle columnFamilyHandle,
final byte[] key) throws RocksDBException {
public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key,
final boolean assume_tracked) throws RocksDBException {
assert (isOwningHandle());
singleDelete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_, assume_tracked);
}
/*
* Same as
* {@link #singleDelete(ColumnFamilyHandle, byte[], boolean)}
* with assume_tracked=false.
*/
@Experimental("Performance optimization for a very specific workload")
public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[] key)
throws RocksDBException {
assert(isOwningHandle());
singleDelete(nativeHandle_, key, key.length,
columnFamilyHandle.nativeHandle_);
singleDelete(nativeHandle_, key, key.length, columnFamilyHandle.nativeHandle_,
/*assume_tracked*/ false);
}
/**
@ -927,11 +1026,24 @@ public class Transaction extends RocksObject {
* described above occurs, or in the case of an unexpected error
*/
@Experimental("Performance optimization for a very specific workload")
public void singleDelete(final ColumnFamilyHandle columnFamilyHandle,
final byte[][] keyParts) throws RocksDBException {
public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts,
final boolean assume_tracked) throws RocksDBException {
assert (isOwningHandle());
singleDelete(
nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_, assume_tracked);
}
/*
* Same as
* {@link #singleDelete(ColumnFamilyHandle, byte[][], boolean)}
* with assume_tracked=false.
*/
@Experimental("Performance optimization for a very specific workload")
public void singleDelete(final ColumnFamilyHandle columnFamilyHandle, final byte[][] keyParts)
throws RocksDBException {
assert(isOwningHandle());
singleDelete(nativeHandle_, keyParts, keyParts.length,
columnFamilyHandle.nativeHandle_);
singleDelete(nativeHandle_, keyParts, keyParts.length, columnFamilyHandle.nativeHandle_,
/*assume_tracked*/ false);
}
//TODO(AR) refactor if we implement org.rocksdb.SliceParts in future
@ -1642,13 +1754,12 @@ public class Transaction extends RocksObject {
private native byte[][] multiGet(final long handle,
final long readOptionsHandle, final byte[][] keys)
throws RocksDBException;
private native byte[] getForUpdate(final long handle,
final long readOptionsHandle, final byte key[], final int keyLength,
final long columnFamilyHandle, final boolean exclusive)
private native byte[] getForUpdate(final long handle, final long readOptionsHandle,
final byte key[], final int keyLength, final long columnFamilyHandle, final boolean exclusive,
final boolean do_validate) throws RocksDBException;
private native byte[] getForUpdate(final long handle, final long readOptionsHandle,
final byte key[], final int keyLen, final boolean exclusive, final boolean do_validate)
throws RocksDBException;
private native byte[] getForUpdate(final long handle,
final long readOptionsHandle, final byte key[], final int keyLen,
final boolean exclusive) throws RocksDBException;
private native byte[][] multiGetForUpdate(final long handle,
final long readOptionsHandle, final byte[][] keys,
final long[] columnFamilyHandles) throws RocksDBException;
@ -1659,42 +1770,38 @@ public class Transaction extends RocksObject {
final long readOptionsHandle);
private native long getIterator(final long handle,
final long readOptionsHandle, final long columnFamilyHandle);
private native void put(final long handle, final byte[] key,
final int keyLength, final byte[] value, final int valueLength,
final long columnFamilyHandle) throws RocksDBException;
private native void put(final long handle, final byte[] key, final int keyLength,
final byte[] value, final int valueLength, final long columnFamilyHandle,
final boolean assume_tracked) throws RocksDBException;
private native void put(final long handle, final byte[] key,
final int keyLength, final byte[] value, final int valueLength)
throws RocksDBException;
private native void put(final long handle, final byte[][] keys,
final int keysLength, final byte[][] values, final int valuesLength,
final long columnFamilyHandle) throws RocksDBException;
private native void put(final long handle, final byte[][] keys, final int keysLength,
final byte[][] values, final int valuesLength, final long columnFamilyHandle,
final boolean assume_tracked) throws RocksDBException;
private native void put(final long handle, final byte[][] keys,
final int keysLength, final byte[][] values, final int valuesLength)
throws RocksDBException;
private native void merge(final long handle, final byte[] key,
final int keyLength, final byte[] value, final int valueLength,
final long columnFamilyHandle) throws RocksDBException;
private native void merge(final long handle, final byte[] key, final int keyLength,
final byte[] value, final int valueLength, final long columnFamilyHandle,
final boolean assume_tracked) throws RocksDBException;
private native void merge(final long handle, final byte[] key,
final int keyLength, final byte[] value, final int valueLength)
throws RocksDBException;
private native void delete(final long handle, final byte[] key,
final int keyLength, final long columnFamilyHandle)
throws RocksDBException;
private native void delete(final long handle, final byte[] key, final int keyLength,
final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException;
private native void delete(final long handle, final byte[] key,
final int keyLength) throws RocksDBException;
private native void delete(final long handle, final byte[][] keys,
final int keysLength, final long columnFamilyHandle)
throws RocksDBException;
private native void delete(final long handle, final byte[][] keys, final int keysLength,
final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException;
private native void delete(final long handle, final byte[][] keys,
final int keysLength) throws RocksDBException;
private native void singleDelete(final long handle, final byte[] key,
final int keyLength, final long columnFamilyHandle)
throws RocksDBException;
private native void singleDelete(final long handle, final byte[] key, final int keyLength,
final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException;
private native void singleDelete(final long handle, final byte[] key,
final int keyLength) throws RocksDBException;
private native void singleDelete(final long handle, final byte[][] keys,
final int keysLength, final long columnFamilyHandle)
throws RocksDBException;
private native void singleDelete(final long handle, final byte[][] keys, final int keysLength,
final long columnFamilyHandle, final boolean assume_tracked) throws RocksDBException;
private native void singleDelete(final long handle, final byte[][] keys,
final int keysLength) throws RocksDBException;
private native void putUntracked(final long handle, final byte[] key,

@ -134,7 +134,7 @@ Status NewJemallocNodumpAllocator(
std::shared_ptr<MemoryAllocator>* memory_allocator) {
*memory_allocator = nullptr;
#ifndef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
(void) options;
(void)options;
return Status::NotSupported(
"JemallocNodumpAllocator only available with jemalloc version >= 5 "
"and MADV_DONTDUMP is available.");

@ -80,8 +80,11 @@ Status OptimisticTransaction::Rollback() {
// 'exclusive' is unused for OptimisticTransaction.
Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool exclusive, bool untracked) {
if (untracked) {
bool exclusive, const bool do_validate,
const bool assume_tracked) {
assert(!assume_tracked); // not supported
(void)assume_tracked;
if (!do_validate) {
return Status::OK();
}
uint32_t cfh_id = GetColumnFamilyID(column_family);

@ -48,8 +48,8 @@ class OptimisticTransaction : public TransactionBaseImpl {
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool exclusive,
bool untracked = false) override;
bool read_only, bool exclusive, const bool do_validate = true,
const bool assume_tracked = false) override;
private:
OptimisticTransactionDB* const txn_db_;

@ -515,7 +515,9 @@ Status PessimisticTransaction::LockBatch(WriteBatch* batch,
// the snapshot time.
Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool read_only,
bool exclusive, bool skip_validate) {
bool exclusive, const bool do_validate,
const bool assume_tracked) {
assert(!assume_tracked || !do_validate);
Status s;
if (UNLIKELY(skip_concurrency_control_)) {
return s;
@ -559,7 +561,11 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
// any writes since this transaction's snapshot.
// TODO(agiardullo): could optimize by supporting shared txn locks in the
// future
if (skip_validate || snapshot_ == nullptr) {
if (!do_validate || snapshot_ == nullptr) {
if (assume_tracked && !previously_locked) {
s = Status::InvalidArgument(
"assume_tracked is set but it is not tracked yet");
}
// Need to remember the earliest sequence number that we know that this
// key has not been modified after. This is useful if this same
// transaction
@ -628,7 +634,7 @@ Status PessimisticTransaction::ValidateSnapshot(
// Otherwise we have either
// 1: tracked_at_seq == kMaxSequenceNumber, i.e., first time tracking the key
// 2: snap_seq < tracked_at_seq: last time we lock the key was via
// skip_validate option which means we had skipped ValidateSnapshot. In both
// do_validate=false which means we had skipped ValidateSnapshot. In both
// cases we should do ValidateSnapshot now.
*tracked_at_seq = snap_seq;

@ -135,8 +135,8 @@ class PessimisticTransaction : public TransactionBaseImpl {
Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock);
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool exclusive,
bool skip_validate = false) override;
bool read_only, bool exclusive, const bool do_validate = true,
const bool assume_tracked = false) override;
void Clear() override;

@ -97,7 +97,8 @@ void TransactionBaseImpl::SetSnapshotIfNeeded() {
Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
const SliceParts& key, bool read_only,
bool exclusive, bool skip_validate) {
bool exclusive, const bool do_validate,
const bool assume_tracked) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
@ -110,7 +111,8 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
str.append(key.parts[i].data(), key.parts[i].size());
}
return TryLock(column_family, str, read_only, exclusive, skip_validate);
return TryLock(column_family, str, read_only, exclusive, do_validate,
assume_tracked);
}
void TransactionBaseImpl::SetSavePoint() {
@ -215,8 +217,15 @@ Status TransactionBaseImpl::Get(const ReadOptions& read_options,
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool exclusive) {
Status s = TryLock(column_family, key, true /* read_only */, exclusive);
bool exclusive,
const bool do_validate) {
if (!do_validate && read_options.snapshot != nullptr) {
return Status::InvalidArgument(
"If do_validate is false then GetForUpdate with snapshot is not "
"defined.");
}
Status s =
TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
if (s.ok() && value != nullptr) {
assert(value != nullptr);
@ -234,8 +243,15 @@ Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key,
PinnableSlice* pinnable_val,
bool exclusive) {
Status s = TryLock(column_family, key, true /* read_only */, exclusive);
bool exclusive,
const bool do_validate) {
if (!do_validate && read_options.snapshot != nullptr) {
return Status::InvalidArgument(
"If do_validate is false then GetForUpdate with snapshot is not "
"defined.");
}
Status s =
TryLock(column_family, key, true /* read_only */, exclusive, do_validate);
if (s.ok() && pinnable_val != nullptr) {
s = Get(read_options, column_family, key, pinnable_val);
@ -303,9 +319,11 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
}
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
const Slice& key, const Slice& value,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, do_validate, assume_tracked);
if (s.ok()) {
s = GetBatchForWrite()->Put(column_family, key, value);
@ -318,10 +336,11 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
}
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
const SliceParts& key, const SliceParts& value,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, do_validate, assume_tracked);
if (s.ok()) {
s = GetBatchForWrite()->Put(column_family, key, value);
@ -334,9 +353,11 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
}
Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
const Slice& key, const Slice& value,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, do_validate, assume_tracked);
if (s.ok()) {
s = GetBatchForWrite()->Merge(column_family, key, value);
@ -349,9 +370,11 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
}
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
const Slice& key,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, do_validate, assume_tracked);
if (s.ok()) {
s = GetBatchForWrite()->Delete(column_family, key);
@ -364,9 +387,11 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
}
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
const SliceParts& key,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, do_validate, assume_tracked);
if (s.ok()) {
s = GetBatchForWrite()->Delete(column_family, key);
@ -379,9 +404,11 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
}
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
const Slice& key,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, do_validate, assume_tracked);
if (s.ok()) {
s = GetBatchForWrite()->SingleDelete(column_family, key);
@ -394,9 +421,11 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
}
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s =
TryLock(column_family, key, false /* read_only */, true /* exclusive */);
const SliceParts& key,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, do_validate, assume_tracked);
if (s.ok()) {
s = GetBatchForWrite()->SingleDelete(column_family, key);
@ -411,7 +440,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* skip_validate */);
true /* exclusive */, false /* do_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Put(column_family, key, value);
@ -427,7 +456,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* skip_validate */);
true /* exclusive */, false /* do_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Put(column_family, key, value);
@ -443,7 +472,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key,
const Slice& value) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* skip_validate */);
true /* exclusive */, false /* do_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Merge(column_family, key, value);
@ -458,7 +487,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* skip_validate */);
true /* exclusive */, false /* do_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Delete(column_family, key);
@ -473,7 +502,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* skip_validate */);
true /* exclusive */, false /* do_validate */);
if (s.ok()) {
s = GetBatchForWrite()->Delete(column_family, key);
@ -488,7 +517,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status TransactionBaseImpl::SingleDeleteUntracked(
ColumnFamilyHandle* column_family, const Slice& key) {
Status s = TryLock(column_family, key, false /* read_only */,
true /* exclusive */, true /* skip_validate */);
true /* exclusive */, false /* do_validate */);
if (s.ok()) {
s = GetBatchForWrite()->SingleDelete(column_family, key);

@ -36,11 +36,12 @@ class TransactionBaseImpl : public Transaction {
// Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock
// returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
// skip_validate will be true if called from PutUntracked, DeleteUntracked, or
// MergeUntracked.
// do_validate will be false if called from PutUntracked, DeleteUntracked,
// MergeUntracked, or GetForUpdate(do_validate=false)
virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool read_only, bool exclusive,
bool skip_validate = false) = 0;
const bool do_validate = true,
const bool assume_tracked = false) = 0;
void SetSavePoint() override;
@ -63,16 +64,19 @@ class TransactionBaseImpl : public Transaction {
using Transaction::GetForUpdate;
Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool exclusive) override;
std::string* value, bool exclusive,
const bool do_validate) override;
Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool exclusive) override;
PinnableSlice* pinnable_val, bool exclusive,
const bool do_validate) override;
Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value, bool exclusive) override {
std::string* value, bool exclusive,
const bool do_validate) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value,
exclusive);
exclusive, do_validate);
}
std::vector<Status> MultiGet(
@ -109,36 +113,38 @@ class TransactionBaseImpl : public Transaction {
ColumnFamilyHandle* column_family) override;
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
const Slice& value, const bool assume_tracked = false) override;
Status Put(const Slice& key, const Slice& value) override {
return Put(nullptr, key, value);
}
Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
const SliceParts& value,
const bool assume_tracked = false) override;
Status Put(const SliceParts& key, const SliceParts& value) override {
return Put(nullptr, key, value);
}
Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
const Slice& value, const bool assume_tracked = false) override;
Status Merge(const Slice& key, const Slice& value) override {
return Merge(nullptr, key, value);
}
Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) override;
Status Delete(const Slice& key) override { return Delete(nullptr, key); }
Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
const bool assume_tracked = false) override;
Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) override;
Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) override;
Status SingleDelete(const Slice& key) override {
return SingleDelete(nullptr, key);
}
Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
const bool assume_tracked = false) override;
Status SingleDelete(const SliceParts& key) override {
return SingleDelete(nullptr, key);
}
@ -335,7 +341,8 @@ class TransactionBaseImpl : public Transaction {
std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool read_only, bool exclusive, bool skip_validate = false);
bool read_only, bool exclusive, const bool do_validate = true,
const bool assume_tracked = false);
WriteBatchBase* GetBatchForWrite();
void SetSnapshotInternal(const Snapshot* snapshot);

@ -140,6 +140,61 @@ TEST_P(TransactionTest, SuccessTest) {
delete txn;
}
// The test clarifies the contract of do_validate and assume_tracked
// in GetForUpdate and Put/Merge/Delete
TEST_P(TransactionTest, AssumeExclusiveTracked) {
WriteOptions write_options;
ReadOptions read_options;
std::string value;
Status s;
TransactionOptions txn_options;
txn_options.lock_timeout = 1;
const bool EXCLUSIVE = true;
const bool DO_VALIDATE = true;
const bool ASSUME_LOCKED = true;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn);
txn->SetSnapshot();
// commit a value after the snapshot is taken
ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
// By default write should fail to the commit after our snapshot
s = txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE);
ASSERT_TRUE(s.IsBusy());
// But the user could direct the db to skip validating the snapshot. The read
// value then should be the most recently committed
ASSERT_OK(
txn->GetForUpdate(read_options, "foo", &value, EXCLUSIVE, !DO_VALIDATE));
ASSERT_EQ(value, "bar");
// Although ValidateSnapshot is skipped the key must have still got locked
s = db->Put(write_options, Slice("foo"), Slice("bar"));
ASSERT_TRUE(s.IsTimedOut());
// By default the write operations should fail due to the commit after the
// snapshot
s = txn->Put(Slice("foo"), Slice("bar1"));
ASSERT_TRUE(s.IsBusy());
s = txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
!ASSUME_LOCKED);
ASSERT_TRUE(s.IsBusy());
// But the user could direct the db that it already assumes exclusive lock on
// the key due to the previous GetForUpdate call.
ASSERT_OK(txn->Put(db->DefaultColumnFamily(), Slice("foo"), Slice("bar1"),
ASSUME_LOCKED));
ASSERT_OK(txn->Merge(db->DefaultColumnFamily(), Slice("foo"), Slice("bar2"),
ASSUME_LOCKED));
ASSERT_OK(
txn->Delete(db->DefaultColumnFamily(), Slice("foo"), ASSUME_LOCKED));
ASSERT_OK(txn->SingleDelete(db->DefaultColumnFamily(), Slice("foo"),
ASSUME_LOCKED));
txn->Rollback();
delete txn;
}
// This test clarifies the contract of ValidateSnapshot
TEST_P(TransactionTest, ValidateSnapshotTest) {
for (bool with_2pc : {true, false}) {

@ -84,66 +84,72 @@ void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
}
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
const Slice& key, const Slice& value,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Put(column_family, key, value);
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
}
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) {
const SliceParts& key, const SliceParts& value,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Put(column_family, key, value);
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
}
Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
const Slice& key, const Slice& value,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Merge(column_family, key, value);
return TransactionBaseImpl::Merge(column_family, key, value, assume_tracked);
}
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
const Slice& key, const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Delete(column_family, key);
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
}
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
const SliceParts& key,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::Delete(column_family, key);
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
}
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
const Slice& key,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::SingleDelete(column_family, key);
return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked);
}
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
const SliceParts& key,
const bool assume_tracked) {
Status s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
return TransactionBaseImpl::SingleDelete(column_family, key);
return TransactionBaseImpl::SingleDelete(column_family, key, assume_tracked);
}
Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {

@ -48,25 +48,31 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
using TransactionBaseImpl::Put;
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
const Slice& value,
const bool assume_tracked = false) override;
virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
const SliceParts& value,
const bool assume_tracked = false) override;
using TransactionBaseImpl::Merge;
virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
const Slice& value,
const bool assume_tracked = false) override;
using TransactionBaseImpl::Delete;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) override;
virtual Status Delete(ColumnFamilyHandle* column_family,
const Slice& key) override;
virtual Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
const SliceParts& key,
const bool assume_tracked = false) override;
using TransactionBaseImpl::SingleDelete;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) override;
const Slice& key,
const bool assume_tracked = false) override;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
const SliceParts& key,
const bool assume_tracked = false) override;
virtual Status RebuildFromWriteBatch(WriteBatch*) override {
// This function was only useful for recovering prepared transactions, but

Loading…
Cancel
Save