diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d1408349..b3b7a9c5b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -219,6 +219,7 @@ set(SOURCES utilities/redis/redis_lists.cc utilities/spatialdb/spatial_db.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc + utilities/transactions/optimistic_transaction_base.cc utilities/transactions/optimistic_transaction_impl.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/transaction_impl.cc diff --git a/src.mk b/src.mk index 8a6c4dc7f..6bb99f287 100644 --- a/src.mk +++ b/src.mk @@ -118,6 +118,7 @@ LIB_SOURCES = \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/transactions/optimistic_transaction_impl.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ + utilities/transactions/transaction_base.cc \ utilities/transactions/transaction_db_impl.cc \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_impl.cc \ diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index f3f184d3e..a75732290 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -27,11 +27,7 @@ struct WriteOptions; OptimisticTransactionImpl::OptimisticTransactionImpl( OptimisticTransactionDB* txn_db, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) - : txn_db_(txn_db), - db_(txn_db->GetBaseDB()), - write_options_(write_options), - cmp_(txn_options.cmp), - write_batch_(new WriteBatchWithIndex(txn_options.cmp, 0, true)) { + : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) { if (txn_options.set_snapshot) { SetSnapshot(); } @@ -46,10 +42,6 @@ void OptimisticTransactionImpl::Cleanup() { write_batch_->Clear(); } -void OptimisticTransactionImpl::SetSnapshot() { - snapshot_.reset(new ManagedSnapshot(db_)); -} - Status OptimisticTransactionImpl::Commit() { // Set up callback which will call CheckTransactionForConflicts() to // check whether this transaction is safe to be committed. @@ -77,34 +69,12 @@ void OptimisticTransactionImpl::Rollback() { Cleanup(); } -void OptimisticTransactionImpl::SetSavePoint() { - if (save_points_ == nullptr) { - save_points_.reset(new std::stack>()); - } - save_points_->push(snapshot_); - write_batch_->SetSavePoint(); -} - -Status OptimisticTransactionImpl::RollbackToSavePoint() { - if (save_points_ != nullptr && save_points_->size() > 0) { - // Restore saved snapshot - snapshot_ = save_points_->top(); - save_points_->pop(); - - // Rollback batch - Status s = write_batch_->RollbackToSavePoint(); - assert(s.ok()); - - return s; - } else { - assert(write_batch_->RollbackToSavePoint().IsNotFound()); - return Status::NotFound(); - } -} - // Record this key so that we can check it for conflicts at commit time. -void OptimisticTransactionImpl::RecordOperation( - ColumnFamilyHandle* column_family, const Slice& key) { +Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, + const Slice& key, bool untracked) { + if (untracked) { + return Status::OK(); + } uint32_t cfh_id = GetColumnFamilyID(column_family); SequenceNumber seq; @@ -128,190 +98,11 @@ void OptimisticTransactionImpl::RecordOperation( tracked_keys_[cfh_id][key_str] = seq; } } -} - -void OptimisticTransactionImpl::RecordOperation( - ColumnFamilyHandle* column_family, const SliceParts& key) { - size_t key_size = 0; - for (int i = 0; i < key.num_parts; ++i) { - key_size += key.parts[i].size(); - } - - std::string str; - str.reserve(key_size); - - for (int i = 0; i < key.num_parts; ++i) { - str.append(key.parts[i].data(), key.parts[i].size()); - } - - RecordOperation(column_family, str); -} - -Status OptimisticTransactionImpl::Get(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key, - value); -} - -Status OptimisticTransactionImpl::GetForUpdate( - const ReadOptions& read_options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - // Regardless of whether the Get succeeded, track this key. - RecordOperation(column_family, key); - - if (value == nullptr) { - return Status::OK(); - } else { - return Get(read_options, column_family, key, value); - } -} - -std::vector OptimisticTransactionImpl::MultiGet( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - // Regardless of whether the MultiGet succeeded, track these keys. - size_t num_keys = keys.size(); - values->resize(num_keys); - - // TODO(agiardullo): optimize multiget? - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -std::vector OptimisticTransactionImpl::MultiGetForUpdate( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - // Regardless of whether the MultiGet succeeded, track these keys. - size_t num_keys = keys.size(); - values->resize(num_keys); - - // TODO(agiardullo): optimize multiget? - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - // Regardless of whether the Get succeeded, track this key. - RecordOperation(column_family[i], keys[i]); - - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -Iterator* OptimisticTransactionImpl::GetIterator( - const ReadOptions& read_options) { - Iterator* db_iter = db_->NewIterator(read_options); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(db_iter); -} - -Iterator* OptimisticTransactionImpl::GetIterator( - const ReadOptions& read_options, ColumnFamilyHandle* column_family) { - Iterator* db_iter = db_->NewIterator(read_options, column_family); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(column_family, db_iter); -} - -Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - RecordOperation(column_family, key); - - write_batch_->Put(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family, - const SliceParts& key, - const SliceParts& value) { - RecordOperation(column_family, key); - - write_batch_->Put(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::Merge(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - RecordOperation(column_family, key); - - write_batch_->Merge(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { - RecordOperation(column_family, key); - - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - RecordOperation(column_family, key); - - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::PutUntracked( - ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - write_batch_->Put(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::PutUntracked( - ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) { - write_batch_->Put(column_family, key, value); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::MergeUntracked( - ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - write_batch_->Merge(column_family, key, value); + // Always return OK. Confilct checking will happen at commit time. return Status::OK(); } -Status OptimisticTransactionImpl::DeleteUntracked( - ColumnFamilyHandle* column_family, const Slice& key) { - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -Status OptimisticTransactionImpl::DeleteUntracked( - ColumnFamilyHandle* column_family, const SliceParts& key) { - write_batch_->Delete(column_family, key); - - return Status::OK(); -} - -void OptimisticTransactionImpl::PutLogData(const Slice& blob) { - write_batch_->PutLogData(blob); -} - -WriteBatchWithIndex* OptimisticTransactionImpl::GetWriteBatch() { - return write_batch_.get(); -} - // Returns OK if it is safe to commit this transaction. Returns Status::Busy // if there are read or write conflicts that would prevent us from committing OR // if we can not determine whether there would be any such conflicts. diff --git a/utilities/transactions/optimistic_transaction_impl.h b/utilities/transactions/optimistic_transaction_impl.h index c8f84c387..abbeb1ab9 100644 --- a/utilities/transactions/optimistic_transaction_impl.h +++ b/utilities/transactions/optimistic_transaction_impl.h @@ -21,11 +21,12 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_util.h" namespace rocksdb { -class OptimisticTransactionImpl : public Transaction { +class OptimisticTransactionImpl : public TransactionBaseImpl { public: OptimisticTransactionImpl(OptimisticTransactionDB* db, const WriteOptions& write_options, @@ -37,144 +38,21 @@ class OptimisticTransactionImpl : public Transaction { void Rollback() override; - void SetSavePoint() override; - - Status RollbackToSavePoint() override; - - Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) override; - - Status Get(const ReadOptions& options, const Slice& key, - std::string* value) override { - return Get(options, db_->DefaultColumnFamily(), key, value); - } - - Status GetForUpdate(const ReadOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; - - Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) override { - return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); - } - - std::vector MultiGet( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) override { - return MultiGet(options, std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - std::vector MultiGetForUpdate( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGetForUpdate( - const ReadOptions& options, const std::vector& keys, - std::vector* values) override { - return MultiGetForUpdate(options, - std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - Iterator* GetIterator(const ReadOptions& read_options) override; - Iterator* GetIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) override; - - Status Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Put(const Slice& key, const Slice& value) override { - return Put(nullptr, key, value); - } - - Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status Put(const SliceParts& key, const SliceParts& value) override { - return Put(nullptr, key, value); - } - - Status Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Merge(const Slice& key, const Slice& value) override { - return Merge(nullptr, key, value); - } - - Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; - Status Delete(const Slice& key) override { return Delete(nullptr, key); } - Status Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } - - Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status PutUntracked(const Slice& key, const Slice& value) override { - return PutUntracked(nullptr, key, value); - } - - Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status PutUntracked(const SliceParts& key, const SliceParts& value) override { - return PutUntracked(nullptr, key, value); - } - - Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status MergeUntracked(const Slice& key, const Slice& value) override { - return MergeUntracked(nullptr, key, value); - } - - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const Slice& key) override; - Status DeleteUntracked(const Slice& key) override { - return DeleteUntracked(nullptr, key); - } - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status DeleteUntracked(const SliceParts& key) override { - return DeleteUntracked(nullptr, key); - } - - void PutLogData(const Slice& blob) override; - const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; } - const Snapshot* GetSnapshot() const override { - return snapshot_ ? snapshot_->snapshot() : nullptr; - } - - void SetSnapshot() override; - - WriteBatchWithIndex* GetWriteBatch() override; - protected: - OptimisticTransactionDB* const txn_db_; - DB* db_; - const WriteOptions write_options_; - std::shared_ptr snapshot_; - const Comparator* cmp_; - std::unique_ptr write_batch_; + Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, + bool untracked = false) override; private: + OptimisticTransactionDB* const txn_db_; + // Map of Column Family IDs to keys and corresponding sequence numbers. // The sequence number stored for a key will be used during commit to make // sure this key has // not changed since this sequence number. TransactionKeyMap tracked_keys_; - // Stack of the Snapshot saved at each save point. Saved snapshots may be - // nullptr if there was no snapshot at the time SetSavePoint() was called. - std::unique_ptr>> save_points_; - friend class OptimisticTransactionCallback; // Returns OK if it is safe to commit this transaction. Returns Status::Busy @@ -184,10 +62,6 @@ class OptimisticTransactionImpl : public Transaction { // Should only be called on writer thread. Status CheckTransactionForConflicts(DB* db); - void RecordOperation(ColumnFamilyHandle* column_family, const Slice& key); - void RecordOperation(ColumnFamilyHandle* column_family, - const SliceParts& key); - void Cleanup(); // No copying allowed diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc new file mode 100644 index 000000000..88b8c3fd4 --- /dev/null +++ b/utilities/transactions/transaction_base.cc @@ -0,0 +1,278 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "utilities/transactions/transaction_base.h" + +#include "db/column_family.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/status.h" +#include "util/string_util.h" + +namespace rocksdb { + +TransactionBaseImpl::TransactionBaseImpl(DB* db, + const WriteOptions& write_options) + : db_(db), + write_options_(write_options), + cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), + write_batch_(new WriteBatchWithIndex(cmp_, 0, true)), + start_time_(db_->GetEnv()->NowMicros()) {} + +TransactionBaseImpl::~TransactionBaseImpl() {} + +void TransactionBaseImpl::SetSnapshot() { + snapshot_.reset(new ManagedSnapshot(db_)); +} + +Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, + const SliceParts& key, bool untracked) { + size_t key_size = 0; + for (int i = 0; i < key.num_parts; ++i) { + key_size += key.parts[i].size(); + } + + std::string str; + str.reserve(key_size); + + for (int i = 0; i < key.num_parts; ++i) { + str.append(key.parts[i].data(), key.parts[i].size()); + } + + return TryLock(column_family, str, untracked); +} + +void TransactionBaseImpl::SetSavePoint() { + if (save_points_ == nullptr) { + save_points_.reset(new std::stack>()); + } + save_points_->push(snapshot_); + write_batch_->SetSavePoint(); +} + +Status TransactionBaseImpl::RollbackToSavePoint() { + if (save_points_ != nullptr && save_points_->size() > 0) { + // Restore saved snapshot + snapshot_ = save_points_->top(); + save_points_->pop(); + + // Rollback batch + Status s = write_batch_->RollbackToSavePoint(); + assert(s.ok()); + + return s; + } else { + assert(write_batch_->RollbackToSavePoint().IsNotFound()); + return Status::NotFound(); + } +} + +Status TransactionBaseImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, std::string* value) { + return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key, + value); +} + +Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, std::string* value) { + Status s = TryLock(column_family, key); + + if (s.ok() && value != nullptr) { + s = Get(read_options, column_family, key, value); + } + return s; +} + +std::vector TransactionBaseImpl::MultiGet( + const ReadOptions& read_options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) { + size_t num_keys = keys.size(); + values->resize(num_keys); + + std::vector stat_list(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + std::string* value = values ? &(*values)[i] : nullptr; + stat_list[i] = Get(read_options, column_family[i], keys[i], value); + } + + return stat_list; +} + +std::vector TransactionBaseImpl::MultiGetForUpdate( + const ReadOptions& read_options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) { + // Regardless of whether the MultiGet succeeded, track these keys. + size_t num_keys = keys.size(); + values->resize(num_keys); + + // Lock all keys + for (size_t i = 0; i < num_keys; ++i) { + Status s = TryLock(column_family[i], keys[i]); + if (!s.ok()) { + // Fail entire multiget if we cannot lock all keys + return std::vector(num_keys, s); + } + } + + // TODO(agiardullo): optimize multiget? + std::vector stat_list(num_keys); + for (size_t i = 0; i < num_keys; ++i) { + std::string* value = values ? &(*values)[i] : nullptr; + stat_list[i] = Get(read_options, column_family[i], keys[i], value); + } + + return stat_list; +} + +Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) { + Iterator* db_iter = db_->NewIterator(read_options); + assert(db_iter); + + return write_batch_->NewIteratorWithBase(db_iter); +} + +Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + Iterator* db_iter = db_->NewIterator(read_options, column_family); + assert(db_iter); + + return write_batch_->NewIteratorWithBase(column_family, db_iter); +} + +Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, + const SliceParts& value) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Merge(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, + const Slice& key) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + Status s = TryLock(column_family, key); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key, + const SliceParts& value) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Put(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, + const Slice& key, + const Slice& value) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Merge(column_family, key, value); + } + + return s; +} + +Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) { + bool untracked = true; + Status s = TryLock(column_family, key, untracked); + + if (s.ok()) { + write_batch_->Delete(column_family, key); + } + + return s; +} + +void TransactionBaseImpl::PutLogData(const Slice& blob) { + write_batch_->PutLogData(blob); +} + +WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() { + return write_batch_.get(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h new file mode 100644 index 000000000..9cd324e57 --- /dev/null +++ b/utilities/transactions/transaction_base.h @@ -0,0 +1,183 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "rocksdb/snapshot.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/write_batch_with_index.h" + +namespace rocksdb { + +class TransactionBaseImpl : public Transaction { + public: + TransactionBaseImpl(DB* db, const WriteOptions& write_options); + + virtual ~TransactionBaseImpl(); + + // Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock + // returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed. + // untracked will be true if called from PutUntracked, DeleteUntracked, or + // MergeUntracked. + virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, + bool untracked = false) = 0; + + void SetSavePoint() override; + + Status RollbackToSavePoint() override; + + Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, std::string* value) override; + + Status Get(const ReadOptions& options, const Slice& key, + std::string* value) override { + return Get(options, db_->DefaultColumnFamily(), key, value); + } + + Status GetForUpdate(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) override; + + Status GetForUpdate(const ReadOptions& options, const Slice& key, + std::string* value) override { + return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); + } + + std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override; + + std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) override { + return MultiGet(options, std::vector( + keys.size(), db_->DefaultColumnFamily()), + keys, values); + } + + std::vector MultiGetForUpdate( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override; + + std::vector MultiGetForUpdate( + const ReadOptions& options, const std::vector& keys, + std::vector* values) override { + return MultiGetForUpdate(options, + std::vector( + keys.size(), db_->DefaultColumnFamily()), + keys, values); + } + + Iterator* GetIterator(const ReadOptions& read_options) override; + Iterator* GetIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) override; + + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status Put(const Slice& key, const Slice& value) override { + return Put(nullptr, key, value); + } + + Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + Status Put(const SliceParts& key, const SliceParts& value) override { + return Put(nullptr, key, value); + } + + Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status Merge(const Slice& key, const Slice& value) override { + return Merge(nullptr, key, value); + } + + Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + Status Delete(const Slice& key) override { return Delete(nullptr, key); } + Status Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } + + Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status PutUntracked(const Slice& key, const Slice& value) override { + return PutUntracked(nullptr, key, value); + } + + Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + Status PutUntracked(const SliceParts& key, const SliceParts& value) override { + return PutUntracked(nullptr, key, value); + } + + Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status MergeUntracked(const Slice& key, const Slice& value) override { + return MergeUntracked(nullptr, key, value); + } + + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status DeleteUntracked(const Slice& key) override { + return DeleteUntracked(nullptr, key); + } + Status DeleteUntracked(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + Status DeleteUntracked(const SliceParts& key) override { + return DeleteUntracked(nullptr, key); + } + + void PutLogData(const Slice& blob) override; + + WriteBatchWithIndex* GetWriteBatch() override; + + const Snapshot* GetSnapshot() const override { + return snapshot_ ? snapshot_->snapshot() : nullptr; + } + + void SetSnapshot() override; + + protected: + DB* const db_; + + const WriteOptions write_options_; + + const Comparator* cmp_; + + // Records writes pending in this transaction + std::unique_ptr write_batch_; + + // Stores that time the txn was constructed, in microseconds. + const uint64_t start_time_; + + // Stores the current snapshot that was was set by SetSnapshot or null if + // no snapshot is currently set. + std::shared_ptr snapshot_; + + // Stack of the Snapshot saved at each save point. Saved snapshots may be + // nullptr if there was no snapshot at the time SetSavePoint() was called. + std::unique_ptr>> save_points_; + + private: + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, + bool untracked = false); +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index d4a197e2a..cfaf0b2ac 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -36,18 +36,13 @@ TransactionID TransactionImpl::GenTxnID() { TransactionImpl::TransactionImpl(TransactionDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : db_(txn_db), + : TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_impl_(nullptr), txn_id_(GenTxnID()), - write_options_(write_options), - cmp_(GetColumnFamilyUserComparator(txn_db->DefaultColumnFamily())), - write_batch_(new WriteBatchWithIndex(cmp_, 0, true)), - start_time_( - txn_options.expiration >= 0 ? db_->GetEnv()->NowMicros() / 1000 : 0), expiration_time_(txn_options.expiration >= 0 - ? start_time_ + txn_options.expiration - : 0), - lock_timeout_(txn_options.lock_timeout) { + ? start_time_ / 1000 + txn_options.expiration + : 0), + lock_timeout_(txn_options.lock_timeout) { txn_db_impl_ = dynamic_cast(txn_db); assert(txn_db_impl_); @@ -65,10 +60,6 @@ TransactionImpl::~TransactionImpl() { txn_db_impl_->UnLock(this, &tracked_keys_); } -void TransactionImpl::SetSnapshot() { - snapshot_.reset(new ManagedSnapshot(db_)); -} - void TransactionImpl::Cleanup() { write_batch_->Clear(); txn_db_impl_->UnLock(this, &tracked_keys_); @@ -112,10 +103,6 @@ Status TransactionImpl::Commit() { Status TransactionImpl::DoCommit(WriteBatch* batch) { Status s; - // Do write directly on base db as TransctionDB::Write() would attempt to - // do conflict checking that we've already done. - DB* db = db_->GetBaseDB(); - if (expiration_time_ > 0) { // We cannot commit a transaction that is expired as its locks might have // been released. @@ -123,11 +110,14 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) { // expiration time once we're on the writer thread. TransactionCallback callback(this); - assert(dynamic_cast(db) != nullptr); - auto db_impl = reinterpret_cast(db); + // Do write directly on base db as TransctionDB::Write() would attempt to + // do conflict checking that we've already done. + assert(dynamic_cast(db_) != nullptr); + auto db_impl = reinterpret_cast(db_); + s = db_impl->WriteWithCallback(write_options_, batch, &callback); } else { - s = db->Write(write_options_, batch); + s = db_->Write(write_options_, batch); } return s; @@ -135,31 +125,6 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) { void TransactionImpl::Rollback() { Cleanup(); } -void TransactionImpl::SetSavePoint() { - if (save_points_ == nullptr) { - save_points_.reset(new std::stack>()); - } - save_points_->push(snapshot_); - write_batch_->SetSavePoint(); -} - -Status TransactionImpl::RollbackToSavePoint() { - if (save_points_ != nullptr && save_points_->size() > 0) { - // Restore saved snapshot - snapshot_ = save_points_->top(); - save_points_->pop(); - - // Rollback batch - Status s = write_batch_->RollbackToSavePoint(); - assert(s.ok()); - - return s; - } else { - assert(write_batch_->RollbackToSavePoint().IsNotFound()); - return Status::NotFound(); - } -} - // Lock all keys in this batch. // On success, caller should unlock keys_to_unlock Status TransactionImpl::LockBatch(WriteBatch* batch, @@ -234,35 +199,26 @@ Status TransactionImpl::LockBatch(WriteBatch* batch, return s; } -Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, - const SliceParts& key, bool check_snapshot) { - size_t key_size = 0; - for (int i = 0; i < key.num_parts; ++i) { - key_size += key.parts[i].size(); - } - - std::string str; - str.reserve(key_size); - - for (int i = 0; i < key.num_parts; ++i) { - str.append(key.parts[i].data(), key.parts[i].size()); - } - - return TryLock(column_family, str, check_snapshot); -} - // Attempt to lock this key. // Returns OK if the key has been successfully locked. Non-ok, otherwise. // If check_shapshot is true and this transaction has a snapshot set, // this key will only be locked if there have been no writes to this key since // the snapshot time. Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family, - const Slice& key, bool check_snapshot) { + const Slice& key, bool untracked) { uint32_t cfh_id = GetColumnFamilyID(column_family); std::string key_str = key.ToString(); bool previously_locked; Status s; + // Even though we do not care about doing conflict checking for this write, + // we still need to take a lock to make sure we do not cause a conflict with + // some other write. However, we do not need to check if there have been + // any writes since this transaction's snapshot. + // TODO(agiardullo): could optimize by supporting shared txn locks in the + // future + bool check_snapshot = !untracked; + // lock this key if this transactions hasn't already locked it auto iter = tracked_keys_[cfh_id].find(key_str); if (iter == tracked_keys_[cfh_id].end()) { @@ -327,8 +283,8 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key) { Status result; if (snapshot_ != nullptr) { - assert(dynamic_cast(db_->GetBaseDB()) != nullptr); - auto db_impl = reinterpret_cast(db_->GetBaseDB()); + assert(dynamic_cast(db_) != nullptr); + auto db_impl = reinterpret_cast(db_); ColumnFamilyHandle* cfh = column_family ? column_family : db_impl->DefaultColumnFamily(); @@ -341,213 +297,6 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family, return result; } -Status TransactionImpl::Get(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { - return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key, - value); -} - -Status TransactionImpl::GetForUpdate(const ReadOptions& read_options, - ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) { - Status s = TryLock(column_family, key); - - if (s.ok() && value != nullptr) { - s = Get(read_options, column_family, key, value); - } - return s; -} - -std::vector TransactionImpl::MultiGet( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - size_t num_keys = keys.size(); - values->resize(num_keys); - - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -std::vector TransactionImpl::MultiGetForUpdate( - const ReadOptions& read_options, - const std::vector& column_family, - const std::vector& keys, std::vector* values) { - // Regardless of whether the MultiGet succeeded, track these keys. - size_t num_keys = keys.size(); - values->resize(num_keys); - - // Lock all keys - for (size_t i = 0; i < num_keys; ++i) { - Status s = TryLock(column_family[i], keys[i]); - if (!s.ok()) { - // Fail entire multiget if we cannot lock all keys - return std::vector(num_keys, s); - } - } - - // TODO(agiardullo): optimize multiget? - std::vector stat_list(num_keys); - for (size_t i = 0; i < num_keys; ++i) { - std::string* value = values ? &(*values)[i] : nullptr; - stat_list[i] = Get(read_options, column_family[i], keys[i], value); - } - - return stat_list; -} - -Iterator* TransactionImpl::GetIterator(const ReadOptions& read_options) { - Iterator* db_iter = db_->NewIterator(read_options); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(db_iter); -} - -Iterator* TransactionImpl::GetIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) { - Iterator* db_iter = db_->NewIterator(read_options, column_family); - assert(db_iter); - - return write_batch_->NewIteratorWithBase(column_family, db_iter); -} - -Status TransactionImpl::Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::Put(ColumnFamilyHandle* column_family, - const SliceParts& key, const SliceParts& value) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::Merge(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Merge(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -Status TransactionImpl::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - Status s = TryLock(column_family, key); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - // Even though we do not care about doing conflict checking for this write, - // we still need to take a lock to make sure we do not cause a conflict with - // some other write. However, we do not need to check if there have been - // any writes since this transaction's snapshot. - bool check_snapshot = false; - - // TODO(agiardullo): could optimize by supporting shared txn locks in the - // future - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key, - const SliceParts& value) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Put(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::MergeUntracked(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Merge(column_family, key, value); - } - - return s; -} - -Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family, - const Slice& key) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key) { - bool check_snapshot = false; - Status s = TryLock(column_family, key, check_snapshot); - - if (s.ok()) { - write_batch_->Delete(column_family, key); - } - - return s; -} - -void TransactionImpl::PutLogData(const Slice& blob) { - write_batch_->PutLogData(blob); -} - -WriteBatchWithIndex* TransactionImpl::GetWriteBatch() { - return write_batch_.get(); -} - } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_impl.h b/utilities/transactions/transaction_impl.h index 06d5903e2..2d11ac0c0 100644 --- a/utilities/transactions/transaction_impl.h +++ b/utilities/transactions/transaction_impl.h @@ -22,6 +22,7 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/write_batch_with_index.h" +#include "utilities/transactions/transaction_base.h" #include "utilities/transactions/transaction_util.h" namespace rocksdb { @@ -30,7 +31,7 @@ using TransactionID = uint64_t; class TransactionDBImpl; -class TransactionImpl : public Transaction { +class TransactionImpl : public TransactionBaseImpl { public: TransactionImpl(TransactionDB* db, const WriteOptions& write_options, const TransactionOptions& txn_options); @@ -43,123 +44,6 @@ class TransactionImpl : public Transaction { void Rollback() override; - void SetSavePoint() override; - - Status RollbackToSavePoint() override; - - Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, std::string* value) override; - - Status Get(const ReadOptions& options, const Slice& key, - std::string* value) override { - return Get(options, db_->DefaultColumnFamily(), key, value); - } - - Status GetForUpdate(const ReadOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) override; - - Status GetForUpdate(const ReadOptions& options, const Slice& key, - std::string* value) override { - return GetForUpdate(options, db_->DefaultColumnFamily(), key, value); - } - - std::vector MultiGet( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) override { - return MultiGet(options, std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - std::vector MultiGetForUpdate( - const ReadOptions& options, - const std::vector& column_family, - const std::vector& keys, - std::vector* values) override; - - std::vector MultiGetForUpdate( - const ReadOptions& options, const std::vector& keys, - std::vector* values) override { - return MultiGetForUpdate(options, - std::vector( - keys.size(), db_->DefaultColumnFamily()), - keys, values); - } - - Iterator* GetIterator(const ReadOptions& read_options) override; - Iterator* GetIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) override; - - Status Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Put(const Slice& key, const Slice& value) override { - return Put(nullptr, key, value); - } - - Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status Put(const SliceParts& key, const SliceParts& value) override { - return Put(nullptr, key, value); - } - - Status Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status Merge(const Slice& key, const Slice& value) override { - return Merge(nullptr, key, value); - } - - Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; - Status Delete(const Slice& key) override { return Delete(nullptr, key); } - Status Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } - - Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status PutUntracked(const Slice& key, const Slice& value) override { - return PutUntracked(nullptr, key, value); - } - - Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - Status PutUntracked(const SliceParts& key, const SliceParts& value) override { - return PutUntracked(nullptr, key, value); - } - - Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - Status MergeUntracked(const Slice& key, const Slice& value) override { - return MergeUntracked(nullptr, key, value); - } - - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const Slice& key) override; - Status DeleteUntracked(const Slice& key) override { - return DeleteUntracked(nullptr, key); - } - Status DeleteUntracked(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - Status DeleteUntracked(const SliceParts& key) override { - return DeleteUntracked(nullptr, key); - } - - void PutLogData(const Slice& blob) override; - - const Snapshot* GetSnapshot() const override { - return snapshot_ ? snapshot_->snapshot() : nullptr; - } - - void SetSnapshot() override; - - WriteBatchWithIndex* GetWriteBatch() override; - // Generate a new unique transaction identifier static TransactionID GenTxnID(); @@ -178,9 +62,11 @@ class TransactionImpl : public Transaction { int64_t GetLockTimeout() const { return lock_timeout_; } void SetLockTimeout(int64_t timeout) { lock_timeout_ = timeout; } - private: - TransactionDB* const db_; + protected: + Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, + bool untracked = false) override; + private: TransactionDBImpl* txn_db_impl_; // Used to create unique ids for transactions. @@ -189,21 +75,6 @@ class TransactionImpl : public Transaction { // Unique ID for this transaction const TransactionID txn_id_; - const WriteOptions write_options_; - - // If snapshot_ is set, all keys that locked must also have not been written - // since this snapshot - std::shared_ptr snapshot_; - - const Comparator* cmp_; - - std::unique_ptr write_batch_; - - // If expiration_ is non-zero, start_time_ stores that time the txn was - // constructed, - // in milliseconds. - const uint64_t start_time_; - // If non-zero, this transaction should not be committed after this time (in // milliseconds) const uint64_t expiration_time_; @@ -217,14 +88,6 @@ class TransactionImpl : public Transaction { // stored. TransactionKeyMap tracked_keys_; - // Stack of the Snapshot saved at each save point. Saved snapshots may be - // nullptr if there was no snapshot at the time SetSavePoint() was called. - std::unique_ptr>> save_points_; - - Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, - bool check_snapshot = true); - Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, - bool check_snapshot = true); void Cleanup(); Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key);