fork of https://github.com/rust-rocksdb/rust-rocksdb for nextgraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							780 lines
						
					
					
						
							26 KiB
						
					
					
				
			
		
		
	
	
							780 lines
						
					
					
						
							26 KiB
						
					
					
				| //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under both the GPLv2 (found in the
 | |
| //  COPYING file in the root directory) and Apache 2.0 License
 | |
| //  (found in the LICENSE.Apache file in the root directory).
 | |
| 
 | |
| 
 | |
| #include "utilities/transactions/pessimistic_transaction_db.h"
 | |
| 
 | |
| #include <cinttypes>
 | |
| #include <sstream>
 | |
| #include <string>
 | |
| #include <unordered_set>
 | |
| #include <vector>
 | |
| 
 | |
| #include "db/db_impl/db_impl.h"
 | |
| #include "logging/logging.h"
 | |
| #include "rocksdb/db.h"
 | |
| #include "rocksdb/options.h"
 | |
| #include "rocksdb/utilities/transaction_db.h"
 | |
| #include "test_util/sync_point.h"
 | |
| #include "util/cast_util.h"
 | |
| #include "util/mutexlock.h"
 | |
| #include "utilities/transactions/pessimistic_transaction.h"
 | |
| #include "utilities/transactions/transaction_db_mutex_impl.h"
 | |
| #include "utilities/transactions/write_prepared_txn_db.h"
 | |
| #include "utilities/transactions/write_unprepared_txn_db.h"
 | |
| 
 | |
| namespace ROCKSDB_NAMESPACE {
 | |
| 
 | |
| PessimisticTransactionDB::PessimisticTransactionDB(
 | |
|     DB* db, const TransactionDBOptions& txn_db_options)
 | |
|     : TransactionDB(db),
 | |
|       db_impl_(static_cast_with_check<DBImpl>(db)),
 | |
|       txn_db_options_(txn_db_options),
 | |
|       lock_manager_(NewLockManager(this, txn_db_options)) {
 | |
|   assert(db_impl_ != nullptr);
 | |
|   info_log_ = db_impl_->GetDBOptions().info_log;
 | |
| }
 | |
| 
 | |
| // Support initiliazing PessimisticTransactionDB from a stackable db
 | |
| //
 | |
| //    PessimisticTransactionDB
 | |
| //     ^        ^
 | |
| //     |        |
 | |
| //     |        +
 | |
| //     |   StackableDB
 | |
| //     |   ^
 | |
| //     |   |
 | |
| //     +   +
 | |
| //     DBImpl
 | |
| //       ^
 | |
| //       |(inherit)
 | |
| //       +
 | |
| //       DB
 | |
| //
 | |
| PessimisticTransactionDB::PessimisticTransactionDB(
 | |
|     StackableDB* db, const TransactionDBOptions& txn_db_options)
 | |
|     : TransactionDB(db),
 | |
|       db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
 | |
|       txn_db_options_(txn_db_options),
 | |
|       lock_manager_(NewLockManager(this, txn_db_options)) {
 | |
|   assert(db_impl_ != nullptr);
 | |
| }
 | |
| 
 | |
| PessimisticTransactionDB::~PessimisticTransactionDB() {
 | |
|   while (!transactions_.empty()) {
 | |
|     delete transactions_.begin()->second;
 | |
|     // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
 | |
|     // clear whether delete would also remove the entry from transactions_.
 | |
|   }
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::VerifyCFOptions(
 | |
|     const ColumnFamilyOptions& cf_options) {
 | |
|   const Comparator* const ucmp = cf_options.comparator;
 | |
|   assert(ucmp);
 | |
|   size_t ts_sz = ucmp->timestamp_size();
 | |
|   if (0 == ts_sz) {
 | |
|     return Status::OK();
 | |
|   }
 | |
|   if (ts_sz != sizeof(TxnTimestamp)) {
 | |
|     std::ostringstream oss;
 | |
|     oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp)
 | |
|         << " bytes. CF comparator " << std::string(ucmp->Name())
 | |
|         << " timestamp size is " << ts_sz << " bytes";
 | |
|     return Status::InvalidArgument(oss.str());
 | |
|   }
 | |
|   if (txn_db_options_.write_policy != WRITE_COMMITTED) {
 | |
|     return Status::NotSupported("Only WriteCommittedTxn supports timestamp");
 | |
|   }
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::Initialize(
 | |
|     const std::vector<size_t>& compaction_enabled_cf_indices,
 | |
|     const std::vector<ColumnFamilyHandle*>& handles) {
 | |
|   for (auto cf_ptr : handles) {
 | |
|     AddColumnFamily(cf_ptr);
 | |
|   }
 | |
|   // Verify cf options
 | |
|   for (auto handle : handles) {
 | |
|     ColumnFamilyDescriptor cfd;
 | |
|     Status s = handle->GetDescriptor(&cfd);
 | |
|     if (!s.ok()) {
 | |
|       return s;
 | |
|     }
 | |
|     s = VerifyCFOptions(cfd.options);
 | |
|     if (!s.ok()) {
 | |
|       return s;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Re-enable compaction for the column families that initially had
 | |
|   // compaction enabled.
 | |
|   std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
 | |
|   compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
 | |
|   for (auto index : compaction_enabled_cf_indices) {
 | |
|     compaction_enabled_cf_handles.push_back(handles[index]);
 | |
|   }
 | |
| 
 | |
|   Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
 | |
| 
 | |
|   // create 'real' transactions from recovered shell transactions
 | |
|   auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
 | |
|   assert(dbimpl != nullptr);
 | |
|   auto rtrxs = dbimpl->recovered_transactions();
 | |
| 
 | |
|   for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
 | |
|     auto recovered_trx = it->second;
 | |
|     assert(recovered_trx);
 | |
|     assert(recovered_trx->batches_.size() == 1);
 | |
|     const auto& seq = recovered_trx->batches_.begin()->first;
 | |
|     const auto& batch_info = recovered_trx->batches_.begin()->second;
 | |
|     assert(batch_info.log_number_);
 | |
|     assert(recovered_trx->name_.length());
 | |
| 
 | |
|     WriteOptions w_options;
 | |
|     w_options.sync = true;
 | |
|     TransactionOptions t_options;
 | |
|     // This would help avoiding deadlock for keys that although exist in the WAL
 | |
|     // did not go through concurrency control. This includes the merge that
 | |
|     // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
 | |
|     // there is a conflict between the keys of two transactions that must be
 | |
|     // avoided, it is already avoided by the application, MyRocks, before the
 | |
|     // restart (ii) application, MyRocks, guarntees to rollback/commit the
 | |
|     // recovered transactions before new transactions start.
 | |
|     t_options.skip_concurrency_control = true;
 | |
| 
 | |
|     Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
 | |
|     assert(real_trx);
 | |
|     real_trx->SetLogNumber(batch_info.log_number_);
 | |
|     assert(seq != kMaxSequenceNumber);
 | |
|     if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
 | |
|       real_trx->SetId(seq);
 | |
|     }
 | |
| 
 | |
|     s = real_trx->SetName(recovered_trx->name_);
 | |
|     if (!s.ok()) {
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
 | |
|     // WriteCommitted set this to to disable this check that is specific to
 | |
|     // WritePrepared txns
 | |
|     assert(batch_info.batch_cnt_ == 0 ||
 | |
|            real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
 | |
|     real_trx->SetState(Transaction::PREPARED);
 | |
|     if (!s.ok()) {
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
|   if (s.ok()) {
 | |
|     dbimpl->DeleteAllRecoveredTransactions();
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Transaction* WriteCommittedTxnDB::BeginTransaction(
 | |
|     const WriteOptions& write_options, const TransactionOptions& txn_options,
 | |
|     Transaction* old_txn) {
 | |
|   if (old_txn != nullptr) {
 | |
|     ReinitializeTransaction(old_txn, write_options, txn_options);
 | |
|     return old_txn;
 | |
|   } else {
 | |
|     return new WriteCommittedTxn(this, write_options, txn_options);
 | |
|   }
 | |
| }
 | |
| 
 | |
| TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
 | |
|     const TransactionDBOptions& txn_db_options) {
 | |
|   TransactionDBOptions validated = txn_db_options;
 | |
| 
 | |
|   if (txn_db_options.num_stripes == 0) {
 | |
|     validated.num_stripes = 1;
 | |
|   }
 | |
| 
 | |
|   return validated;
 | |
| }
 | |
| 
 | |
| Status TransactionDB::Open(const Options& options,
 | |
|                            const TransactionDBOptions& txn_db_options,
 | |
|                            const std::string& dbname, TransactionDB** dbptr) {
 | |
|   DBOptions db_options(options);
 | |
|   ColumnFamilyOptions cf_options(options);
 | |
|   std::vector<ColumnFamilyDescriptor> column_families;
 | |
|   column_families.push_back(
 | |
|       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
 | |
|   std::vector<ColumnFamilyHandle*> handles;
 | |
|   Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
 | |
|                                  column_families, &handles, dbptr);
 | |
|   if (s.ok()) {
 | |
|     assert(handles.size() == 1);
 | |
|     // i can delete the handle since DBImpl is always holding a reference to
 | |
|     // default column family
 | |
|     delete handles[0];
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status TransactionDB::Open(
 | |
|     const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
 | |
|     const std::string& dbname,
 | |
|     const std::vector<ColumnFamilyDescriptor>& column_families,
 | |
|     std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
 | |
|   Status s;
 | |
|   DB* db = nullptr;
 | |
|   if (txn_db_options.write_policy == WRITE_COMMITTED &&
 | |
|       db_options.unordered_write) {
 | |
|     return Status::NotSupported(
 | |
|         "WRITE_COMMITTED is incompatible with unordered_writes");
 | |
|   }
 | |
|   if (txn_db_options.write_policy == WRITE_UNPREPARED &&
 | |
|       db_options.unordered_write) {
 | |
|     // TODO(lth): support it
 | |
|     return Status::NotSupported(
 | |
|         "WRITE_UNPREPARED is currently incompatible with unordered_writes");
 | |
|   }
 | |
|   if (txn_db_options.write_policy == WRITE_PREPARED &&
 | |
|       db_options.unordered_write && !db_options.two_write_queues) {
 | |
|     return Status::NotSupported(
 | |
|         "WRITE_PREPARED is incompatible with unordered_writes if "
 | |
|         "two_write_queues is not enabled.");
 | |
|   }
 | |
| 
 | |
|   std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
 | |
|   std::vector<size_t> compaction_enabled_cf_indices;
 | |
|   DBOptions db_options_2pc = db_options;
 | |
|   PrepareWrap(&db_options_2pc, &column_families_copy,
 | |
|               &compaction_enabled_cf_indices);
 | |
|   const bool use_seq_per_batch =
 | |
|       txn_db_options.write_policy == WRITE_PREPARED ||
 | |
|       txn_db_options.write_policy == WRITE_UNPREPARED;
 | |
|   const bool use_batch_per_txn =
 | |
|       txn_db_options.write_policy == WRITE_COMMITTED ||
 | |
|       txn_db_options.write_policy == WRITE_PREPARED;
 | |
|   s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
 | |
|                    use_seq_per_batch, use_batch_per_txn);
 | |
|   if (s.ok()) {
 | |
|     ROCKS_LOG_WARN(db->GetDBOptions().info_log,
 | |
|                    "Transaction write_policy is %" PRId32,
 | |
|                    static_cast<int>(txn_db_options.write_policy));
 | |
|     // if WrapDB return non-ok, db will be deleted in WrapDB() via
 | |
|     // ~StackableDB().
 | |
|     s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
 | |
|                dbptr);
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| void TransactionDB::PrepareWrap(
 | |
|     DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
 | |
|     std::vector<size_t>* compaction_enabled_cf_indices) {
 | |
|   compaction_enabled_cf_indices->clear();
 | |
| 
 | |
|   // Enable MemTable History if not already enabled
 | |
|   for (size_t i = 0; i < column_families->size(); i++) {
 | |
|     ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
 | |
| 
 | |
|     if (cf_options->max_write_buffer_size_to_maintain == 0 &&
 | |
|         cf_options->max_write_buffer_number_to_maintain == 0) {
 | |
|       // Setting to -1 will set the History size to
 | |
|       // max_write_buffer_number * write_buffer_size.
 | |
|       cf_options->max_write_buffer_size_to_maintain = -1;
 | |
|     }
 | |
|     if (!cf_options->disable_auto_compactions) {
 | |
|       // Disable compactions momentarily to prevent race with DB::Open
 | |
|       cf_options->disable_auto_compactions = true;
 | |
|       compaction_enabled_cf_indices->push_back(i);
 | |
|     }
 | |
|   }
 | |
|   db_options->allow_2pc = true;
 | |
| }
 | |
| 
 | |
| namespace {
 | |
| template <typename DBType>
 | |
| Status WrapAnotherDBInternal(
 | |
|     DBType* db, const TransactionDBOptions& txn_db_options,
 | |
|     const std::vector<size_t>& compaction_enabled_cf_indices,
 | |
|     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
 | |
|   assert(db != nullptr);
 | |
|   assert(dbptr != nullptr);
 | |
|   *dbptr = nullptr;
 | |
|   std::unique_ptr<PessimisticTransactionDB> txn_db;
 | |
|   // txn_db owns object pointed to by the raw db pointer.
 | |
|   switch (txn_db_options.write_policy) {
 | |
|     case WRITE_UNPREPARED:
 | |
|       txn_db.reset(new WriteUnpreparedTxnDB(
 | |
|           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
 | |
|       break;
 | |
|     case WRITE_PREPARED:
 | |
|       txn_db.reset(new WritePreparedTxnDB(
 | |
|           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
 | |
|       break;
 | |
|     case WRITE_COMMITTED:
 | |
|     default:
 | |
|       txn_db.reset(new WriteCommittedTxnDB(
 | |
|           db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
 | |
|   }
 | |
|   txn_db->UpdateCFComparatorMap(handles);
 | |
|   Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
 | |
|   // In case of a failure at this point, db is deleted via the txn_db destructor
 | |
|   // and set to nullptr.
 | |
|   if (s.ok()) {
 | |
|     *dbptr = txn_db.release();
 | |
|   } else {
 | |
|     for (auto* h : handles) {
 | |
|       delete h;
 | |
|     }
 | |
|     // txn_db still owns db, and ~StackableDB() will be called when txn_db goes
 | |
|     // out of scope, deleting the input db pointer.
 | |
|     ROCKS_LOG_FATAL(db->GetDBOptions().info_log,
 | |
|                     "Failed to initialize txn_db: %s", s.ToString().c_str());
 | |
|   }
 | |
|   return s;
 | |
| }
 | |
| }  // namespace
 | |
| 
 | |
| Status TransactionDB::WrapDB(
 | |
|     // make sure this db is already opened with memtable history enabled,
 | |
|     // auto compaction distabled and 2 phase commit enabled
 | |
|     DB* db, const TransactionDBOptions& txn_db_options,
 | |
|     const std::vector<size_t>& compaction_enabled_cf_indices,
 | |
|     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
 | |
|   return WrapAnotherDBInternal(db, txn_db_options,
 | |
|                                compaction_enabled_cf_indices, handles, dbptr);
 | |
| }
 | |
| 
 | |
| Status TransactionDB::WrapStackableDB(
 | |
|     // make sure this stackable_db is already opened with memtable history
 | |
|     // enabled, auto compaction distabled and 2 phase commit enabled
 | |
|     StackableDB* db, const TransactionDBOptions& txn_db_options,
 | |
|     const std::vector<size_t>& compaction_enabled_cf_indices,
 | |
|     const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
 | |
|   return WrapAnotherDBInternal(db, txn_db_options,
 | |
|                                compaction_enabled_cf_indices, handles, dbptr);
 | |
| }
 | |
| 
 | |
| // Let LockManager know that this column family exists so it can
 | |
| // allocate a LockMap for it.
 | |
| void PessimisticTransactionDB::AddColumnFamily(
 | |
|     const ColumnFamilyHandle* handle) {
 | |
|   lock_manager_->AddColumnFamily(handle);
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::CreateColumnFamily(
 | |
|     const ColumnFamilyOptions& options, const std::string& column_family_name,
 | |
|     ColumnFamilyHandle** handle) {
 | |
|   InstrumentedMutexLock l(&column_family_mutex_);
 | |
|   Status s = VerifyCFOptions(options);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   s = db_->CreateColumnFamily(options, column_family_name, handle);
 | |
|   if (s.ok()) {
 | |
|     lock_manager_->AddColumnFamily(*handle);
 | |
|     UpdateCFComparatorMap(*handle);
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::CreateColumnFamilies(
 | |
|     const ColumnFamilyOptions& options,
 | |
|     const std::vector<std::string>& column_family_names,
 | |
|     std::vector<ColumnFamilyHandle*>* handles) {
 | |
|   InstrumentedMutexLock l(&column_family_mutex_);
 | |
| 
 | |
|   Status s = VerifyCFOptions(options);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   s = db_->CreateColumnFamilies(options, column_family_names, handles);
 | |
|   if (s.ok()) {
 | |
|     for (auto* handle : *handles) {
 | |
|       lock_manager_->AddColumnFamily(handle);
 | |
|       UpdateCFComparatorMap(handle);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::CreateColumnFamilies(
 | |
|     const std::vector<ColumnFamilyDescriptor>& column_families,
 | |
|     std::vector<ColumnFamilyHandle*>* handles) {
 | |
|   InstrumentedMutexLock l(&column_family_mutex_);
 | |
| 
 | |
|   for (auto& cf_desc : column_families) {
 | |
|     Status s = VerifyCFOptions(cf_desc.options);
 | |
|     if (!s.ok()) {
 | |
|       return s;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   Status s = db_->CreateColumnFamilies(column_families, handles);
 | |
|   if (s.ok()) {
 | |
|     for (auto* handle : *handles) {
 | |
|       lock_manager_->AddColumnFamily(handle);
 | |
|       UpdateCFComparatorMap(handle);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| // Let LockManager know that it can deallocate the LockMap for this
 | |
| // column family.
 | |
| Status PessimisticTransactionDB::DropColumnFamily(
 | |
|     ColumnFamilyHandle* column_family) {
 | |
|   InstrumentedMutexLock l(&column_family_mutex_);
 | |
| 
 | |
|   Status s = db_->DropColumnFamily(column_family);
 | |
|   if (s.ok()) {
 | |
|     lock_manager_->RemoveColumnFamily(column_family);
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::DropColumnFamilies(
 | |
|     const std::vector<ColumnFamilyHandle*>& column_families) {
 | |
|   InstrumentedMutexLock l(&column_family_mutex_);
 | |
| 
 | |
|   Status s = db_->DropColumnFamilies(column_families);
 | |
|   if (s.ok()) {
 | |
|     for (auto* handle : column_families) {
 | |
|       lock_manager_->RemoveColumnFamily(handle);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
 | |
|                                          uint32_t cfh_id,
 | |
|                                          const std::string& key,
 | |
|                                          bool exclusive) {
 | |
|   return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
 | |
|                                               uint32_t cfh_id,
 | |
|                                               const Endpoint& start_endp,
 | |
|                                               const Endpoint& end_endp) {
 | |
|   return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
 | |
|                                 /*exclusive=*/true);
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
 | |
|                                       const LockTracker& keys) {
 | |
|   lock_manager_->UnLock(txn, keys, GetEnv());
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
 | |
|                                       uint32_t cfh_id, const std::string& key) {
 | |
|   lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
 | |
| }
 | |
| 
 | |
| // Used when wrapping DB write operations in a transaction
 | |
| Transaction* PessimisticTransactionDB::BeginInternalTransaction(
 | |
|     const WriteOptions& options) {
 | |
|   TransactionOptions txn_options;
 | |
|   Transaction* txn = BeginTransaction(options, txn_options, nullptr);
 | |
| 
 | |
|   // Use default timeout for non-transactional writes
 | |
|   txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
 | |
|   return txn;
 | |
| }
 | |
| 
 | |
| // All user Put, Merge, Delete, and Write requests must be intercepted to make
 | |
| // sure that they lock all keys that they are writing to avoid causing conflicts
 | |
| // with any concurrent transactions. The easiest way to do this is to wrap all
 | |
| // write operations in a transaction.
 | |
| //
 | |
| // Put(), Merge(), and Delete() only lock a single key per call.  Write() will
 | |
| // sort its keys before locking them.  This guarantees that TransactionDB write
 | |
| // methods cannot deadlock with each other (but still could deadlock with a
 | |
| // Transaction).
 | |
| Status PessimisticTransactionDB::Put(const WriteOptions& options,
 | |
|                                      ColumnFamilyHandle* column_family,
 | |
|                                      const Slice& key, const Slice& val) {
 | |
|   Status s = FailIfCfEnablesTs(this, column_family);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   Transaction* txn = BeginInternalTransaction(options);
 | |
|   txn->DisableIndexing();
 | |
| 
 | |
|   // Since the client didn't create a transaction, they don't care about
 | |
|   // conflict checking for this write.  So we just need to do PutUntracked().
 | |
|   s = txn->PutUntracked(column_family, key, val);
 | |
| 
 | |
|   if (s.ok()) {
 | |
|     s = txn->Commit();
 | |
|   }
 | |
| 
 | |
|   delete txn;
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
 | |
|                                         ColumnFamilyHandle* column_family,
 | |
|                                         const Slice& key) {
 | |
|   Status s = FailIfCfEnablesTs(this, column_family);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   Transaction* txn = BeginInternalTransaction(wopts);
 | |
|   txn->DisableIndexing();
 | |
| 
 | |
|   // Since the client didn't create a transaction, they don't care about
 | |
|   // conflict checking for this write.  So we just need to do
 | |
|   // DeleteUntracked().
 | |
|   s = txn->DeleteUntracked(column_family, key);
 | |
| 
 | |
|   if (s.ok()) {
 | |
|     s = txn->Commit();
 | |
|   }
 | |
| 
 | |
|   delete txn;
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
 | |
|                                               ColumnFamilyHandle* column_family,
 | |
|                                               const Slice& key) {
 | |
|   Status s = FailIfCfEnablesTs(this, column_family);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   Transaction* txn = BeginInternalTransaction(wopts);
 | |
|   txn->DisableIndexing();
 | |
| 
 | |
|   // Since the client didn't create a transaction, they don't care about
 | |
|   // conflict checking for this write.  So we just need to do
 | |
|   // SingleDeleteUntracked().
 | |
|   s = txn->SingleDeleteUntracked(column_family, key);
 | |
| 
 | |
|   if (s.ok()) {
 | |
|     s = txn->Commit();
 | |
|   }
 | |
| 
 | |
|   delete txn;
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::Merge(const WriteOptions& options,
 | |
|                                        ColumnFamilyHandle* column_family,
 | |
|                                        const Slice& key, const Slice& value) {
 | |
|   Status s = FailIfCfEnablesTs(this, column_family);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
| 
 | |
|   Transaction* txn = BeginInternalTransaction(options);
 | |
|   txn->DisableIndexing();
 | |
| 
 | |
|   // Since the client didn't create a transaction, they don't care about
 | |
|   // conflict checking for this write.  So we just need to do
 | |
|   // MergeUntracked().
 | |
|   s = txn->MergeUntracked(column_family, key, value);
 | |
| 
 | |
|   if (s.ok()) {
 | |
|     s = txn->Commit();
 | |
|   }
 | |
| 
 | |
|   delete txn;
 | |
| 
 | |
|   return s;
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::Write(const WriteOptions& opts,
 | |
|                                        WriteBatch* updates) {
 | |
|   return WriteWithConcurrencyControl(opts, updates);
 | |
| }
 | |
| 
 | |
| Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
 | |
|                                   WriteBatch* updates) {
 | |
|   Status s = FailIfBatchHasTs(updates);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
|   if (txn_db_options_.skip_concurrency_control) {
 | |
|     return db_impl_->Write(opts, updates);
 | |
|   } else {
 | |
|     return WriteWithConcurrencyControl(opts, updates);
 | |
|   }
 | |
| }
 | |
| 
 | |
| Status WriteCommittedTxnDB::Write(
 | |
|     const WriteOptions& opts,
 | |
|     const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
 | |
|   Status s = FailIfBatchHasTs(updates);
 | |
|   if (!s.ok()) {
 | |
|     return s;
 | |
|   }
 | |
|   if (optimizations.skip_concurrency_control) {
 | |
|     return db_impl_->Write(opts, updates);
 | |
|   } else {
 | |
|     return WriteWithConcurrencyControl(opts, updates);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::InsertExpirableTransaction(
 | |
|     TransactionID tx_id, PessimisticTransaction* tx) {
 | |
|   assert(tx->GetExpirationTime() > 0);
 | |
|   std::lock_guard<std::mutex> lock(map_mutex_);
 | |
|   expirable_transactions_map_.insert({tx_id, tx});
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
 | |
|   std::lock_guard<std::mutex> lock(map_mutex_);
 | |
|   expirable_transactions_map_.erase(tx_id);
 | |
| }
 | |
| 
 | |
| bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
 | |
|     TransactionID tx_id) {
 | |
|   std::lock_guard<std::mutex> lock(map_mutex_);
 | |
| 
 | |
|   auto tx_it = expirable_transactions_map_.find(tx_id);
 | |
|   if (tx_it == expirable_transactions_map_.end()) {
 | |
|     return true;
 | |
|   }
 | |
|   PessimisticTransaction& tx = *(tx_it->second);
 | |
|   return tx.TryStealingLocks();
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::ReinitializeTransaction(
 | |
|     Transaction* txn, const WriteOptions& write_options,
 | |
|     const TransactionOptions& txn_options) {
 | |
|   auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
 | |
| 
 | |
|   txn_impl->Reinitialize(this, write_options, txn_options);
 | |
| }
 | |
| 
 | |
| Transaction* PessimisticTransactionDB::GetTransactionByName(
 | |
|     const TransactionName& name) {
 | |
|   std::lock_guard<std::mutex> lock(name_map_mutex_);
 | |
|   auto it = transactions_.find(name);
 | |
|   if (it == transactions_.end()) {
 | |
|     return nullptr;
 | |
|   } else {
 | |
|     return it->second;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::GetAllPreparedTransactions(
 | |
|     std::vector<Transaction*>* transv) {
 | |
|   assert(transv);
 | |
|   transv->clear();
 | |
|   std::lock_guard<std::mutex> lock(name_map_mutex_);
 | |
|   for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
 | |
|     if (it->second->GetState() == Transaction::PREPARED) {
 | |
|       transv->push_back(it->second);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
 | |
|   return lock_manager_->GetPointLockStatus();
 | |
| }
 | |
| 
 | |
| std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
 | |
|   return lock_manager_->GetDeadlockInfoBuffer();
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
 | |
|   lock_manager_->Resize(target_size);
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
 | |
|   assert(txn);
 | |
|   assert(txn->GetName().length() > 0);
 | |
|   assert(GetTransactionByName(txn->GetName()) == nullptr);
 | |
|   assert(txn->GetState() == Transaction::STARTED);
 | |
|   std::lock_guard<std::mutex> lock(name_map_mutex_);
 | |
|   transactions_[txn->GetName()] = txn;
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
 | |
|   assert(txn);
 | |
|   std::lock_guard<std::mutex> lock(name_map_mutex_);
 | |
|   auto it = transactions_.find(txn->GetName());
 | |
|   assert(it != transactions_.end());
 | |
|   transactions_.erase(it);
 | |
| }
 | |
| 
 | |
| std::pair<Status, std::shared_ptr<const Snapshot>>
 | |
| PessimisticTransactionDB::CreateTimestampedSnapshot(TxnTimestamp ts) {
 | |
|   if (kMaxTxnTimestamp == ts) {
 | |
|     return std::make_pair(Status::InvalidArgument("invalid ts"), nullptr);
 | |
|   }
 | |
|   assert(db_impl_);
 | |
|   return db_impl_->CreateTimestampedSnapshot(kMaxSequenceNumber, ts);
 | |
| }
 | |
| 
 | |
| std::shared_ptr<const Snapshot>
 | |
| PessimisticTransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const {
 | |
|   assert(db_impl_);
 | |
|   return db_impl_->GetTimestampedSnapshot(ts);
 | |
| }
 | |
| 
 | |
| void PessimisticTransactionDB::ReleaseTimestampedSnapshotsOlderThan(
 | |
|     TxnTimestamp ts) {
 | |
|   assert(db_impl_);
 | |
|   db_impl_->ReleaseTimestampedSnapshotsOlderThan(ts);
 | |
| }
 | |
| 
 | |
| Status PessimisticTransactionDB::GetTimestampedSnapshots(
 | |
|     TxnTimestamp ts_lb, TxnTimestamp ts_ub,
 | |
|     std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const {
 | |
|   assert(db_impl_);
 | |
|   return db_impl_->GetTimestampedSnapshots(ts_lb, ts_ub, timestamped_snapshots);
 | |
| }
 | |
| 
 | |
| Status SnapshotCreationCallback::operator()(SequenceNumber seq,
 | |
|                                             bool disable_memtable) {
 | |
|   assert(db_impl_);
 | |
|   assert(commit_ts_ != kMaxTxnTimestamp);
 | |
| 
 | |
|   const bool two_write_queues =
 | |
|       db_impl_->immutable_db_options().two_write_queues;
 | |
|   assert(!two_write_queues || !disable_memtable);
 | |
| #ifdef NDEBUG
 | |
|   (void)two_write_queues;
 | |
|   (void)disable_memtable;
 | |
| #endif
 | |
| 
 | |
|   const bool seq_per_batch = db_impl_->seq_per_batch();
 | |
|   if (!seq_per_batch) {
 | |
|     assert(db_impl_->GetLastPublishedSequence() <= seq);
 | |
|   } else {
 | |
|     assert(db_impl_->GetLastPublishedSequence() < seq);
 | |
|   }
 | |
| 
 | |
|   // Create a snapshot which can also be used for write conflict checking.
 | |
|   auto ret = db_impl_->CreateTimestampedSnapshot(seq, commit_ts_);
 | |
|   snapshot_creation_status_ = ret.first;
 | |
|   snapshot_ = ret.second;
 | |
|   if (snapshot_creation_status_.ok()) {
 | |
|     assert(snapshot_);
 | |
|   } else {
 | |
|     assert(!snapshot_);
 | |
|   }
 | |
|   if (snapshot_ && snapshot_notifier_) {
 | |
|     snapshot_notifier_->SnapshotCreated(snapshot_.get());
 | |
|   }
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| }  // namespace ROCKSDB_NAMESPACE
 | |
| 
 |