diff --git a/db/memtable.cc b/db/memtable.cc index 9f12f4130..029490d2b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -958,7 +958,24 @@ static bool SaveValue(void* arg, const char* entry) { return true; // to continue to the next seq } - s->seq = seq; + if (s->seq == kMaxSequenceNumber) { + s->seq = seq; + } + + s->seq = std::max(s->seq, max_covering_tombstone_seq); + + if (ts_sz > 0 && s->timestamp != nullptr) { + if (!s->timestamp->empty()) { + assert(ts_sz == s->timestamp->size()); + } + // TODO optimize for smaller size ts + const std::string kMaxTs(ts_sz, '\xff'); + if (s->timestamp->empty() || + user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) { + Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); + s->timestamp->assign(ts.data(), ts_sz); + } + } if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || type == kTypeWideColumnEntity) && @@ -1039,10 +1056,6 @@ static bool SaveValue(void* arg, const char* entry) { *(s->is_blob_index) = (type == kTypeBlobIndex); } - if (ts_sz > 0 && s->timestamp != nullptr) { - Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); - s->timestamp->assign(ts.data(), ts.size()); - } return false; } case kTypeDeletion: @@ -1058,10 +1071,6 @@ static bool SaveValue(void* arg, const char* entry) { } } else { *(s->status) = Status::NotFound(); - if (ts_sz > 0 && s->timestamp != nullptr) { - Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); - s->timestamp->assign(ts.data(), ts.size()); - } } *(s->found_final_value) = true; return false; diff --git a/table/get_context.cc b/table/get_context.cc index 8e54e6d52..c86edbd7e 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -241,6 +241,23 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, if (*seq_ == kMaxSequenceNumber) { *seq_ = parsed_key.sequence; } + if (max_covering_tombstone_seq_) { + *seq_ = std::max(*seq_, *max_covering_tombstone_seq_); + } + } + + size_t ts_sz = ucmp_->timestamp_size(); + if (ts_sz > 0 && timestamp_ != nullptr) { + if (!timestamp_->empty()) { + assert(ts_sz == timestamp_->size()); + } + // TODO optimize for small size ts + const std::string kMaxTs(ts_sz, '\xff'); + if (timestamp_->empty() || + ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) { + Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); + timestamp_->assign(ts.data(), ts.size()); + } } auto type = parsed_key.type; @@ -359,13 +376,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, } } } - if (state_ == kFound) { - size_t ts_sz = ucmp_->timestamp_size(); - if (ts_sz > 0 && timestamp_ != nullptr) { - Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); - timestamp_->assign(ts.data(), ts.size()); - } - } return false; case kTypeDeletion: @@ -377,11 +387,6 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, assert(state_ == kNotFound || state_ == kMerge); if (kNotFound == state_) { state_ = kDeleted; - size_t ts_sz = ucmp_->timestamp_size(); - if (ts_sz > 0 && timestamp_ != nullptr) { - Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); - timestamp_->assign(ts.data(), ts.size()); - } } else if (kMerge == state_) { state_ = kFound; Merge(nullptr); diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 47acfbe56..7161aa4b7 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -166,6 +166,65 @@ TEST_P(OptimisticTransactionTest, WriteConflictTest2) { delete txn; } +TEST_P(OptimisticTransactionTest, WriteConflictTest3) { + ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar")); + + Transaction* txn = txn_db->BeginTransaction(WriteOptions()); + ASSERT_NE(txn, nullptr); + + std::string value; + ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(txn->Merge("foo", "bar3")); + + // Merge outside of a transaction should conflict with the previous merge + ASSERT_OK(txn_db->Merge(WriteOptions(), "foo", "bar2")); + ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar2"); + + ASSERT_EQ(1, txn->GetNumKeys()); + + Status s = txn->Commit(); + EXPECT_TRUE(s.IsBusy()); // Txn should not commit + + // Verify that transaction did not write anything + ASSERT_OK(txn_db->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar2"); + + delete txn; +} + +TEST_P(OptimisticTransactionTest, WriteConflict4) { + ASSERT_OK(txn_db->Put(WriteOptions(), "foo", "bar")); + + Transaction* txn = txn_db->BeginTransaction(WriteOptions()); + ASSERT_NE(txn, nullptr); + + std::string value; + ASSERT_OK(txn->GetForUpdate(ReadOptions(), "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(txn->Merge("foo", "bar3")); + + // Range delete outside of a transaction should conflict with the previous + // merge inside txn + auto* dbimpl = static_cast_with_check(txn_db->GetRootDB()); + ColumnFamilyHandle* default_cf = dbimpl->DefaultColumnFamily(); + ASSERT_OK(dbimpl->DeleteRange(WriteOptions(), default_cf, "foo", "foo1")); + Status s = txn_db->Get(ReadOptions(), "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + + ASSERT_EQ(1, txn->GetNumKeys()); + + s = txn->Commit(); + EXPECT_TRUE(s.IsBusy()); // Txn should not commit + + // Verify that transaction did not write anything + s = txn_db->Get(ReadOptions(), "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + + delete txn; +} + TEST_P(OptimisticTransactionTest, ReadConflictTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options;