Transaction stats

Summary: Added funtions to fetch the number of locked keys in a transaction, the number of pending puts/merge/deletes, and the elapsed time

Test Plan: unit tests

Reviewers: yoshinorim, jkedgar, rven, sdong, yhchiang, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D45417
main
agiardullo 10 years ago
parent 25dbc579f5
commit aa6eed0c1e
  1. 16
      include/rocksdb/utilities/transaction.h
  2. 23
      utilities/transactions/optimistic_transaction_impl.cc
  3. 4
      utilities/transactions/optimistic_transaction_impl.h
  4. 5
      utilities/transactions/optimistic_transaction_test.cc
  5. 41
      utilities/transactions/transaction_base.cc
  6. 32
      utilities/transactions/transaction_base.h
  7. 22
      utilities/transactions/transaction_impl.cc
  8. 4
      utilities/transactions/transaction_impl.h
  9. 38
      utilities/transactions/transaction_test.cc

@ -250,6 +250,22 @@ class Transaction {
// Similar to WriteBatch::PutLogData
virtual void PutLogData(const Slice& blob) = 0;
// Returns the number of distinct Keys being tracked by this transaction.
// If this transaction was created by a TransactinDB, this is the number of
// keys that are currently locked by this transaction.
// If this transaction was created by an OptimisticTransactionDB, this is the
// number of keys that need to be checked for conflicts at commit time.
virtual uint64_t GetNumKeys() const = 0;
// Returns the number of Puts/Deletes/Merges that have been applied to this
// transaction so far.
virtual uint64_t GetNumPuts() const = 0;
virtual uint64_t GetNumDeletes() const = 0;
virtual uint64_t GetNumMerges() const = 0;
// Returns the elapsed time in milliseconds since this Transaction began.
virtual uint64_t GetElapsedTime() const = 0;
// Fetch the underlying write batch that contains all pending changes to be
// committed.
//

@ -36,10 +36,9 @@ OptimisticTransactionImpl::OptimisticTransactionImpl(
OptimisticTransactionImpl::~OptimisticTransactionImpl() {
}
void OptimisticTransactionImpl::Cleanup() {
void OptimisticTransactionImpl::Clear() {
TransactionBaseImpl::Clear();
tracked_keys_.clear();
save_points_.reset(nullptr);
write_batch_->Clear();
}
Status OptimisticTransactionImpl::Commit() {
@ -59,15 +58,13 @@ Status OptimisticTransactionImpl::Commit() {
write_options_, write_batch_->GetWriteBatch(), &callback);
if (s.ok()) {
Cleanup();
Clear();
}
return s;
}
void OptimisticTransactionImpl::Rollback() {
Cleanup();
}
void OptimisticTransactionImpl::Rollback() { Clear(); }
// Record this key so that we can check it for conflicts at commit time.
Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
@ -119,6 +116,18 @@ Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
return TransactionUtil::CheckKeysForConflicts(db_impl, &tracked_keys_);
}
uint64_t OptimisticTransactionImpl::GetNumKeys() const {
uint64_t count = 0;
// sum up locked keys in all column families
for (const auto& key_map_iter : tracked_keys_) {
const auto& keys = key_map_iter.second;
count += keys.size();
}
return count;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -38,6 +38,8 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
void Rollback() override;
uint64_t GetNumKeys() const override;
const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; }
protected:
@ -62,7 +64,7 @@ class OptimisticTransactionImpl : public TransactionBaseImpl {
// Should only be called on writer thread.
Status CheckTransactionForConflicts(DB* db);
void Cleanup();
void Clear() override;
// No copying allowed
OptimisticTransactionImpl(const OptimisticTransactionImpl&);

@ -89,6 +89,7 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) {
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
ASSERT_EQ(1, txn->GetNumKeys());
s = txn->Commit();
ASSERT_TRUE(s.IsBusy()); // Txn should not commit
@ -490,6 +491,8 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
Slice value_slices[2] = {Slice("bar"), Slice("bar")};
txn->Put(handles[2], SliceParts(&key_slice, 1), SliceParts(value_slices, 2));
ASSERT_EQ(3, txn->GetNumKeys());
// Txn should commit
s = txn->Commit();
ASSERT_OK(s);
@ -544,6 +547,8 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
txn->Delete(handles[2], "ZZZ");
txn->Put(handles[2], "AAAZZZ", "barbarbar");
ASSERT_EQ(5, txn->GetNumKeys());
// Txn should commit
s = txn->Commit();
ASSERT_OK(s);

@ -25,6 +25,14 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db,
TransactionBaseImpl::~TransactionBaseImpl() {}
void TransactionBaseImpl::Clear() {
save_points_.reset(nullptr);
write_batch_->Clear();
num_puts_ = 0;
num_deletes_ = 0;
num_merges_ = 0;
}
void TransactionBaseImpl::SetSnapshot() {
snapshot_.reset(new ManagedSnapshot(db_));
}
@ -48,16 +56,21 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family,
void TransactionBaseImpl::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_.reset(new std::stack<std::shared_ptr<ManagedSnapshot>>());
save_points_.reset(new std::stack<TransactionBaseImpl::SavePoint>());
}
save_points_->push(snapshot_);
save_points_->emplace(snapshot_, num_puts_, num_deletes_, num_merges_);
write_batch_->SetSavePoint();
}
Status TransactionBaseImpl::RollbackToSavePoint() {
if (save_points_ != nullptr && save_points_->size() > 0) {
// Restore saved snapshot
snapshot_ = save_points_->top();
// Restore saved SavePoint
TransactionBaseImpl::SavePoint& save_point = save_points_->top();
snapshot_ = save_point.snapshot_;
num_puts_ = save_point.num_puts_;
num_deletes_ = save_point.num_deletes_;
num_merges_ = save_point.num_merges_;
save_points_->pop();
// Rollback batch
@ -153,6 +166,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_puts_++;
}
return s;
@ -165,6 +179,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_puts_++;
}
return s;
@ -176,6 +191,7 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
num_merges_++;
}
return s;
@ -187,6 +203,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_deletes_++;
}
return s;
@ -198,6 +215,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_deletes_++;
}
return s;
@ -210,6 +228,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_puts_++;
}
return s;
@ -223,6 +242,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_puts_++;
}
return s;
@ -236,6 +256,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
num_merges_++;
}
return s;
@ -248,6 +269,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_deletes_++;
}
return s;
@ -260,6 +282,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_deletes_++;
}
return s;
@ -273,6 +296,16 @@ WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
return write_batch_.get();
}
uint64_t TransactionBaseImpl::GetElapsedTime() const {
return (db_->GetEnv()->NowMicros() - start_time_) / 1000;
}
uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
uint64_t TransactionBaseImpl::GetNumDeletes() const { return num_deletes_; }
uint64_t TransactionBaseImpl::GetNumMerges() const { return num_merges_; }
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -28,6 +28,9 @@ class TransactionBaseImpl : public Transaction {
virtual ~TransactionBaseImpl();
// Remove pending operations queued in this transaction.
virtual void Clear();
// Called before executing Put, Merge, Delete, and GetForUpdate. If TryLock
// returns non-OK, the Put/Merge/Delete/GetForUpdate will be failed.
// untracked will be true if called from PutUntracked, DeleteUntracked, or
@ -155,6 +158,14 @@ class TransactionBaseImpl : public Transaction {
void SetSnapshot() override;
uint64_t GetElapsedTime() const override;
uint64_t GetNumPuts() const override;
uint64_t GetNumDeletes() const override;
uint64_t GetNumMerges() const override;
protected:
DB* const db_;
@ -172,9 +183,28 @@ class TransactionBaseImpl : public Transaction {
// no snapshot is currently set.
std::shared_ptr<ManagedSnapshot> snapshot_;
// Count of various operations pending in this transaction
uint64_t num_puts_ = 0;
uint64_t num_deletes_ = 0;
uint64_t num_merges_ = 0;
struct SavePoint {
std::shared_ptr<ManagedSnapshot> snapshot_;
uint64_t num_puts_;
uint64_t num_deletes_;
uint64_t num_merges_;
SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, uint64_t num_puts,
uint64_t num_deletes, uint64_t num_merges)
: snapshot_(snapshot),
num_puts_(num_puts),
num_deletes_(num_deletes),
num_merges_(num_merges) {}
};
// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<std::shared_ptr<ManagedSnapshot>>> save_points_;
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint>> save_points_;
private:
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,

@ -61,11 +61,11 @@ TransactionImpl::~TransactionImpl() {
txn_db_impl_->UnLock(this, &tracked_keys_);
}
void TransactionImpl::Cleanup() {
write_batch_->Clear();
void TransactionImpl::Clear() {
TransactionBaseImpl::Clear();
txn_db_impl_->UnLock(this, &tracked_keys_);
tracked_keys_.clear();
save_points_.reset(nullptr);
}
bool TransactionImpl::IsExpired() const {
@ -96,7 +96,7 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) {
Status TransactionImpl::Commit() {
Status s = DoCommit(write_batch_->GetWriteBatch());
Cleanup();
Clear();
return s;
}
@ -124,7 +124,7 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) {
return s;
}
void TransactionImpl::Rollback() { Cleanup(); }
void TransactionImpl::Rollback() { Clear(); }
// Lock all keys in this batch.
// On success, caller should unlock keys_to_unlock
@ -298,6 +298,18 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family,
return result;
}
uint64_t TransactionImpl::GetNumKeys() const {
uint64_t count = 0;
// sum up locked keys in all column families
for (const auto& key_map_iter : tracked_keys_) {
const auto& keys = key_map_iter.second;
count += keys.size();
}
return count;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -44,6 +44,8 @@ class TransactionImpl : public TransactionBaseImpl {
void Rollback() override;
uint64_t GetNumKeys() const override;
// Generate a new unique transaction identifier
static TransactionID GenTxnID();
@ -90,7 +92,7 @@ class TransactionImpl : public TransactionBaseImpl {
// stored.
TransactionKeyMap tracked_keys_;
void Cleanup();
void Clear() override;
Status CheckKeySequence(ColumnFamilyHandle* column_family, const Slice& key);

@ -55,6 +55,8 @@ TEST_F(TransactionTest, SuccessTest) {
Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn);
ASSERT_EQ(0, txn->GetNumPuts());
s = txn->GetForUpdate(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
@ -62,6 +64,8 @@ TEST_F(TransactionTest, SuccessTest) {
s = txn->Put(Slice("foo"), Slice("bar2"));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
s = txn->GetForUpdate(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar2");
@ -142,6 +146,8 @@ TEST_F(TransactionTest, WriteConflictTest2) {
s = db->Get(read_options, "foo", &value);
ASSERT_EQ(value, "barz");
ASSERT_EQ(2, txn->GetNumKeys());
s = txn->Commit();
ASSERT_OK(s); // Txn should commit, but only write foo2 and foo3
@ -535,6 +541,7 @@ TEST_F(TransactionTest, ColumnFamiliesTest) {
s = txn->Put(handles[2], SliceParts(&key_slice, 1),
SliceParts(value_slices, 2));
ASSERT_OK(s);
ASSERT_EQ(3, txn->GetNumKeys());
s = txn->Commit();
ASSERT_OK(s);
@ -596,6 +603,8 @@ TEST_F(TransactionTest, ColumnFamiliesTest) {
s = txn->Put(handles[2], "AAAZZZ", "barbarbar");
ASSERT_OK(s);
ASSERT_EQ(5, txn->GetNumKeys());
// Txn should commit
s = txn->Commit();
ASSERT_OK(s);
@ -1299,6 +1308,8 @@ TEST_F(TransactionTest, SavepointTest) {
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
ASSERT_EQ(0, txn->GetNumPuts());
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
@ -1311,6 +1322,9 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Put("B", "b");
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
ASSERT_EQ(0, txn->GetNumDeletes());
s = txn->Commit();
ASSERT_OK(s);
@ -1342,8 +1356,14 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Put("D", "d");
ASSERT_OK(s);
ASSERT_EQ(5, txn->GetNumPuts());
ASSERT_EQ(1, txn->GetNumDeletes());
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
ASSERT_EQ(3, txn->GetNumPuts());
ASSERT_EQ(0, txn->GetNumDeletes());
s = txn->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a", value);
@ -1365,11 +1385,17 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Put("E", "e");
ASSERT_OK(s);
ASSERT_EQ(5, txn->GetNumPuts());
ASSERT_EQ(0, txn->GetNumDeletes());
// Rollback to beginning of txn
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
txn->Rollback();
ASSERT_EQ(0, txn->GetNumPuts());
ASSERT_EQ(0, txn->GetNumDeletes());
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
@ -1392,6 +1418,9 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Put("F", "f");
ASSERT_OK(s);
ASSERT_EQ(2, txn->GetNumPuts());
ASSERT_EQ(0, txn->GetNumDeletes());
txn->SetSavePoint(); // 3
txn->SetSavePoint(); // 4
@ -1414,8 +1443,14 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Get(read_options, "B", &value);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(3, txn->GetNumPuts());
ASSERT_EQ(2, txn->GetNumDeletes());
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
ASSERT_EQ(2, txn->GetNumPuts());
ASSERT_EQ(0, txn->GetNumDeletes());
s = txn->Get(read_options, "F", &value);
ASSERT_OK(s);
ASSERT_EQ("f", value);
@ -1487,6 +1522,9 @@ TEST_F(TransactionTest, TimeoutTest) {
s = db->Put(write_options, "aaa", "xxx");
ASSERT_OK(s);
ASSERT_GE(txn1->GetElapsedTime(),
static_cast<uint64_t>(txn_options0.expiration));
s = txn1->Commit();
ASSERT_TRUE(s.IsExpired()); // expired!

Loading…
Cancel
Save