// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE #include "utilities/transactions/transaction_db_impl.h" #include #include #include #include "db/db_impl.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" #include "utilities/transactions/transaction_db_mutex_impl.h" #include "utilities/transactions/transaction_impl.h" namespace rocksdb { TransactionDBImpl::TransactionDBImpl(DB* db, const TransactionDBOptions& txn_db_options) : TransactionDB(db), txn_db_options_(txn_db_options), lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks, txn_db_options_.custom_mutex_factory ? txn_db_options_.custom_mutex_factory : std::shared_ptr( new TransactionDBMutexFactoryImpl())) {} Transaction* TransactionDBImpl::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { if (old_txn != nullptr) { ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; } else { return new TransactionImpl(this, write_options, txn_options); } } TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions( const TransactionDBOptions& txn_db_options) { TransactionDBOptions validated = txn_db_options; if (txn_db_options.num_stripes == 0) { validated.num_stripes = 1; } return validated; } Status TransactionDB::Open(const Options& options, const TransactionDBOptions& txn_db_options, const std::string& dbname, TransactionDB** dbptr) { DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); std::vector handles; Status s = TransactionDB::Open(db_options, txn_db_options, dbname, column_families, &handles, dbptr); if (s.ok()) { assert(handles.size() == 1); // i can delete the handle since DBImpl is always holding a reference to // default column family delete handles[0]; } return s; } Status TransactionDB::Open( const DBOptions& db_options, const TransactionDBOptions& txn_db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, TransactionDB** dbptr) { Status s; DB* db; std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; // Enable MemTable History if not already enabled for (size_t i = 0; i < column_families_copy.size(); i++) { ColumnFamilyOptions* options = &column_families_copy[i].options; if (options->max_write_buffer_number_to_maintain == 0) { // Setting to -1 will set the History size to max_write_buffer_number. options->max_write_buffer_number_to_maintain = -1; } if (!options->disable_auto_compactions) { // Disable compactions momentarily to prevent race with DB::Open options->disable_auto_compactions = true; compaction_enabled_cf_indices.push_back(i); } } s = DB::Open(db_options, dbname, column_families_copy, handles, &db); if (s.ok()) { TransactionDBImpl* txn_db = new TransactionDBImpl( db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options)); *dbptr = txn_db; for (auto cf_ptr : *handles) { txn_db->AddColumnFamily(cf_ptr); } // Re-enable compaction for the column families that initially had // compaction enabled. assert(column_families_copy.size() == (*handles).size()); std::vector compaction_enabled_cf_handles; compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); for (auto index : compaction_enabled_cf_indices) { compaction_enabled_cf_handles.push_back((*handles)[index]); } s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles); } return s; } // Let TransactionLockMgr know that this column family exists so it can // allocate a LockMap for it. void TransactionDBImpl::AddColumnFamily(const ColumnFamilyHandle* handle) { lock_mgr_.AddColumnFamily(handle->GetID()); } Status TransactionDBImpl::CreateColumnFamily( const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle) { InstrumentedMutexLock l(&column_family_mutex_); Status s = db_->CreateColumnFamily(options, column_family_name, handle); if (s.ok()) { lock_mgr_.AddColumnFamily((*handle)->GetID()); } return s; } // Let TransactionLockMgr know that it can deallocate the LockMap for this // column family. Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { InstrumentedMutexLock l(&column_family_mutex_); Status s = db_->DropColumnFamily(column_family); if (s.ok()) { lock_mgr_.RemoveColumnFamily(column_family->GetID()); } return s; } Status TransactionDBImpl::TryLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key) { return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv()); } void TransactionDBImpl::UnLock(TransactionImpl* txn, const TransactionKeyMap* keys) { lock_mgr_.UnLock(txn, keys, GetEnv()); } void TransactionDBImpl::UnLock(TransactionImpl* txn, uint32_t cfh_id, const std::string& key) { lock_mgr_.UnLock(txn, cfh_id, key, GetEnv()); } // Used when wrapping DB write operations in a transaction Transaction* TransactionDBImpl::BeginInternalTransaction( const WriteOptions& options) { TransactionOptions txn_options; Transaction* txn = BeginTransaction(options, txn_options, nullptr); assert(dynamic_cast(txn) != nullptr); auto txn_impl = reinterpret_cast(txn); // Use default timeout for non-transactional writes txn_impl->SetLockTimeout(txn_db_options_.default_lock_timeout); return txn; } // All user Put, Merge, Delete, and Write requests must be intercepted to make // sure that they lock all keys that they are writing to avoid causing conflicts // with any concurent transactions. The easiest way to do this is to wrap all // write operations in a transaction. // // Put(), Merge(), and Delete() only lock a single key per call. Write() will // sort its keys before locking them. This guarantees that TransactionDB write // methods cannot deadlock with eachother (but still could deadlock with a // Transaction). Status TransactionDBImpl::Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { Status s; Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do PutUntracked(). s = txn->PutUntracked(column_family, key, val); if (s.ok()) { s = txn->Commit(); } delete txn; return s; } Status TransactionDBImpl::Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, const Slice& key) { Status s; Transaction* txn = BeginInternalTransaction(wopts); txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do // DeleteUntracked(). s = txn->DeleteUntracked(column_family, key); if (s.ok()) { s = txn->Commit(); } delete txn; return s; } Status TransactionDBImpl::Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { Status s; Transaction* txn = BeginInternalTransaction(options); txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do // MergeUntracked(). s = txn->MergeUntracked(column_family, key, value); if (s.ok()) { s = txn->Commit(); } delete txn; return s; } Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { // Need to lock all keys in this batch to prevent write conflicts with // concurrent transactions. Transaction* txn = BeginInternalTransaction(opts); txn->DisableIndexing(); assert(dynamic_cast(txn) != nullptr); auto txn_impl = reinterpret_cast(txn); // Since commitBatch sorts the keys before locking, concurrent Write() // operations will not cause a deadlock. // In order to avoid a deadlock with a concurrent Transaction, Transactions // should use a lock timeout. Status s = txn_impl->CommitBatch(updates); delete txn; return s; } void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id, TransactionImpl* tx) { assert(tx->GetExpirationTime() > 0); std::lock_guard lock(map_mutex_); expirable_transactions_map_.insert({tx_id, tx}); } void TransactionDBImpl::RemoveExpirableTransaction(TransactionID tx_id) { std::lock_guard lock(map_mutex_); expirable_transactions_map_.erase(tx_id); } bool TransactionDBImpl::TryStealingExpiredTransactionLocks( TransactionID tx_id) { std::lock_guard lock(map_mutex_); auto tx_it = expirable_transactions_map_.find(tx_id); if (tx_it == expirable_transactions_map_.end()) { return true; } TransactionImpl& tx = *(tx_it->second); return tx.TryStealingLocks(); } void TransactionDBImpl::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options) { assert(dynamic_cast(txn) != nullptr); auto txn_impl = reinterpret_cast(txn); txn_impl->Reinitialize(write_options, txn_options); } } // namespace rocksdb #endif // ROCKSDB_LITE