diff --git a/HISTORY.md b/HISTORY.md index 4bea93fb9..f2a73bec4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -61,6 +61,7 @@ * Return Status::InvalidArgument from ObjectRegistry::NewObject if a factory exists but the object ould not be created (returns NotFound if the factory is missing). * Remove deprecated overloads of API DB::GetApproximateSizes. * Remove deprecated option DBOptions::new_table_reader_for_compaction_inputs. +* Add Transaction::SetReadTimestampForValidation() and Transaction::SetCommitTimestamp(). Default impl returns NotSupported(). ### Behavior Changes * Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO. diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index dd7dd998a..2ada5e05d 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE +#include #include #include @@ -24,6 +25,11 @@ using TransactionName = std::string; using TransactionID = uint64_t; +using TxnTimestamp = uint64_t; + +constexpr TxnTimestamp kMaxTxnTimestamp = + std::numeric_limits::max(); + /* class Endpoint allows to define prefix ranges. @@ -594,6 +600,14 @@ class Transaction { // to remain the same across restarts. uint64_t GetId() { return id_; } + virtual Status SetReadTimestampForValidation(TxnTimestamp /*ts*/) { + return Status::NotSupported("timestamp not supported"); + } + + virtual Status SetCommitTimestamp(TxnTimestamp /*ts*/) { + return Status::NotSupported("timestamp not supported"); + } + protected: explicit Transaction(const TransactionDB* /*db*/) {} Transaction() : log_number_(0), txn_state_(STARTED) {} diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 627f266c5..276302be8 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -29,7 +29,7 @@ enum TxnDBWritePolicy { WRITE_UNPREPARED // write data before the prepare phase of 2pc }; -const uint32_t kInitialMaxDeadlocks = 5; +constexpr uint32_t kInitialMaxDeadlocks = 5; class LockManager; struct RangeLockInfo; diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index f0f630a48..087a84a82 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -133,7 +133,25 @@ bool PessimisticTransaction::IsExpired() const { WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : PessimisticTransaction(txn_db, write_options, txn_options){}; + : PessimisticTransaction(txn_db, write_options, txn_options) {} + +Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) { + if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) { + return Status::InvalidArgument( + "Cannot decrease read timestamp for validation"); + } + read_timestamp_ = ts; + return Status::OK(); +} + +Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) { + if (read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) { + return Status::InvalidArgument( + "Cannot commit at timestamp smaller than or equal to read timestamp"); + } + commit_timestamp_ = ts; + return Status::OK(); +} Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { std::unique_ptr keys_to_unlock(lock_tracker_factory_.Create()); @@ -711,9 +729,19 @@ Status PessimisticTransaction::ValidateSnapshot( ColumnFamilyHandle* cfh = column_family ? column_family : db_impl_->DefaultColumnFamily(); - // TODO (yanqin): support conflict checking based on timestamp. + assert(cfh); + const Comparator* const ucmp = cfh->GetComparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + std::string ts_buf; + if (ts_sz > 0 && read_timestamp_ < kMaxTxnTimestamp) { + assert(ts_sz == sizeof(read_timestamp_)); + PutFixed64(&ts_buf, read_timestamp_); + } + return TransactionUtil::CheckKeyForConflicts( - db_impl_, cfh, key.ToString(), snap_seq, nullptr, false /* cache_only */); + db_impl_, cfh, key.ToString(), snap_seq, ts_sz == 0 ? nullptr : &ts_buf, + false /* cache_only */); } bool PessimisticTransaction::TryStealingLocks() { diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index d17899cd9..6908aa7a0 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -158,6 +158,14 @@ class PessimisticTransaction : public TransactionBaseImpl { // microseconds according to Env->NowMicros()) uint64_t expiration_time_; + // Timestamp used by the transaction to perform all GetForUpdate. + // Use this timestamp for conflict checking. + // read_timestamp_ == kMaxTxnTimestamp means this transaction has not + // performed any GetForUpdate. It is possible that the transaction has + // performed blind writes or Get, though. + TxnTimestamp read_timestamp_{kMaxTxnTimestamp}; + TxnTimestamp commit_timestamp_{kMaxTxnTimestamp}; + private: friend class TransactionTest_ValidateSnapshotTest_Test; // Used to create unique ids for transactions. @@ -215,6 +223,9 @@ class WriteCommittedTxn : public PessimisticTransaction { ~WriteCommittedTxn() override {} + Status SetReadTimestampForValidation(TxnTimestamp ts) override; + Status SetCommitTimestamp(TxnTimestamp ts) override; + private: Status PrepareInternal() override; diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 864bb5f5f..c5eeb5398 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -8,6 +8,7 @@ #include "utilities/transactions/pessimistic_transaction_db.h" #include +#include #include #include #include @@ -70,8 +71,23 @@ PessimisticTransactionDB::~PessimisticTransactionDB() { } } -Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) { - return Status::OK(); +Status PessimisticTransactionDB::VerifyCFOptions( + const ColumnFamilyOptions& cf_options) { + const Comparator* const ucmp = cf_options.comparator; + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (0 == ts_sz) { + return Status::OK(); + } + if (ts_sz != sizeof(TxnTimestamp)) { + std::ostringstream oss; + oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp) + << " bytes. CF comparator " << std::string(ucmp->Name()) + << " timestamp size is " << ts_sz << " bytes"; + return Status::InvalidArgument(oss.str()); + } + // TODO: Update this check once timestamp is supported. + return Status::NotSupported("Transaction DB does not support timestamp"); } Status PessimisticTransactionDB::Initialize( @@ -243,13 +259,11 @@ Status TransactionDB::Open( ROCKS_LOG_WARN(db->GetDBOptions().info_log, "Transaction write_policy is %" PRId32, static_cast(txn_db_options.write_policy)); + // if WrapDB return non-ok, db will be deleted in WrapDB() via + // ~StackableDB(). s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); } - if (!s.ok()) { - // just in case it was not deleted (and not set to nullptr). - delete db; - } return s; } @@ -287,6 +301,7 @@ Status WrapAnotherDBInternal( assert(dbptr != nullptr); *dbptr = nullptr; std::unique_ptr txn_db; + // txn_db owns object pointed to by the raw db pointer. switch (txn_db_options.write_policy) { case WRITE_UNPREPARED: txn_db.reset(new WriteUnpreparedTxnDB( @@ -307,6 +322,14 @@ Status WrapAnotherDBInternal( // and set to nullptr. if (s.ok()) { *dbptr = txn_db.release(); + } else { + for (auto* h : handles) { + delete h; + } + // txn_db still owns db, and ~StackableDB() will be called when txn_db goes + // out of scope, deleting the input db pointer. + ROCKS_LOG_FATAL(db->GetDBOptions().info_log, + "Failed to initialize txn_db: %s", s.ToString().c_str()); } return s; } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index d8d12c92a..286c05163 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6339,6 +6339,82 @@ TEST_P(TransactionTest, CommitWithoutPrepare) { } } +TEST_P(TransactionTest, OpenAndEnableU64Timestamp) { + ASSERT_OK(ReOpenNoDelete()); + + assert(db); + + const std::string test_cf_name = "test_cf"; + ColumnFamilyOptions cf_opts; + cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper(); + { + ColumnFamilyHandle* cfh = nullptr; + ASSERT_TRUE( + db->CreateColumnFamily(cf_opts, test_cf_name, &cfh).IsNotSupported()); + } + + // Bypass transaction db layer. + { + DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); + assert(db_impl); + ColumnFamilyHandle* cfh = nullptr; + ASSERT_OK(db_impl->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + } + + { + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + std::vector handles; + ASSERT_TRUE(ReOpenNoDelete(cf_descs, &handles).IsNotSupported()); + } +} + +TEST_P(TransactionTest, OpenAndEnableU32Timestamp) { + class DummyComparatorWithU32Ts : public Comparator { + public: + DummyComparatorWithU32Ts() : Comparator(sizeof(uint32_t)) {} + const char* Name() const override { return "DummyComparatorWithU32Ts"; } + void FindShortSuccessor(std::string*) const override {} + void FindShortestSeparator(std::string*, const Slice&) const override {} + int Compare(const Slice&, const Slice&) const override { return 0; } + }; + + std::unique_ptr dummy_ucmp(new DummyComparatorWithU32Ts()); + + ASSERT_OK(ReOpenNoDelete()); + + assert(db); + + const std::string test_cf_name = "test_cf"; + + ColumnFamilyOptions cf_opts; + cf_opts.comparator = dummy_ucmp.get(); + { + ColumnFamilyHandle* cfh = nullptr; + ASSERT_TRUE(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh) + .IsInvalidArgument()); + } + + // Bypass transaction db layer. + { + ColumnFamilyHandle* cfh = nullptr; + DBImpl* db_impl = static_cast_with_check(db->GetRootDB()); + assert(db_impl); + ASSERT_OK(db_impl->CreateColumnFamily(cf_opts, test_cf_name, &cfh)); + delete cfh; + } + + { + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, options); + cf_descs.emplace_back(test_cf_name, cf_opts); + std::vector handles; + ASSERT_TRUE(ReOpenNoDelete(cf_descs, &handles).IsInvalidArgument()); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index ecc8ea1f9..2aecb312a 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -173,13 +173,12 @@ class TransactionTestBase : public ::testing::Test { StackableDB* stackable_db = new StackableDB(root_db); if (s.ok()) { assert(root_db != nullptr); + // If WrapStackableDB() returns non-ok, then stackable_db is already + // deleted within WrapStackableDB(). s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, compaction_enabled_cf_indices, *handles, &db); } - if (!s.ok()) { - delete stackable_db; - } return s; }