From 3bfd3d39a30ad9f650362f5302ed62cd58e228c6 Mon Sep 17 00:00:00 2001 From: agiardullo Date: Thu, 15 Oct 2015 16:37:15 -0700 Subject: [PATCH] Use SST files for Transaction conflict detection Summary: Currently, transactions can fail even if there is no actual write conflict. This is due to relying on only the memtables to check for write-conflicts. Users have to tune memtable settings to try to avoid this, but it's hard to figure out exactly how to tune these settings. With this diff, TransactionDB will use both memtables and SST files to determine if there are any write conflicts. This relies on the fact that BlockBasedTable stores sequence numbers for all writes that happen after any open snapshot. Also, D50295 is needed to prevent SingleDelete from disappearing writes (the TODOs in this test code will be fixed once the other diff is approved and merged). Note that Optimistic transactions will still rely on tuning memtable settings as we do not want to read from SST while on the write thread. Also, memtable settings can still be used to reduce how often TransactionDB needs to read SST files. Test Plan: unit tests, db bench Reviewers: rven, yhchiang, kradhakrishnan, IslamAbdelRahman, sdong Reviewed By: sdong Subscribers: dhruba, leveldb, yoshinorim Differential Revision: https://reviews.facebook.net/D50475 --- db/builder.cc | 3 +- db/builder.h | 1 + db/compaction_iterator.cc | 10 + db/db_impl.cc | 63 ++++- db/db_impl.h | 31 ++- db/flush_job.cc | 6 +- db/flush_job.h | 2 + db/flush_job_test.cc | 12 +- db/repair.cc | 3 +- db/snapshot_impl.h | 2 +- db/table_cache.cc | 4 +- db/version_set.cc | 20 +- db/version_set.h | 17 +- include/rocksdb/compaction_filter.h | 6 + include/rocksdb/options.h | 8 + table/cuckoo_table_reader.cc | 9 +- table/get_context.cc | 26 +- table/get_context.h | 22 +- .../optimistic_transaction_impl.cc | 10 +- utilities/transactions/transaction_impl.cc | 3 +- utilities/transactions/transaction_test.cc | 225 ++++++++++++++---- utilities/transactions/transaction_util.cc | 77 +++--- utilities/transactions/transaction_util.h | 11 +- 23 files changed, 441 insertions(+), 130 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 68b9fe804..52605b27d 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -59,6 +59,7 @@ Status BuildTable( const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, std::vector snapshots, + SequenceNumber earliest_write_conflict_snapshot, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority, @@ -97,7 +98,7 @@ Status BuildTable( CompactionIterator c_iter(iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, &snapshots, - kMaxSequenceNumber, env, + earliest_write_conflict_snapshot, env, true /* internal key corruption is not ok */); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/builder.h b/db/builder.h index cdafa4ab3..9a4d3b60b 100644 --- a/db/builder.h +++ b/db/builder.h @@ -56,6 +56,7 @@ extern Status BuildTable( const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, std::vector snapshots, + SequenceNumber earliest_write_conflict_snapshot, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 2ae5750a8..73e612a42 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -333,6 +333,10 @@ void CompactionIterator::NextFromInput() { // same key, then this kv is not visible in any snapshot. // Hidden by an newer entry for same user key // TODO: why not > ? + // + // Note: Dropping this key will not affect TransactionDB write-conflict + // checking since there has already been a record returned for this key + // in this snapshot. assert(last_sequence >= current_user_key_sequence_); ++iter_stats_.num_record_drop_hidden; // (A) input_->Next(); @@ -351,6 +355,9 @@ void CompactionIterator::NextFromInput() { // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. + // + // Note: Dropping this Delete will not affect TransactionDB + // write-conflict checking since it is earlier than any snapshot. ++iter_stats_.num_record_drop_obsolete; input_->Next(); } else if (ikey_.type == kTypeMerge) { @@ -400,6 +407,9 @@ void CompactionIterator::PrepareOutput() { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. + + // This is safe for TransactionDB write-conflict checking since transactions + // only care about sequence number larger than any active snapshots. if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ && ikey_.type != kTypeMerge) { assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); diff --git a/db/db_impl.cc b/db/db_impl.cc index 6e776f550..144210412 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1394,11 +1394,17 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, { mutex_.Unlock(); TableFileCreationInfo info; + + SequenceNumber earliest_write_conflict_snapshot; + std::vector snapshot_seqs = + snapshots_.GetAll(&earliest_write_conflict_snapshot); + s = BuildTable( dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(), - cfd->int_tbl_prop_collector_factories(), cfd->GetID(), - snapshots_.GetAll(), GetCompressionFlush(*cfd->ioptions()), + cfd->int_tbl_prop_collector_factories(), cfd->GetID(), snapshot_seqs, + earliest_write_conflict_snapshot, + GetCompressionFlush(*cfd->ioptions()), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); LogFlush(db_options_.info_log); @@ -1453,12 +1459,16 @@ Status DBImpl::FlushMemTableToOutputFile( assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); - FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, - env_options_, versions_.get(), &mutex_, &shutting_down_, - snapshots_.GetAll(), job_context, log_buffer, - directories_.GetDbDir(), directories_.GetDataDir(0U), - GetCompressionFlush(*cfd->ioptions()), stats_, - &event_logger_); + SequenceNumber earliest_write_conflict_snapshot; + std::vector snapshot_seqs = + snapshots_.GetAll(&earliest_write_conflict_snapshot); + + FlushJob flush_job( + dbname_, cfd, db_options_, mutable_cf_options, env_options_, + versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, + earliest_write_conflict_snapshot, job_context, log_buffer, + directories_.GetDbDir(), directories_.GetDataDir(0U), + GetCompressionFlush(*cfd->ioptions()), stats_, &event_logger_); FileMetaData file_meta; @@ -5368,9 +5378,9 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE -Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, - const Slice& key, - SequenceNumber* seq) { +Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, + bool cache_only, SequenceNumber* seq, + bool* found_record_for_key) { Status s; std::string value; MergeContext merge_context; @@ -5379,6 +5389,10 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, LookupKey lkey(key, current_seq); *seq = kMaxSequenceNumber; + *found_record_for_key = false; + + // TODO(agiardullo): Should optimize all the Get() functions below to not + // return a value since we do not use it. // Check if there is a record for this key in the latest memtable sv->mem->Get(lkey, &value, &s, &merge_context, seq); @@ -5394,6 +5408,7 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check immutable memtables + *found_record_for_key = true; return Status::OK(); } @@ -5411,6 +5426,7 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check memtable history + *found_record_for_key = true; return Status::OK(); } @@ -5426,6 +5442,31 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, return s; } + if (*seq != kMaxSequenceNumber) { + // Found a sequence number, no need to check SST files + *found_record_for_key = true; + return Status::OK(); + } + + // TODO(agiardullo): possible optimization: consider checking cached + // SST files if cache_only=true? + if (!cache_only) { + // Check tables + ReadOptions read_options; + + sv->current->Get(read_options, lkey, &value, &s, &merge_context, + nullptr /* value_found */, found_record_for_key, seq); + + if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { + // unexpected error reading SST files + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "Unexpected status returned from Version::Get: %s\n", + s.ToString().c_str()); + + return s; + } + } + return Status::OK(); } #endif // ROCKSDB_LITE diff --git a/db/db_impl.h b/db/db_impl.h index a02f47bfb..0f44b378a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -226,13 +226,30 @@ class DBImpl : public DB { bool include_history); // For a given key, check to see if there are any records for this key - // in the memtables, including memtable history. - - // On success, *seq will contain the sequence number for the - // latest such change or kMaxSequenceNumber if no records were present. - // Returns OK on success, other status on error reading memtables. - Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key, - SequenceNumber* seq); + // in the memtables, including memtable history. If cache_only is false, + // SST files will also be checked. + // + // If a key is found, *found_record_for_key will be set to true and + // *seq will will be set to the stored sequence number for the latest + // operation on this key or kMaxSequenceNumber if unknown. + // If no key is found, *found_record_for_key will be set to false. + // + // Note: If cache_only=false, it is possible for *seq to be set to 0 if + // the sequence number has been cleared from the record. If the caller is + // holding an active db snapshot, we know the missing sequence must be less + // than the snapshot's sequence number (sequence numbers are only cleared + // when there are no earlier active snapshots). + // + // If NotFound is returned and found_record_for_key is set to false, then no + // record for this key was found. If the caller is holding an active db + // snapshot, we know that no key could have existing after this snapshot + // (since we do not compact keys that have an earlier snapshot). + // + // Returns OK or NotFound on success, + // other status on unexpected error. + Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, + bool cache_only, SequenceNumber* seq, + bool* found_record_for_key); using DB::AddFile; virtual Status AddFile(ColumnFamilyHandle* column_family, diff --git a/db/flush_job.cc b/db/flush_job.cc index a20a0ba98..9da7d9546 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -62,6 +62,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, @@ -75,6 +76,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, db_mutex_(db_mutex), shutting_down_(shutting_down), existing_snapshots_(std::move(existing_snapshots)), + earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), job_context_(job_context), log_buffer_(log_buffer), db_directory_(db_directory), @@ -235,8 +237,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - existing_snapshots_, output_compression_, - cfd_->ioptions()->compression_opts, + existing_snapshots_, earliest_write_conflict_snapshot_, + output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), Env::IO_HIGH, &table_properties_); info.table_properties = table_properties_; diff --git a/db/flush_job.h b/db/flush_job.h index dbc4113e1..d12da141e 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -58,6 +58,7 @@ class FlushJob { const EnvOptions& env_options, VersionSet* versions, InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, @@ -83,6 +84,7 @@ class FlushJob { InstrumentedMutex* db_mutex_; std::atomic* shutting_down_; std::vector existing_snapshots_; + SequenceNumber earliest_write_conflict_snapshot_; JobContext* job_context_; LogBuffer* log_buffer_; Directory* db_directory_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index d20cf5e90..f7071c1ee 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -92,8 +92,8 @@ TEST_F(FlushJobTest, Empty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr, &event_logger); + {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, + nullptr, kNoCompression, nullptr, &event_logger); ASSERT_OK(flush_job.Run()); job_context.Clean(); } @@ -131,8 +131,8 @@ TEST_F(FlushJobTest, NonEmpty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr, &event_logger); + {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, + nullptr, kNoCompression, nullptr, &event_logger); FileMetaData fd; mutex_.Lock(); ASSERT_OK(flush_job.Run(&fd)); @@ -195,8 +195,8 @@ TEST_F(FlushJobTest, Snapshots) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - snapshots, &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr, &event_logger); + snapshots, kMaxSequenceNumber, &job_context, nullptr, + nullptr, nullptr, kNoCompression, nullptr, &event_logger); mutex_.Lock(); ASSERT_OK(flush_job.Run()); mutex_.Unlock(); diff --git a/db/repair.cc b/db/repair.cc index c27db6e49..a03549099 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -294,7 +294,8 @@ class Repairer { dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(), &meta, icmp_, &int_tbl_prop_collector_factories_, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, {}, - kNoCompression, CompressionOptions(), false, nullptr); + kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, + nullptr); } delete mem->Unref(); delete cf_mems_default; diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 4fa0bb9d5..277cf3a20 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -91,7 +91,7 @@ class SnapshotList { ret.push_back(s->next_->number_); if (oldest_write_conflict_snapshot != nullptr && - *oldest_write_conflict_snapshot != kMaxSequenceNumber && + *oldest_write_conflict_snapshot == kMaxSequenceNumber && s->next_->is_write_conflict_boundary_) { // If this is the first write-conflict boundary snapshot in the list, // it is the oldest diff --git a/db/table_cache.cc b/db/table_cache.cc index 82b52ddb5..48b8e1b07 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -219,7 +219,9 @@ Status TableCache::Get(const ReadOptions& options, IterKey row_cache_key; std::string row_cache_entry_buffer; - if (ioptions_.row_cache) { + // Check row cache if enabled. Since row cache does not currently store + // sequence numbers, we cannot use it if we need to fetch the sequence. + if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { uint64_t fd_number = fd.GetNumber(); auto user_key = ExtractUserKey(k); // We use the user key as cache key instead of the internal key, diff --git a/db/version_set.cc b/db/version_set.cc index a9422a84b..66762bebe 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -868,21 +868,24 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, refs_(0), version_number_(version_number) {} -void Version::Get(const ReadOptions& read_options, - const LookupKey& k, - std::string* value, - Status* status, - MergeContext* merge_context, - bool* value_found) { +void Version::Get(const ReadOptions& read_options, const LookupKey& k, + std::string* value, Status* status, + MergeContext* merge_context, bool* value_found, + bool* key_exists, SequenceNumber* seq) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); assert(status->ok() || status->IsMergeInProgress()); + if (key_exists != nullptr) { + // will falsify below if not found + *key_exists = true; + } + GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, this->env_); + value, value_found, merge_context, this->env_, seq); FilePicker fp( storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, @@ -942,6 +945,9 @@ void Version::Get(const ReadOptions& read_options, user_key); } } else { + if (key_exists != nullptr) { + *key_exists = false; + } *status = Status::NotFound(); // Use an empty error message for speed } } diff --git a/db/version_set.h b/db/version_set.h index b7482499f..997cd2a80 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -424,11 +424,24 @@ class Version { // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. - // Uses *operands to store merge_operator operations to apply later + // Uses *operands to store merge_operator operations to apply later. + // + // If the ReadOptions.read_tier is set to do a read-only fetch, then + // *value_found will be set to false if it cannot be determined whether + // this value exists without doing IO. + // + // If the key is Deleted, *status will be set to NotFound and + // *key_exists will be set to true. + // If no key was found, *status will be set to NotFound and + // *key_exists will be set to false. + // If seq is non-null, *seq will be set to the sequence number found + // for the key if a key was found. + // // REQUIRES: lock is not held void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, MergeContext* merge_context, - bool* value_found = nullptr); + bool* value_found = nullptr, bool* key_exists = nullptr, + SequenceNumber* seq = nullptr); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 3bb250e9c..1286840fe 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -93,6 +93,12 @@ class CompactionFilter { // The compaction process invokes this method on every merge operand. If this // method returns true, the merge operand will be ignored and not written out // in the compaction output + // + // Note: If you are using a TransactionDB, it is not recommended to implement + // FilterMergeOperand(). If a Merge operation is filtered out, TransactionDB + // may not realize there is a write conflict and may allow a Transaction to + // Commit that should have failed. Instead, it is better to implement any + // Merge filtering inside the MergeOperator. virtual bool FilterMergeOperand(int level, const Slice& key, const Slice& operand) const { return false; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5ec1da71d..04f9d7d8f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -276,9 +276,17 @@ struct ColumnFamilyOptions { // max_write_buffer_number, this parameter does not affect flushing. // This controls the minimum amount of write history that will be available // in memory for conflict checking when Transactions are used. + // + // When using an OptimisticTransactionDB: // If this value is too low, some transactions may fail at commit time due // to not being able to determine whether there were any write conflicts. // + // When using a TransactionDB: + // If Transaction::SetSnapshot is used, TransactionDB will read either + // in-memory write buffers or SST files to do write-conflict checking. + // Increasing this value can reduce the number of reads to SST files + // done for conflict detection. + // // Setting this value to 0 will cause write buffers to be freed immediately // after they are flushed. // If this value is set to -1, 'max_write_buffer_number' will be used. diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index 2d413f043..d806e77ef 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -143,11 +143,16 @@ Status CuckooTableReader::Get(const ReadOptions& readOptions, const Slice& key, return Status::OK(); } // Here, we compare only the user key part as we support only one entry - // per user key and we don't support sanpshot. + // per user key and we don't support snapshot. if (ucomp_->Equal(user_key, Slice(bucket, user_key.size()))) { Slice value(bucket + key_length_, value_length_); if (is_last_level_) { - get_context->SaveValue(value); + // Sequence number is not stored at the last level, so we will use + // kMaxSequenceNumber since it is unknown. This could cause some + // transactions to fail to lock a key due to known sequence number. + // However, it is expected for anyone to use a CuckooTable in a + // TransactionDB. + get_context->SaveValue(value, kMaxSequenceNumber); } else { Slice full_key(bucket, key_length_); ParsedInternalKey found_ikey; diff --git a/table/get_context.cc b/table/get_context.cc index 609ca3083..ef4f6aff9 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -34,7 +34,8 @@ GetContext::GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, - bool* value_found, MergeContext* merge_context, Env* env) + bool* value_found, MergeContext* merge_context, Env* env, + SequenceNumber* seq) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -45,7 +46,12 @@ GetContext::GetContext(const Comparator* ucmp, value_found_(value_found), merge_context_(merge_context), env_(env), - replay_log_(nullptr) {} + seq_(seq), + replay_log_(nullptr) { + if (seq_) { + *seq_ = kMaxSequenceNumber; + } +} // Called from TableCache::Get and Table::Get when file/block in which // key may exist are not there in TableCache/BlockCache respectively. In this @@ -59,7 +65,7 @@ void GetContext::MarkKeyMayExist() { } } -void GetContext::SaveValue(const Slice& value) { +void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { assert(state_ == kNotFound); appendToReplayLog(replay_log_, kTypeValue, value); @@ -74,6 +80,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, if (ucmp_->Equal(parsed_key.user_key, user_key_)) { appendToReplayLog(replay_log_, parsed_key.type, value); + if (seq_ != nullptr) { + // Set the sequence number if it is uninitialized + if (*seq_ == kMaxSequenceNumber) { + *seq_ = parsed_key.sequence; + } + } + // Key matches. Process it switch (parsed_key.type) { case kTypeValue: @@ -154,8 +167,11 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key, bool ret = GetLengthPrefixedSlice(&s, &value); assert(ret); (void)ret; - // Sequence number is ignored in SaveValue, so we just pass 0. - get_context->SaveValue(ParsedInternalKey(user_key, 0, type), value); + + // Since SequenceNumber is not stored and unknown, we will use + // kMaxSequenceNumber. + get_context->SaveValue( + ParsedInternalKey(user_key, kMaxSequenceNumber, type), value); } #else // ROCKSDB_LITE assert(false); diff --git a/table/get_context.h b/table/get_context.h index 2c2dd8e1d..c06c3c8d4 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -7,6 +7,7 @@ #include #include "db/merge_context.h" #include "rocksdb/env.h" +#include "rocksdb/types.h" namespace rocksdb { class MergeContext; @@ -24,11 +25,22 @@ class GetContext { GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, bool* value_found, - MergeContext* merge_context, Env* env_); + MergeContext* merge_context, Env* env, + SequenceNumber* seq = nullptr); void MarkKeyMayExist(); - void SaveValue(const Slice& value); + + // Records this key, value, and any meta-data (such as sequence number and + // state) into this GetContext. + // + // Returns True if more keys need to be read (due to merges) or + // False if the complete value has been found. bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value); + + // Simplified version of the previous function. Should only be used when we + // know that the operation is a Put. + void SaveValue(const Slice& value, SequenceNumber seq); + GetState State() const { return state_; } // If a non-null string is passed, all the SaveValue calls will be @@ -36,6 +48,9 @@ class GetContext { // another GetContext with replayGetContextLog. void SetReplayLog(std::string* replay_log) { replay_log_ = replay_log; } + // Do we need to fetch the SequenceNumber for this key? + bool NeedToReadSequence() const { return (seq_ != nullptr); } + private: const Comparator* ucmp_; const MergeOperator* merge_operator_; @@ -49,6 +64,9 @@ class GetContext { bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; Env* env_; + // If a key is found, seq_ will be set to the SequenceNumber of most recent + // write to the key or kMaxSequenceNumber if unknown + SequenceNumber* seq_; std::string* replay_log_; }; diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 028bed4c9..120f18ed8 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -95,15 +95,19 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, // if we can not determine whether there would be any such conflicts. // // Should only be called on writer thread in order to avoid any race conditions -// in detecting -// write conflicts. +// in detecting write conflicts. Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) { Status result; assert(dynamic_cast(db) != nullptr); auto db_impl = reinterpret_cast(db); - return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys()); + // Since we are on the write thread and do not want to block other writers, + // we will do a cache-only conflict check. This can result in TryAgain + // getting returned if there is not sufficient memtable history to check + // for conflicts. + return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys(), + true /* cache_only */); } } // namespace rocksdb diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 33eb6509b..7480ce6dd 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -312,7 +312,8 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, column_family ? column_family : db_impl->DefaultColumnFamily(); return TransactionUtil::CheckKeyForConflicts( - db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber()); + db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber(), + false /* cache_only */); } } // namespace rocksdb diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 73f227ffb..dc69f101c 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -12,8 +12,10 @@ #include "rocksdb/options.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" +#include "table/mock_table.h" #include "util/logging.h" #include "util/testharness.h" +#include "util/testutil.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" @@ -32,6 +34,8 @@ class TransactionTest : public testing::Test { TransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; + options.write_buffer_size = 4 * 1024; + options.level0_file_num_compaction_trigger = 2; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); dbname = test::TmpDir() + "/transaction_testdb"; @@ -46,6 +50,15 @@ class TransactionTest : public testing::Test { delete db; DestroyDB(dbname, options); } + + Status ReOpen() { + delete db; + DestroyDB(dbname, options); + + Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); + + return s; + } }; TEST_F(TransactionTest, SuccessTest) { @@ -85,6 +98,43 @@ TEST_F(TransactionTest, SuccessTest) { delete txn; } +TEST_F(TransactionTest, FirstWriteTest) { + WriteOptions write_options; + + // Test conflict checking against the very first write to a db. + // The transaction's snapshot will have seq 1 and the following write + // will have sequence 1. + Status s = db->Put(write_options, "A", "a"); + + Transaction* txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + ASSERT_OK(s); + + s = txn->Put("A", "b"); + ASSERT_OK(s); + + delete txn; +} + +TEST_F(TransactionTest, FirstWriteTest2) { + WriteOptions write_options; + + Transaction* txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + // Test conflict checking against the very first write to a db. + // The transaction's snapshot is a seq 0 while the following write + // will have sequence 1. + Status s = db->Put(write_options, "A", "a"); + ASSERT_OK(s); + + s = txn->Put("A", "b"); + ASSERT_TRUE(s.IsBusy()); + + delete txn; +} + TEST_F(TransactionTest, WriteConflictTest) { WriteOptions write_options; ReadOptions read_options; @@ -268,68 +318,155 @@ TEST_F(TransactionTest, FlushTest) { } TEST_F(TransactionTest, FlushTest2) { - WriteOptions write_options; - ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; - string value; - Status s; + const size_t num_tests = 3; + + for (size_t n = 0; n < num_tests; n++) { + // Test different table factories + switch (n) { + case 0: + break; + case 1: + options.table_factory.reset(new mock::MockTableFactory()); + break; + case 2: { + PlainTableOptions pt_opts; + pt_opts.hash_table_ratio = 0; + options.table_factory.reset(NewPlainTableFactory(pt_opts)); + break; + } + } - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + Status s = ReOpen(); + ASSERT_OK(s); - txn_options.set_snapshot = true; - Transaction* txn = db->BeginTransaction(write_options, txn_options); - ASSERT_TRUE(txn); + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + TransactionOptions txn_options; + string value; - snapshot_read_options.snapshot = txn->GetSnapshot(); + DBImpl* db_impl = reinterpret_cast(db->GetBaseDB()); - txn->GetForUpdate(snapshot_read_options, "foo", &value); - ASSERT_EQ(value, "bar"); + db->Put(write_options, Slice("foo"), Slice("bar")); + db->Put(write_options, Slice("foo2"), Slice("bar2")); + db->Put(write_options, Slice("foo3"), Slice("bar3")); - s = txn->Put(Slice("foo"), Slice("bar2")); - ASSERT_OK(s); + txn_options.set_snapshot = true; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); - txn->GetForUpdate(snapshot_read_options, "foo", &value); - ASSERT_EQ(value, "bar2"); + snapshot_read_options.snapshot = txn->GetSnapshot(); - // Put a random key so we have a MemTable to flush - s = db->Put(write_options, "dummy", "dummy"); - ASSERT_OK(s); + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar"); - // force a memtable flush - FlushOptions flush_ops; - db->Flush(flush_ops); + s = txn->Put(Slice("foo"), Slice("bar2")); + ASSERT_OK(s); - // Put a random key so we have a MemTable to flush - s = db->Put(write_options, "dummy", "dummy2"); - ASSERT_OK(s); + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + // verify foo is locked by txn + s = db->Delete(write_options, "foo"); + ASSERT_TRUE(s.IsTimedOut()); - // force a memtable flush - db->Flush(flush_ops); + s = db->Put(write_options, "Z", "z"); + ASSERT_OK(s); + s = db->Put(write_options, "dummy", "dummy"); + ASSERT_OK(s); - s = db->Put(write_options, "dummy", "dummy3"); - ASSERT_OK(s); + s = db->Put(write_options, "S", "s"); + ASSERT_OK(s); + s = db->SingleDelete(write_options, "S"); + ASSERT_OK(s); - // force a memtable flush - // Since our test db has max_write_buffer_number=2, this flush will cause - // the first memtable to get purged from the MemtableList history. - db->Flush(flush_ops); + s = txn->Delete("S"); + // Should fail after encountering a write to S in memtable + ASSERT_TRUE(s.IsBusy()); - s = txn->Put("X", "Y"); - // Put should fail since MemTableList History is not older than the snapshot. - ASSERT_TRUE(s.IsTryAgain()); + // force a memtable flush + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); - s = txn->Commit(); - ASSERT_OK(s); + // Put a random key so we have a MemTable to flush + s = db->Put(write_options, "dummy", "dummy2"); + ASSERT_OK(s); - // Transaction should only write the keys that succeeded. - s = db->Get(read_options, "foo", &value); - ASSERT_EQ(value, "bar2"); + // force a memtable flush + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); - s = db->Get(read_options, "X", &value); - ASSERT_TRUE(s.IsNotFound()); + s = db->Put(write_options, "dummy", "dummy3"); + ASSERT_OK(s); + + // force a memtable flush + // Since our test db has max_write_buffer_number=2, this flush will cause + // the first memtable to get purged from the MemtableList history. + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); + + s = txn->Put("X", "Y"); + // Should succeed after verifying there is no write to X in SST file + ASSERT_OK(s); + + s = txn->Put("Z", "zz"); + // Should fail after encountering a write to Z in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->GetForUpdate(read_options, "foo2", &value); + // should succeed since key was written before txn started + ASSERT_OK(s); + // verify foo2 is locked by txn + s = db->Delete(write_options, "foo2"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn->Delete("S"); + // Should fail after encountering a write to S in SST file + fprintf(stderr, "%lu %s\n", n, s.ToString().c_str()); + ASSERT_TRUE(s.IsBusy()); + + // Write a bunch of keys to db to force a compaction + Random rnd(47); + for (int i = 0; i < 1000; i++) { + s = db->Put(write_options, std::to_string(i), + test::CompressibleString(&rnd, 0.8, 100, &value)); + ASSERT_OK(s); + } + + s = txn->Put("X", "yy"); + // Should succeed after verifying there is no write to X in SST file + ASSERT_OK(s); + + s = txn->Put("Z", "zzz"); + // Should fail after encountering a write to Z in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->Delete("S"); + // Should fail after encountering a write to S in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->GetForUpdate(read_options, "foo3", &value); + // should succeed since key was written before txn started + ASSERT_OK(s); + // verify foo3 is locked by txn + s = db->Delete(write_options, "foo3"); + ASSERT_TRUE(s.IsTimedOut()); + + db_impl->TEST_WaitForCompact(); + + s = txn->Commit(); + ASSERT_OK(s); + + // Transaction should only write the keys that succeeded. + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + + s = db->Get(read_options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("yy", value); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ("z", value); delete txn; + } } TEST_F(TransactionTest, NoSnapshotTest) { diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 413cfbbe8..0cf4c7329 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -25,7 +25,8 @@ namespace rocksdb { Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, - SequenceNumber key_seq) { + SequenceNumber key_seq, + bool cache_only) { Status result; auto cfh = reinterpret_cast(column_family); @@ -41,7 +42,7 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, SequenceNumber earliest_seq = db_impl->GetEarliestMemTableSequenceNumber(sv, true); - result = CheckKey(db_impl, sv, earliest_seq, key_seq, key); + result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only); db_impl->ReturnAndCleanupSuperVersion(cfd, sv); } @@ -51,9 +52,10 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, - SequenceNumber key_seq, - const std::string& key) { + SequenceNumber key_seq, const std::string& key, + bool cache_only) { Status result; + bool need_to_read_sst = false; // Since it would be too slow to check the SST files, we will only use // the memtables to check whether there have been any recent writes @@ -63,35 +65,47 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, if (earliest_seq == kMaxSequenceNumber) { // The age of this memtable is unknown. Cannot rely on it to check // for recent writes. This error shouldn't happen often in practice as - // the - // Memtable should have a valid earliest sequence number except in some + // the Memtable should have a valid earliest sequence number except in some // corner cases (such as error cases during recovery). - result = Status::TryAgain( - "Transaction ould not check for conflicts as the MemTable does not " - "countain a long enough history to check write at SequenceNumber: ", - ToString(key_seq)); + need_to_read_sst = true; + if (cache_only) { + result = Status::TryAgain( + "Transaction ould not check for conflicts as the MemTable does not " + "countain a long enough history to check write at SequenceNumber: ", + ToString(key_seq)); + } } else if (key_seq < earliest_seq) { - // The age of this memtable is too new to use to check for recent - // writes. - char msg[255]; - snprintf(msg, sizeof(msg), - "Transaction could not check for conflicts for opearation at " - "SequenceNumber %" PRIu64 - " as the MemTable only contains changes newer than SequenceNumber " - "%" PRIu64 - ". Increasing the value of the " - "max_write_buffer_number_to_maintain option could reduce the " - "frequency " - "of this error.", - key_seq, earliest_seq); - result = Status::TryAgain(msg); - } else { + need_to_read_sst = true; + + if (cache_only) { + // The age of this memtable is too new to use to check for recent + // writes. + char msg[255]; + snprintf(msg, sizeof(msg), + "Transaction could not check for conflicts for operation at " + "SequenceNumber %" PRIu64 + " as the MemTable only contains changes newer than " + "SequenceNumber %" PRIu64 + ". Increasing the value of the " + "max_write_buffer_number_to_maintain option could reduce the " + "frequency " + "of this error.", + key_seq, earliest_seq); + result = Status::TryAgain(msg); + } + } + + if (result.ok()) { SequenceNumber seq = kMaxSequenceNumber; - Status s = db_impl->GetLatestSequenceForKeyFromMemtable(sv, key, &seq); - if (!s.ok()) { + bool found_record_for_key = false; + + Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst, + &seq, &found_record_for_key); + + if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { result = s; - } else if (seq != kMaxSequenceNumber && seq > key_seq) { + } else if (found_record_for_key && (seq > key_seq)) { // Write Conflict result = Status::Busy(); } @@ -100,8 +114,9 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, return result; } -Status TransactionUtil::CheckKeysForConflicts( - DBImpl* db_impl, const TransactionKeyMap& key_map) { +Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl, + const TransactionKeyMap& key_map, + bool cache_only) { Status result; for (auto& key_map_iter : key_map) { @@ -124,7 +139,7 @@ Status TransactionUtil::CheckKeysForConflicts( const auto& key = key_iter.first; const SequenceNumber key_seq = key_iter.second; - result = CheckKey(db_impl, sv, earliest_seq, key_seq, key); + result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only); if (!result.ok()) { break; diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index c843b0ec1..b2ce7da19 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -30,12 +30,16 @@ class TransactionUtil { // Verifies there have been no writes to this key in the db since this // sequence number. // + // If cache_only is true, then this function will not attempt to read any + // SST files. This will make it more likely this function will + // return an error if it is unable to determine if there are any conflicts. + // // Returns OK on success, BUSY if there is a conflicting write, or other error // status for any unexpected errors. static Status CheckKeyForConflicts(DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, - SequenceNumber key_seq); + SequenceNumber key_seq, bool cache_only); // For each key,SequenceNumber pair in the TransactionKeyMap, this function // will verify there have been no writes to the key in the db since that @@ -47,12 +51,13 @@ class TransactionUtil { // REQUIRED: this function should only be called on the write thread or if the // mutex is held. static Status CheckKeysForConflicts(DBImpl* db_impl, - const TransactionKeyMap& keys); + const TransactionKeyMap& keys, + bool cache_only); private: static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber key_seq, - const std::string& key); + const std::string& key, bool cache_only); }; } // namespace rocksdb