Support user-defined timestamps in write-committed txns (#9629)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9629

Pessimistic transactions use pessimistic concurrency control, i.e. locking. Keys are
locked upon first operation that writes the key or has the intention of writing. For example,
`PessimisticTransaction::Put()`, `PessimisticTransaction::Delete()`,
`PessimisticTransaction::SingleDelete()` will write to or delete a key, while
`PessimisticTransaction::GetForUpdate()` is used by application to indicate
to RocksDB that the transaction has the intention of performing write operation later
in the same transaction.
Pessimistic transactions support two-phase commit (2PC). A transaction can be
`Prepared()`'ed and then `Commit()`. The prepare phase is similar to a promise: once
`Prepare()` succeeds, the transaction has acquired the necessary resources to commit.
The resources include locks, persistence of WAL, etc.
Write-committed transaction is the default pessimistic transaction implementation. In
RocksDB write-committed transaction, `Prepare()` will write data to the WAL as a prepare
section. `Commit()` will write a commit marker to the WAL and then write data to the
memtables. While writing to the memtables, different keys in the transaction's write batch
will be assigned different sequence numbers in ascending order.
Until commit/rollback, the transaction holds locks on the keys so that no other transaction
can write to the same keys. Furthermore, the keys' sequence numbers represent the order
in which they are committed and should be made visible. This is convenient for us to
implement support for user-defined timestamps.
Since column families with and without timestamps can co-exist in the same database,
a transaction may or may not involve timestamps. Based on this observation, we add two
optional members to each `PessimisticTransaction`, `read_timestamp_` and
`commit_timestamp_`. If no key in the transaction's write batch has timestamp, then
setting these two variables do not have any effect. For the rest of this commit, we discuss
only the cases when these two variables are meaningful.

read_timestamp_ is used mainly for validation, and should be set before first call to
`GetForUpdate()`. Otherwise, the latter will return non-ok status. `GetForUpdate()` calls
`TryLock()` that can verify if another transaction has written the same key since
`read_timestamp_` till this call to `GetForUpdate()`. If another transaction has indeed
written the same key, then validation fails, and RocksDB allows this transaction to
refine `read_timestamp_` by increasing it. Note that a transaction can still use `Get()`
with a different timestamp to read, but the result of the read should not be used to
determine data that will be written later.

commit_timestamp_ must be set after finishing writing and before transaction commit.
This applies to both 2PC and non-2PC cases. In the case of 2PC, it's usually set after
prepare phase succeeds.

We currently require that the commit timestamp be chosen after all keys are locked. This
means we disallow the `TransactionDB`-level APIs if user-defined timestamp is used
by the transaction. Specifically, calling `PessimisticTransactionDB::Put()`,
`PessimisticTransactionDB::Delete()`, `PessimisticTransactionDB::SingleDelete()`,
etc. will return non-ok status because they specify timestamps before locking the keys.
Users are also prompted to use the `Transaction` APIs when they receive the non-ok status.

Reviewed By: ltamasi

Differential Revision: D31822445

fbshipit-source-id: b82abf8e230216dc89cc519564a588224a88fd43
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent ca0ef54f16
commit 3b6dc049f7
  1. 1
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 3
      Makefile
  4. 6
      TARGETS
  5. 13
      db/db_impl/db_impl_write.cc
  6. 1
      src.mk
  7. 360
      utilities/transactions/pessimistic_transaction.cc
  8. 69
      utilities/transactions/pessimistic_transaction.h
  9. 34
      utilities/transactions/pessimistic_transaction_db.cc
  10. 33
      utilities/transactions/pessimistic_transaction_db.h
  11. 5
      utilities/transactions/transaction_base.h
  12. 22
      utilities/transactions/transaction_test.cc
  13. 44
      utilities/transactions/transaction_test.h
  14. 515
      utilities/transactions/write_committed_transaction_ts_test.cc

@ -1334,6 +1334,7 @@ if(WITH_TESTS)
utilities/transactions/optimistic_transaction_test.cc utilities/transactions/optimistic_transaction_test.cc
utilities/transactions/transaction_test.cc utilities/transactions/transaction_test.cc
utilities/transactions/lock/point/point_lock_manager_test.cc utilities/transactions/lock/point/point_lock_manager_test.cc
utilities/transactions/write_committed_transaction_ts_test.cc
utilities/transactions/write_prepared_transaction_test.cc utilities/transactions/write_prepared_transaction_test.cc
utilities/transactions/write_unprepared_transaction_test.cc utilities/transactions/write_unprepared_transaction_test.cc
utilities/transactions/lock/range/range_locking_test.cc utilities/transactions/lock/range/range_locking_test.cc

@ -2,6 +2,7 @@
## Unreleased ## Unreleased
### New Features ### New Features
* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp. * Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp.
* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs.
* Added BlobDB options to `ldb` * Added BlobDB options to `ldb`
* `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`. * `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`.

@ -1866,6 +1866,9 @@ point_lock_manager_test: utilities/transactions/lock/point/point_lock_manager_te
transaction_test: $(OBJ_DIR)/utilities/transactions/transaction_test.o $(TEST_LIBRARY) $(LIBRARY) transaction_test: $(OBJ_DIR)/utilities/transactions/transaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
write_committed_transaction_ts_test: $(OBJ_DIR)/utilities/transactions/write_committed_transaction_ts_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
write_prepared_transaction_test: $(OBJ_DIR)/utilities/transactions/write_prepared_transaction_test.o $(TEST_LIBRARY) $(LIBRARY) write_prepared_transaction_test: $(OBJ_DIR)/utilities/transactions/write_prepared_transaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)

@ -6120,6 +6120,12 @@ cpp_unittest_wrapper(name="write_callback_test",
extra_compiler_flags=[]) extra_compiler_flags=[])
cpp_unittest_wrapper(name="write_committed_transaction_ts_test",
srcs=["utilities/transactions/write_committed_transaction_ts_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="write_controller_test", cpp_unittest_wrapper(name="write_controller_test",
srcs=["db/write_controller_test.cc"], srcs=["db/write_controller_test.cc"],
deps=[":rocksdb_test_lib"], deps=[":rocksdb_test_lib"],

@ -130,7 +130,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(!seq_per_batch_ || batch_cnt != 0); assert(!seq_per_batch_ || batch_cnt != 0);
if (my_batch == nullptr) { if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!"); return Status::Corruption("Batch is nullptr!");
} else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { } else if (!disable_memtable &&
WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) {
// If writing to memtable, then we require the caller to set/update the
// timestamps for the keys in the write batch.
// Otherwise, it means we are just writing to the WAL, and we allow
// timestamps unset for the keys in the write batch. This can happen if we
// use TransactionDB with write-committed policy, and we currently do not
// support user-defined timestamp with other policies.
// In the prepare phase, a transaction can write the batch to the WAL
// without inserting to memtable. The keys in the batch do not have to be
// assigned timestamps because they will be used only during recovery if
// there is a commit marker which includes their commit timestamp.
return Status::InvalidArgument("write batch must have timestamp(s) set"); return Status::InvalidArgument("write batch must have timestamp(s) set");
} else if (write_options.rate_limiter_priority != Env::IO_TOTAL && } else if (write_options.rate_limiter_priority != Env::IO_TOTAL &&
write_options.rate_limiter_priority != Env::IO_USER) { write_options.rate_limiter_priority != Env::IO_USER) {

@ -584,6 +584,7 @@ TEST_MAIN_SOURCES = \
utilities/transactions/lock/point/point_lock_manager_test.cc \ utilities/transactions/lock/point/point_lock_manager_test.cc \
utilities/transactions/write_prepared_transaction_test.cc \ utilities/transactions/write_prepared_transaction_test.cc \
utilities/transactions/write_unprepared_transaction_test.cc \ utilities/transactions/write_unprepared_transaction_test.cc \
utilities/transactions/write_committed_transaction_ts_test.cc \
utilities/ttl/ttl_test.cc \ utilities/ttl/ttl_test.cc \
utilities/util_merge_operators_test.cc \ utilities/util_merge_operators_test.cc \
utilities/write_batch_with_index/write_batch_with_index_test.cc \ utilities/write_batch_with_index/write_batch_with_index_test.cc \

@ -25,6 +25,7 @@
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/transaction_util.h" #include "utilities/transactions/transaction_util.h"
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -135,6 +136,274 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options) {} : PessimisticTransaction(txn_db, write_options, txn_options) {}
Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool exclusive, const bool do_validate) {
return GetForUpdateImpl(read_options, column_family, key, value, exclusive,
do_validate);
}
Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key,
PinnableSlice* pinnable_val,
bool exclusive, const bool do_validate) {
return GetForUpdateImpl(read_options, column_family, key, pinnable_val,
exclusive, do_validate);
}
template <typename TValue>
inline Status WriteCommittedTxn::GetForUpdateImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, TValue* value, bool exclusive, const bool do_validate) {
column_family =
column_family ? column_family : db_impl_->DefaultColumnFamily();
assert(column_family);
if (!read_options.timestamp) {
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
if (0 == ts_sz) {
return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
value, exclusive, do_validate);
}
} else {
Status s = db_impl_->FailIfTsSizesMismatch(column_family,
*(read_options.timestamp));
if (!s.ok()) {
return s;
}
}
if (!do_validate) {
return Status::InvalidArgument(
"If do_validate is false then GetForUpdate with read_timestamp is not "
"defined.");
} else if (kMaxTxnTimestamp == read_timestamp_) {
return Status::InvalidArgument("read_timestamp must be set for validation");
}
if (!read_options.timestamp) {
ReadOptions read_opts_copy = read_options;
char ts_buf[sizeof(kMaxTxnTimestamp)];
EncodeFixed64(ts_buf, read_timestamp_);
Slice ts(ts_buf, sizeof(ts_buf));
read_opts_copy.timestamp = &ts;
return TransactionBaseImpl::GetForUpdate(read_opts_copy, column_family, key,
value, exclusive, do_validate);
}
assert(read_options.timestamp);
const char* const ts_buf = read_options.timestamp->data();
assert(read_options.timestamp->size() == sizeof(kMaxTxnTimestamp));
TxnTimestamp ts = DecodeFixed64(ts_buf);
if (ts != read_timestamp_) {
return Status::InvalidArgument("Must read from the same read_timestamp");
}
return TransactionBaseImpl::GetForUpdate(read_options, column_family, key,
value, exclusive, do_validate);
}
Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, &value, this]() {
Status s =
GetBatchForWrite()->Put(column_family, key, value);
if (s.ok()) {
++num_puts_;
}
return s;
});
}
Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, &value, this]() {
Status s =
GetBatchForWrite()->Put(column_family, key, value);
if (s.ok()) {
++num_puts_;
}
return s;
});
}
Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
return Operate(
column_family, key, /*do_validate=*/false,
/*assume_tracked=*/false, [column_family, &key, &value, this]() {
Status s = GetBatchForWrite()->Put(column_family, key, value);
if (s.ok()) {
++num_puts_;
}
return s;
});
}
Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
return Operate(
column_family, key, /*do_validate=*/false,
/*assume_tracked=*/false, [column_family, &key, &value, this]() {
Status s = GetBatchForWrite()->Put(column_family, key, value);
if (s.ok()) {
++num_puts_;
}
return s;
});
}
Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
const Slice& key, const bool assume_tracked) {
const bool do_validate = !assume_tracked;
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, this]() {
Status s = GetBatchForWrite()->Delete(column_family, key);
if (s.ok()) {
++num_deletes_;
}
return s;
});
}
Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, this]() {
Status s = GetBatchForWrite()->Delete(column_family, key);
if (s.ok()) {
++num_deletes_;
}
return s;
});
}
Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) {
return Operate(column_family, key, /*do_validate=*/false,
/*assume_tracked=*/false, [column_family, &key, this]() {
Status s = GetBatchForWrite()->Delete(column_family, key);
if (s.ok()) {
++num_deletes_;
}
return s;
});
}
Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) {
return Operate(column_family, key, /*do_validate=*/false,
/*assume_tracked=*/false, [column_family, &key, this]() {
Status s = GetBatchForWrite()->Delete(column_family, key);
if (s.ok()) {
++num_deletes_;
}
return s;
});
}
Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, this]() {
Status s =
GetBatchForWrite()->SingleDelete(column_family, key);
if (s.ok()) {
++num_deletes_;
}
return s;
});
}
Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, this]() {
Status s =
GetBatchForWrite()->SingleDelete(column_family, key);
if (s.ok()) {
++num_deletes_;
}
return s;
});
}
Status WriteCommittedTxn::SingleDeleteUntracked(
ColumnFamilyHandle* column_family, const Slice& key) {
return Operate(column_family, key, /*do_validate=*/false,
/*assume_tracked=*/false, [column_family, &key, this]() {
Status s =
GetBatchForWrite()->SingleDelete(column_family, key);
if (s.ok()) {
++num_deletes_;
}
return s;
});
}
Status WriteCommittedTxn::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
const bool do_validate = !assume_tracked;
return Operate(column_family, key, do_validate, assume_tracked,
[column_family, &key, &value, this]() {
Status s =
GetBatchForWrite()->Merge(column_family, key, value);
if (s.ok()) {
++num_merges_;
}
return s;
});
}
template <typename TKey, typename TOperation>
Status WriteCommittedTxn::Operate(ColumnFamilyHandle* column_family,
const TKey& key, const bool do_validate,
const bool assume_tracked,
TOperation&& operation) {
Status s;
if constexpr (std::is_same_v<Slice, TKey>) {
s = TryLock(column_family, key, /*read_only=*/false, /*exclusive=*/true,
do_validate, assume_tracked);
} else if constexpr (std::is_same_v<SliceParts, TKey>) {
std::string key_buf;
Slice contiguous_key(key, &key_buf);
s = TryLock(column_family, contiguous_key, /*read_only=*/false,
/*exclusive=*/true, do_validate, assume_tracked);
}
if (!s.ok()) {
return s;
}
column_family =
column_family ? column_family : db_impl_->DefaultColumnFamily();
assert(column_family);
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
if (ts_sz > 0) {
assert(ts_sz == sizeof(TxnTimestamp));
if (!IndexingEnabled()) {
cfs_with_ts_tracked_when_indexing_disabled_.insert(
column_family->GetID());
}
}
return operation();
}
Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) { Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) {
if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) { if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) {
return Status::InvalidArgument( return Status::InvalidArgument(
@ -154,6 +423,17 @@ Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) {
} }
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
if (batch && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
// CommitBatch() needs to lock the keys in the batch.
// However, the application also needs to specify the timestamp for the
// keys in batch before calling this API.
// This means timestamp order may violate the order of locking, thus
// violate the sequence number order for the same user key.
// Therefore, we disallow this operation for now.
return Status::NotSupported(
"Batch to commit includes timestamp assigned before locking");
}
std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create()); std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
Status s = LockBatch(batch, keys_to_unlock.get()); Status s = LockBatch(batch, keys_to_unlock.get());
@ -369,9 +649,41 @@ Status PessimisticTransaction::Commit() {
} }
Status WriteCommittedTxn::CommitWithoutPrepareInternal() { Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
WriteBatchWithIndex* wbwi = GetWriteBatch();
assert(wbwi);
WriteBatch* wb = wbwi->GetWriteBatch();
assert(wb);
const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
return Status::InvalidArgument("Must assign a commit timestamp");
}
if (needs_ts) {
assert(commit_timestamp_ != kMaxTxnTimestamp);
char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
EncodeFixed64(commit_ts_buf, commit_timestamp_);
Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
Status s =
wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf);
if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) {
return sizeof(kMaxTxnTimestamp);
}
const Comparator* ucmp =
WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
return ucmp ? ucmp->timestamp_size()
: std::numeric_limits<uint64_t>::max();
});
if (!s.ok()) {
return s;
}
}
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
auto s = auto s =
db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(), db_impl_->WriteImpl(write_options_, wb,
/*callback*/ nullptr, /*log_used*/ nullptr, /*callback*/ nullptr, /*log_used*/ nullptr,
/*log_ref*/ 0, /*disable_memtable*/ false, &seq_used); /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
@ -394,11 +706,46 @@ Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
} }
Status WriteCommittedTxn::CommitInternal() { Status WriteCommittedTxn::CommitInternal() {
WriteBatchWithIndex* wbwi = GetWriteBatch();
assert(wbwi);
WriteBatch* wb = wbwi->GetWriteBatch();
assert(wb);
const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb);
if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) {
return Status::InvalidArgument("Must assign a commit timestamp");
}
// We take the commit-time batch and append the Commit marker. // We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode // The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch(); WriteBatch* working_batch = GetCommitTimeWriteBatch();
auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
assert(s.ok()); Status s;
if (!needs_ts) {
s = WriteBatchInternal::MarkCommit(working_batch, name_);
} else {
assert(commit_timestamp_ != kMaxTxnTimestamp);
char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
EncodeFixed64(commit_ts_buf, commit_timestamp_);
Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf));
s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_,
commit_ts);
if (s.ok()) {
s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t {
if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) !=
cfs_with_ts_tracked_when_indexing_disabled_.end()) {
return sizeof(kMaxTxnTimestamp);
}
const Comparator* ucmp =
WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf);
return ucmp ? ucmp->timestamp_size()
: std::numeric_limits<uint64_t>::max();
});
}
}
if (!s.ok()) {
return s;
}
// any operations appended to this working_batch will be ignored from WAL // any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint(); working_batch->MarkWalTerminationPoint();
@ -406,8 +753,7 @@ Status WriteCommittedTxn::CommitInternal() {
// insert prepared batch into Memtable only skipping WAL. // insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers // Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values // in non recovery mode and simply insert the values
s = WriteBatchInternal::Append(working_batch, s = WriteBatchInternal::Append(working_batch, wb);
GetWriteBatch()->GetWriteBatch());
assert(s.ok()); assert(s.ok());
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
@ -489,6 +835,10 @@ Status PessimisticTransaction::RollbackToSavePoint() {
// On success, caller should unlock keys_to_unlock // On success, caller should unlock keys_to_unlock
Status PessimisticTransaction::LockBatch(WriteBatch* batch, Status PessimisticTransaction::LockBatch(WriteBatch* batch,
LockTracker* keys_to_unlock) { LockTracker* keys_to_unlock) {
if (!batch) {
return Status::InvalidArgument("batch is nullptr");
}
class Handler : public WriteBatch::Handler { class Handler : public WriteBatch::Handler {
public: public:
// Sorted map of column_family_id to sorted set of keys. // Sorted map of column_family_id to sorted set of keys.

@ -223,10 +223,73 @@ class WriteCommittedTxn : public PessimisticTransaction {
~WriteCommittedTxn() override {} ~WriteCommittedTxn() override {}
using TransactionBaseImpl::GetForUpdate;
Status GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool exclusive,
const bool do_validate) override;
Status GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* pinnable_val, bool exclusive,
const bool do_validate) override;
using TransactionBaseImpl::Put;
// `key` does NOT include timestamp even when it's enabled.
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, const bool assume_tracked = false) override;
Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value,
const bool assume_tracked = false) override;
using TransactionBaseImpl::PutUntracked;
Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
using TransactionBaseImpl::Delete;
// `key` does NOT include timestamp even when it's enabled.
Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) override;
Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
const bool assume_tracked = false) override;
using TransactionBaseImpl::DeleteUntracked;
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) override;
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
using TransactionBaseImpl::SingleDelete;
// `key` does NOT include timestamp even when it's enabled.
Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) override;
Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
const bool assume_tracked = false) override;
using TransactionBaseImpl::SingleDeleteUntracked;
Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) override;
using TransactionBaseImpl::Merge;
Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, const bool assume_tracked = false) override;
Status SetReadTimestampForValidation(TxnTimestamp ts) override; Status SetReadTimestampForValidation(TxnTimestamp ts) override;
Status SetCommitTimestamp(TxnTimestamp ts) override; Status SetCommitTimestamp(TxnTimestamp ts) override;
private: private:
template <typename TValue>
Status GetForUpdateImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
TValue* value, bool exclusive,
const bool do_validate);
template <typename TKey, typename TOperation>
Status Operate(ColumnFamilyHandle* column_family, const TKey& key,
const bool do_validate, const bool assume_tracked,
TOperation&& operation);
Status PrepareInternal() override; Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override; Status CommitWithoutPrepareInternal() override;
@ -236,6 +299,12 @@ class WriteCommittedTxn : public PessimisticTransaction {
Status CommitInternal() override; Status CommitInternal() override;
Status RollbackInternal() override; Status RollbackInternal() override;
// Column families that enable timestamps and whose data are written when
// indexing_enabled_ is false. If a key is written when indexing_enabled_ is
// true, then the corresponding column family is not added to cfs_with_ts
// even if it enables timestamp.
std::unordered_set<uint32_t> cfs_with_ts_tracked_when_indexing_disabled_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -86,8 +86,10 @@ Status PessimisticTransactionDB::VerifyCFOptions(
<< " timestamp size is " << ts_sz << " bytes"; << " timestamp size is " << ts_sz << " bytes";
return Status::InvalidArgument(oss.str()); return Status::InvalidArgument(oss.str());
} }
// TODO: Update this check once timestamp is supported. if (txn_db_options_.write_policy != WRITE_COMMITTED) {
return Status::NotSupported("Transaction DB does not support timestamp"); return Status::NotSupported("Only WriteCommittedTxn supports timestamp");
}
return Status::OK();
} }
Status PessimisticTransactionDB::Initialize( Status PessimisticTransactionDB::Initialize(
@ -442,7 +444,10 @@ Transaction* PessimisticTransactionDB::BeginInternalTransaction(
Status PessimisticTransactionDB::Put(const WriteOptions& options, Status PessimisticTransactionDB::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) { const Slice& key, const Slice& val) {
Status s; Status s = FailIfCfEnablesTs(this, column_family);
if (!s.ok()) {
return s;
}
Transaction* txn = BeginInternalTransaction(options); Transaction* txn = BeginInternalTransaction(options);
txn->DisableIndexing(); txn->DisableIndexing();
@ -463,7 +468,10 @@ Status PessimisticTransactionDB::Put(const WriteOptions& options,
Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
Status s; Status s = FailIfCfEnablesTs(this, column_family);
if (!s.ok()) {
return s;
}
Transaction* txn = BeginInternalTransaction(wopts); Transaction* txn = BeginInternalTransaction(wopts);
txn->DisableIndexing(); txn->DisableIndexing();
@ -485,7 +493,10 @@ Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
Status s; Status s = FailIfCfEnablesTs(this, column_family);
if (!s.ok()) {
return s;
}
Transaction* txn = BeginInternalTransaction(wopts); Transaction* txn = BeginInternalTransaction(wopts);
txn->DisableIndexing(); txn->DisableIndexing();
@ -507,7 +518,10 @@ Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
Status PessimisticTransactionDB::Merge(const WriteOptions& options, Status PessimisticTransactionDB::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
Status s; Status s = FailIfCfEnablesTs(this, column_family);
if (!s.ok()) {
return s;
}
Transaction* txn = BeginInternalTransaction(options); Transaction* txn = BeginInternalTransaction(options);
txn->DisableIndexing(); txn->DisableIndexing();
@ -533,6 +547,10 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts,
Status WriteCommittedTxnDB::Write(const WriteOptions& opts, Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
WriteBatch* updates) { WriteBatch* updates) {
Status s = FailIfBatchHasTs(updates);
if (!s.ok()) {
return s;
}
if (txn_db_options_.skip_concurrency_control) { if (txn_db_options_.skip_concurrency_control) {
return db_impl_->Write(opts, updates); return db_impl_->Write(opts, updates);
} else { } else {
@ -543,6 +561,10 @@ Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
Status WriteCommittedTxnDB::Write( Status WriteCommittedTxnDB::Write(
const WriteOptions& opts, const WriteOptions& opts,
const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
Status s = FailIfBatchHasTs(updates);
if (!s.ok()) {
return s;
}
if (optimizations.skip_concurrency_control) { if (optimizations.skip_concurrency_control) {
return db_impl_->Write(opts, updates); return db_impl_->Write(opts, updates);
} else { } else {

@ -155,6 +155,11 @@ class PessimisticTransactionDB : public TransactionDB {
std::shared_ptr<Logger> info_log_; std::shared_ptr<Logger> info_log_;
const TransactionDBOptions txn_db_options_; const TransactionDBOptions txn_db_options_;
static Status FailIfBatchHasTs(const WriteBatch* wb);
static Status FailIfCfEnablesTs(const DB* db,
const ColumnFamilyHandle* column_family);
void ReinitializeTransaction( void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options, Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options = TransactionOptions()); const TransactionOptions& txn_options = TransactionOptions());
@ -175,11 +180,12 @@ class PessimisticTransactionDB : public TransactionDB {
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
Transaction* BeginInternalTransaction(const WriteOptions& options);
std::shared_ptr<LockManager> lock_manager_; std::shared_ptr<LockManager> lock_manager_;
// Must be held when adding/dropping column families. // Must be held when adding/dropping column families.
InstrumentedMutex column_family_mutex_; InstrumentedMutex column_family_mutex_;
Transaction* BeginInternalTransaction(const WriteOptions& options);
// Used to ensure that no locks are stolen from an expirable transaction // Used to ensure that no locks are stolen from an expirable transaction
// that has started a commit. Only transactions with an expiration time // that has started a commit. Only transactions with an expiration time
@ -224,5 +230,30 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB {
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
}; };
inline Status PessimisticTransactionDB::FailIfBatchHasTs(
const WriteBatch* batch) {
if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
return Status::NotSupported(
"Writes with timestamp must go through transaction API instead of "
"TransactionDB.");
}
return Status::OK();
}
inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
const DB* db, const ColumnFamilyHandle* column_family) {
assert(db);
column_family = column_family ? column_family : db->DefaultColumnFamily();
assert(column_family);
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
return Status::NotSupported(
"Write operation with user timestamp must go through the transaction "
"API instead of TransactionDB.");
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -220,6 +220,8 @@ class TransactionBaseImpl : public Transaction {
void EnableIndexing() override { indexing_enabled_ = true; } void EnableIndexing() override { indexing_enabled_ = true; }
bool IndexingEnabled() const { return indexing_enabled_; }
uint64_t GetElapsedTime() const override; uint64_t GetElapsedTime() const override;
uint64_t GetNumPuts() const override; uint64_t GetNumPuts() const override;
@ -278,6 +280,8 @@ class TransactionBaseImpl : public Transaction {
assert(s.ok()); assert(s.ok());
} }
WriteBatchBase* GetBatchForWrite();
DB* db_; DB* db_;
DBImpl* dbimpl_; DBImpl* dbimpl_;
@ -365,7 +369,6 @@ class TransactionBaseImpl : public Transaction {
bool read_only, bool exclusive, const bool do_validate = true, bool read_only, bool exclusive, const bool do_validate = true,
const bool assume_tracked = false); const bool assume_tracked = false);
WriteBatchBase* GetBatchForWrite();
void SetSnapshotInternal(const Snapshot* snapshot); void SetSnapshotInternal(const Snapshot* snapshot);
}; };

@ -6421,12 +6421,18 @@ TEST_P(TransactionTest, OpenAndEnableU64Timestamp) {
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
{ {
ColumnFamilyHandle* cfh = nullptr; ColumnFamilyHandle* cfh = nullptr;
ASSERT_TRUE( const Status s = db->CreateColumnFamily(cf_opts, test_cf_name, &cfh);
db->CreateColumnFamily(cf_opts, test_cf_name, &cfh).IsNotSupported()); if (txn_db_options.write_policy == WRITE_COMMITTED) {
ASSERT_OK(s);
delete cfh;
} else {
ASSERT_TRUE(s.IsNotSupported());
assert(!cfh);
}
} }
// Bypass transaction db layer. // Bypass transaction db layer.
{ if (txn_db_options.write_policy != WRITE_COMMITTED) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
assert(db_impl); assert(db_impl);
ColumnFamilyHandle* cfh = nullptr; ColumnFamilyHandle* cfh = nullptr;
@ -6439,7 +6445,15 @@ TEST_P(TransactionTest, OpenAndEnableU64Timestamp) {
cf_descs.emplace_back(kDefaultColumnFamilyName, options); cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts); cf_descs.emplace_back(test_cf_name, cf_opts);
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
ASSERT_TRUE(ReOpenNoDelete(cf_descs, &handles).IsNotSupported()); const Status s = ReOpenNoDelete(cf_descs, &handles);
if (txn_db_options.write_policy == WRITE_COMMITTED) {
ASSERT_OK(s);
for (auto* h : handles) {
delete h;
}
} else {
ASSERT_TRUE(s.IsNotSupported());
}
} }
} }

@ -170,12 +170,12 @@ class TransactionTestBase : public ::testing::Test {
txn_db_options.write_policy == WRITE_PREPARED; txn_db_options.write_policy == WRITE_PREPARED;
Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db,
use_seq_per_batch, use_batch_per_txn); use_seq_per_batch, use_batch_per_txn);
StackableDB* stackable_db = new StackableDB(root_db); auto stackable_db = std::make_unique<StackableDB>(root_db);
if (s.ok()) { if (s.ok()) {
assert(root_db != nullptr); assert(root_db != nullptr);
// If WrapStackableDB() returns non-ok, then stackable_db is already // If WrapStackableDB() returns non-ok, then stackable_db is already
// deleted within WrapStackableDB(). // deleted within WrapStackableDB().
s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, s = TransactionDB::WrapStackableDB(stackable_db.release(), txn_db_options,
compaction_enabled_cf_indices, compaction_enabled_cf_indices,
*handles, &db); *handles, &db);
} }
@ -522,4 +522,44 @@ class MySQLStyleTransactionTest
const bool with_slow_threads_; const bool with_slow_threads_;
}; };
class WriteCommittedTxnWithTsTest
: public TransactionTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public:
WriteCommittedTxnWithTsTest()
: TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
WRITE_COMMITTED, kOrderedWrite) {}
~WriteCommittedTxnWithTsTest() override {
for (auto* h : handles_) {
delete h;
}
}
Status GetFromDb(ReadOptions read_opts, ColumnFamilyHandle* column_family,
const Slice& key, TxnTimestamp ts, std::string* value) {
std::string ts_buf;
PutFixed64(&ts_buf, ts);
Slice ts_slc = ts_buf;
read_opts.timestamp = &ts_slc;
assert(db);
return db->Get(read_opts, column_family, key, value);
}
Transaction* NewTxn(WriteOptions write_opts, TransactionOptions txn_opts) {
assert(db);
auto* txn = db->BeginTransaction(write_opts, txn_opts);
assert(txn);
const bool enable_indexing = std::get<2>(GetParam());
if (enable_indexing) {
txn->EnableIndexing();
} else {
txn->DisableIndexing();
}
return txn;
}
protected:
std::vector<ColumnFamilyHandle*> handles_{};
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,515 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "utilities/merge_operators.h"
#ifndef ROCKSDB_LITE
#include "test_util/testutil.h"
#include "utilities/transactions/transaction_test.h"
namespace ROCKSDB_NAMESPACE {
INSTANTIATE_TEST_CASE_P(
DBAsBaseDB, WriteCommittedTxnWithTsTest,
::testing::Values(std::make_tuple(false, /*two_write_queue=*/false,
/*enable_indexing=*/false),
std::make_tuple(false, /*two_write_queue=*/true,
/*enable_indexing=*/false),
std::make_tuple(false, /*two_write_queue=*/false,
/*enable_indexing=*/true),
std::make_tuple(false, /*two_write_queue=*/true,
/*enable_indexing=*/true)));
INSTANTIATE_TEST_CASE_P(
DBAsStackableDB, WriteCommittedTxnWithTsTest,
::testing::Values(std::make_tuple(true, /*two_write_queue=*/false,
/*enable_indexing=*/false),
std::make_tuple(true, /*two_write_queue=*/true,
/*enable_indexing=*/false),
std::make_tuple(true, /*two_write_queue=*/false,
/*enable_indexing=*/true),
std::make_tuple(true, /*two_write_queue=*/true,
/*enable_indexing=*/true)));
TEST_P(WriteCommittedTxnWithTsTest, SanityChecks) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn);
ASSERT_OK(txn->Put(handles_[1], "foo", "value"));
ASSERT_TRUE(txn->Commit().IsInvalidArgument());
auto* pessimistic_txn =
static_cast_with_check<PessimisticTransaction>(txn.get());
ASSERT_TRUE(
pessimistic_txn->CommitBatch(/*batch=*/nullptr).IsInvalidArgument());
{
WriteBatchWithIndex* wbwi = txn->GetWriteBatch();
assert(wbwi);
WriteBatch* wb = wbwi->GetWriteBatch();
assert(wb);
// Write a key to the batch for nonexisting cf.
ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"",
/*value=*/""));
}
ASSERT_OK(txn->SetCommitTimestamp(20));
ASSERT_TRUE(txn->Commit().IsInvalidArgument());
txn.reset();
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Put(handles_[1], "foo", "value"));
{
WriteBatchWithIndex* wbwi = txn1->GetWriteBatch();
assert(wbwi);
WriteBatch* wb = wbwi->GetWriteBatch();
assert(wb);
// Write a key to the batch for non-existing cf.
ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"",
/*value=*/""));
}
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(21));
ASSERT_TRUE(txn1->Commit().IsInvalidArgument());
txn1.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, ReOpenWithTimestamp) {
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
ASSERT_OK(txn0->Put(handles_[1], "foo", "value"));
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Prepare());
ASSERT_TRUE(txn0->Commit().IsInvalidArgument());
txn0.reset();
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
ASSERT_OK(txn1->Put(handles_[1], "foo", "value1"));
{
std::string buf;
PutFixed64(&buf, 23);
ASSERT_OK(txn1->Put("id", buf));
ASSERT_OK(txn1->Merge("id", buf));
}
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(/*ts=*/23));
ASSERT_OK(txn1->Commit());
txn1.reset();
{
std::string value;
const Status s =
GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("value1", value);
}
{
std::string value;
const Status s = db->Get(ReadOptions(), handles_[0], "id", &value);
ASSERT_OK(s);
uint64_t ival = 0;
Slice value_slc = value;
bool result = GetFixed64(&value_slc, &ival);
assert(result);
ASSERT_EQ(46, ival);
}
}
TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
ASSERT_OK(txn0->Put(handles_[1], "foo", "foo_value"));
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Prepare());
WriteOptions write_opts;
write_opts.sync = true;
std::unique_ptr<Transaction> txn1(NewTxn(write_opts, TransactionOptions()));
assert(txn1);
ASSERT_OK(txn1->Put("bar", "bar_value_1"));
ASSERT_OK(txn1->Put(handles_[1], "bar", "bar_value_1"));
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(/*ts=*/23));
ASSERT_OK(txn1->Commit());
txn1.reset();
std::unique_ptr<Transaction> txn2(NewTxn(write_opts, TransactionOptions()));
assert(txn2);
ASSERT_OK(txn2->Put("key1", "value_3"));
ASSERT_OK(txn2->Put(handles_[1], "key1", "value_3"));
ASSERT_OK(txn2->SetCommitTimestamp(/*ts=*/24));
ASSERT_OK(txn2->Commit());
txn2.reset();
txn0.reset();
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
{
std::string value;
Status s = GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ReadOptions(), handles_[0], "bar", &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_1", value);
value.clear();
s = GetFromDb(ReadOptions(), handles_[1], "bar", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_1", value);
s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/23, &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ReadOptions(), handles_[0], "key1", &value);
ASSERT_OK(s);
ASSERT_EQ("value_3", value);
s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/24, &value);
ASSERT_OK(s);
ASSERT_EQ("value_3", value);
}
}
TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_options);
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::string key_str = "tes_key";
std::string ts_str;
std::string value_str = "test_value";
PutFixed64(&ts_str, 100);
Slice value = value_str;
ASSERT_TRUE(
db->Put(WriteOptions(), handles_[1], "foo", "bar").IsNotSupported());
ASSERT_TRUE(db->Delete(WriteOptions(), handles_[1], "foo").IsNotSupported());
ASSERT_TRUE(
db->SingleDelete(WriteOptions(), handles_[1], "foo").IsNotSupported());
ASSERT_TRUE(
db->Merge(WriteOptions(), handles_[1], "foo", "+1").IsNotSupported());
WriteBatch wb1(/*reserved_bytes=*/0, /*max_bytes=*/0,
/*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0);
ASSERT_OK(wb1.Put(handles_[1], key_str, ts_str, value));
ASSERT_TRUE(db->Write(WriteOptions(), &wb1).IsNotSupported());
ASSERT_TRUE(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb1)
.IsNotSupported());
auto* pessimistic_txn_db =
static_cast_with_check<PessimisticTransactionDB>(db);
assert(pessimistic_txn_db);
ASSERT_TRUE(
pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb1)
.IsNotSupported());
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
ASSERT_OK(db->Delete(WriteOptions(), "bar"));
ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
ASSERT_OK(db->Merge(WriteOptions(), "key", "_more"));
WriteBatch wb2(/*reserved_bytes=*/0, /*max_bytes=*/0,
/*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0);
ASSERT_OK(wb2.Put(key_str, value));
ASSERT_OK(db->Write(WriteOptions(), &wb2));
ASSERT_OK(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb2));
ASSERT_OK(
pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb2));
std::unique_ptr<Transaction> txn(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn);
WriteBatch wb3(/*reserved_bytes=*/0, /*max_bytes=*/0,
/*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0);
ASSERT_OK(wb3.Put(handles_[1], "key", "value"));
auto* pessimistic_txn =
static_cast_with_check<PessimisticTransaction>(txn.get());
assert(pessimistic_txn);
ASSERT_TRUE(pessimistic_txn->CommitBatch(&wb3).IsNotSupported());
txn.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, Merge) {
options.merge_operator = MergeOperators::CreateStringAppendOperator();
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn);
ASSERT_OK(txn->Put(handles_[1], "foo", "bar"));
ASSERT_TRUE(txn->Merge(handles_[1], "foo", "1").IsInvalidArgument());
txn.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, GetForUpdate) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn1->Put(handles_[1], "key", "value1"));
ASSERT_OK(txn1->SetCommitTimestamp(24));
ASSERT_OK(txn1->Commit());
txn1.reset();
std::string value;
ASSERT_OK(txn0->SetReadTimestampForValidation(23));
ASSERT_TRUE(
txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value).IsBusy());
ASSERT_OK(txn0->Rollback());
txn0.reset();
std::unique_ptr<Transaction> txn2(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn2->SetReadTimestampForValidation(25));
ASSERT_OK(txn2->GetForUpdate(ReadOptions(), handles_[1], "key", &value));
ASSERT_OK(txn2->SetCommitTimestamp(26));
ASSERT_OK(txn2->Commit());
txn2.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, BlindWrite) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
{
std::string value;
ASSERT_OK(txn0->SetReadTimestampForValidation(100));
// Lock "key".
ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value)
.IsNotFound());
}
ASSERT_OK(txn0->Put(handles_[1], "key", "value0"));
ASSERT_OK(txn0->SetCommitTimestamp(101));
ASSERT_OK(txn0->Commit());
ASSERT_OK(txn1->Put(handles_[1], "key", "value1"));
// In reality, caller needs to ensure commit_ts of txn1 is greater than the
// commit_ts of txn0, which is true for lock-based concurrency control.
ASSERT_OK(txn1->SetCommitTimestamp(102));
ASSERT_OK(txn1->Commit());
txn0.reset();
txn1.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, RefineReadTimestamp) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
{
ASSERT_OK(txn0->SetReadTimestampForValidation(100));
// Lock "key0", "key1", ..., "key4".
for (int i = 0; i < 5; ++i) {
std::string value;
ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1],
"key" + std::to_string(i), &value)
.IsNotFound());
}
}
ASSERT_OK(txn1->Put(handles_[1], "key5", "value5_0"));
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(101));
ASSERT_OK(txn1->Commit());
txn1.reset();
{
std::string value;
ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value)
.IsBusy());
ASSERT_OK(txn0->SetReadTimestampForValidation(102));
ASSERT_OK(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value));
ASSERT_EQ("value5_0", value);
}
for (int i = 0; i < 6; ++i) {
ASSERT_OK(txn0->Put(handles_[1], "key" + std::to_string(i),
"value" + std::to_string(i)));
}
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Prepare());
ASSERT_OK(txn0->SetCommitTimestamp(103));
ASSERT_OK(txn0->Commit());
txn0.reset();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <cstdio>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "SKIPPED as Transactions not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE
Loading…
Cancel
Save