Fix DBImpl::GetLatestSequenceForKey() for Merge (#10724)

Summary:
Currently, without this fix, DBImpl::GetLatestSequenceForKey() may not return the latest sequence number for merge operands of the key. This can cause conflict checking during optimistic transaction commit phase to fail. Fix it by always returning the latest sequence number of the key, also considering range tombstones.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10724

Test Plan: make check

Reviewed By: cbi42

Differential Revision: D39756847

Pulled By: riversand963

fbshipit-source-id: 0764c3dd4cb24960b37e18adccc6e7feed0e6876
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent c76a90ceb9
commit 07249fea8f
  1. 25
      db/memtable.cc
  2. 29
      table/get_context.cc
  3. 59
      utilities/transactions/optimistic_transaction_test.cc

@ -958,7 +958,24 @@ static bool SaveValue(void* arg, const char* entry) {
return true; // to continue to the next seq
}
if (s->seq == kMaxSequenceNumber) {
s->seq = seq;
}
s->seq = std::max(s->seq, max_covering_tombstone_seq);
if (ts_sz > 0 && s->timestamp != nullptr) {
if (!s->timestamp->empty()) {
assert(ts_sz == s->timestamp->size());
}
// TODO optimize for smaller size ts
const std::string kMaxTs(ts_sz, '\xff');
if (s->timestamp->empty() ||
user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) {
Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
s->timestamp->assign(ts.data(), ts_sz);
}
}
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
type == kTypeWideColumnEntity) &&
@ -1039,10 +1056,6 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->is_blob_index) = (type == kTypeBlobIndex);
}
if (ts_sz > 0 && s->timestamp != nullptr) {
Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
s->timestamp->assign(ts.data(), ts.size());
}
return false;
}
case kTypeDeletion:
@ -1058,10 +1071,6 @@ static bool SaveValue(void* arg, const char* entry) {
}
} else {
*(s->status) = Status::NotFound();
if (ts_sz > 0 && s->timestamp != nullptr) {
Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
s->timestamp->assign(ts.data(), ts.size());
}
}
*(s->found_final_value) = true;
return false;

@ -241,6 +241,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
if (*seq_ == kMaxSequenceNumber) {
*seq_ = parsed_key.sequence;
}
if (max_covering_tombstone_seq_) {
*seq_ = std::max(*seq_, *max_covering_tombstone_seq_);
}
}
size_t ts_sz = ucmp_->timestamp_size();
if (ts_sz > 0 && timestamp_ != nullptr) {
if (!timestamp_->empty()) {
assert(ts_sz == timestamp_->size());
}
// TODO optimize for small size ts
const std::string kMaxTs(ts_sz, '\xff');
if (timestamp_->empty() ||
ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
timestamp_->assign(ts.data(), ts.size());
}
}
auto type = parsed_key.type;
@ -359,13 +376,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
}
}
}
if (state_ == kFound) {
size_t ts_sz = ucmp_->timestamp_size();
if (ts_sz > 0 && timestamp_ != nullptr) {
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
timestamp_->assign(ts.data(), ts.size());
}
}
return false;
case kTypeDeletion:
@ -377,11 +387,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) {
state_ = kDeleted;
size_t ts_sz = ucmp_->timestamp_size();
if (ts_sz > 0 && timestamp_ != nullptr) {
Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
timestamp_->assign(ts.data(), ts.size());
}
} else if (kMerge == state_) {
state_ = kFound;
Merge(nullptr);

@ -166,6 +166,65 @@ TEST_P(OptimisticTransactionTest, WriteConflictTest2) {
delete txn;
}
TEST_P(OptimisticTransactionTest, WriteConflictTest3) {
ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar"));
Transaction* txn = txn_db->BeginTransaction(WriteOptions());
ASSERT_NE(txn, nullptr);
std::string value;
ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value));
ASSERT_EQ(value, "bar");
ASSERT_OK(txn->Merge("foo", "bar3"));
// Merge outside of a transaction should conflict with the previous merge
ASSERT_OK(txn_db->Merge(WriteOptions(), "foo", "bar2"));
ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value));
ASSERT_EQ(value, "bar2");
ASSERT_EQ(1, txn->GetNumKeys());
Status s = txn->Commit();
EXPECT_TRUE(s.IsBusy()); // Txn should not commit
// Verify that transaction did not write anything
ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value));
ASSERT_EQ(value, "bar2");
delete txn;
}
TEST_P(OptimisticTransactionTest, WriteConflict4) {
ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar"));
Transaction* txn = txn_db->BeginTransaction(WriteOptions());
ASSERT_NE(txn, nullptr);
std::string value;
ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value));
ASSERT_EQ(value, "bar");
ASSERT_OK(txn->Merge("foo", "bar3"));
// Range delete outside of a transaction should conflict with the previous
// merge inside txn
auto* dbimpl = static_cast_with_check<DBImpl>(txn_db->GetRootDB());
ColumnFamilyHandle* default_cf = dbimpl->DefaultColumnFamily();
ASSERT_OK(dbimpl->DeleteRange(WriteOptions(), default_cf, "foo", "foo1"));
Status s = txn_db->Get(ReadOptions(), "foo", &value);
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(1, txn->GetNumKeys());
s = txn->Commit();
EXPECT_TRUE(s.IsBusy()); // Txn should not commit
// Verify that transaction did not write anything
s = txn_db->Get(ReadOptions(), "foo", &value);
ASSERT_TRUE(s.IsNotFound());
delete txn;
}
TEST_P(OptimisticTransactionTest, ReadConflictTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;

Loading…
Cancel
Save