diff --git a/CMakeLists.txt b/CMakeLists.txt index 5de221b0f..9b4f73050 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1334,6 +1334,7 @@ if(WITH_TESTS) utilities/transactions/optimistic_transaction_test.cc utilities/transactions/transaction_test.cc utilities/transactions/lock/point/point_lock_manager_test.cc + utilities/transactions/write_committed_transaction_ts_test.cc utilities/transactions/write_prepared_transaction_test.cc utilities/transactions/write_unprepared_transaction_test.cc utilities/transactions/lock/range/range_locking_test.cc diff --git a/HISTORY.md b/HISTORY.md index 03720b69f..ecc8bf1cb 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### New Features * Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp. +* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs. * Added BlobDB options to `ldb` * `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`. diff --git a/Makefile b/Makefile index 7cd92595f..afc141bd2 100644 --- a/Makefile +++ b/Makefile @@ -1866,6 +1866,9 @@ point_lock_manager_test: utilities/transactions/lock/point/point_lock_manager_te transaction_test: $(OBJ_DIR)/utilities/transactions/transaction_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +write_committed_transaction_ts_test: $(OBJ_DIR)/utilities/transactions/write_committed_transaction_ts_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + write_prepared_transaction_test: $(OBJ_DIR)/utilities/transactions/write_prepared_transaction_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 126d3701a..fdc71a1c4 100644 --- a/TARGETS +++ b/TARGETS @@ -6120,6 +6120,12 @@ cpp_unittest_wrapper(name="write_callback_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="write_committed_transaction_ts_test", + srcs=["utilities/transactions/write_committed_transaction_ts_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="write_controller_test", srcs=["db/write_controller_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 61910ede4..bb9eae90f 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -130,7 +130,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, assert(!seq_per_batch_ || batch_cnt != 0); if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); - } else if (WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { + } else if (!disable_memtable && + WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { + // If writing to memtable, then we require the caller to set/update the + // timestamps for the keys in the write batch. + // Otherwise, it means we are just writing to the WAL, and we allow + // timestamps unset for the keys in the write batch. This can happen if we + // use TransactionDB with write-committed policy, and we currently do not + // support user-defined timestamp with other policies. + // In the prepare phase, a transaction can write the batch to the WAL + // without inserting to memtable. The keys in the batch do not have to be + // assigned timestamps because they will be used only during recovery if + // there is a commit marker which includes their commit timestamp. return Status::InvalidArgument("write batch must have timestamp(s) set"); } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && write_options.rate_limiter_priority != Env::IO_USER) { diff --git a/src.mk b/src.mk index f8c1f0307..705c6980c 100644 --- a/src.mk +++ b/src.mk @@ -584,6 +584,7 @@ TEST_MAIN_SOURCES = \ utilities/transactions/lock/point/point_lock_manager_test.cc \ utilities/transactions/write_prepared_transaction_test.cc \ utilities/transactions/write_unprepared_transaction_test.cc \ + utilities/transactions/write_committed_transaction_ts_test.cc \ utilities/ttl/ttl_test.cc \ utilities/util_merge_operators_test.cc \ utilities/write_batch_with_index/write_batch_with_index_test.cc \ diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index d59a62fc1..9e45b71e9 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -25,6 +25,7 @@ #include "util/string_util.h" #include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/transaction_util.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace ROCKSDB_NAMESPACE { @@ -135,6 +136,274 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, const TransactionOptions& txn_options) : PessimisticTransaction(txn_db, write_options, txn_options) {} +Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, std::string* value, + bool exclusive, const bool do_validate) { + return GetForUpdateImpl(read_options, column_family, key, value, exclusive, + do_validate); +} + +Status WriteCommittedTxn::GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, + PinnableSlice* pinnable_val, + bool exclusive, const bool do_validate) { + return GetForUpdateImpl(read_options, column_family, key, pinnable_val, + exclusive, do_validate); +} + +template +inline Status WriteCommittedTxn::GetForUpdateImpl( + const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, TValue* value, bool exclusive, const bool do_validate) { + column_family = + column_family ? column_family : db_impl_->DefaultColumnFamily(); + assert(column_family); + if (!read_options.timestamp) { + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (0 == ts_sz) { + return TransactionBaseImpl::GetForUpdate(read_options, column_family, key, + value, exclusive, do_validate); + } + } else { + Status s = db_impl_->FailIfTsSizesMismatch(column_family, + *(read_options.timestamp)); + if (!s.ok()) { + return s; + } + } + + if (!do_validate) { + return Status::InvalidArgument( + "If do_validate is false then GetForUpdate with read_timestamp is not " + "defined."); + } else if (kMaxTxnTimestamp == read_timestamp_) { + return Status::InvalidArgument("read_timestamp must be set for validation"); + } + + if (!read_options.timestamp) { + ReadOptions read_opts_copy = read_options; + char ts_buf[sizeof(kMaxTxnTimestamp)]; + EncodeFixed64(ts_buf, read_timestamp_); + Slice ts(ts_buf, sizeof(ts_buf)); + read_opts_copy.timestamp = &ts; + return TransactionBaseImpl::GetForUpdate(read_opts_copy, column_family, key, + value, exclusive, do_validate); + } + assert(read_options.timestamp); + const char* const ts_buf = read_options.timestamp->data(); + assert(read_options.timestamp->size() == sizeof(kMaxTxnTimestamp)); + TxnTimestamp ts = DecodeFixed64(ts_buf); + if (ts != read_timestamp_) { + return Status::InvalidArgument("Must read from the same read_timestamp"); + } + return TransactionBaseImpl::GetForUpdate(read_options, column_family, key, + value, exclusive, do_validate); +} + +Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, &value, this]() { + Status s = + GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, &value, this]() { + Status s = + GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + return Operate( + column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, &value, this]() { + Status s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::PutUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key, + const SliceParts& value) { + return Operate( + column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, &value, this]() { + Status s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + ++num_puts_; + } + return s; + }); +} + +Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family, + const Slice& key, const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) { + return Operate(column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) { + return Operate(column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, this]() { + Status s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = + GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, this]() { + Status s = + GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::SingleDeleteUntracked( + ColumnFamilyHandle* column_family, const Slice& key) { + return Operate(column_family, key, /*do_validate=*/false, + /*assume_tracked=*/false, [column_family, &key, this]() { + Status s = + GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + ++num_deletes_; + } + return s; + }); +} + +Status WriteCommittedTxn::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value, + const bool assume_tracked) { + const bool do_validate = !assume_tracked; + return Operate(column_family, key, do_validate, assume_tracked, + [column_family, &key, &value, this]() { + Status s = + GetBatchForWrite()->Merge(column_family, key, value); + if (s.ok()) { + ++num_merges_; + } + return s; + }); +} + +template +Status WriteCommittedTxn::Operate(ColumnFamilyHandle* column_family, + const TKey& key, const bool do_validate, + const bool assume_tracked, + TOperation&& operation) { + Status s; + if constexpr (std::is_same_v) { + s = TryLock(column_family, key, /*read_only=*/false, /*exclusive=*/true, + do_validate, assume_tracked); + } else if constexpr (std::is_same_v) { + std::string key_buf; + Slice contiguous_key(key, &key_buf); + s = TryLock(column_family, contiguous_key, /*read_only=*/false, + /*exclusive=*/true, do_validate, assume_tracked); + } + if (!s.ok()) { + return s; + } + column_family = + column_family ? column_family : db_impl_->DefaultColumnFamily(); + assert(column_family); + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz > 0) { + assert(ts_sz == sizeof(TxnTimestamp)); + if (!IndexingEnabled()) { + cfs_with_ts_tracked_when_indexing_disabled_.insert( + column_family->GetID()); + } + } + return operation(); +} + Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) { if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) { return Status::InvalidArgument( @@ -154,6 +423,17 @@ Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) { } Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { + if (batch && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { + // CommitBatch() needs to lock the keys in the batch. + // However, the application also needs to specify the timestamp for the + // keys in batch before calling this API. + // This means timestamp order may violate the order of locking, thus + // violate the sequence number order for the same user key. + // Therefore, we disallow this operation for now. + return Status::NotSupported( + "Batch to commit includes timestamp assigned before locking"); + } + std::unique_ptr keys_to_unlock(lock_tracker_factory_.Create()); Status s = LockBatch(batch, keys_to_unlock.get()); @@ -369,9 +649,41 @@ Status PessimisticTransaction::Commit() { } Status WriteCommittedTxn::CommitWithoutPrepareInternal() { + WriteBatchWithIndex* wbwi = GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + + const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb); + if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) { + return Status::InvalidArgument("Must assign a commit timestamp"); + } + + if (needs_ts) { + assert(commit_timestamp_ != kMaxTxnTimestamp); + char commit_ts_buf[sizeof(kMaxTxnTimestamp)]; + EncodeFixed64(commit_ts_buf, commit_timestamp_); + Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf)); + + Status s = + wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t { + auto cf_iter = cfs_with_ts_tracked_when_indexing_disabled_.find(cf); + if (cf_iter != cfs_with_ts_tracked_when_indexing_disabled_.end()) { + return sizeof(kMaxTxnTimestamp); + } + const Comparator* ucmp = + WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf); + return ucmp ? ucmp->timestamp_size() + : std::numeric_limits::max(); + }); + if (!s.ok()) { + return s; + } + } + uint64_t seq_used = kMaxSequenceNumber; auto s = - db_impl_->WriteImpl(write_options_, GetWriteBatch()->GetWriteBatch(), + db_impl_->WriteImpl(write_options_, wb, /*callback*/ nullptr, /*log_used*/ nullptr, /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used); assert(!s.ok() || seq_used != kMaxSequenceNumber); @@ -394,11 +706,46 @@ Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { } Status WriteCommittedTxn::CommitInternal() { + WriteBatchWithIndex* wbwi = GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + + const bool needs_ts = WriteBatchInternal::HasKeyWithTimestamp(*wb); + if (needs_ts && commit_timestamp_ == kMaxTxnTimestamp) { + return Status::InvalidArgument("Must assign a commit timestamp"); + } // We take the commit-time batch and append the Commit marker. // The Memtable will ignore the Commit marker in non-recovery mode WriteBatch* working_batch = GetCommitTimeWriteBatch(); - auto s = WriteBatchInternal::MarkCommit(working_batch, name_); - assert(s.ok()); + + Status s; + if (!needs_ts) { + s = WriteBatchInternal::MarkCommit(working_batch, name_); + } else { + assert(commit_timestamp_ != kMaxTxnTimestamp); + char commit_ts_buf[sizeof(kMaxTxnTimestamp)]; + EncodeFixed64(commit_ts_buf, commit_timestamp_); + Slice commit_ts(commit_ts_buf, sizeof(commit_ts_buf)); + s = WriteBatchInternal::MarkCommitWithTimestamp(working_batch, name_, + commit_ts); + if (s.ok()) { + s = wb->UpdateTimestamps(commit_ts, [wbwi, this](uint32_t cf) -> size_t { + if (cfs_with_ts_tracked_when_indexing_disabled_.find(cf) != + cfs_with_ts_tracked_when_indexing_disabled_.end()) { + return sizeof(kMaxTxnTimestamp); + } + const Comparator* ucmp = + WriteBatchWithIndexInternal::GetUserComparator(*wbwi, cf); + return ucmp ? ucmp->timestamp_size() + : std::numeric_limits::max(); + }); + } + } + + if (!s.ok()) { + return s; + } // any operations appended to this working_batch will be ignored from WAL working_batch->MarkWalTerminationPoint(); @@ -406,8 +753,7 @@ Status WriteCommittedTxn::CommitInternal() { // insert prepared batch into Memtable only skipping WAL. // Memtable will ignore BeginPrepare/EndPrepare markers // in non recovery mode and simply insert the values - s = WriteBatchInternal::Append(working_batch, - GetWriteBatch()->GetWriteBatch()); + s = WriteBatchInternal::Append(working_batch, wb); assert(s.ok()); uint64_t seq_used = kMaxSequenceNumber; @@ -489,6 +835,10 @@ Status PessimisticTransaction::RollbackToSavePoint() { // On success, caller should unlock keys_to_unlock Status PessimisticTransaction::LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock) { + if (!batch) { + return Status::InvalidArgument("batch is nullptr"); + } + class Handler : public WriteBatch::Handler { public: // Sorted map of column_family_id to sorted set of keys. diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 6908aa7a0..609bcd600 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -223,10 +223,73 @@ class WriteCommittedTxn : public PessimisticTransaction { ~WriteCommittedTxn() override {} + using TransactionBaseImpl::GetForUpdate; + Status GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool exclusive, + const bool do_validate) override; + Status GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val, bool exclusive, + const bool do_validate) override; + + using TransactionBaseImpl::Put; + // `key` does NOT include timestamp even when it's enabled. + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, const bool assume_tracked = false) override; + Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value, + const bool assume_tracked = false) override; + + using TransactionBaseImpl::PutUntracked; + Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + + using TransactionBaseImpl::Delete; + // `key` does NOT include timestamp even when it's enabled. + Status Delete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; + Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; + + using TransactionBaseImpl::DeleteUntracked; + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + + using TransactionBaseImpl::SingleDelete; + // `key` does NOT include timestamp even when it's enabled. + Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, + const bool assume_tracked = false) override; + Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, + const bool assume_tracked = false) override; + + using TransactionBaseImpl::SingleDeleteUntracked; + Status SingleDeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + + using TransactionBaseImpl::Merge; + Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, const bool assume_tracked = false) override; + Status SetReadTimestampForValidation(TxnTimestamp ts) override; Status SetCommitTimestamp(TxnTimestamp ts) override; private: + template + Status GetForUpdateImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + TValue* value, bool exclusive, + const bool do_validate); + + template + Status Operate(ColumnFamilyHandle* column_family, const TKey& key, + const bool do_validate, const bool assume_tracked, + TOperation&& operation); + Status PrepareInternal() override; Status CommitWithoutPrepareInternal() override; @@ -236,6 +299,12 @@ class WriteCommittedTxn : public PessimisticTransaction { Status CommitInternal() override; Status RollbackInternal() override; + + // Column families that enable timestamps and whose data are written when + // indexing_enabled_ is false. If a key is written when indexing_enabled_ is + // true, then the corresponding column family is not added to cfs_with_ts + // even if it enables timestamp. + std::unordered_set cfs_with_ts_tracked_when_indexing_disabled_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index c5eeb5398..c1e3a2ab2 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -86,8 +86,10 @@ Status PessimisticTransactionDB::VerifyCFOptions( << " timestamp size is " << ts_sz << " bytes"; return Status::InvalidArgument(oss.str()); } - // TODO: Update this check once timestamp is supported. - return Status::NotSupported("Transaction DB does not support timestamp"); + if (txn_db_options_.write_policy != WRITE_COMMITTED) { + return Status::NotSupported("Only WriteCommittedTxn supports timestamp"); + } + return Status::OK(); } Status PessimisticTransactionDB::Initialize( @@ -442,7 +444,10 @@ Transaction* PessimisticTransactionDB::BeginInternalTransaction( Status PessimisticTransactionDB::Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); @@ -463,7 +468,10 @@ Status PessimisticTransactionDB::Put(const WriteOptions& options, Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(wopts); txn->DisableIndexing(); @@ -485,7 +493,10 @@ Status PessimisticTransactionDB::Delete(const WriteOptions& wopts, Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(wopts); txn->DisableIndexing(); @@ -507,7 +518,10 @@ Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts, Status PessimisticTransactionDB::Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - Status s; + Status s = FailIfCfEnablesTs(this, column_family); + if (!s.ok()) { + return s; + } Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); @@ -533,6 +547,10 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, Status WriteCommittedTxnDB::Write(const WriteOptions& opts, WriteBatch* updates) { + Status s = FailIfBatchHasTs(updates); + if (!s.ok()) { + return s; + } if (txn_db_options_.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { @@ -543,6 +561,10 @@ Status WriteCommittedTxnDB::Write(const WriteOptions& opts, Status WriteCommittedTxnDB::Write( const WriteOptions& opts, const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) { + Status s = FailIfBatchHasTs(updates); + if (!s.ok()) { + return s; + } if (optimizations.skip_concurrency_control) { return db_impl_->Write(opts, updates); } else { diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index eb0dd2f05..c0a4b9736 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -155,6 +155,11 @@ class PessimisticTransactionDB : public TransactionDB { std::shared_ptr info_log_; const TransactionDBOptions txn_db_options_; + static Status FailIfBatchHasTs(const WriteBatch* wb); + + static Status FailIfCfEnablesTs(const DB* db, + const ColumnFamilyHandle* column_family); + void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); @@ -175,11 +180,12 @@ class PessimisticTransactionDB : public TransactionDB { friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; + Transaction* BeginInternalTransaction(const WriteOptions& options); + std::shared_ptr lock_manager_; // Must be held when adding/dropping column families. InstrumentedMutex column_family_mutex_; - Transaction* BeginInternalTransaction(const WriteOptions& options); // Used to ensure that no locks are stolen from an expirable transaction // that has started a commit. Only transactions with an expiration time @@ -224,5 +230,30 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; }; +inline Status PessimisticTransactionDB::FailIfBatchHasTs( + const WriteBatch* batch) { + if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { + return Status::NotSupported( + "Writes with timestamp must go through transaction API instead of " + "TransactionDB."); + } + return Status::OK(); +} + +inline Status PessimisticTransactionDB::FailIfCfEnablesTs( + const DB* db, const ColumnFamilyHandle* column_family) { + assert(db); + column_family = column_family ? column_family : db->DefaultColumnFamily(); + assert(column_family); + const Comparator* const ucmp = column_family->GetComparator(); + assert(ucmp); + if (ucmp->timestamp_size() > 0) { + return Status::NotSupported( + "Write operation with user timestamp must go through the transaction " + "API instead of TransactionDB."); + } + return Status::OK(); +} + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 7febb10ba..e4da8d44b 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -220,6 +220,8 @@ class TransactionBaseImpl : public Transaction { void EnableIndexing() override { indexing_enabled_ = true; } + bool IndexingEnabled() const { return indexing_enabled_; } + uint64_t GetElapsedTime() const override; uint64_t GetNumPuts() const override; @@ -278,6 +280,8 @@ class TransactionBaseImpl : public Transaction { assert(s.ok()); } + WriteBatchBase* GetBatchForWrite(); + DB* db_; DBImpl* dbimpl_; @@ -365,7 +369,6 @@ class TransactionBaseImpl : public Transaction { bool read_only, bool exclusive, const bool do_validate = true, const bool assume_tracked = false); - WriteBatchBase* GetBatchForWrite(); void SetSnapshotInternal(const Snapshot* snapshot); }; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 1970cf6b2..53f9606fe 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6421,12 +6421,18 @@ TEST_P(TransactionTest, OpenAndEnableU64Timestamp) { cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); { ColumnFamilyHandle* cfh = nullptr; - ASSERT_TRUE( - db->CreateColumnFamily(cf_opts, test_cf_name, &cfh).IsNotSupported()); + const Status s = db->CreateColumnFamily(cf_opts, test_cf_name, &cfh); + if (txn_db_options.write_policy == WRITE_COMMITTED) { + ASSERT_OK(s); + delete cfh; + } else { + ASSERT_TRUE(s.IsNotSupported()); + assert(!cfh); + } } // Bypass transaction db layer. - { + if (txn_db_options.write_policy != WRITE_COMMITTED) { DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); assert(db_impl); ColumnFamilyHandle* cfh = nullptr; @@ -6439,7 +6445,15 @@ TEST_P(TransactionTest, OpenAndEnableU64Timestamp) { cf_descs.emplace_back(kDefaultColumnFamilyName, options); cf_descs.emplace_back(test_cf_name, cf_opts); std::vector handles; - ASSERT_TRUE(ReOpenNoDelete(cf_descs, &handles).IsNotSupported()); + const Status s = ReOpenNoDelete(cf_descs, &handles); + if (txn_db_options.write_policy == WRITE_COMMITTED) { + ASSERT_OK(s); + for (auto* h : handles) { + delete h; + } + } else { + ASSERT_TRUE(s.IsNotSupported()); + } } } diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 2aecb312a..2780cf24d 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -170,12 +170,12 @@ class TransactionTestBase : public ::testing::Test { txn_db_options.write_policy == WRITE_PREPARED; Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, use_seq_per_batch, use_batch_per_txn); - StackableDB* stackable_db = new StackableDB(root_db); + auto stackable_db = std::make_unique(root_db); if (s.ok()) { assert(root_db != nullptr); // If WrapStackableDB() returns non-ok, then stackable_db is already // deleted within WrapStackableDB(). - s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, + s = TransactionDB::WrapStackableDB(stackable_db.release(), txn_db_options, compaction_enabled_cf_indices, *handles, &db); } @@ -522,4 +522,44 @@ class MySQLStyleTransactionTest const bool with_slow_threads_; }; +class WriteCommittedTxnWithTsTest + : public TransactionTestBase, + public ::testing::WithParamInterface> { + public: + WriteCommittedTxnWithTsTest() + : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), + WRITE_COMMITTED, kOrderedWrite) {} + ~WriteCommittedTxnWithTsTest() override { + for (auto* h : handles_) { + delete h; + } + } + + Status GetFromDb(ReadOptions read_opts, ColumnFamilyHandle* column_family, + const Slice& key, TxnTimestamp ts, std::string* value) { + std::string ts_buf; + PutFixed64(&ts_buf, ts); + Slice ts_slc = ts_buf; + read_opts.timestamp = &ts_slc; + assert(db); + return db->Get(read_opts, column_family, key, value); + } + + Transaction* NewTxn(WriteOptions write_opts, TransactionOptions txn_opts) { + assert(db); + auto* txn = db->BeginTransaction(write_opts, txn_opts); + assert(txn); + const bool enable_indexing = std::get<2>(GetParam()); + if (enable_indexing) { + txn->EnableIndexing(); + } else { + txn->DisableIndexing(); + } + return txn; + } + + protected: + std::vector handles_{}; +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/write_committed_transaction_ts_test.cc b/utilities/transactions/write_committed_transaction_ts_test.cc new file mode 100644 index 000000000..57ea6b303 --- /dev/null +++ b/utilities/transactions/write_committed_transaction_ts_test.cc @@ -0,0 +1,515 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/utilities/transaction_db.h" +#include "utilities/merge_operators.h" +#ifndef ROCKSDB_LITE + +#include "test_util/testutil.h" +#include "utilities/transactions/transaction_test.h" + +namespace ROCKSDB_NAMESPACE { + +INSTANTIATE_TEST_CASE_P( + DBAsBaseDB, WriteCommittedTxnWithTsTest, + ::testing::Values(std::make_tuple(false, /*two_write_queue=*/false, + /*enable_indexing=*/false), + std::make_tuple(false, /*two_write_queue=*/true, + /*enable_indexing=*/false), + std::make_tuple(false, /*two_write_queue=*/false, + /*enable_indexing=*/true), + std::make_tuple(false, /*two_write_queue=*/true, + /*enable_indexing=*/true))); + +INSTANTIATE_TEST_CASE_P( + DBAsStackableDB, WriteCommittedTxnWithTsTest, + ::testing::Values(std::make_tuple(true, /*two_write_queue=*/false, + /*enable_indexing=*/false), + std::make_tuple(true, /*two_write_queue=*/true, + /*enable_indexing=*/false), + std::make_tuple(true, /*two_write_queue=*/false, + /*enable_indexing=*/true), + std::make_tuple(true, /*two_write_queue=*/true, + /*enable_indexing=*/true))); + +TEST_P(WriteCommittedTxnWithTsTest, SanityChecks) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->Put(handles_[1], "foo", "value")); + ASSERT_TRUE(txn->Commit().IsInvalidArgument()); + + auto* pessimistic_txn = + static_cast_with_check(txn.get()); + ASSERT_TRUE( + pessimistic_txn->CommitBatch(/*batch=*/nullptr).IsInvalidArgument()); + + { + WriteBatchWithIndex* wbwi = txn->GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + // Write a key to the batch for nonexisting cf. + ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"", + /*value=*/"")); + } + + ASSERT_OK(txn->SetCommitTimestamp(20)); + + ASSERT_TRUE(txn->Commit().IsInvalidArgument()); + txn.reset(); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Put(handles_[1], "foo", "value")); + { + WriteBatchWithIndex* wbwi = txn1->GetWriteBatch(); + assert(wbwi); + WriteBatch* wb = wbwi->GetWriteBatch(); + assert(wb); + // Write a key to the batch for non-existing cf. + ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"", + /*value=*/"")); + } + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(21)); + ASSERT_TRUE(txn1->Commit().IsInvalidArgument()); + txn1.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, ReOpenWithTimestamp) { + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + ASSERT_OK(txn0->Put(handles_[1], "foo", "value")); + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Prepare()); + ASSERT_TRUE(txn0->Commit().IsInvalidArgument()); + txn0.reset(); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + ASSERT_OK(txn1->Put(handles_[1], "foo", "value1")); + { + std::string buf; + PutFixed64(&buf, 23); + ASSERT_OK(txn1->Put("id", buf)); + ASSERT_OK(txn1->Merge("id", buf)); + } + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + { + std::string value; + const Status s = + GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("value1", value); + } + + { + std::string value; + const Status s = db->Get(ReadOptions(), handles_[0], "id", &value); + ASSERT_OK(s); + uint64_t ival = 0; + Slice value_slc = value; + bool result = GetFixed64(&value_slc, &ival); + assert(result); + ASSERT_EQ(46, ival); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + ASSERT_OK(txn0->Put(handles_[1], "foo", "foo_value")); + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Prepare()); + + WriteOptions write_opts; + write_opts.sync = true; + std::unique_ptr txn1(NewTxn(write_opts, TransactionOptions())); + assert(txn1); + ASSERT_OK(txn1->Put("bar", "bar_value_1")); + ASSERT_OK(txn1->Put(handles_[1], "bar", "bar_value_1")); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(/*ts=*/23)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + std::unique_ptr txn2(NewTxn(write_opts, TransactionOptions())); + assert(txn2); + ASSERT_OK(txn2->Put("key1", "value_3")); + ASSERT_OK(txn2->Put(handles_[1], "key1", "value_3")); + ASSERT_OK(txn2->SetCommitTimestamp(/*ts=*/24)); + ASSERT_OK(txn2->Commit()); + txn2.reset(); + + txn0.reset(); + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + { + std::string value; + Status s = GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(ReadOptions(), handles_[0], "bar", &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + value.clear(); + s = GetFromDb(ReadOptions(), handles_[1], "bar", /*ts=*/23, &value); + ASSERT_OK(s); + ASSERT_EQ("bar_value_1", value); + + s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/23, &value); + ASSERT_TRUE(s.IsNotFound()); + + s = db->Get(ReadOptions(), handles_[0], "key1", &value); + ASSERT_OK(s); + ASSERT_EQ("value_3", value); + + s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/24, &value); + ASSERT_OK(s); + ASSERT_EQ("value_3", value); + } +} + +TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_options); + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::string key_str = "tes_key"; + std::string ts_str; + std::string value_str = "test_value"; + PutFixed64(&ts_str, 100); + Slice value = value_str; + + ASSERT_TRUE( + db->Put(WriteOptions(), handles_[1], "foo", "bar").IsNotSupported()); + ASSERT_TRUE(db->Delete(WriteOptions(), handles_[1], "foo").IsNotSupported()); + ASSERT_TRUE( + db->SingleDelete(WriteOptions(), handles_[1], "foo").IsNotSupported()); + ASSERT_TRUE( + db->Merge(WriteOptions(), handles_[1], "foo", "+1").IsNotSupported()); + WriteBatch wb1(/*reserved_bytes=*/0, /*max_bytes=*/0, + /*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0); + ASSERT_OK(wb1.Put(handles_[1], key_str, ts_str, value)); + ASSERT_TRUE(db->Write(WriteOptions(), &wb1).IsNotSupported()); + ASSERT_TRUE(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb1) + .IsNotSupported()); + auto* pessimistic_txn_db = + static_cast_with_check(db); + assert(pessimistic_txn_db); + ASSERT_TRUE( + pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb1) + .IsNotSupported()); + + ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); + ASSERT_OK(db->Put(WriteOptions(), "bar", "value")); + ASSERT_OK(db->Delete(WriteOptions(), "bar")); + ASSERT_OK(db->SingleDelete(WriteOptions(), "foo")); + ASSERT_OK(db->Put(WriteOptions(), "key", "value")); + ASSERT_OK(db->Merge(WriteOptions(), "key", "_more")); + WriteBatch wb2(/*reserved_bytes=*/0, /*max_bytes=*/0, + /*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0); + ASSERT_OK(wb2.Put(key_str, value)); + ASSERT_OK(db->Write(WriteOptions(), &wb2)); + ASSERT_OK(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb2)); + ASSERT_OK( + pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb2)); + + std::unique_ptr txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn); + + WriteBatch wb3(/*reserved_bytes=*/0, /*max_bytes=*/0, + /*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0); + + ASSERT_OK(wb3.Put(handles_[1], "key", "value")); + auto* pessimistic_txn = + static_cast_with_check(txn.get()); + assert(pessimistic_txn); + ASSERT_TRUE(pessimistic_txn->CommitBatch(&wb3).IsNotSupported()); + + txn.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, Merge) { + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn); + ASSERT_OK(txn->Put(handles_[1], "foo", "bar")); + ASSERT_TRUE(txn->Merge(handles_[1], "foo", "1").IsInvalidArgument()); + txn.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, GetForUpdate) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn1->Put(handles_[1], "key", "value1")); + ASSERT_OK(txn1->SetCommitTimestamp(24)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + std::string value; + ASSERT_OK(txn0->SetReadTimestampForValidation(23)); + ASSERT_TRUE( + txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value).IsBusy()); + ASSERT_OK(txn0->Rollback()); + txn0.reset(); + + std::unique_ptr txn2( + NewTxn(WriteOptions(), TransactionOptions())); + ASSERT_OK(txn2->SetReadTimestampForValidation(25)); + ASSERT_OK(txn2->GetForUpdate(ReadOptions(), handles_[1], "key", &value)); + ASSERT_OK(txn2->SetCommitTimestamp(26)); + ASSERT_OK(txn2->Commit()); + txn2.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, BlindWrite) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + + { + std::string value; + ASSERT_OK(txn0->SetReadTimestampForValidation(100)); + // Lock "key". + ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value) + .IsNotFound()); + } + + ASSERT_OK(txn0->Put(handles_[1], "key", "value0")); + ASSERT_OK(txn0->SetCommitTimestamp(101)); + ASSERT_OK(txn0->Commit()); + + ASSERT_OK(txn1->Put(handles_[1], "key", "value1")); + // In reality, caller needs to ensure commit_ts of txn1 is greater than the + // commit_ts of txn0, which is true for lock-based concurrency control. + ASSERT_OK(txn1->SetCommitTimestamp(102)); + ASSERT_OK(txn1->Commit()); + + txn0.reset(); + txn1.reset(); +} + +TEST_P(WriteCommittedTxnWithTsTest, RefineReadTimestamp) { + ASSERT_OK(ReOpenNoDelete()); + + ColumnFamilyOptions cf_options; + cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + const std::string test_cf_name = "test_cf"; + ColumnFamilyHandle* cfh = nullptr; + assert(db); + ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh)); + delete cfh; + cfh = nullptr; + + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options)); + options.avoid_flush_during_shutdown = true; + + ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_)); + + std::unique_ptr txn0( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn0); + + std::unique_ptr txn1( + NewTxn(WriteOptions(), TransactionOptions())); + assert(txn1); + + { + ASSERT_OK(txn0->SetReadTimestampForValidation(100)); + // Lock "key0", "key1", ..., "key4". + for (int i = 0; i < 5; ++i) { + std::string value; + ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], + "key" + std::to_string(i), &value) + .IsNotFound()); + } + } + ASSERT_OK(txn1->Put(handles_[1], "key5", "value5_0")); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->SetCommitTimestamp(101)); + ASSERT_OK(txn1->Commit()); + txn1.reset(); + + { + std::string value; + ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value) + .IsBusy()); + ASSERT_OK(txn0->SetReadTimestampForValidation(102)); + ASSERT_OK(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value)); + ASSERT_EQ("value5_0", value); + } + + for (int i = 0; i < 6; ++i) { + ASSERT_OK(txn0->Put(handles_[1], "key" + std::to_string(i), + "value" + std::to_string(i))); + } + ASSERT_OK(txn0->SetName("txn0")); + ASSERT_OK(txn0->Prepare()); + ASSERT_OK(txn0->SetCommitTimestamp(103)); + ASSERT_OK(txn0->Commit()); + txn0.reset(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as Transactions not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // ROCKSDB_LITE