Have Transactions use WriteBatch::RollbackToSavePoint

Summary:
Clean up transactions to use the new RollbackToSavePoint api in WriteBatchWithIndex.

Note, this diff depends on Pessimistic Transactions diff and ManagedSnapshot diff (D40869 and D43293).

Test Plan: unit tests

Reviewers: rven, yhchiang, kradhakrishnan, spetrunia, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43371
main
agiardullo 10 years ago
parent 0db807ec28
commit c3466eab07
  1. 8
      include/rocksdb/utilities/transaction.h
  2. 84
      utilities/transactions/optimistic_transaction_impl.cc
  3. 22
      utilities/transactions/optimistic_transaction_impl.h
  4. 17
      utilities/transactions/optimistic_transaction_test.cc
  5. 83
      utilities/transactions/transaction_impl.cc
  6. 18
      utilities/transactions/transaction_impl.h
  7. 17
      utilities/transactions/transaction_test.cc
  8. 119
      utilities/transactions/transaction_util.cc
  9. 6
      utilities/transactions/transaction_util.h

@ -93,12 +93,10 @@ class Transaction {
virtual void SetSavePoint() = 0;
// Undo all operations in this transaction (Put, Merge, Delete, PutLogData)
// since the
// most recent call to SetSavePoint() and removes the most recent
// since the most recent call to SetSavePoint() and removes the most recent
// SetSavePoint().
// If there is no previous call to SetSavePoint(), behaves the same as
// Rollback()
virtual void RollbackToSavePoint() = 0;
// If there is no previous call to SetSavePoint(), returns Status::NotFound()
virtual Status RollbackToSavePoint() = 0;
// This function is similar to DB::Get() except it will also read pending
// changes in this transaction.

@ -30,30 +30,24 @@ OptimisticTransactionImpl::OptimisticTransactionImpl(
: txn_db_(txn_db),
db_(txn_db->GetBaseDB()),
write_options_(write_options),
snapshot_(nullptr),
cmp_(txn_options.cmp),
write_batch_(new WriteBatchWithIndex(txn_options.cmp, 0, true)) {
if (txn_options.set_snapshot) {
SetSnapshot();
} else {
start_sequence_number_ = db_->GetLatestSequenceNumber();
}
}
OptimisticTransactionImpl::~OptimisticTransactionImpl() {
}
void OptimisticTransactionImpl::Cleanup() {
tracked_keys_.clear();
if (snapshot_ != nullptr) {
db_->ReleaseSnapshot(snapshot_);
}
save_points_.reset(nullptr);
write_batch_->Clear();
}
void OptimisticTransactionImpl::SetSnapshot() {
if (snapshot_ != nullptr) {
db_->ReleaseSnapshot(snapshot_);
}
snapshot_ = db_->GetSnapshot();
start_sequence_number_ = snapshot_->GetSequenceNumber();
snapshot_.reset(new ManagedSnapshot(db_));
}
Status OptimisticTransactionImpl::Commit() {
@ -73,66 +67,38 @@ Status OptimisticTransactionImpl::Commit() {
write_options_, write_batch_->GetWriteBatch(), &callback);
if (s.ok()) {
tracked_keys_.clear();
write_batch_->Clear();
num_entries_ = 0;
Cleanup();
}
return s;
}
void OptimisticTransactionImpl::Rollback() {
tracked_keys_.clear();
write_batch_->Clear();
num_entries_ = 0;
Cleanup();
}
void OptimisticTransactionImpl::SetSavePoint() {
if (num_entries_ > 0) {
// If transaction is empty, no need to record anything.
if (save_points_ == nullptr) {
save_points_.reset(new std::stack<size_t>());
}
save_points_->push(num_entries_);
save_points_.reset(new std::stack<std::shared_ptr<ManagedSnapshot>>());
}
save_points_->push(snapshot_);
write_batch_->SetSavePoint();
}
void OptimisticTransactionImpl::RollbackToSavePoint() {
size_t savepoint_entries = 0;
Status OptimisticTransactionImpl::RollbackToSavePoint() {
if (save_points_ != nullptr && save_points_->size() > 0) {
savepoint_entries = save_points_->top();
// Restore saved snapshot
snapshot_ = save_points_->top();
save_points_->pop();
}
assert(savepoint_entries <= num_entries_);
// Rollback batch
Status s = write_batch_->RollbackToSavePoint();
assert(s.ok());
if (savepoint_entries == num_entries_) {
// No changes to rollback
} else if (savepoint_entries == 0) {
// Rollback everything
Rollback();
} else {
DBImpl* db_impl = dynamic_cast<DBImpl*>(db_->GetRootDB());
assert(db_impl);
WriteBatchWithIndex* new_batch = new WriteBatchWithIndex(cmp_, 0, true);
Status s = TransactionUtil::CopyFirstN(
savepoint_entries, write_batch_.get(), new_batch, db_impl);
if (!s.ok()) {
// TODO: Should we change this function to return a Status or should we
// somehow make it
// so RollbackToSavePoint() can never fail??
// Consider moving this functionality into WriteBatchWithIndex
fprintf(stderr, "STATUS: %s \n", s.ToString().c_str());
delete new_batch;
return s;
} else {
write_batch_.reset(new_batch);
}
num_entries_ = savepoint_entries;
assert(write_batch_->RollbackToSavePoint().IsNotFound());
return Status::NotFound();
}
}
@ -143,7 +109,7 @@ void OptimisticTransactionImpl::RecordOperation(
SequenceNumber seq;
if (snapshot_) {
seq = start_sequence_number_;
seq = snapshot_->snapshot()->GetSequenceNumber();
} else {
seq = db_->GetLatestSequenceNumber();
}
@ -261,7 +227,6 @@ Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
RecordOperation(column_family, key);
write_batch_->Put(column_family, key, value);
num_entries_++;
return Status::OK();
}
@ -272,7 +237,6 @@ Status OptimisticTransactionImpl::Put(ColumnFamilyHandle* column_family,
RecordOperation(column_family, key);
write_batch_->Put(column_family, key, value);
num_entries_++;
return Status::OK();
}
@ -307,7 +271,6 @@ Status OptimisticTransactionImpl::Delete(ColumnFamilyHandle* column_family,
Status OptimisticTransactionImpl::PutUntracked(
ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
write_batch_->Put(column_family, key, value);
num_entries_++;
return Status::OK();
}
@ -316,7 +279,6 @@ Status OptimisticTransactionImpl::PutUntracked(
ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) {
write_batch_->Put(column_family, key, value);
num_entries_++;
return Status::OK();
}
@ -324,7 +286,6 @@ Status OptimisticTransactionImpl::PutUntracked(
Status OptimisticTransactionImpl::MergeUntracked(
ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) {
write_batch_->Merge(column_family, key, value);
num_entries_++;
return Status::OK();
}
@ -332,7 +293,6 @@ Status OptimisticTransactionImpl::MergeUntracked(
Status OptimisticTransactionImpl::DeleteUntracked(
ColumnFamilyHandle* column_family, const Slice& key) {
write_batch_->Delete(column_family, key);
num_entries_++;
return Status::OK();
}
@ -340,14 +300,12 @@ Status OptimisticTransactionImpl::DeleteUntracked(
Status OptimisticTransactionImpl::DeleteUntracked(
ColumnFamilyHandle* column_family, const SliceParts& key) {
write_batch_->Delete(column_family, key);
num_entries_++;
return Status::OK();
}
void OptimisticTransactionImpl::PutLogData(const Slice& blob) {
write_batch_->PutLogData(blob);
num_entries_++;
}
WriteBatchWithIndex* OptimisticTransactionImpl::GetWriteBatch() {

@ -15,6 +15,7 @@
#include "db/write_callback.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/transaction.h"
@ -38,7 +39,7 @@ class OptimisticTransactionImpl : public Transaction {
void SetSavePoint() override;
void RollbackToSavePoint() override;
Status RollbackToSavePoint() override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
@ -147,7 +148,9 @@ class OptimisticTransactionImpl : public Transaction {
const TransactionKeyMap* GetTrackedKeys() const { return &tracked_keys_; }
const Snapshot* GetSnapshot() const override { return snapshot_; }
const Snapshot* GetSnapshot() const override {
return snapshot_ ? snapshot_->snapshot() : nullptr;
}
void SetSnapshot() override;
@ -157,8 +160,7 @@ class OptimisticTransactionImpl : public Transaction {
OptimisticTransactionDB* const txn_db_;
DB* db_;
const WriteOptions write_options_;
const Snapshot* snapshot_;
SequenceNumber start_sequence_number_;
std::shared_ptr<ManagedSnapshot> snapshot_;
const Comparator* cmp_;
std::unique_ptr<WriteBatchWithIndex> write_batch_;
@ -169,13 +171,9 @@ class OptimisticTransactionImpl : public Transaction {
// not changed since this sequence number.
TransactionKeyMap tracked_keys_;
// Records the number of entries currently in the WriteBatch including calls
// to
// Put, Merge, Delete, and PutLogData()
size_t num_entries_ = 0;
// Stack of number of entries in write_batch at each save point
std::unique_ptr<std::stack<size_t>> save_points_;
// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<std::shared_ptr<ManagedSnapshot>>> save_points_;
friend class OptimisticTransactionCallback;
@ -190,6 +188,8 @@ class OptimisticTransactionImpl : public Transaction {
void RecordOperation(ColumnFamilyHandle* column_family,
const SliceParts& key);
void Cleanup();
// No copying allowed
OptimisticTransactionImpl(const OptimisticTransactionImpl&);
void operator=(const OptimisticTransactionImpl&);

@ -955,12 +955,14 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
txn->RollbackToSavePoint();
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
txn->SetSavePoint(); // 1
txn->RollbackToSavePoint(); // Rollback to beginning of txn
txn->RollbackToSavePoint();
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
s = txn->Put("B", "b");
ASSERT_OK(s);
@ -996,7 +998,7 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
s = txn->Put("D", "d");
ASSERT_OK(s);
txn->RollbackToSavePoint(); // Rollback to 2
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
s = txn->Get(read_options, "A", &value);
ASSERT_OK(s);
@ -1019,7 +1021,10 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
s = txn->Put("E", "e");
ASSERT_OK(s);
txn->RollbackToSavePoint(); // Rollback to beginning of txn
// Rollback to beginning of txn
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
txn->Rollback();
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
@ -1065,7 +1070,7 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
s = txn->Get(read_options, "B", &value);
ASSERT_TRUE(s.IsNotFound());
txn->RollbackToSavePoint(); // Rollback to 3
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
s = txn->Get(read_options, "F", &value);
ASSERT_OK(s);

@ -16,6 +16,7 @@
#include "db/db_impl.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/string_util.h"
@ -39,7 +40,6 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
txn_db_impl_(nullptr),
txn_id_(GenTxnID()),
write_options_(write_options),
snapshot_(nullptr),
cmp_(GetColumnFamilyUserComparator(txn_db->DefaultColumnFamily())),
write_batch_(new WriteBatchWithIndex(cmp_, 0, true)),
start_time_(
@ -62,24 +62,15 @@ TransactionImpl::TransactionImpl(TransactionDB* txn_db,
}
TransactionImpl::~TransactionImpl() {
Cleanup();
if (snapshot_ != nullptr) {
db_->ReleaseSnapshot(snapshot_);
}
txn_db_impl_->UnLock(this, &tracked_keys_);
}
void TransactionImpl::SetSnapshot() {
if (snapshot_ != nullptr) {
db_->ReleaseSnapshot(snapshot_);
}
snapshot_ = db_->GetSnapshot();
snapshot_.reset(new ManagedSnapshot(db_));
}
void TransactionImpl::Cleanup() {
write_batch_->Clear();
num_entries_ = 0;
txn_db_impl_->UnLock(this, &tracked_keys_);
tracked_keys_.clear();
save_points_.reset(nullptr);
@ -145,53 +136,27 @@ Status TransactionImpl::DoCommit(WriteBatch* batch) {
void TransactionImpl::Rollback() { Cleanup(); }
void TransactionImpl::SetSavePoint() {
if (num_entries_ > 0) {
// If transaction is empty, no need to record anything.
if (save_points_ == nullptr) {
save_points_.reset(new std::stack<size_t>());
}
save_points_->push(num_entries_);
save_points_.reset(new std::stack<std::shared_ptr<ManagedSnapshot>>());
}
save_points_->push(snapshot_);
write_batch_->SetSavePoint();
}
void TransactionImpl::RollbackToSavePoint() {
size_t savepoint_entries = 0;
Status TransactionImpl::RollbackToSavePoint() {
if (save_points_ != nullptr && save_points_->size() > 0) {
savepoint_entries = save_points_->top();
// Restore saved snapshot
snapshot_ = save_points_->top();
save_points_->pop();
}
assert(savepoint_entries <= num_entries_);
if (savepoint_entries == num_entries_) {
// No changes to rollback
} else if (savepoint_entries == 0) {
// Rollback everything
Rollback();
} else {
assert(dynamic_cast<DBImpl*>(db_->GetBaseDB()) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db_->GetBaseDB());
// Rollback batch
Status s = write_batch_->RollbackToSavePoint();
assert(s.ok());
WriteBatchWithIndex* new_batch = new WriteBatchWithIndex(cmp_, 0, true);
Status s = TransactionUtil::CopyFirstN(
savepoint_entries, write_batch_.get(), new_batch, db_impl);
if (!s.ok()) {
// TODO: Should we change this function to return a Status or should we
// somehow make it so RollbackToSavePoint() can never fail?? Not easy to
// handle the case where a client accesses a column family that's been
// dropped.
// After chatting with Siying, I'm going to send a diff that adds
// savepoint support in WriteBatchWithIndex and let reviewers decide which
// approach is cleaner.
fprintf(stderr, "STATUS: %s \n", s.ToString().c_str());
delete new_batch;
return s;
} else {
write_batch_.reset(new_batch);
}
num_entries_ = savepoint_entries;
assert(write_batch_->RollbackToSavePoint().IsNotFound());
return Status::NotFound();
}
}
@ -331,7 +296,8 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
// If the key has been previous validated at a sequence number earlier
// than the curent snapshot's sequence number, we already know it has not
// been modified.
bool already_validated = iter->second <= snapshot_->GetSequenceNumber();
SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber();
bool already_validated = iter->second <= seq;
if (!already_validated) {
s = CheckKeySequence(column_family, key);
@ -339,7 +305,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
if (s.ok()) {
// Record that there have been no writes to this key after this
// sequence.
iter->second = snapshot_->GetSequenceNumber();
iter->second = seq;
} else {
// Failed to validate key
if (!previously_locked) {
@ -369,7 +335,7 @@ Status TransactionImpl::CheckKeySequence(ColumnFamilyHandle* column_family,
result = TransactionUtil::CheckKeyForConflicts(
db_impl, cfh, key.ToString(),
snapshot_->GetSequenceNumber());
snapshot_->snapshot()->GetSequenceNumber());
}
return result;
@ -457,7 +423,6 @@ Status TransactionImpl::Put(ColumnFamilyHandle* column_family, const Slice& key,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_entries_++;
}
return s;
@ -469,7 +434,6 @@ Status TransactionImpl::Put(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_entries_++;
}
return s;
@ -481,7 +445,6 @@ Status TransactionImpl::Merge(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
num_entries_++;
}
return s;
@ -493,7 +456,6 @@ Status TransactionImpl::Delete(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_entries_++;
}
return s;
@ -505,7 +467,6 @@ Status TransactionImpl::Delete(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_entries_++;
}
return s;
@ -525,7 +486,6 @@ Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_entries_++;
}
return s;
@ -539,7 +499,6 @@ Status TransactionImpl::PutUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Put(column_family, key, value);
num_entries_++;
}
return s;
@ -552,7 +511,6 @@ Status TransactionImpl::MergeUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Merge(column_family, key, value);
num_entries_++;
}
return s;
@ -565,7 +523,6 @@ Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_entries_++;
}
return s;
@ -578,7 +535,6 @@ Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
if (s.ok()) {
write_batch_->Delete(column_family, key);
num_entries_++;
}
return s;
@ -586,7 +542,6 @@ Status TransactionImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
void TransactionImpl::PutLogData(const Slice& blob) {
write_batch_->PutLogData(blob);
num_entries_++;
}
WriteBatchWithIndex* TransactionImpl::GetWriteBatch() {

@ -16,6 +16,7 @@
#include "db/write_callback.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/transaction.h"
@ -44,7 +45,7 @@ class TransactionImpl : public Transaction {
void SetSavePoint() override;
void RollbackToSavePoint() override;
Status RollbackToSavePoint() override;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value) override;
@ -151,7 +152,9 @@ class TransactionImpl : public Transaction {
void PutLogData(const Slice& blob) override;
const Snapshot* GetSnapshot() const override { return snapshot_; }
const Snapshot* GetSnapshot() const override {
return snapshot_ ? snapshot_->snapshot() : nullptr;
}
void SetSnapshot() override;
@ -190,7 +193,7 @@ class TransactionImpl : public Transaction {
// If snapshot_ is set, all keys that locked must also have not been written
// since this snapshot
const Snapshot* snapshot_;
std::shared_ptr<ManagedSnapshot> snapshot_;
const Comparator* cmp_;
@ -214,12 +217,9 @@ class TransactionImpl : public Transaction {
// stored.
TransactionKeyMap tracked_keys_;
// Records the number of entries currently in the WriteBatch include calls to
// PutLogData()
size_t num_entries_ = 0;
// Stack of number of entries in write_batch at each save point
std::unique_ptr<std::stack<size_t>> save_points_;
// Stack of the Snapshot saved at each save point. Saved snapshots may be
// nullptr if there was no snapshot at the time SetSavePoint() was called.
std::unique_ptr<std::stack<std::shared_ptr<ManagedSnapshot>>> save_points_;
Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
bool check_snapshot = true);

@ -1299,12 +1299,14 @@ TEST_F(TransactionTest, SavepointTest) {
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
txn->RollbackToSavePoint();
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
txn->SetSavePoint(); // 1
txn->RollbackToSavePoint(); // Rollback to beginning of txn
txn->RollbackToSavePoint();
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to beginning of txn
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
s = txn->Put("B", "b");
ASSERT_OK(s);
@ -1340,7 +1342,7 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Put("D", "d");
ASSERT_OK(s);
txn->RollbackToSavePoint(); // Rollback to 2
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 2
s = txn->Get(read_options, "A", &value);
ASSERT_OK(s);
@ -1363,7 +1365,10 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Put("E", "e");
ASSERT_OK(s);
txn->RollbackToSavePoint(); // Rollback to beginning of txn
// Rollback to beginning of txn
s = txn->RollbackToSavePoint();
ASSERT_TRUE(s.IsNotFound());
txn->Rollback();
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
@ -1409,7 +1414,7 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Get(read_options, "B", &value);
ASSERT_TRUE(s.IsNotFound());
txn->RollbackToSavePoint(); // Rollback to 3
ASSERT_OK(txn->RollbackToSavePoint()); // Rollback to 3
s = txn->Get(read_options, "F", &value);
ASSERT_OK(s);

@ -141,125 +141,6 @@ Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
return result;
}
Status TransactionUtil::CopyFirstN(size_t num, WriteBatchWithIndex* batch,
WriteBatchWithIndex* new_batch,
DBImpl* db_impl) {
// Handler for iterating through batch and copying entries to new_batch
class Handler : public WriteBatch::Handler {
public:
WriteBatchWithIndex* batch;
const size_t limit;
DBImpl* db_impl;
size_t seen = 0;
std::unordered_map<uint32_t, SuperVersion*> super_versions;
std::unordered_map<uint32_t, ColumnFamilyHandle*> handles;
Handler(WriteBatchWithIndex* dest, size_t new_limit, DBImpl* db)
: batch(dest), limit(new_limit), db_impl(db) {}
~Handler() {
for (auto& iter : super_versions) {
db_impl->ReturnAndCleanupSuperVersionUnlocked(iter.first, iter.second);
}
}
Status GetColumnFamily(uint32_t column_family_id,
ColumnFamilyHandle** cfh) {
// Need to look up ColumnFamilyHandle for this column family id. Since
// doing this requires grabbing a mutex, lets only do it once per column
// family and cache it.
// In order to ensure that the ColumnFamilyHandle is still valid, we need
// to hold the superversion.
const auto& iter = handles.find(column_family_id);
if (iter == handles.end()) {
// Don't have ColumnFamilyHandle cached, look it up from the db.
SuperVersion* sv =
db_impl->GetAndRefSuperVersionUnlocked(column_family_id);
if (sv == nullptr) {
return Status::InvalidArgument(
"Could not find column family for ID " +
ToString(column_family_id));
}
super_versions.insert({column_family_id, sv});
*cfh = db_impl->GetColumnFamilyHandleUnlocked(column_family_id);
if (*cfh == nullptr) {
return Status::InvalidArgument(
"Could not find column family handle for ID " +
ToString(column_family_id));
}
handles.insert({column_family_id, *cfh});
} else {
*cfh = iter->second;
}
return Status::OK();
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (seen >= limit) {
// Found the first N entries, return Aborted to stop the Iteration.
return Status::Aborted();
}
ColumnFamilyHandle* cfh = nullptr;
Status s = GetColumnFamily(column_family_id, &cfh);
if (s.ok()) {
batch->Put(cfh, key, value);
}
seen++;
return s;
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (seen >= limit) {
// Found the first N entries, return Aborted to stop the Iteration.
return Status::Aborted();
}
ColumnFamilyHandle* cfh = nullptr;
Status s = GetColumnFamily(column_family_id, &cfh);
if (s.ok()) {
batch->Merge(cfh, key, value);
}
seen++;
return s;
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (seen >= limit) {
// Found the first N entries, return Aborted to stop the Iteration.
return Status::Aborted();
}
ColumnFamilyHandle* cfh = nullptr;
Status s = GetColumnFamily(column_family_id, &cfh);
if (s.ok()) {
batch->Delete(cfh, key);
}
seen++;
return s;
}
virtual void LogData(const Slice& blob) override {
if (seen < limit) {
batch->PutLogData(blob);
}
seen++;
}
};
// Iterating on this handler will add all keys in this batch into a new batch
// up to
// the limit.
Handler handler(new_batch, num, db_impl);
Status s = batch->GetWriteBatch()->Iterate(&handler);
if (s.IsAborted()) {
// Handler returns Aborted when it is done copying to stop the iteration.
s = Status::OK();
}
return s;
}
} // namespace rocksdb

@ -48,12 +48,6 @@ class TransactionUtil {
// mutex is held.
static Status CheckKeysForConflicts(DBImpl* db_impl, TransactionKeyMap* keys);
// Copies the first num entries from batch into new_batch (including Put,
// Merge, Delete, and PutLogData).
// Returns non-OK on error.
static Status CopyFirstN(size_t num, WriteBatchWithIndex* batch,
WriteBatchWithIndex* new_batch, DBImpl* db_impl);
private:
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq, SequenceNumber key_seq,

Loading…
Cancel
Save