Use SST files for Transaction conflict detection

Summary:
Currently, transactions can fail even if there is no actual write conflict.  This is due to relying on only the memtables to check for write-conflicts.  Users have to tune memtable settings to try to avoid this, but it's hard to figure out exactly how to tune these settings.

With this diff, TransactionDB will use both memtables and SST files to determine if there are any write conflicts.  This relies on the fact that BlockBasedTable stores sequence numbers for all writes that happen after any open snapshot.  Also, D50295 is needed to prevent SingleDelete from disappearing writes (the TODOs in this test code will be fixed once the other diff is approved and merged).

Note that Optimistic transactions will still rely on tuning memtable settings as we do not want to read from SST while on the write thread.  Also, memtable settings can still be used to reduce how often TransactionDB needs to read SST files.

Test Plan: unit tests, db bench

Reviewers: rven, yhchiang, kradhakrishnan, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb, yoshinorim

Differential Revision: https://reviews.facebook.net/D50475
main
agiardullo 9 years ago
parent 362d819a14
commit 3bfd3d39a3
  1. 3
      db/builder.cc
  2. 1
      db/builder.h
  3. 10
      db/compaction_iterator.cc
  4. 61
      db/db_impl.cc
  5. 31
      db/db_impl.h
  6. 6
      db/flush_job.cc
  7. 2
      db/flush_job.h
  8. 12
      db/flush_job_test.cc
  9. 3
      db/repair.cc
  10. 2
      db/snapshot_impl.h
  11. 4
      db/table_cache.cc
  12. 20
      db/version_set.cc
  13. 17
      db/version_set.h
  14. 6
      include/rocksdb/compaction_filter.h
  15. 8
      include/rocksdb/options.h
  16. 9
      table/cuckoo_table_reader.cc
  17. 26
      table/get_context.cc
  18. 22
      table/get_context.h
  19. 10
      utilities/transactions/optimistic_transaction_impl.cc
  20. 3
      utilities/transactions/transaction_impl.cc
  21. 157
      utilities/transactions/transaction_test.cc
  22. 49
      utilities/transactions/transaction_util.cc
  23. 11
      utilities/transactions/transaction_util.h

@ -59,6 +59,7 @@ Status BuildTable(
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories, int_tbl_prop_collector_factories,
uint32_t column_family_id, std::vector<SequenceNumber> snapshots, uint32_t column_family_id, std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks, const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, const Env::IOPriority io_priority, InternalStats* internal_stats, const Env::IOPriority io_priority,
@ -97,7 +98,7 @@ Status BuildTable(
CompactionIterator c_iter(iter, internal_comparator.user_comparator(), CompactionIterator c_iter(iter, internal_comparator.user_comparator(),
&merge, kMaxSequenceNumber, &snapshots, &merge, kMaxSequenceNumber, &snapshots,
kMaxSequenceNumber, env, earliest_write_conflict_snapshot, env,
true /* internal key corruption is not ok */); true /* internal key corruption is not ok */);
c_iter.SeekToFirst(); c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) { for (; c_iter.Valid(); c_iter.Next()) {

@ -56,6 +56,7 @@ extern Status BuildTable(
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories, int_tbl_prop_collector_factories,
uint32_t column_family_id, std::vector<SequenceNumber> snapshots, uint32_t column_family_id, std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks, const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, InternalStats* internal_stats,

@ -333,6 +333,10 @@ void CompactionIterator::NextFromInput() {
// same key, then this kv is not visible in any snapshot. // same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key // Hidden by an newer entry for same user key
// TODO: why not > ? // TODO: why not > ?
//
// Note: Dropping this key will not affect TransactionDB write-conflict
// checking since there has already been a record returned for this key
// in this snapshot.
assert(last_sequence >= current_user_key_sequence_); assert(last_sequence >= current_user_key_sequence_);
++iter_stats_.num_record_drop_hidden; // (A) ++iter_stats_.num_record_drop_hidden; // (A)
input_->Next(); input_->Next();
@ -351,6 +355,9 @@ void CompactionIterator::NextFromInput() {
// smaller sequence numbers will be dropped in the next // smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above). // few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped. // Therefore this deletion marker is obsolete and can be dropped.
//
// Note: Dropping this Delete will not affect TransactionDB
// write-conflict checking since it is earlier than any snapshot.
++iter_stats_.num_record_drop_obsolete; ++iter_stats_.num_record_drop_obsolete;
input_->Next(); input_->Next();
} else if (ikey_.type == kTypeMerge) { } else if (ikey_.type == kTypeMerge) {
@ -400,6 +407,9 @@ void CompactionIterator::PrepareOutput() {
// If this is the bottommost level (no files in lower levels) // If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno // and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero. // then we can squash the seqno to zero.
// This is safe for TransactionDB write-conflict checking since transactions
// only care about sequence number larger than any active snapshots.
if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ && if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ &&
ikey_.type != kTypeMerge) { ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);

@ -1394,11 +1394,17 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
{ {
mutex_.Unlock(); mutex_.Unlock();
TableFileCreationInfo info; TableFileCreationInfo info;
SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
s = BuildTable( s = BuildTable(
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
iter.get(), &meta, cfd->internal_comparator(), iter.get(), &meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), snapshot_seqs,
snapshots_.GetAll(), GetCompressionFlush(*cfd->ioptions()), earliest_write_conflict_snapshot,
GetCompressionFlush(*cfd->ioptions()),
cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); cfd->internal_stats(), Env::IO_HIGH, &info.table_properties);
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
@ -1453,12 +1459,16 @@ Status DBImpl::FlushMemTableToOutputFile(
assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending()); assert(cfd->imm()->IsFlushPending());
FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, SequenceNumber earliest_write_conflict_snapshot;
env_options_, versions_.get(), &mutex_, &shutting_down_, std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(), job_context, log_buffer, snapshots_.GetAll(&earliest_write_conflict_snapshot);
FlushJob flush_job(
dbname_, cfd, db_options_, mutable_cf_options, env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
earliest_write_conflict_snapshot, job_context, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(0U), directories_.GetDbDir(), directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions()), stats_, GetCompressionFlush(*cfd->ioptions()), stats_, &event_logger_);
&event_logger_);
FileMetaData file_meta; FileMetaData file_meta;
@ -5368,9 +5378,9 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
const Slice& key, bool cache_only, SequenceNumber* seq,
SequenceNumber* seq) { bool* found_record_for_key) {
Status s; Status s;
std::string value; std::string value;
MergeContext merge_context; MergeContext merge_context;
@ -5379,6 +5389,10 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv,
LookupKey lkey(key, current_seq); LookupKey lkey(key, current_seq);
*seq = kMaxSequenceNumber; *seq = kMaxSequenceNumber;
*found_record_for_key = false;
// TODO(agiardullo): Should optimize all the Get() functions below to not
// return a value since we do not use it.
// Check if there is a record for this key in the latest memtable // Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, &value, &s, &merge_context, seq); sv->mem->Get(lkey, &value, &s, &merge_context, seq);
@ -5394,6 +5408,7 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv,
if (*seq != kMaxSequenceNumber) { if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check immutable memtables // Found a sequence number, no need to check immutable memtables
*found_record_for_key = true;
return Status::OK(); return Status::OK();
} }
@ -5411,6 +5426,7 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv,
if (*seq != kMaxSequenceNumber) { if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check memtable history // Found a sequence number, no need to check memtable history
*found_record_for_key = true;
return Status::OK(); return Status::OK();
} }
@ -5426,6 +5442,31 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv,
return s; return s;
} }
if (*seq != kMaxSequenceNumber) {
// Found a sequence number, no need to check SST files
*found_record_for_key = true;
return Status::OK();
}
// TODO(agiardullo): possible optimization: consider checking cached
// SST files if cache_only=true?
if (!cache_only) {
// Check tables
ReadOptions read_options;
sv->current->Get(read_options, lkey, &value, &s, &merge_context,
nullptr /* value_found */, found_record_for_key, seq);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading SST files
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Unexpected status returned from Version::Get: %s\n",
s.ToString().c_str());
return s;
}
}
return Status::OK(); return Status::OK();
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -226,13 +226,30 @@ class DBImpl : public DB {
bool include_history); bool include_history);
// For a given key, check to see if there are any records for this key // For a given key, check to see if there are any records for this key
// in the memtables, including memtable history. // in the memtables, including memtable history. If cache_only is false,
// SST files will also be checked.
// On success, *seq will contain the sequence number for the //
// latest such change or kMaxSequenceNumber if no records were present. // If a key is found, *found_record_for_key will be set to true and
// Returns OK on success, other status on error reading memtables. // *seq will will be set to the stored sequence number for the latest
Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key, // operation on this key or kMaxSequenceNumber if unknown.
SequenceNumber* seq); // If no key is found, *found_record_for_key will be set to false.
//
// Note: If cache_only=false, it is possible for *seq to be set to 0 if
// the sequence number has been cleared from the record. If the caller is
// holding an active db snapshot, we know the missing sequence must be less
// than the snapshot's sequence number (sequence numbers are only cleared
// when there are no earlier active snapshots).
//
// If NotFound is returned and found_record_for_key is set to false, then no
// record for this key was found. If the caller is holding an active db
// snapshot, we know that no key could have existing after this snapshot
// (since we do not compact keys that have an earlier snapshot).
//
// Returns OK or NotFound on success,
// other status on unexpected error.
Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only, SequenceNumber* seq,
bool* found_record_for_key);
using DB::AddFile; using DB::AddFile;
virtual Status AddFile(ColumnFamilyHandle* column_family, virtual Status AddFile(ColumnFamilyHandle* column_family,

@ -62,6 +62,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
InstrumentedMutex* db_mutex, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
JobContext* job_context, LogBuffer* log_buffer, JobContext* job_context, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_file_directory, Directory* db_directory, Directory* output_file_directory,
CompressionType output_compression, Statistics* stats, CompressionType output_compression, Statistics* stats,
@ -75,6 +76,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
db_mutex_(db_mutex), db_mutex_(db_mutex),
shutting_down_(shutting_down), shutting_down_(shutting_down),
existing_snapshots_(std::move(existing_snapshots)), existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
job_context_(job_context), job_context_(job_context),
log_buffer_(log_buffer), log_buffer_(log_buffer),
db_directory_(db_directory), db_directory_(db_directory),
@ -235,8 +237,8 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
cfd_->table_cache(), iter.get(), meta, cfd_->table_cache(), iter.get(), meta,
cfd_->internal_comparator(), cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
existing_snapshots_, output_compression_, existing_snapshots_, earliest_write_conflict_snapshot_,
cfd_->ioptions()->compression_opts, output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), Env::IO_HIGH, &table_properties_); cfd_->internal_stats(), Env::IO_HIGH, &table_properties_);
info.table_properties = table_properties_; info.table_properties = table_properties_;

@ -58,6 +58,7 @@ class FlushJob {
const EnvOptions& env_options, VersionSet* versions, const EnvOptions& env_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down, InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots, std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
JobContext* job_context, LogBuffer* log_buffer, JobContext* job_context, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_file_directory, Directory* db_directory, Directory* output_file_directory,
CompressionType output_compression, Statistics* stats, CompressionType output_compression, Statistics* stats,
@ -83,6 +84,7 @@ class FlushJob {
InstrumentedMutex* db_mutex_; InstrumentedMutex* db_mutex_;
std::atomic<bool>* shutting_down_; std::atomic<bool>* shutting_down_;
std::vector<SequenceNumber> existing_snapshots_; std::vector<SequenceNumber> existing_snapshots_;
SequenceNumber earliest_write_conflict_snapshot_;
JobContext* job_context_; JobContext* job_context_;
LogBuffer* log_buffer_; LogBuffer* log_buffer_;
Directory* db_directory_; Directory* db_directory_;

@ -92,8 +92,8 @@ TEST_F(FlushJobTest, Empty) {
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
{}, &job_context, nullptr, nullptr, nullptr, {}, kMaxSequenceNumber, &job_context, nullptr, nullptr,
kNoCompression, nullptr, &event_logger); nullptr, kNoCompression, nullptr, &event_logger);
ASSERT_OK(flush_job.Run()); ASSERT_OK(flush_job.Run());
job_context.Clean(); job_context.Clean();
} }
@ -131,8 +131,8 @@ TEST_F(FlushJobTest, NonEmpty) {
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
{}, &job_context, nullptr, nullptr, nullptr, {}, kMaxSequenceNumber, &job_context, nullptr, nullptr,
kNoCompression, nullptr, &event_logger); nullptr, kNoCompression, nullptr, &event_logger);
FileMetaData fd; FileMetaData fd;
mutex_.Lock(); mutex_.Lock();
ASSERT_OK(flush_job.Run(&fd)); ASSERT_OK(flush_job.Run(&fd));
@ -195,8 +195,8 @@ TEST_F(FlushJobTest, Snapshots) {
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(), db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_, env_options_, versions_.get(), &mutex_, &shutting_down_,
snapshots, &job_context, nullptr, nullptr, nullptr, snapshots, kMaxSequenceNumber, &job_context, nullptr,
kNoCompression, nullptr, &event_logger); nullptr, nullptr, kNoCompression, nullptr, &event_logger);
mutex_.Lock(); mutex_.Lock();
ASSERT_OK(flush_job.Run()); ASSERT_OK(flush_job.Run());
mutex_.Unlock(); mutex_.Unlock();

@ -294,7 +294,8 @@ class Repairer {
dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(), dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(),
&meta, icmp_, &int_tbl_prop_collector_factories_, &meta, icmp_, &int_tbl_prop_collector_factories_,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, {}, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, {},
kNoCompression, CompressionOptions(), false, nullptr); kMaxSequenceNumber, kNoCompression, CompressionOptions(), false,
nullptr);
} }
delete mem->Unref(); delete mem->Unref();
delete cf_mems_default; delete cf_mems_default;

@ -91,7 +91,7 @@ class SnapshotList {
ret.push_back(s->next_->number_); ret.push_back(s->next_->number_);
if (oldest_write_conflict_snapshot != nullptr && if (oldest_write_conflict_snapshot != nullptr &&
*oldest_write_conflict_snapshot != kMaxSequenceNumber && *oldest_write_conflict_snapshot == kMaxSequenceNumber &&
s->next_->is_write_conflict_boundary_) { s->next_->is_write_conflict_boundary_) {
// If this is the first write-conflict boundary snapshot in the list, // If this is the first write-conflict boundary snapshot in the list,
// it is the oldest // it is the oldest

@ -219,7 +219,9 @@ Status TableCache::Get(const ReadOptions& options,
IterKey row_cache_key; IterKey row_cache_key;
std::string row_cache_entry_buffer; std::string row_cache_entry_buffer;
if (ioptions_.row_cache) { // Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
uint64_t fd_number = fd.GetNumber(); uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k); auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key, // We use the user key as cache key instead of the internal key,

@ -868,21 +868,24 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
refs_(0), refs_(0),
version_number_(version_number) {} version_number_(version_number) {}
void Version::Get(const ReadOptions& read_options, void Version::Get(const ReadOptions& read_options, const LookupKey& k,
const LookupKey& k, std::string* value, Status* status,
std::string* value, MergeContext* merge_context, bool* value_found,
Status* status, bool* key_exists, SequenceNumber* seq) {
MergeContext* merge_context,
bool* value_found) {
Slice ikey = k.internal_key(); Slice ikey = k.internal_key();
Slice user_key = k.user_key(); Slice user_key = k.user_key();
assert(status->ok() || status->IsMergeInProgress()); assert(status->ok() || status->IsMergeInProgress());
if (key_exists != nullptr) {
// will falsify below if not found
*key_exists = true;
}
GetContext get_context( GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_, user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, this->env_); value, value_found, merge_context, this->env_, seq);
FilePicker fp( FilePicker fp(
storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
@ -942,6 +945,9 @@ void Version::Get(const ReadOptions& read_options,
user_key); user_key);
} }
} else { } else {
if (key_exists != nullptr) {
*key_exists = false;
}
*status = Status::NotFound(); // Use an empty error message for speed *status = Status::NotFound(); // Use an empty error message for speed
} }
} }

@ -424,11 +424,24 @@ class Version {
// Lookup the value for key. If found, store it in *val and // Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. // return OK. Else return a non-OK status.
// Uses *operands to store merge_operator operations to apply later // Uses *operands to store merge_operator operations to apply later.
//
// If the ReadOptions.read_tier is set to do a read-only fetch, then
// *value_found will be set to false if it cannot be determined whether
// this value exists without doing IO.
//
// If the key is Deleted, *status will be set to NotFound and
// *key_exists will be set to true.
// If no key was found, *status will be set to NotFound and
// *key_exists will be set to false.
// If seq is non-null, *seq will be set to the sequence number found
// for the key if a key was found.
//
// REQUIRES: lock is not held // REQUIRES: lock is not held
void Get(const ReadOptions&, const LookupKey& key, std::string* val, void Get(const ReadOptions&, const LookupKey& key, std::string* val,
Status* status, MergeContext* merge_context, Status* status, MergeContext* merge_context,
bool* value_found = nullptr); bool* value_found = nullptr, bool* key_exists = nullptr,
SequenceNumber* seq = nullptr);
// Loads some stats information from files. Call without mutex held. It needs // Loads some stats information from files. Call without mutex held. It needs
// to be called before applying the version to the version set. // to be called before applying the version to the version set.

@ -93,6 +93,12 @@ class CompactionFilter {
// The compaction process invokes this method on every merge operand. If this // The compaction process invokes this method on every merge operand. If this
// method returns true, the merge operand will be ignored and not written out // method returns true, the merge operand will be ignored and not written out
// in the compaction output // in the compaction output
//
// Note: If you are using a TransactionDB, it is not recommended to implement
// FilterMergeOperand(). If a Merge operation is filtered out, TransactionDB
// may not realize there is a write conflict and may allow a Transaction to
// Commit that should have failed. Instead, it is better to implement any
// Merge filtering inside the MergeOperator.
virtual bool FilterMergeOperand(int level, const Slice& key, virtual bool FilterMergeOperand(int level, const Slice& key,
const Slice& operand) const { const Slice& operand) const {
return false; return false;

@ -276,9 +276,17 @@ struct ColumnFamilyOptions {
// max_write_buffer_number, this parameter does not affect flushing. // max_write_buffer_number, this parameter does not affect flushing.
// This controls the minimum amount of write history that will be available // This controls the minimum amount of write history that will be available
// in memory for conflict checking when Transactions are used. // in memory for conflict checking when Transactions are used.
//
// When using an OptimisticTransactionDB:
// If this value is too low, some transactions may fail at commit time due // If this value is too low, some transactions may fail at commit time due
// to not being able to determine whether there were any write conflicts. // to not being able to determine whether there were any write conflicts.
// //
// When using a TransactionDB:
// If Transaction::SetSnapshot is used, TransactionDB will read either
// in-memory write buffers or SST files to do write-conflict checking.
// Increasing this value can reduce the number of reads to SST files
// done for conflict detection.
//
// Setting this value to 0 will cause write buffers to be freed immediately // Setting this value to 0 will cause write buffers to be freed immediately
// after they are flushed. // after they are flushed.
// If this value is set to -1, 'max_write_buffer_number' will be used. // If this value is set to -1, 'max_write_buffer_number' will be used.

@ -143,11 +143,16 @@ Status CuckooTableReader::Get(const ReadOptions& readOptions, const Slice& key,
return Status::OK(); return Status::OK();
} }
// Here, we compare only the user key part as we support only one entry // Here, we compare only the user key part as we support only one entry
// per user key and we don't support sanpshot. // per user key and we don't support snapshot.
if (ucomp_->Equal(user_key, Slice(bucket, user_key.size()))) { if (ucomp_->Equal(user_key, Slice(bucket, user_key.size()))) {
Slice value(bucket + key_length_, value_length_); Slice value(bucket + key_length_, value_length_);
if (is_last_level_) { if (is_last_level_) {
get_context->SaveValue(value); // Sequence number is not stored at the last level, so we will use
// kMaxSequenceNumber since it is unknown. This could cause some
// transactions to fail to lock a key due to known sequence number.
// However, it is expected for anyone to use a CuckooTable in a
// TransactionDB.
get_context->SaveValue(value, kMaxSequenceNumber);
} else { } else {
Slice full_key(bucket, key_length_); Slice full_key(bucket, key_length_);
ParsedInternalKey found_ikey; ParsedInternalKey found_ikey;

@ -34,7 +34,8 @@ GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator, Logger* logger, const MergeOperator* merge_operator, Logger* logger,
Statistics* statistics, GetState init_state, Statistics* statistics, GetState init_state,
const Slice& user_key, std::string* ret_value, const Slice& user_key, std::string* ret_value,
bool* value_found, MergeContext* merge_context, Env* env) bool* value_found, MergeContext* merge_context, Env* env,
SequenceNumber* seq)
: ucmp_(ucmp), : ucmp_(ucmp),
merge_operator_(merge_operator), merge_operator_(merge_operator),
logger_(logger), logger_(logger),
@ -45,7 +46,12 @@ GetContext::GetContext(const Comparator* ucmp,
value_found_(value_found), value_found_(value_found),
merge_context_(merge_context), merge_context_(merge_context),
env_(env), env_(env),
replay_log_(nullptr) {} seq_(seq),
replay_log_(nullptr) {
if (seq_) {
*seq_ = kMaxSequenceNumber;
}
}
// Called from TableCache::Get and Table::Get when file/block in which // Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this // key may exist are not there in TableCache/BlockCache respectively. In this
@ -59,7 +65,7 @@ void GetContext::MarkKeyMayExist() {
} }
} }
void GetContext::SaveValue(const Slice& value) { void GetContext::SaveValue(const Slice& value, SequenceNumber seq) {
assert(state_ == kNotFound); assert(state_ == kNotFound);
appendToReplayLog(replay_log_, kTypeValue, value); appendToReplayLog(replay_log_, kTypeValue, value);
@ -74,6 +80,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
if (ucmp_->Equal(parsed_key.user_key, user_key_)) { if (ucmp_->Equal(parsed_key.user_key, user_key_)) {
appendToReplayLog(replay_log_, parsed_key.type, value); appendToReplayLog(replay_log_, parsed_key.type, value);
if (seq_ != nullptr) {
// Set the sequence number if it is uninitialized
if (*seq_ == kMaxSequenceNumber) {
*seq_ = parsed_key.sequence;
}
}
// Key matches. Process it // Key matches. Process it
switch (parsed_key.type) { switch (parsed_key.type) {
case kTypeValue: case kTypeValue:
@ -154,8 +167,11 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
bool ret = GetLengthPrefixedSlice(&s, &value); bool ret = GetLengthPrefixedSlice(&s, &value);
assert(ret); assert(ret);
(void)ret; (void)ret;
// Sequence number is ignored in SaveValue, so we just pass 0.
get_context->SaveValue(ParsedInternalKey(user_key, 0, type), value); // Since SequenceNumber is not stored and unknown, we will use
// kMaxSequenceNumber.
get_context->SaveValue(
ParsedInternalKey(user_key, kMaxSequenceNumber, type), value);
} }
#else // ROCKSDB_LITE #else // ROCKSDB_LITE
assert(false); assert(false);

@ -7,6 +7,7 @@
#include <string> #include <string>
#include "db/merge_context.h" #include "db/merge_context.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/types.h"
namespace rocksdb { namespace rocksdb {
class MergeContext; class MergeContext;
@ -24,11 +25,22 @@ class GetContext {
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state, Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, std::string* ret_value, bool* value_found, const Slice& user_key, std::string* ret_value, bool* value_found,
MergeContext* merge_context, Env* env_); MergeContext* merge_context, Env* env,
SequenceNumber* seq = nullptr);
void MarkKeyMayExist(); void MarkKeyMayExist();
void SaveValue(const Slice& value);
// Records this key, value, and any meta-data (such as sequence number and
// state) into this GetContext.
//
// Returns True if more keys need to be read (due to merges) or
// False if the complete value has been found.
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value); bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value);
// Simplified version of the previous function. Should only be used when we
// know that the operation is a Put.
void SaveValue(const Slice& value, SequenceNumber seq);
GetState State() const { return state_; } GetState State() const { return state_; }
// If a non-null string is passed, all the SaveValue calls will be // If a non-null string is passed, all the SaveValue calls will be
@ -36,6 +48,9 @@ class GetContext {
// another GetContext with replayGetContextLog. // another GetContext with replayGetContextLog.
void SetReplayLog(std::string* replay_log) { replay_log_ = replay_log; } void SetReplayLog(std::string* replay_log) { replay_log_ = replay_log; }
// Do we need to fetch the SequenceNumber for this key?
bool NeedToReadSequence() const { return (seq_ != nullptr); }
private: private:
const Comparator* ucmp_; const Comparator* ucmp_;
const MergeOperator* merge_operator_; const MergeOperator* merge_operator_;
@ -49,6 +64,9 @@ class GetContext {
bool* value_found_; // Is value set correctly? Used by KeyMayExist bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_; MergeContext* merge_context_;
Env* env_; Env* env_;
// If a key is found, seq_ will be set to the SequenceNumber of most recent
// write to the key or kMaxSequenceNumber if unknown
SequenceNumber* seq_;
std::string* replay_log_; std::string* replay_log_;
}; };

@ -95,15 +95,19 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
// if we can not determine whether there would be any such conflicts. // if we can not determine whether there would be any such conflicts.
// //
// Should only be called on writer thread in order to avoid any race conditions // Should only be called on writer thread in order to avoid any race conditions
// in detecting // in detecting write conflicts.
// write conflicts.
Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) { Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) {
Status result; Status result;
assert(dynamic_cast<DBImpl*>(db) != nullptr); assert(dynamic_cast<DBImpl*>(db) != nullptr);
auto db_impl = reinterpret_cast<DBImpl*>(db); auto db_impl = reinterpret_cast<DBImpl*>(db);
return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys()); // Since we are on the write thread and do not want to block other writers,
// we will do a cache-only conflict check. This can result in TryAgain
// getting returned if there is not sufficient memtable history to check
// for conflicts.
return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys(),
true /* cache_only */);
} }
} // namespace rocksdb } // namespace rocksdb

@ -312,7 +312,8 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
column_family ? column_family : db_impl->DefaultColumnFamily(); column_family ? column_family : db_impl->DefaultColumnFamily();
return TransactionUtil::CheckKeyForConflicts( return TransactionUtil::CheckKeyForConflicts(
db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber()); db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber(),
false /* cache_only */);
} }
} // namespace rocksdb } // namespace rocksdb

@ -12,8 +12,10 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/merge_operators/string_append/stringappend.h"
@ -32,6 +34,8 @@ class TransactionTest : public testing::Test {
TransactionTest() { TransactionTest() {
options.create_if_missing = true; options.create_if_missing = true;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.write_buffer_size = 4 * 1024;
options.level0_file_num_compaction_trigger = 2;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
dbname = test::TmpDir() + "/transaction_testdb"; dbname = test::TmpDir() + "/transaction_testdb";
@ -46,6 +50,15 @@ class TransactionTest : public testing::Test {
delete db; delete db;
DestroyDB(dbname, options); DestroyDB(dbname, options);
} }
Status ReOpen() {
delete db;
DestroyDB(dbname, options);
Status s = TransactionDB::Open(options, txn_db_options, dbname, &db);
return s;
}
}; };
TEST_F(TransactionTest, SuccessTest) { TEST_F(TransactionTest, SuccessTest) {
@ -85,6 +98,43 @@ TEST_F(TransactionTest, SuccessTest) {
delete txn; delete txn;
} }
TEST_F(TransactionTest, FirstWriteTest) {
WriteOptions write_options;
// Test conflict checking against the very first write to a db.
// The transaction's snapshot will have seq 1 and the following write
// will have sequence 1.
Status s = db->Put(write_options, "A", "a");
Transaction* txn = db->BeginTransaction(write_options);
txn->SetSnapshot();
ASSERT_OK(s);
s = txn->Put("A", "b");
ASSERT_OK(s);
delete txn;
}
TEST_F(TransactionTest, FirstWriteTest2) {
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options);
txn->SetSnapshot();
// Test conflict checking against the very first write to a db.
// The transaction's snapshot is a seq 0 while the following write
// will have sequence 1.
Status s = db->Put(write_options, "A", "a");
ASSERT_OK(s);
s = txn->Put("A", "b");
ASSERT_TRUE(s.IsBusy());
delete txn;
}
TEST_F(TransactionTest, WriteConflictTest) { TEST_F(TransactionTest, WriteConflictTest) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options; ReadOptions read_options;
@ -268,14 +318,37 @@ TEST_F(TransactionTest, FlushTest) {
} }
TEST_F(TransactionTest, FlushTest2) { TEST_F(TransactionTest, FlushTest2) {
const size_t num_tests = 3;
for (size_t n = 0; n < num_tests; n++) {
// Test different table factories
switch (n) {
case 0:
break;
case 1:
options.table_factory.reset(new mock::MockTableFactory());
break;
case 2: {
PlainTableOptions pt_opts;
pt_opts.hash_table_ratio = 0;
options.table_factory.reset(NewPlainTableFactory(pt_opts));
break;
}
}
Status s = ReOpen();
ASSERT_OK(s);
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options, snapshot_read_options; ReadOptions read_options, snapshot_read_options;
TransactionOptions txn_options; TransactionOptions txn_options;
string value; string value;
Status s;
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetBaseDB());
db->Put(write_options, Slice("foo"), Slice("bar")); db->Put(write_options, Slice("foo"), Slice("bar"));
db->Put(write_options, Slice("foo2"), Slice("bar")); db->Put(write_options, Slice("foo2"), Slice("bar2"));
db->Put(write_options, Slice("foo3"), Slice("bar3"));
txn_options.set_snapshot = true; txn_options.set_snapshot = true;
Transaction* txn = db->BeginTransaction(write_options, txn_options); Transaction* txn = db->BeginTransaction(write_options, txn_options);
@ -291,21 +364,34 @@ TEST_F(TransactionTest, FlushTest2) {
txn->GetForUpdate(snapshot_read_options, "foo", &value); txn->GetForUpdate(snapshot_read_options, "foo", &value);
ASSERT_EQ(value, "bar2"); ASSERT_EQ(value, "bar2");
// verify foo is locked by txn
s = db->Delete(write_options, "foo");
ASSERT_TRUE(s.IsTimedOut());
// Put a random key so we have a MemTable to flush s = db->Put(write_options, "Z", "z");
ASSERT_OK(s);
s = db->Put(write_options, "dummy", "dummy"); s = db->Put(write_options, "dummy", "dummy");
ASSERT_OK(s); ASSERT_OK(s);
s = db->Put(write_options, "S", "s");
ASSERT_OK(s);
s = db->SingleDelete(write_options, "S");
ASSERT_OK(s);
s = txn->Delete("S");
// Should fail after encountering a write to S in memtable
ASSERT_TRUE(s.IsBusy());
// force a memtable flush // force a memtable flush
FlushOptions flush_ops; s = db_impl->TEST_FlushMemTable(true);
db->Flush(flush_ops); ASSERT_OK(s);
// Put a random key so we have a MemTable to flush // Put a random key so we have a MemTable to flush
s = db->Put(write_options, "dummy", "dummy2"); s = db->Put(write_options, "dummy", "dummy2");
ASSERT_OK(s); ASSERT_OK(s);
// force a memtable flush // force a memtable flush
db->Flush(flush_ops); ASSERT_OK(db_impl->TEST_FlushMemTable(true));
s = db->Put(write_options, "dummy", "dummy3"); s = db->Put(write_options, "dummy", "dummy3");
ASSERT_OK(s); ASSERT_OK(s);
@ -313,11 +399,56 @@ TEST_F(TransactionTest, FlushTest2) {
// force a memtable flush // force a memtable flush
// Since our test db has max_write_buffer_number=2, this flush will cause // Since our test db has max_write_buffer_number=2, this flush will cause
// the first memtable to get purged from the MemtableList history. // the first memtable to get purged from the MemtableList history.
db->Flush(flush_ops); ASSERT_OK(db_impl->TEST_FlushMemTable(true));
s = txn->Put("X", "Y"); s = txn->Put("X", "Y");
// Put should fail since MemTableList History is not older than the snapshot. // Should succeed after verifying there is no write to X in SST file
ASSERT_TRUE(s.IsTryAgain()); ASSERT_OK(s);
s = txn->Put("Z", "zz");
// Should fail after encountering a write to Z in SST file
ASSERT_TRUE(s.IsBusy());
s = txn->GetForUpdate(read_options, "foo2", &value);
// should succeed since key was written before txn started
ASSERT_OK(s);
// verify foo2 is locked by txn
s = db->Delete(write_options, "foo2");
ASSERT_TRUE(s.IsTimedOut());
s = txn->Delete("S");
// Should fail after encountering a write to S in SST file
fprintf(stderr, "%lu %s\n", n, s.ToString().c_str());
ASSERT_TRUE(s.IsBusy());
// Write a bunch of keys to db to force a compaction
Random rnd(47);
for (int i = 0; i < 1000; i++) {
s = db->Put(write_options, std::to_string(i),
test::CompressibleString(&rnd, 0.8, 100, &value));
ASSERT_OK(s);
}
s = txn->Put("X", "yy");
// Should succeed after verifying there is no write to X in SST file
ASSERT_OK(s);
s = txn->Put("Z", "zzz");
// Should fail after encountering a write to Z in SST file
ASSERT_TRUE(s.IsBusy());
s = txn->Delete("S");
// Should fail after encountering a write to S in SST file
ASSERT_TRUE(s.IsBusy());
s = txn->GetForUpdate(read_options, "foo3", &value);
// should succeed since key was written before txn started
ASSERT_OK(s);
// verify foo3 is locked by txn
s = db->Delete(write_options, "foo3");
ASSERT_TRUE(s.IsTimedOut());
db_impl->TEST_WaitForCompact();
s = txn->Commit(); s = txn->Commit();
ASSERT_OK(s); ASSERT_OK(s);
@ -327,10 +458,16 @@ TEST_F(TransactionTest, FlushTest2) {
ASSERT_EQ(value, "bar2"); ASSERT_EQ(value, "bar2");
s = db->Get(read_options, "X", &value); s = db->Get(read_options, "X", &value);
ASSERT_TRUE(s.IsNotFound()); ASSERT_OK(s);
ASSERT_EQ("yy", value);
s = db->Get(read_options, "Z", &value);
ASSERT_OK(s);
ASSERT_EQ("z", value);
delete txn; delete txn;
} }
}
TEST_F(TransactionTest, NoSnapshotTest) { TEST_F(TransactionTest, NoSnapshotTest) {
WriteOptions write_options; WriteOptions write_options;

@ -25,7 +25,8 @@ namespace rocksdb {
Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const std::string& key, const std::string& key,
SequenceNumber key_seq) { SequenceNumber key_seq,
bool cache_only) {
Status result; Status result;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
@ -41,7 +42,7 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl,
SequenceNumber earliest_seq = SequenceNumber earliest_seq =
db_impl->GetEarliestMemTableSequenceNumber(sv, true); db_impl->GetEarliestMemTableSequenceNumber(sv, true);
result = CheckKey(db_impl, sv, earliest_seq, key_seq, key); result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv); db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
} }
@ -51,9 +52,10 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl,
Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq, SequenceNumber earliest_seq,
SequenceNumber key_seq, SequenceNumber key_seq, const std::string& key,
const std::string& key) { bool cache_only) {
Status result; Status result;
bool need_to_read_sst = false;
// Since it would be too slow to check the SST files, we will only use // Since it would be too slow to check the SST files, we will only use
// the memtables to check whether there have been any recent writes // the memtables to check whether there have been any recent writes
@ -63,35 +65,47 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
if (earliest_seq == kMaxSequenceNumber) { if (earliest_seq == kMaxSequenceNumber) {
// The age of this memtable is unknown. Cannot rely on it to check // The age of this memtable is unknown. Cannot rely on it to check
// for recent writes. This error shouldn't happen often in practice as // for recent writes. This error shouldn't happen often in practice as
// the // the Memtable should have a valid earliest sequence number except in some
// Memtable should have a valid earliest sequence number except in some
// corner cases (such as error cases during recovery). // corner cases (such as error cases during recovery).
need_to_read_sst = true;
if (cache_only) {
result = Status::TryAgain( result = Status::TryAgain(
"Transaction ould not check for conflicts as the MemTable does not " "Transaction ould not check for conflicts as the MemTable does not "
"countain a long enough history to check write at SequenceNumber: ", "countain a long enough history to check write at SequenceNumber: ",
ToString(key_seq)); ToString(key_seq));
}
} else if (key_seq < earliest_seq) { } else if (key_seq < earliest_seq) {
need_to_read_sst = true;
if (cache_only) {
// The age of this memtable is too new to use to check for recent // The age of this memtable is too new to use to check for recent
// writes. // writes.
char msg[255]; char msg[255];
snprintf(msg, sizeof(msg), snprintf(msg, sizeof(msg),
"Transaction could not check for conflicts for opearation at " "Transaction could not check for conflicts for operation at "
"SequenceNumber %" PRIu64
" as the MemTable only contains changes newer than "
"SequenceNumber %" PRIu64 "SequenceNumber %" PRIu64
" as the MemTable only contains changes newer than SequenceNumber "
"%" PRIu64
". Increasing the value of the " ". Increasing the value of the "
"max_write_buffer_number_to_maintain option could reduce the " "max_write_buffer_number_to_maintain option could reduce the "
"frequency " "frequency "
"of this error.", "of this error.",
key_seq, earliest_seq); key_seq, earliest_seq);
result = Status::TryAgain(msg); result = Status::TryAgain(msg);
} else { }
}
if (result.ok()) {
SequenceNumber seq = kMaxSequenceNumber; SequenceNumber seq = kMaxSequenceNumber;
Status s = db_impl->GetLatestSequenceForKeyFromMemtable(sv, key, &seq); bool found_record_for_key = false;
if (!s.ok()) {
Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst,
&seq, &found_record_for_key);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
result = s; result = s;
} else if (seq != kMaxSequenceNumber && seq > key_seq) { } else if (found_record_for_key && (seq > key_seq)) {
// Write Conflict // Write Conflict
result = Status::Busy(); result = Status::Busy();
} }
@ -100,8 +114,9 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
return result; return result;
} }
Status TransactionUtil::CheckKeysForConflicts( Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl,
DBImpl* db_impl, const TransactionKeyMap& key_map) { const TransactionKeyMap& key_map,
bool cache_only) {
Status result; Status result;
for (auto& key_map_iter : key_map) { for (auto& key_map_iter : key_map) {
@ -124,7 +139,7 @@ Status TransactionUtil::CheckKeysForConflicts(
const auto& key = key_iter.first; const auto& key = key_iter.first;
const SequenceNumber key_seq = key_iter.second; const SequenceNumber key_seq = key_iter.second;
result = CheckKey(db_impl, sv, earliest_seq, key_seq, key); result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only);
if (!result.ok()) { if (!result.ok()) {
break; break;

@ -30,12 +30,16 @@ class TransactionUtil {
// Verifies there have been no writes to this key in the db since this // Verifies there have been no writes to this key in the db since this
// sequence number. // sequence number.
// //
// If cache_only is true, then this function will not attempt to read any
// SST files. This will make it more likely this function will
// return an error if it is unable to determine if there are any conflicts.
//
// Returns OK on success, BUSY if there is a conflicting write, or other error // Returns OK on success, BUSY if there is a conflicting write, or other error
// status for any unexpected errors. // status for any unexpected errors.
static Status CheckKeyForConflicts(DBImpl* db_impl, static Status CheckKeyForConflicts(DBImpl* db_impl,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const std::string& key, const std::string& key,
SequenceNumber key_seq); SequenceNumber key_seq, bool cache_only);
// For each key,SequenceNumber pair in the TransactionKeyMap, this function // For each key,SequenceNumber pair in the TransactionKeyMap, this function
// will verify there have been no writes to the key in the db since that // will verify there have been no writes to the key in the db since that
@ -47,12 +51,13 @@ class TransactionUtil {
// REQUIRED: this function should only be called on the write thread or if the // REQUIRED: this function should only be called on the write thread or if the
// mutex is held. // mutex is held.
static Status CheckKeysForConflicts(DBImpl* db_impl, static Status CheckKeysForConflicts(DBImpl* db_impl,
const TransactionKeyMap& keys); const TransactionKeyMap& keys,
bool cache_only);
private: private:
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq, SequenceNumber key_seq, SequenceNumber earliest_seq, SequenceNumber key_seq,
const std::string& key); const std::string& key, bool cache_only);
}; };
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save