Update TransactionUtil::CheckKeyForConflict to also use timestamps (#9162)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9162

Existing TransactionUtil::CheckKeyForConflict() performs only seq-based
conflict checking. If user-defined timestamp is enabled, it should perform
conflict checking based on timestamps too.

Update TransactionUtil::CheckKey-related methods to verify the timestamp of the
latest version of a key is smaller than the read timestamp. Note that
CheckKeysForConflict() is not updated since it's used only by optimistic
transaction, and we do not plan to update it in this upcoming batch of diffs.

Existing GetLatestSequenceForKey() returns the sequence of the latest
version of a specific user key. Since we support user-defined timestamp, we
need to update this method to also return the timestamp (if enabled) of the
latest version of the key. This will be needed for snapshot validation.

Reviewed By: ltamasi

Differential Revision: D31567960

fbshipit-source-id: 2e4a14aed267435a9aa91bc632d2411c01946d44
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 9bb13c56b3
commit 2035798834
  1. 1
      HISTORY.md
  2. 3
      db/column_family.cc
  3. 59
      db/db_impl/db_impl.cc
  4. 12
      db/db_impl/db_impl.h
  5. 63
      db/db_test2.cc
  6. 4
      utilities/transactions/pessimistic_transaction.cc
  7. 38
      utilities/transactions/transaction_util.cc
  8. 17
      utilities/transactions/transaction_util.h
  9. 7
      utilities/transactions/write_prepared_txn.cc
  10. 8
      utilities/transactions/write_unprepared_txn.cc

@ -18,6 +18,7 @@
### Behavior Changes
* `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files.
* `TransactionUtil::CheckKeyForConflicts` can also perform conflict-checking based on user-defined timestamps in addition to sequence numbers.
### Public Interface Change
* When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly.

@ -211,7 +211,8 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
size_t clamp_max = std::conditional<
sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
std::integral_constant<uint64_t, 64ull << 30>>::type::value;
ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10,
clamp_max);
// if user sets arena_block_size, we trust user to use this value. Otherwise,
// calculate a proper value from writer_buffer_size;
if (result.arena_block_size <= 0) {

@ -4376,25 +4376,38 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
return earliest_seq;
}
Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only,
SequenceNumber lower_bound_seq,
SequenceNumber* seq,
bool* found_record_for_key,
bool* is_blob_index) {
Status DBImpl::GetLatestSequenceForKey(
SuperVersion* sv, const Slice& key, bool cache_only,
SequenceNumber lower_bound_seq, SequenceNumber* seq, std::string* timestamp,
bool* found_record_for_key, bool* is_blob_index) {
Status s;
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0;
ReadOptions read_options;
SequenceNumber current_seq = versions_->LastSequence();
LookupKey lkey(key, current_seq);
ColumnFamilyData* cfd = sv->cfd;
assert(cfd);
const Comparator* const ucmp = cfd->user_comparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
std::string ts_buf;
if (ts_sz > 0) {
assert(timestamp);
ts_buf.assign(ts_sz, '\xff');
} else {
assert(!timestamp);
}
Slice ts(ts_buf);
LookupKey lkey(key, current_seq, ts_sz == 0 ? nullptr : &ts);
*seq = kMaxSequenceNumber;
*found_record_for_key = false;
// Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context,
sv->mem->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
nullptr /*read_callback*/, is_blob_index);
@ -4406,6 +4419,10 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
return s;
}
assert(!ts_sz ||
(*seq != kMaxSequenceNumber &&
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check immutable memtables
@ -4421,7 +4438,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
}
// Check if there is a record for this key in the immutable memtables
sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context,
sv->imm->Get(lkey, /*value=*/nullptr, timestamp, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
nullptr /*read_callback*/, is_blob_index);
@ -4434,6 +4451,11 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
return s;
}
assert(!ts_sz ||
(*seq != kMaxSequenceNumber &&
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check memtable history
*found_record_for_key = true;
@ -4448,9 +4470,9 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
}
// Check if there is a record for this key in the immutable memtables
sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, seq, read_options,
is_blob_index);
sv->imm->GetFromHistory(lkey, /*value=*/nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq, seq,
read_options, is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
@ -4462,8 +4484,13 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
return s;
}
assert(!ts_sz ||
(*seq != kMaxSequenceNumber &&
*timestamp != std::string(ts_sz, '\xff')) ||
(*seq == kMaxSequenceNumber && timestamp->empty()));
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check SST files
assert(0 == ts_sz || *timestamp != std::string(ts_sz, '\xff'));
*found_record_for_key = true;
return Status::OK();
}
@ -4476,10 +4503,10 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
// SST files if cache_only=true?
if (!cache_only) {
// Check tables
sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, nullptr /* value_found */,
found_record_for_key, seq, nullptr /*read_callback*/,
is_blob_index);
sv->current->Get(read_options, lkey, /*value=*/nullptr, timestamp, &s,
&merge_context, &max_covering_tombstone_seq,
nullptr /* value_found */, found_record_for_key, seq,
nullptr /*read_callback*/, is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading SST files

@ -582,9 +582,15 @@ class DBImpl : public DB {
// in the memtables, including memtable history. If cache_only is false,
// SST files will also be checked.
//
// `key` should NOT have user-defined timestamp appended to user key even if
// timestamp is enabled.
//
// If a key is found, *found_record_for_key will be set to true and
// *seq will be set to the stored sequence number for the latest
// operation on this key or kMaxSequenceNumber if unknown.
// operation on this key or kMaxSequenceNumber if unknown. If user-defined
// timestamp is enabled for this column family and timestamp is not nullptr,
// then *timestamp will be set to the stored timestamp for the latest
// operation on this key.
// If no key is found, *found_record_for_key will be set to false.
//
// Note: If cache_only=false, it is possible for *seq to be set to 0 if
@ -608,9 +614,9 @@ class DBImpl : public DB {
Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only,
SequenceNumber lower_bound_seq,
SequenceNumber* seq,
SequenceNumber* seq, std::string* timestamp,
bool* found_record_for_key,
bool* is_blob_index = nullptr);
bool* is_blob_index);
Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
const Slice& lower_bound, const Slice upper_bound);

@ -6763,6 +6763,69 @@ TEST_F(DBTest2, RenameDirectory) {
Destroy(options);
dbname_ = old_dbname;
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest2, GetLatestSeqAndTsForKey) {
Destroy(last_options_);
Options options = CurrentOptions();
options.max_write_buffer_size_to_maintain = 64 << 10;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.comparator = test::ComparatorWithU64Ts();
options.statistics = CreateDBStatistics();
Reopen(options);
constexpr uint64_t kTsU64Value = 12;
for (uint64_t key = 0; key < 100; ++key) {
std::string ts_str;
PutFixed64(&ts_str, kTsU64Value);
Slice ts = ts_str;
WriteOptions write_opts;
write_opts.timestamp = &ts;
std::string key_str;
PutFixed64(&key_str, key);
std::reverse(key_str.begin(), key_str.end());
ASSERT_OK(Put(key_str, "value", write_opts));
}
ASSERT_OK(Flush());
constexpr bool cache_only = true;
constexpr SequenceNumber lower_bound_seq = 0;
auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(
dbfull()->DefaultColumnFamily());
assert(cfhi);
assert(cfhi->cfd());
SuperVersion* sv = cfhi->cfd()->GetSuperVersion();
for (uint64_t key = 0; key < 100; ++key) {
std::string key_str;
PutFixed64(&key_str, key);
std::reverse(key_str.begin(), key_str.end());
std::string ts;
SequenceNumber seq = kMaxSequenceNumber;
bool found_record_for_key = false;
bool is_blob_index = false;
const Status s = dbfull()->GetLatestSequenceForKey(
sv, key_str, cache_only, lower_bound_seq, &seq, &ts,
&found_record_for_key, &is_blob_index);
ASSERT_OK(s);
std::string expected_ts;
PutFixed64(&expected_ts, kTsU64Value);
ASSERT_EQ(expected_ts, ts);
ASSERT_TRUE(found_record_for_key);
ASSERT_FALSE(is_blob_index);
}
// Verify that no read to SST files.
ASSERT_EQ(0, options.statistics->getTickerCount(GET_HIT_L0));
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -174,7 +174,6 @@ Status PessimisticTransaction::CommitBatch(WriteBatch* batch) {
}
Status PessimisticTransaction::Prepare() {
if (name_.empty()) {
return Status::InvalidArgument(
"Cannot prepare a transaction that has not been named.");
@ -712,8 +711,9 @@ Status PessimisticTransaction::ValidateSnapshot(
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
// TODO (yanqin): support conflict checking based on timestamp.
return TransactionUtil::CheckKeyForConflicts(
db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */);
db_impl_, cfh, key.ToString(), snap_seq, nullptr, false /* cache_only */);
}
bool PessimisticTransaction::TryStealingLocks() {

@ -21,8 +21,8 @@ namespace ROCKSDB_NAMESPACE {
Status TransactionUtil::CheckKeyForConflicts(
DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,
SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker,
SequenceNumber min_uncommitted) {
SequenceNumber snap_seq, const std::string* const read_ts, bool cache_only,
ReadCallback* snap_checker, SequenceNumber min_uncommitted) {
Status result;
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
@ -38,8 +38,8 @@ Status TransactionUtil::CheckKeyForConflicts(
SequenceNumber earliest_seq =
db_impl->GetEarliestMemTableSequenceNumber(sv, true);
result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only,
snap_checker, min_uncommitted);
result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, read_ts,
cache_only, snap_checker, min_uncommitted);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
}
@ -50,8 +50,9 @@ Status TransactionUtil::CheckKeyForConflicts(
Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq,
SequenceNumber snap_seq,
const std::string& key, bool cache_only,
ReadCallback* snap_checker,
const std::string& key,
const std::string* const read_ts,
bool cache_only, ReadCallback* snap_checker,
SequenceNumber min_uncommitted) {
// When `min_uncommitted` is provided, keys are not always committed
// in sequence number order, and `snap_checker` is used to check whether
@ -105,6 +106,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
if (result.ok()) {
SequenceNumber seq = kMaxSequenceNumber;
std::string timestamp;
bool found_record_for_key = false;
// When min_uncommitted == kMaxSequenceNumber, writes are committed in
@ -117,9 +119,10 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
// keys lower than min_uncommitted can be skipped.
SequenceNumber lower_bound_seq =
(min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted;
Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst,
lower_bound_seq, &seq,
&found_record_for_key);
Status s = db_impl->GetLatestSequenceForKey(
sv, key, !need_to_read_sst, lower_bound_seq, &seq,
!read_ts ? nullptr : &timestamp, &found_record_for_key,
/*is_blob_index=*/nullptr);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
result = s;
@ -127,6 +130,17 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
bool write_conflict = snap_checker == nullptr
? snap_seq < seq
: !snap_checker->IsVisible(seq);
// Perform conflict checking based on timestamp if applicable.
if (!write_conflict && read_ts != nullptr) {
ColumnFamilyData* cfd = sv->cfd;
assert(cfd);
const Comparator* const ucmp = cfd->user_comparator();
assert(ucmp);
assert(read_ts->size() == ucmp->timestamp_size());
assert(read_ts->size() == timestamp.size());
// Write conflict if *ts < timestamp.
write_conflict = ucmp->CompareTimestamp(*read_ts, timestamp) < 0;
}
if (write_conflict) {
result = Status::Busy();
}
@ -167,7 +181,11 @@ Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
PointLockStatus status = tracker.GetPointLockStatus(cf, key);
const SequenceNumber key_seq = status.seq;
result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
// TODO: support timestamp-based conflict checking.
// CheckKeysForConflicts() is currently used only by optimistic
// transactions.
result = CheckKey(db_impl, sv, earliest_seq, key_seq, key,
/*read_ts=*/nullptr, cache_only);
if (!result.ok()) {
break;
}

@ -27,20 +27,22 @@ class WriteBatchWithIndex;
class TransactionUtil {
public:
// Verifies there have been no commits to this key in the db since this
// sequence number.
// sequence number. If user-defined timestamp is enabled, then also check
// no commits to this key in the db since the given ts.
//
// If cache_only is true, then this function will not attempt to read any
// SST files. This will make it more likely this function will
// return an error if it is unable to determine if there are any conflicts.
//
// See comment of CheckKey() for explanation of `snap_seq`, `snap_checker`
// and `min_uncommitted`.
// See comment of CheckKey() for explanation of `snap_seq`, `ts`,
// `snap_checker` and `min_uncommitted`.
//
// Returns OK on success, BUSY if there is a conflicting write, or other error
// status for any unexpected errors.
static Status CheckKeyForConflicts(
DBImpl* db_impl, ColumnFamilyHandle* column_family,
const std::string& key, SequenceNumber snap_seq, bool cache_only,
const std::string& key, SequenceNumber snap_seq,
const std::string* const ts, bool cache_only,
ReadCallback* snap_checker = nullptr,
SequenceNumber min_uncommitted = kMaxSequenceNumber);
@ -68,10 +70,13 @@ class TransactionUtil {
// seq < `min_uncommitted`: no conflict
// seq > `snap_seq`: applicable to conflict
// `min_uncommitted` <= seq <= `snap_seq`: call `snap_checker` to determine.
//
// If user-defined timestamp is enabled, a write conflict is detected if an
// operation for `key` with timestamp greater than `ts` exists.
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq, SequenceNumber snap_seq,
const std::string& key, bool cache_only,
ReadCallback* snap_checker = nullptr,
const std::string& key, const std::string* const ts,
bool cache_only, ReadCallback* snap_checker = nullptr,
SequenceNumber min_uncommitted = kMaxSequenceNumber);
};

@ -453,9 +453,10 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
kBackedByDBSnapshot);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker, min_uncommitted);
// TODO(yanqin): support user-defined timestamp
return TransactionUtil::CheckKeyForConflicts(
db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
false /* cache_only */, &snap_checker, min_uncommitted);
}
void WritePreparedTxn::SetSnapshot() {

@ -6,6 +6,7 @@
#ifndef ROCKSDB_LITE
#include "utilities/transactions/write_unprepared_txn.h"
#include "db/db_impl/db_impl.h"
#include "util/cast_util.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
@ -1025,9 +1026,10 @@ Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
WriteUnpreparedTxnReadCallback snap_checker(
wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker, min_uncommitted);
// TODO(yanqin): Support user-defined timestamp.
return TransactionUtil::CheckKeyForConflicts(
db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
false /* cache_only */, &snap_checker, min_uncommitted);
}
const std::map<SequenceNumber, size_t>&

Loading…
Cancel
Save