Add commit_timestamp and read_timestamp to Pessimistic transaction (#9537)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9537

Add `Transaction::SetReadTimestampForValidation()` and
`Transaction::SetCommitTimestamp()` APIs with default implementation
returning `Status::NotSupported()`. Currently, calling these two APIs do not
have any effect.

Also add checks to `PessimisticTransactionDB`
to enforce that column families in the same db either
- disable user-defined timestamp
- enable 64-bit timestamp

Just to clarify, a `PessimisticTransactionDB` can have some column families without
timestamps as well as column families that enable timestamp.

Each `PessimisticTransaction` can have two optional timestamps, `read_timestamp_`
used for additional validation and `commit_timestamp_` which denotes when the transaction commits.
For now, we are going to support `WriteCommittedTxn` (in a series of subsequent PRs)

Once set, we do not allow decreasing `read_timestamp_`. The `commit_timestamp_` must be
 greater than `read_timestamp_` for each transaction and must be set before commit, unless
the transaction does not involve any column family that enables user-defined timestamp.

TransactionDB builds on top of RocksDB core `DB` layer. Though `DB` layer assumes
that user-defined timestamps are byte arrays, `TransactionDB` uses uint64_t to store
timestamps. When they are passed down, they are still interpreted as
byte-arrays by `DB`.

Reviewed By: ltamasi

Differential Revision: D31567959

fbshipit-source-id: b0b6b69acab5d8e340cf174f33e8b09f1c3d3502
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 81ada95bd7
commit d6e1e6f37a
  1. 1
      HISTORY.md
  2. 14
      include/rocksdb/utilities/transaction.h
  3. 2
      include/rocksdb/utilities/transaction_db.h
  4. 34
      utilities/transactions/pessimistic_transaction.cc
  5. 11
      utilities/transactions/pessimistic_transaction.h
  6. 33
      utilities/transactions/pessimistic_transaction_db.cc
  7. 76
      utilities/transactions/transaction_test.cc
  8. 5
      utilities/transactions/transaction_test.h

@ -61,6 +61,7 @@
* Return Status::InvalidArgument from ObjectRegistry::NewObject if a factory exists but the object ould not be created (returns NotFound if the factory is missing). * Return Status::InvalidArgument from ObjectRegistry::NewObject if a factory exists but the object ould not be created (returns NotFound if the factory is missing).
* Remove deprecated overloads of API DB::GetApproximateSizes. * Remove deprecated overloads of API DB::GetApproximateSizes.
* Remove deprecated option DBOptions::new_table_reader_for_compaction_inputs. * Remove deprecated option DBOptions::new_table_reader_for_compaction_inputs.
* Add Transaction::SetReadTimestampForValidation() and Transaction::SetCommitTimestamp(). Default impl returns NotSupported().
### Behavior Changes ### Behavior Changes
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO. * Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.

@ -7,6 +7,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <limits>
#include <string> #include <string>
#include <vector> #include <vector>
@ -24,6 +25,11 @@ using TransactionName = std::string;
using TransactionID = uint64_t; using TransactionID = uint64_t;
using TxnTimestamp = uint64_t;
constexpr TxnTimestamp kMaxTxnTimestamp =
std::numeric_limits<TxnTimestamp>::max();
/* /*
class Endpoint allows to define prefix ranges. class Endpoint allows to define prefix ranges.
@ -594,6 +600,14 @@ class Transaction {
// to remain the same across restarts. // to remain the same across restarts.
uint64_t GetId() { return id_; } uint64_t GetId() { return id_; }
virtual Status SetReadTimestampForValidation(TxnTimestamp /*ts*/) {
return Status::NotSupported("timestamp not supported");
}
virtual Status SetCommitTimestamp(TxnTimestamp /*ts*/) {
return Status::NotSupported("timestamp not supported");
}
protected: protected:
explicit Transaction(const TransactionDB* /*db*/) {} explicit Transaction(const TransactionDB* /*db*/) {}
Transaction() : log_number_(0), txn_state_(STARTED) {} Transaction() : log_number_(0), txn_state_(STARTED) {}

@ -29,7 +29,7 @@ enum TxnDBWritePolicy {
WRITE_UNPREPARED // write data before the prepare phase of 2pc WRITE_UNPREPARED // write data before the prepare phase of 2pc
}; };
const uint32_t kInitialMaxDeadlocks = 5; constexpr uint32_t kInitialMaxDeadlocks = 5;
class LockManager; class LockManager;
struct RangeLockInfo; struct RangeLockInfo;

@ -133,7 +133,25 @@ bool PessimisticTransaction::IsExpired() const {
WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db,
const WriteOptions& write_options, const WriteOptions& write_options,
const TransactionOptions& txn_options) const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options){}; : PessimisticTransaction(txn_db, write_options, txn_options) {}
Status WriteCommittedTxn::SetReadTimestampForValidation(TxnTimestamp ts) {
if (read_timestamp_ < kMaxTxnTimestamp && ts < read_timestamp_) {
return Status::InvalidArgument(
"Cannot decrease read timestamp for validation");
}
read_timestamp_ = ts;
return Status::OK();
}
Status WriteCommittedTxn::SetCommitTimestamp(TxnTimestamp ts) {
if (read_timestamp_ < kMaxTxnTimestamp && ts <= read_timestamp_) {
return Status::InvalidArgument(
"Cannot commit at timestamp smaller than or equal to read timestamp");
}
commit_timestamp_ = ts;
return Status::OK();
}
Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create()); std::unique_ptr<LockTracker> keys_to_unlock(lock_tracker_factory_.Create());
@ -711,9 +729,19 @@ Status PessimisticTransaction::ValidateSnapshot(
ColumnFamilyHandle* cfh = ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily(); column_family ? column_family : db_impl_->DefaultColumnFamily();
// TODO (yanqin): support conflict checking based on timestamp. assert(cfh);
const Comparator* const ucmp = cfh->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
std::string ts_buf;
if (ts_sz > 0 && read_timestamp_ < kMaxTxnTimestamp) {
assert(ts_sz == sizeof(read_timestamp_));
PutFixed64(&ts_buf, read_timestamp_);
}
return TransactionUtil::CheckKeyForConflicts( return TransactionUtil::CheckKeyForConflicts(
db_impl_, cfh, key.ToString(), snap_seq, nullptr, false /* cache_only */); db_impl_, cfh, key.ToString(), snap_seq, ts_sz == 0 ? nullptr : &ts_buf,
false /* cache_only */);
} }
bool PessimisticTransaction::TryStealingLocks() { bool PessimisticTransaction::TryStealingLocks() {

@ -158,6 +158,14 @@ class PessimisticTransaction : public TransactionBaseImpl {
// microseconds according to Env->NowMicros()) // microseconds according to Env->NowMicros())
uint64_t expiration_time_; uint64_t expiration_time_;
// Timestamp used by the transaction to perform all GetForUpdate.
// Use this timestamp for conflict checking.
// read_timestamp_ == kMaxTxnTimestamp means this transaction has not
// performed any GetForUpdate. It is possible that the transaction has
// performed blind writes or Get, though.
TxnTimestamp read_timestamp_{kMaxTxnTimestamp};
TxnTimestamp commit_timestamp_{kMaxTxnTimestamp};
private: private:
friend class TransactionTest_ValidateSnapshotTest_Test; friend class TransactionTest_ValidateSnapshotTest_Test;
// Used to create unique ids for transactions. // Used to create unique ids for transactions.
@ -215,6 +223,9 @@ class WriteCommittedTxn : public PessimisticTransaction {
~WriteCommittedTxn() override {} ~WriteCommittedTxn() override {}
Status SetReadTimestampForValidation(TxnTimestamp ts) override;
Status SetCommitTimestamp(TxnTimestamp ts) override;
private: private:
Status PrepareInternal() override; Status PrepareInternal() override;

@ -8,6 +8,7 @@
#include "utilities/transactions/pessimistic_transaction_db.h" #include "utilities/transactions/pessimistic_transaction_db.h"
#include <cinttypes> #include <cinttypes>
#include <sstream>
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
@ -70,8 +71,23 @@ PessimisticTransactionDB::~PessimisticTransactionDB() {
} }
} }
Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) { Status PessimisticTransactionDB::VerifyCFOptions(
const ColumnFamilyOptions& cf_options) {
const Comparator* const ucmp = cf_options.comparator;
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
if (0 == ts_sz) {
return Status::OK(); return Status::OK();
}
if (ts_sz != sizeof(TxnTimestamp)) {
std::ostringstream oss;
oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp)
<< " bytes. CF comparator " << std::string(ucmp->Name())
<< " timestamp size is " << ts_sz << " bytes";
return Status::InvalidArgument(oss.str());
}
// TODO: Update this check once timestamp is supported.
return Status::NotSupported("Transaction DB does not support timestamp");
} }
Status PessimisticTransactionDB::Initialize( Status PessimisticTransactionDB::Initialize(
@ -243,13 +259,11 @@ Status TransactionDB::Open(
ROCKS_LOG_WARN(db->GetDBOptions().info_log, ROCKS_LOG_WARN(db->GetDBOptions().info_log,
"Transaction write_policy is %" PRId32, "Transaction write_policy is %" PRId32,
static_cast<int>(txn_db_options.write_policy)); static_cast<int>(txn_db_options.write_policy));
// if WrapDB return non-ok, db will be deleted in WrapDB() via
// ~StackableDB().
s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
dbptr); dbptr);
} }
if (!s.ok()) {
// just in case it was not deleted (and not set to nullptr).
delete db;
}
return s; return s;
} }
@ -287,6 +301,7 @@ Status WrapAnotherDBInternal(
assert(dbptr != nullptr); assert(dbptr != nullptr);
*dbptr = nullptr; *dbptr = nullptr;
std::unique_ptr<PessimisticTransactionDB> txn_db; std::unique_ptr<PessimisticTransactionDB> txn_db;
// txn_db owns object pointed to by the raw db pointer.
switch (txn_db_options.write_policy) { switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED: case WRITE_UNPREPARED:
txn_db.reset(new WriteUnpreparedTxnDB( txn_db.reset(new WriteUnpreparedTxnDB(
@ -307,6 +322,14 @@ Status WrapAnotherDBInternal(
// and set to nullptr. // and set to nullptr.
if (s.ok()) { if (s.ok()) {
*dbptr = txn_db.release(); *dbptr = txn_db.release();
} else {
for (auto* h : handles) {
delete h;
}
// txn_db still owns db, and ~StackableDB() will be called when txn_db goes
// out of scope, deleting the input db pointer.
ROCKS_LOG_FATAL(db->GetDBOptions().info_log,
"Failed to initialize txn_db: %s", s.ToString().c_str());
} }
return s; return s;
} }

@ -6339,6 +6339,82 @@ TEST_P(TransactionTest, CommitWithoutPrepare) {
} }
} }
TEST_P(TransactionTest, OpenAndEnableU64Timestamp) {
ASSERT_OK(ReOpenNoDelete());
assert(db);
const std::string test_cf_name = "test_cf";
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
{
ColumnFamilyHandle* cfh = nullptr;
ASSERT_TRUE(
db->CreateColumnFamily(cf_opts, test_cf_name, &cfh).IsNotSupported());
}
// Bypass transaction db layer.
{
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
assert(db_impl);
ColumnFamilyHandle* cfh = nullptr;
ASSERT_OK(db_impl->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
}
{
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
std::vector<ColumnFamilyHandle*> handles;
ASSERT_TRUE(ReOpenNoDelete(cf_descs, &handles).IsNotSupported());
}
}
TEST_P(TransactionTest, OpenAndEnableU32Timestamp) {
class DummyComparatorWithU32Ts : public Comparator {
public:
DummyComparatorWithU32Ts() : Comparator(sizeof(uint32_t)) {}
const char* Name() const override { return "DummyComparatorWithU32Ts"; }
void FindShortSuccessor(std::string*) const override {}
void FindShortestSeparator(std::string*, const Slice&) const override {}
int Compare(const Slice&, const Slice&) const override { return 0; }
};
std::unique_ptr<Comparator> dummy_ucmp(new DummyComparatorWithU32Ts());
ASSERT_OK(ReOpenNoDelete());
assert(db);
const std::string test_cf_name = "test_cf";
ColumnFamilyOptions cf_opts;
cf_opts.comparator = dummy_ucmp.get();
{
ColumnFamilyHandle* cfh = nullptr;
ASSERT_TRUE(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh)
.IsInvalidArgument());
}
// Bypass transaction db layer.
{
ColumnFamilyHandle* cfh = nullptr;
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
assert(db_impl);
ASSERT_OK(db_impl->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
}
{
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
std::vector<ColumnFamilyHandle*> handles;
ASSERT_TRUE(ReOpenNoDelete(cf_descs, &handles).IsInvalidArgument());
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -173,13 +173,12 @@ class TransactionTestBase : public ::testing::Test {
StackableDB* stackable_db = new StackableDB(root_db); StackableDB* stackable_db = new StackableDB(root_db);
if (s.ok()) { if (s.ok()) {
assert(root_db != nullptr); assert(root_db != nullptr);
// If WrapStackableDB() returns non-ok, then stackable_db is already
// deleted within WrapStackableDB().
s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options, s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
compaction_enabled_cf_indices, compaction_enabled_cf_indices,
*handles, &db); *handles, &db);
} }
if (!s.ok()) {
delete stackable_db;
}
return s; return s;
} }

Loading…
Cancel
Save