Common base class for transactions

Summary:
As I keep adding new features to transactions, I keep creating more duplicate code.  This diff cleans this up by creating a base implementation class for Transaction and OptimisticTransaction to inherit from.

The code in TransactionBase.h/.cc is all just copied from elsewhere.  The only entertaining part of this class worth looking at is the virtual TryLock method which allows OptimisticTransactions and Transactions to share the same common code for Put/Get/etc.

The rest of this diff is mostly red and easy on the eyes.

Test Plan: No functionality change.  existing tests pass.

Reviewers: sdong, jkedgar, rven, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D45135
main
agiardullo 10 years ago
parent 2050832974
commit 20d1e547d1
  1. 1
      CMakeLists.txt
  2. 1
      src.mk
  3. 223
      utilities/transactions/optimistic_transaction_impl.cc
  4. 138
      utilities/transactions/optimistic_transaction_impl.h
  5. 278
      utilities/transactions/transaction_base.cc
  6. 183
      utilities/transactions/transaction_base.h
  7. 293
      utilities/transactions/transaction_impl.cc
  8. 149
      utilities/transactions/transaction_impl.h

@ -219,6 +219,7 @@ set(SOURCES
utilities/redis/redis_lists.cc
utilities/spatialdb/spatial_db.cc
utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/transactions/optimistic_transaction_base.cc
utilities/transactions/optimistic_transaction_impl.cc
utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/transaction_impl.cc

@ -118,6 +118,7 @@ LIB_SOURCES = \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/transactions/optimistic_transaction_impl.cc \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/transaction_base.cc \
utilities/transactions/transaction_db_impl.cc \
utilities/transactions/transaction_lock_mgr.cc \
utilities/transactions/transaction_impl.cc \

@ -27,11 +27,7 @@ struct WriteOptions;
OptimisticTransactionImpl::OptimisticTransactionImpl(
OptimisticTransactionDB* txn_db, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options)
: txn_db_(txn_db),
db_(txn_db->GetBaseDB()),
write_options_(write_options),
cmp_(txn_options.cmp),
write_batch_(new WriteBatchWithIndex(txn_options.cmp, 0, true)) {
: TransactionBaseImpl(txn_db->GetBaseDB(), write_options), txn_db_(txn_db) {
if (txn_options.set_snapshot) {
SetSnapshot();
}
@ -46,10 +42,6 @@ void OptimisticTransactionImpl::Cleanup() {
write_batch_->Clear();
}
void OptimisticTransactionImpl::SetSnapshot() {
snapshot_.reset(new ManagedSnapshot(db_));
}
Status OptimisticTransactionImpl::Commit() {
// Set up callback which will call CheckTransactionForConflicts() to
// check whether this transaction is safe to be committed.
@ -77,34 +69,12 @@ void OptimisticTransactionImpl::Rollback() {
Cleanup();
}
void OptimisticTransactionImpl::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_.reset(new std::stack<std::shared_ptr<ManagedSnapshot>>());
}
save_points_->push(snapshot_);
write_batch_->SetSavePoint();
}
Status OptimisticTransactionImpl::RollbackToSavePoint() {
if (save_points_ != nullptr && save_points_->size() > 0) {
// Restore saved snapshot
snapshot_ = save_points_->top();
save_points_->pop();
// Rollback batch
Status s = write_batch_->RollbackToSavePoint();
assert(s.ok());
return s;
} else {
assert(write_batch_->RollbackToSavePoint().IsNotFound());
return Status::NotFound();
}
}
// Record this key so that we can check it for conflicts at commit time.
void OptimisticTransactionImpl::RecordOperation(
ColumnFamilyHandle* column_family, const Slice& key) {
Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool untracked) {
if (untracked) {
return Status::OK();
}
uint32_t cfh_id = GetColumnFamilyID(column_family);
SequenceNumber seq;
@ -128,190 +98,11 @@ void OptimisticTransactionImpl::RecordOperation(
tracked_keys_[cfh_id][key_str] = seq;
}
}
}
void OptimisticTransactionImpl::RecordOperation(
ColumnFamilyHandle* column_family, const SliceParts& key) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
}
std::string str;
str.reserve(key_size);
for (int i = 0; i < key.num_parts; ++i) {
str.append(key.parts[i].data(), key.parts[i].size());
}
RecordOperation(column_family, str);
}
Status OptimisticTransactionImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key,
value);
}
Status OptimisticTransactionImpl::GetForUpdate(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
// Regardless of whether the Get succeeded, track this key.
RecordOperation(column_family, key);
if (value == nullptr) {
return Status::OK();
} else {
return Get(read_options, column_family, key, value);
}
}
std::vector<Status> OptimisticTransactionImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Regardless of whether the MultiGet succeeded, track these keys.
size_t num_keys = keys.size();
values->resize(num_keys);
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
std::string* value = values ? &(*values)[i] : nullptr;
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
std::vector<Status> OptimisticTransactionImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Regardless of whether the MultiGet succeeded, track these keys.
size_t num_keys = keys.size();
values->resize(num_keys);
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
// Regardless of whether the Get succeeded, track this key.
RecordOperation(column_family[i], keys[i]);
std::string* value = values ? &(*values)[i] : nullptr;
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
Iterator* OptimisticTransactionImpl::GetIterator(
const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);
return write_batch_->NewIteratorWithBase(db_iter);
}
Iterator* OptimisticTransactionImpl::GetIterator(
const ReadOptions& read_options, ColumnFamilyHandle* column_family) {
Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter);
return write_batch_->NewIteratorWithBase(column_family, db_iter);
}
Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
RecordOperation(column_family, key);
write_batch_->Put(column_family, key, value);
return Status::OK();
}
Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
RecordOperation(column_family, key);
write_batch_->Put(column_family, key, value);
return Status::OK();
}
Status OptimisticTransactionImpl::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
RecordOperation(column_family, key);
write_batch_->Merge(column_family, key, value);
return Status::OK();
}
Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
RecordOperation(column_family, key);
write_batch_->Delete(column_family, key);
return Status::OK();
}
Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
RecordOperation(column_family, key);
write_batch_->Delete(column_family, key);
return Status::OK();
}
Status OptimisticTransactionImpl::PutUntracked(
ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
write_batch_->Put(column_family, key, value);
return Status::OK();
}
Status OptimisticTransactionImpl::PutUntracked(
ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) {
write_batch_->Put(column_family, key, value);
return Status::OK();
}
Status OptimisticTransactionImpl::MergeUntracked(
ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
write_batch_->Merge(column_family, key, value);
// Always return OK. Confilct checking will happen at commit time.
return Status::OK();
}
Status OptimisticTransactionImpl::DeleteUntracked(
ColumnFamilyHandle* column_family, const Slice& key) {
write_batch_->Delete(column_family, key);
return Status::OK();
}
Status OptimisticTransactionImpl::DeleteUntracked(
ColumnFamilyHandle* column_family, const SliceParts& key) {
write_batch_->Delete(column_family, key);
return Status::OK();
}
void OptimisticTransactionImpl::PutLogData(const Slice& blob) {
write_batch_->PutLogData(blob);
}
WriteBatchWithIndex* OptimisticTransactionImpl::GetWriteBatch() {
return write_batch_.get();
}
// Returns OK if it is safe to commit this transaction. Returns Status::Busy
// if there are read or write conflicts that would prevent us from committing OR
// if we can not determine whether there would be any such conflicts.

@ -21,11 +21,12 @@
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "utilities/transactions/transaction_base.h"
#include "utilities/transactions/transaction_util.h"
namespace rocksdb {
class OptimisticTransactionImpl : public Transaction {
class OptimisticTransactionImpl : public TransactionBaseImpl {
public:
OptimisticTransactionImpl(OptimisticTransactionDB* db,
const WriteOptions& write_options,
@ -37,144 +38,21 @@ class OptimisticTransactionImpl : public Transaction {
void Rollback() override;
void SetSavePoint() override;
Status RollbackToSavePoint() override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override {
return Get(options, db_->DefaultColumnFamily(), key, value);
}
Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value);
}
std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGet(options, std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options, const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGetForUpdate(options,
std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
Iterator* GetIterator(const ReadOptions& read_options) override;
Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) override;
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Put(const Slice& key, const Slice& value) override {
return Put(nullptr, key, value);
}
Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
Status Put(const SliceParts& key, const SliceParts& value) override {
return Put(nullptr, key, value);
}
Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Merge(const Slice& key, const Slice& value) override {
return Merge(nullptr, key, value);
}
Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
Status Delete(const Slice& key) override { return Delete(nullptr, key); }
Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status PutUntracked(const Slice& key, const Slice& value) override {
return PutUntracked(nullptr, key, value);
}
Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
Status PutUntracked(const SliceParts& key, const SliceParts& value) override {
return PutUntracked(nullptr, key, value);
}
Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status MergeUntracked(const Slice& key, const Slice& value) override {
return MergeUntracked(nullptr, key, value);
}
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) override;
Status DeleteUntracked(const Slice& key) override {
return DeleteUntracked(nullptr, key);
}
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status DeleteUntracked(const SliceParts& key) override {
return DeleteUntracked(nullptr, key);
}
void PutLogData(const Slice& blob) override;
const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; }
const Snapshot* GetSnapshot() const override {
return snapshot_ ? snapshot_->snapshot() : nullptr;
}
void SetSnapshot() override;
WriteBatchWithIndex* GetWriteBatch() override;
protected:
OptimisticTransactionDB* const txn_db_;
DB* db_;
const WriteOptions write_options_;
std::shared_ptr<ManagedSnapshot> snapshot_;
const Comparator* cmp_;
std::unique_ptr<WriteBatchWithIndex> write_batch_;
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool untracked = false) override;
private:
OptimisticTransactionDB* const txn_db_;
// Map of Column Family IDs to keys and corresponding sequence numbers.
// The sequence number stored for a key will be used during commit to make
// sure this key has
// not changed since this sequence number.
TransactionKeyMap tracked_keys_;
// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<std::shared_ptr<ManagedSnapshot>>> save_points_;
friend class OptimisticTransactionCallback;
// Returns OK if it is safe to commit this transaction. Returns Status::Busy
@ -184,10 +62,6 @@ class OptimisticTransactionImpl : public Transaction {
// Should only be called on writer thread.
Status CheckTransactionForConflicts(DB* db);
void RecordOperation(ColumnFamilyHandle* column_family, const Slice& key);
void RecordOperation(ColumnFamilyHandle* column_family,
const SliceParts& key);
void Cleanup();
// No copying allowed

@ -0,0 +1,278 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_base.h"
#include "db/column_family.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "util/string_util.h"
namespace rocksdb {
TransactionBaseImpl::TransactionBaseImpl(DB* db,
const WriteOptions& write_options)
: db_(db),
write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
write_batch_(new WriteBatchWithIndex(cmp_, 0, true)),
start_time_(db_->GetEnv()->NowMicros()) {}
TransactionBaseImpl::~TransactionBaseImpl() {}
void TransactionBaseImpl::SetSnapshot() {
snapshot_.reset(new ManagedSnapshot(db_));
}
Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
const SliceParts& key, bool untracked) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
}
std::string str;
str.reserve(key_size);
for (int i = 0; i < key.num_parts; ++i) {
str.append(key.parts[i].data(), key.parts[i].size());
}
return TryLock(column_family, str, untracked);
}
void TransactionBaseImpl::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_.reset(new std::stack<std::shared_ptr<ManagedSnapshot>>());
}
save_points_->push(snapshot_);
write_batch_->SetSavePoint();
}
Status TransactionBaseImpl::RollbackToSavePoint() {
if (save_points_ != nullptr && save_points_->size() > 0) {
// Restore saved snapshot
snapshot_ = save_points_->top();
save_points_->pop();
// Rollback batch
Status s = write_batch_->RollbackToSavePoint();
assert(s.ok());
return s;
} else {
assert(write_batch_->RollbackToSavePoint().IsNotFound());
return Status::NotFound();
}
}
Status TransactionBaseImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key,
value);
}
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
Status s = TryLock(column_family, key);
if (s.ok() && value != nullptr) {
s = Get(read_options, column_family, key, value);
}
return s;
}
std::vector<Status> TransactionBaseImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
size_t num_keys = keys.size();
values->resize(num_keys);
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
std::string* value = values ? &(*values)[i] : nullptr;
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
std::vector<Status> TransactionBaseImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Regardless of whether the MultiGet succeeded, track these keys.
size_t num_keys = keys.size();
values->resize(num_keys);
// Lock all keys
for (size_t i = 0; i < num_keys; ++i) {
Status s = TryLock(column_family[i], keys[i]);
if (!s.ok()) {
// Fail entire multiget if we cannot lock all keys
return std::vector<Status>(num_keys, s);
}
}
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
std::string* value = values ? &(*values)[i] : nullptr;
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);
return write_batch_->NewIteratorWithBase(db_iter);
}
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter);
return write_batch_->NewIteratorWithBase(column_family, db_iter);
}
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
}
return s;
}
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key,
const Slice& value) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
}
return s;
}
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) {
bool untracked = true;
Status s = TryLock(column_family, key, untracked);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
void TransactionBaseImpl::PutLogData(const Slice& blob) {
write_batch_->PutLogData(blob);
}
WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
return write_batch_.get();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,183 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <stack>
#include <string>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
namespace rocksdb {
class TransactionBaseImpl : public Transaction {
public:
TransactionBaseImpl(DB* db, const WriteOptions& write_options);
virtual ~TransactionBaseImpl();
// Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock
// returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
// untracked will be true if called from PutUntracked, DeleteUntracked, or
// MergeUntracked.
virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool untracked = false) = 0;
void SetSavePoint() override;
Status RollbackToSavePoint() override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override {
return Get(options, db_->DefaultColumnFamily(), key, value);
}
Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value);
}
std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGet(options, std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options, const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGetForUpdate(options,
std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
Iterator* GetIterator(const ReadOptions& read_options) override;
Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) override;
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Put(const Slice& key, const Slice& value) override {
return Put(nullptr, key, value);
}
Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
Status Put(const SliceParts& key, const SliceParts& value) override {
return Put(nullptr, key, value);
}
Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Merge(const Slice& key, const Slice& value) override {
return Merge(nullptr, key, value);
}
Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
Status Delete(const Slice& key) override { return Delete(nullptr, key); }
Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status PutUntracked(const Slice& key, const Slice& value) override {
return PutUntracked(nullptr, key, value);
}
Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
Status PutUntracked(const SliceParts& key, const SliceParts& value) override {
return PutUntracked(nullptr, key, value);
}
Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status MergeUntracked(const Slice& key, const Slice& value) override {
return MergeUntracked(nullptr, key, value);
}
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) override;
Status DeleteUntracked(const Slice& key) override {
return DeleteUntracked(nullptr, key);
}
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status DeleteUntracked(const SliceParts& key) override {
return DeleteUntracked(nullptr, key);
}
void PutLogData(const Slice& blob) override;
WriteBatchWithIndex* GetWriteBatch() override;
const Snapshot* GetSnapshot() const override {
return snapshot_ ? snapshot_->snapshot() : nullptr;
}
void SetSnapshot() override;
protected:
DB* const db_;
const WriteOptions write_options_;
const Comparator* cmp_;
// Records writes pending in this transaction
std::unique_ptr<WriteBatchWithIndex> write_batch_;
// Stores that time the txn was constructed, in microseconds.
const uint64_t start_time_;
// Stores the current snapshot that was was set by SetSnapshot or null if
// no snapshot is currently set.
std::shared_ptr<ManagedSnapshot> snapshot_;
// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<std::shared_ptr<ManagedSnapshot>>> save_points_;
private:
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool untracked = false);
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -36,18 +36,13 @@ TransactionID TransactionImpl::GenTxnID() {
TransactionImpl::TransactionImpl(TransactionDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: db_(txn_db),
: TransactionBaseImpl(txn_db->GetBaseDB(), write_options),
txn_db_impl_(nullptr),
txn_id_(GenTxnID()),
write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(txn_db->DefaultColumnFamily())),
write_batch_(new WriteBatchWithIndex(cmp_, 0, true)),
start_time_(
txn_options.expiration >= 0 ? db_->GetEnv()->NowMicros() / 1000 : 0),
expiration_time_(txn_options.expiration >= 0
? start_time_ + txn_options.expiration
: 0),
lock_timeout_(txn_options.lock_timeout) {
? start_time_ / 1000 + txn_options.expiration
: 0),
lock_timeout_(txn_options.lock_timeout) {
txn_db_impl_ = dynamic_cast<TransactionDBImpl*>(txn_db);
assert(txn_db_impl_);
@ -65,10 +60,6 @@ TransactionImpl::~TransactionImpl() {
txn_db_impl_->UnLock(this, &tracked_keys_);
}
void TransactionImpl::SetSnapshot() {
snapshot_.reset(new ManagedSnapshot(db_));
}
void TransactionImpl::Cleanup() {
write_batch_->Clear();
txn_db_impl_->UnLock(this, &tracked_keys_);
@ -112,10 +103,6 @@ Status TransactionImpl::Commit() {
Status TransactionImpl::DoCommit(WriteBatch* batch) {
Status s;
// Do write directly on base db as TransctionDB::Write() would attempt to
// do conflict checking that we've already done.
DB* db = db_->GetBaseDB();
if (expiration_time_ > 0) {
// We cannot commit a transaction that is expired as its locks might have
// been released.
@ -123,11 +110,14 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) {
// expiration time once we're on the writer thread.
TransactionCallback callback(this);
assert(dynamic_cast<DBImpl*>(db) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db);
// Do write directly on base db as TransctionDB::Write() would attempt to
// do conflict checking that we've already done.
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db_);
s = db_impl->WriteWithCallback(write_options_, batch, &callback);
} else {
s = db->Write(write_options_, batch);
s = db_->Write(write_options_, batch);
}
return s;
@ -135,31 +125,6 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) {
void TransactionImpl::Rollback() { Cleanup(); }
void TransactionImpl::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_.reset(new std::stack<std::shared_ptr<ManagedSnapshot>>());
}
save_points_->push(snapshot_);
write_batch_->SetSavePoint();
}
Status TransactionImpl::RollbackToSavePoint() {
if (save_points_ != nullptr && save_points_->size() > 0) {
// Restore saved snapshot
snapshot_ = save_points_->top();
save_points_->pop();
// Rollback batch
Status s = write_batch_->RollbackToSavePoint();
assert(s.ok());
return s;
} else {
assert(write_batch_->RollbackToSavePoint().IsNotFound());
return Status::NotFound();
}
}
// Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock
Status TransactionImpl::LockBatch(WriteBatch* batch,
@ -234,35 +199,26 @@ Status TransactionImpl::LockBatch(WriteBatch* batch,
return s;
}
Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
const SliceParts& key, bool check_snapshot) {
size_t key_size = 0;
for (int i = 0; i < key.num_parts; ++i) {
key_size += key.parts[i].size();
}
std::string str;
str.reserve(key_size);
for (int i = 0; i < key.num_parts; ++i) {
str.append(key.parts[i].data(), key.parts[i].size());
}
return TryLock(column_family, str, check_snapshot);
}
// Attempt to lock this key.
// Returns OK if the key has been successfully locked. Non-ok, otherwise.
// If check_shapshot is true and this transaction has a snapshot set,
// this key will only be locked if there have been no writes to this key since
// the snapshot time.
Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
const Slice& key, bool check_snapshot) {
const Slice& key, bool untracked) {
uint32_t cfh_id = GetColumnFamilyID(column_family);
std::string key_str = key.ToString();
bool previously_locked;
Status s;
// Even though we do not care about doing conflict checking for this write,
// we still need to take a lock to make sure we do not cause a conflict with
// some other write. However, we do not need to check if there have been
// any writes since this transaction's snapshot.
// TODO(agiardullo): could optimize by supporting shared txn locks in the
// future
bool check_snapshot = !untracked;
// lock this key if this transactions hasn't already locked it
auto iter = tracked_keys_[cfh_id].find(key_str);
if (iter == tracked_keys_[cfh_id].end()) {
@ -327,8 +283,8 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family,
const Slice& key) {
Status result;
if (snapshot_ != nullptr) {
assert(dynamic_cast<DBImpl*>(db_->GetBaseDB()) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db_->GetBaseDB());
assert(dynamic_cast<DBImpl*>(db_) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db_);
ColumnFamilyHandle* cfh = column_family ? column_family :
db_impl->DefaultColumnFamily();
@ -341,213 +297,6 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family,
return result;
}
Status TransactionImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key,
value);
}
Status TransactionImpl::GetForUpdate(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) {
Status s = TryLock(column_family, key);
if (s.ok() && value != nullptr) {
s = Get(read_options, column_family, key, value);
}
return s;
}
std::vector<Status> TransactionImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
size_t num_keys = keys.size();
values->resize(num_keys);
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
std::string* value = values ? &(*values)[i] : nullptr;
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
std::vector<Status> TransactionImpl::MultiGetForUpdate(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Regardless of whether the MultiGet succeeded, track these keys.
size_t num_keys = keys.size();
values->resize(num_keys);
// Lock all keys
for (size_t i = 0; i < num_keys; ++i) {
Status s = TryLock(column_family[i], keys[i]);
if (!s.ok()) {
// Fail entire multiget if we cannot lock all keys
return std::vector<Status>(num_keys, s);
}
}
// TODO(agiardullo): optimize multiget?
std::vector<Status> stat_list(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
std::string* value = values ? &(*values)[i] : nullptr;
stat_list[i] = Get(read_options, column_family[i], keys[i], value);
}
return stat_list;
}
Iterator* TransactionImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);
return write_batch_->NewIteratorWithBase(db_iter);
}
Iterator* TransactionImpl::GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter);
return write_batch_->NewIteratorWithBase(column_family, db_iter);
}
Status TransactionImpl::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionImpl::Put(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionImpl::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
}
return s;
}
Status TransactionImpl::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
Status TransactionImpl::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
// Even though we do not care about doing conflict checking for this write,
// we still need to take a lock to make sure we do not cause a conflict with
// some other write. However, we do not need to check if there have been
// any writes since this transaction's snapshot.
bool check_snapshot = false;
// TODO(agiardullo): could optimize by supporting shared txn locks in the
// future
Status s = TryLock(column_family, key, check_snapshot);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key,
const SliceParts& value) {
bool check_snapshot = false;
Status s = TryLock(column_family, key, check_snapshot);
if (s.ok()) {
write_batch_->Put(column_family, key, value);
}
return s;
}
Status TransactionImpl::MergeUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
bool check_snapshot = false;
Status s = TryLock(column_family, key, check_snapshot);
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
}
return s;
}
Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) {
bool check_snapshot = false;
Status s = TryLock(column_family, key, check_snapshot);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) {
bool check_snapshot = false;
Status s = TryLock(column_family, key, check_snapshot);
if (s.ok()) {
write_batch_->Delete(column_family, key);
}
return s;
}
void TransactionImpl::PutLogData(const Slice& blob) {
write_batch_->PutLogData(blob);
}
WriteBatchWithIndex* TransactionImpl::GetWriteBatch() {
return write_batch_.get();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -22,6 +22,7 @@
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "utilities/transactions/transaction_base.h"
#include "utilities/transactions/transaction_util.h"
namespace rocksdb {
@ -30,7 +31,7 @@ using TransactionID = uint64_t;
class TransactionDBImpl;
class TransactionImpl : public Transaction {
class TransactionImpl : public TransactionBaseImpl {
public:
TransactionImpl(TransactionDB* db, const WriteOptions& write_options,
const TransactionOptions& txn_options);
@ -43,123 +44,6 @@ class TransactionImpl : public Transaction {
void Rollback() override;
void SetSavePoint() override;
Status RollbackToSavePoint() override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override {
return Get(options, db_->DefaultColumnFamily(), key, value);
}
Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override;
Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, value);
}
std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGet(options, std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
std::vector<Status> MultiGetForUpdate(
const ReadOptions& options, const std::vector<Slice>& keys,
std::vector<std::string>* values) override {
return MultiGetForUpdate(options,
std::vector<ColumnFamilyHandle*>(
keys.size(), db_->DefaultColumnFamily()),
keys, values);
}
Iterator* GetIterator(const ReadOptions& read_options) override;
Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) override;
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Put(const Slice& key, const Slice& value) override {
return Put(nullptr, key, value);
}
Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
Status Put(const SliceParts& key, const SliceParts& value) override {
return Put(nullptr, key, value);
}
Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Merge(const Slice& key, const Slice& value) override {
return Merge(nullptr, key, value);
}
Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
Status Delete(const Slice& key) override { return Delete(nullptr, key); }
Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status PutUntracked(const Slice& key, const Slice& value) override {
return PutUntracked(nullptr, key, value);
}
Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override;
Status PutUntracked(const SliceParts& key, const SliceParts& value) override {
return PutUntracked(nullptr, key, value);
}
Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status MergeUntracked(const Slice& key, const Slice& value) override {
return MergeUntracked(nullptr, key, value);
}
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const Slice& key) override;
Status DeleteUntracked(const Slice& key) override {
return DeleteUntracked(nullptr, key);
}
Status DeleteUntracked(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status DeleteUntracked(const SliceParts& key) override {
return DeleteUntracked(nullptr, key);
}
void PutLogData(const Slice& blob) override;
const Snapshot* GetSnapshot() const override {
return snapshot_ ? snapshot_->snapshot() : nullptr;
}
void SetSnapshot() override;
WriteBatchWithIndex* GetWriteBatch() override;
// Generate a new unique transaction identifier
static TransactionID GenTxnID();
@ -178,9 +62,11 @@ class TransactionImpl : public Transaction {
int64_t GetLockTimeout() const { return lock_timeout_; }
void SetLockTimeout(int64_t timeout) { lock_timeout_ = timeout; }
private:
TransactionDB* const db_;
protected:
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool untracked = false) override;
private:
TransactionDBImpl* txn_db_impl_;
// Used to create unique ids for transactions.
@ -189,21 +75,6 @@ class TransactionImpl : public Transaction {
// Unique ID for this transaction
const TransactionID txn_id_;
const WriteOptions write_options_;
// If snapshot_ is set, all keys that locked must also have not been written
// since this snapshot
std::shared_ptr<ManagedSnapshot> snapshot_;
const Comparator* cmp_;
std::unique_ptr<WriteBatchWithIndex> write_batch_;
// If expiration_ is non-zero, start_time_ stores that time the txn was
// constructed,
// in milliseconds.
const uint64_t start_time_;
// If non-zero, this transaction should not be committed after this time (in
// milliseconds)
const uint64_t expiration_time_;
@ -217,14 +88,6 @@ class TransactionImpl : public Transaction {
// stored.
TransactionKeyMap tracked_keys_;
// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<std::shared_ptr<ManagedSnapshot>>> save_points_;
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool check_snapshot = true);
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool check_snapshot = true);
void Cleanup();
Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key);

Loading…
Cancel
Save