From 07249fea8f999de65bb8cf9844366c1461362f44 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 23 Sep 2022 17:29:05 -0700 Subject: [PATCH] Fix DBImpl::GetLatestSequenceForKey() for Merge (#10724) Summary: Currently, without this fix, DBImpl::GetLatestSequenceForKey() may not return the latest sequence number for merge operands of the key. This can cause conflict checking during optimistic transaction commit phase to fail. Fix it by always returning the latest sequence number of the key, also considering range tombstones. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10724 Test Plan: make check Reviewed By: cbi42 Differential Revision: D39756847 Pulled By: riversand963 fbshipit-source-id: 0764c3dd4cb24960b37e18adccc6e7feed0e6876 --- db/memtable.cc | 27 ++++++--- table/get_context.cc | 29 +++++---- .../optimistic_transaction_test.cc | 59 +++++++++++++++++++ 3 files changed, 94 insertions(+), 21 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index 9f12f4130..029490d2b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -958,7 +958,24 @@ static bool SaveValue(void* arg, const char* entry) { return true; // to continue to the next seq } - s->seq = seq; + if (s->seq == kMaxSequenceNumber) { + s->seq = seq; + } + + s->seq = std::max(s->seq, max_covering_tombstone_seq); + + if (ts_sz > 0 && s->timestamp != nullptr) { + if (!s->timestamp->empty()) { + assert(ts_sz == s->timestamp->size()); + } + // TODO optimize for smaller size ts + const std::string kMaxTs(ts_sz, '\xff'); + if (s->timestamp->empty() || + user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) { + Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); + s->timestamp->assign(ts.data(), ts_sz); + } + } if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || type == kTypeWideColumnEntity) && @@ -1039,10 +1056,6 @@ static bool SaveValue(void* arg, const char* entry) { *(s->is_blob_index) = (type == kTypeBlobIndex); } - if (ts_sz > 0 && s->timestamp != nullptr) { - Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); - s->timestamp->assign(ts.data(), ts.size()); - } return false; } case kTypeDeletion: @@ -1058,10 +1071,6 @@ static bool SaveValue(void* arg, const char* entry) { } } else { *(s->status) = Status::NotFound(); - if (ts_sz > 0 && s->timestamp != nullptr) { - Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); - s->timestamp->assign(ts.data(), ts.size()); - } } *(s->found_final_value) = true; return false; diff --git a/table/get_context.cc b/table/get_context.cc index 8e54e6d52..c86edbd7e 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -241,6 +241,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, if (*seq_ == kMaxSequenceNumber) { *seq_ = parsed_key.sequence; } + if (max_covering_tombstone_seq_) { + *seq_ = std::max(*seq_, *max_covering_tombstone_seq_); + } + } + + size_t ts_sz = ucmp_->timestamp_size(); + if (ts_sz > 0 && timestamp_ != nullptr) { + if (!timestamp_->empty()) { + assert(ts_sz == timestamp_->size()); + } + // TODO optimize for small size ts + const std::string kMaxTs(ts_sz, '\xff'); + if (timestamp_->empty() || + ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) { + Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); + timestamp_->assign(ts.data(), ts.size()); + } } auto type = parsed_key.type; @@ -359,13 +376,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } } } - if (state_ == kFound) { - size_t ts_sz = ucmp_->timestamp_size(); - if (ts_sz > 0 && timestamp_ != nullptr) { - Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); - timestamp_->assign(ts.data(), ts.size()); - } - } return false; case kTypeDeletion: @@ -377,11 +387,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { state_ = kDeleted; - size_t ts_sz = ucmp_->timestamp_size(); - if (ts_sz > 0 && timestamp_ != nullptr) { - Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); - timestamp_->assign(ts.data(), ts.size()); - } } else if (kMerge == state_) { state_ = kFound; Merge(nullptr); diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 47acfbe56..7161aa4b7 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -166,6 +166,65 @@ TEST_P(OptimisticTransactionTest, WriteConflictTest2) { delete txn; } +TEST_P(OptimisticTransactionTest, WriteConflictTest3) { + ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar")); + + Transaction* txn = txn_db->BeginTransaction(WriteOptions()); + ASSERT_NE(txn, nullptr); + + std::string value; + ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(txn->Merge("foo", "bar3")); + + // Merge outside of a transaction should conflict with the previous merge + ASSERT_OK(txn_db->Merge(WriteOptions(), "foo", "bar2")); + ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar2"); + + ASSERT_EQ(1, txn->GetNumKeys()); + + Status s = txn->Commit(); + EXPECT_TRUE(s.IsBusy()); // Txn should not commit + + // Verify that transaction did not write anything + ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar2"); + + delete txn; +} + +TEST_P(OptimisticTransactionTest, WriteConflict4) { + ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar")); + + Transaction* txn = txn_db->BeginTransaction(WriteOptions()); + ASSERT_NE(txn, nullptr); + + std::string value; + ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(txn->Merge("foo", "bar3")); + + // Range delete outside of a transaction should conflict with the previous + // merge inside txn + auto* dbimpl = static_cast_with_check(txn_db->GetRootDB()); + ColumnFamilyHandle* default_cf = dbimpl->DefaultColumnFamily(); + ASSERT_OK(dbimpl->DeleteRange(WriteOptions(), default_cf, "foo", "foo1")); + Status s = txn_db->Get(ReadOptions(), "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + + ASSERT_EQ(1, txn->GetNumKeys()); + + s = txn->Commit(); + EXPECT_TRUE(s.IsBusy()); // Txn should not commit + + // Verify that transaction did not write anything + s = txn_db->Get(ReadOptions(), "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn; +} + TEST_P(OptimisticTransactionTest, ReadConflictTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options;