SingleDelete support for Transactions

Summary: Transactional SingleDelete is needed for MyRocks.  Note: This diff requires D47529.

Test Plan: Added some new tests in this diff as well as more tests added in D47529

Reviewers: rven, sdong, igor, yhchiang

Reviewed By: yhchiang

Subscribers: yoshinorim, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D47535
main
agiardullo 9 years ago
parent a263002a36
commit afe0dc539b
  1. 9
      include/rocksdb/utilities/transaction.h
  2. 24
      utilities/transactions/transaction_base.cc
  3. 11
      utilities/transactions/transaction_base.h
  4. 106
      utilities/transactions/transaction_test.cc

@ -185,7 +185,7 @@ class Transaction {
virtual Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) = 0;
// Put, Merge, and Delete behave similarly to their corresponding
// Put, Merge, Delete, and SingleDelete behave similarly to the corresponding
// functions in WriteBatch, but will also do conflict checking on the
// keys being written.
//
@ -218,6 +218,13 @@ class Transaction {
const SliceParts& key) = 0;
virtual Status Delete(const SliceParts& key) = 0;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
virtual Status SingleDelete(const Slice& key) = 0;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) = 0;
virtual Status SingleDelete(const SliceParts& key) = 0;
// PutUntracked() will write a Put to the batch of operations to be committed
// in this transaction. This write will only happen if this transaction
// gets committed successfully. But unlike Transaction::Put(),

@ -235,6 +235,30 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
return s;
}
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->SingleDelete(column_family, key);
num_deletes_++;
}
return s;
}
Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
Status s = TryLock(column_family, key);
if (s.ok()) {
write_batch_->SingleDelete(column_family, key);
num_deletes_++;
}
return s;
}
Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
bool untracked = true;

@ -117,6 +117,17 @@ class TransactionBaseImpl : public Transaction {
const SliceParts& key) override;
Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) override;
Status SingleDelete(const Slice& key) override {
return SingleDelete(nullptr, key);
}
Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
Status SingleDelete(const SliceParts& key) override {
return SingleDelete(nullptr, key);
}
Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status PutUntracked(const Slice& key, const Slice& value) override {

@ -592,7 +592,7 @@ TEST_F(TransactionTest, ColumnFamiliesTest) {
ASSERT_EQ(values[1], "barbar");
ASSERT_EQ(values[2], "foo");
s = txn->Delete(handles[2], "ZZZ");
s = txn->SingleDelete(handles[2], "ZZZ");
ASSERT_OK(s);
s = txn->Put(handles[2], "ZZZ", "YYY");
ASSERT_OK(s);
@ -1427,7 +1427,7 @@ TEST_F(TransactionTest, SavepointTest) {
s = txn->Put("G", "g");
ASSERT_OK(s);
s = txn->Delete("F");
s = txn->SingleDelete("F");
ASSERT_OK(s);
s = txn->Delete("B");
@ -1438,6 +1438,10 @@ TEST_F(TransactionTest, SavepointTest) {
ASSERT_EQ("aa", value);
s = txn->Get(read_options, "F", &value);
// According to db.h, doing a SingleDelete on a key that has been
// overwritten will have undefinied behavior. So it is unclear what the
// result of fetching "F" should be. The current implementation will
// return NotFound in this case.
ASSERT_TRUE(s.IsNotFound());
s = txn->Get(read_options, "B", &value);
@ -1722,6 +1726,104 @@ TEST_F(TransactionTest, TimeoutTest) {
delete txn2;
}
TEST_F(TransactionTest, SingleDeleteTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
s = txn->SingleDelete("A");
ASSERT_OK(s);
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn->Commit();
ASSERT_OK(s);
delete txn;
txn = db->BeginTransaction(write_options);
s = txn->SingleDelete("A");
ASSERT_OK(s);
s = txn->Put("A", "a");
ASSERT_OK(s);
s = txn->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a", value);
s = txn->Commit();
ASSERT_OK(s);
delete txn;
s = db->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a", value);
txn = db->BeginTransaction(write_options);
s = txn->SingleDelete("A");
ASSERT_OK(s);
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn->Commit();
ASSERT_OK(s);
delete txn;
s = db->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
txn = db->BeginTransaction(write_options);
Transaction* txn2 = db->BeginTransaction(write_options);
txn2->SetSnapshot();
s = txn->Put("A", "a");
ASSERT_OK(s);
s = txn->Put("A", "a2");
ASSERT_OK(s);
s = txn->SingleDelete("A");
ASSERT_OK(s);
s = txn->SingleDelete("B");
ASSERT_OK(s);
// According to db.h, doing a SingleDelete on a key that has been
// overwritten will have undefinied behavior. So it is unclear what the
// result of fetching "A" should be. The current implementation will
// return NotFound in this case.
s = txn->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
s = txn2->Put("B", "b");
ASSERT_TRUE(s.IsTimedOut());
s = txn2->Commit();
ASSERT_OK(s);
delete txn2;
s = txn->Commit();
ASSERT_OK(s);
delete txn;
// According to db.h, doing a SingleDelete on a key that has been
// overwritten will have undefinied behavior. So it is unclear what the
// result of fetching "A" should be. The current implementation will
// return NotFound in this case.
s = db->Get(read_options, "A", &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(read_options, "B", &value);
ASSERT_TRUE(s.IsNotFound());
}
} // namespace rocksdb
int main(int argc, char** argv) {

Loading…
Cancel
Save