diff --git a/db/builder.cc b/db/builder.cc index 68b9fe804..52605b27d 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -59,6 +59,7 @@ Status BuildTable( const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, std::vector snapshots, + SequenceNumber earliest_write_conflict_snapshot, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority, @@ -97,7 +98,7 @@ Status BuildTable( CompactionIterator c_iter(iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, &snapshots, - kMaxSequenceNumber, env, + earliest_write_conflict_snapshot, env, true /* internal key corruption is not ok */); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/builder.h b/db/builder.h index cdafa4ab3..9a4d3b60b 100644 --- a/db/builder.h +++ b/db/builder.h @@ -56,6 +56,7 @@ extern Status BuildTable( const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, std::vector snapshots, + SequenceNumber earliest_write_conflict_snapshot, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 2ae5750a8..73e612a42 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -333,6 +333,10 @@ void CompactionIterator::NextFromInput() { // same key, then this kv is not visible in any snapshot. // Hidden by an newer entry for same user key // TODO: why not > ? + // + // Note: Dropping this key will not affect TransactionDB write-conflict + // checking since there has already been a record returned for this key + // in this snapshot. assert(last_sequence >= current_user_key_sequence_); ++iter_stats_.num_record_drop_hidden; // (A) input_->Next(); @@ -351,6 +355,9 @@ void CompactionIterator::NextFromInput() { // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. + // + // Note: Dropping this Delete will not affect TransactionDB + // write-conflict checking since it is earlier than any snapshot. ++iter_stats_.num_record_drop_obsolete; input_->Next(); } else if (ikey_.type == kTypeMerge) { @@ -400,6 +407,9 @@ void CompactionIterator::PrepareOutput() { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. + + // This is safe for TransactionDB write-conflict checking since transactions + // only care about sequence number larger than any active snapshots. if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ && ikey_.type != kTypeMerge) { assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); diff --git a/db/db_impl.cc b/db/db_impl.cc index 6e776f550..144210412 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1394,11 +1394,17 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, { mutex_.Unlock(); TableFileCreationInfo info; + + SequenceNumber earliest_write_conflict_snapshot; + std::vector snapshot_seqs = + snapshots_.GetAll(&earliest_write_conflict_snapshot); + s = BuildTable( dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(), - cfd->int_tbl_prop_collector_factories(), cfd->GetID(), - snapshots_.GetAll(), GetCompressionFlush(*cfd->ioptions()), + cfd->int_tbl_prop_collector_factories(), cfd->GetID(), snapshot_seqs, + earliest_write_conflict_snapshot, + GetCompressionFlush(*cfd->ioptions()), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); LogFlush(db_options_.info_log); @@ -1453,12 +1459,16 @@ Status DBImpl::FlushMemTableToOutputFile( assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); - FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, - env_options_, versions_.get(), &mutex_, &shutting_down_, - snapshots_.GetAll(), job_context, log_buffer, - directories_.GetDbDir(), directories_.GetDataDir(0U), - GetCompressionFlush(*cfd->ioptions()), stats_, - &event_logger_); + SequenceNumber earliest_write_conflict_snapshot; + std::vector snapshot_seqs = + snapshots_.GetAll(&earliest_write_conflict_snapshot); + + FlushJob flush_job( + dbname_, cfd, db_options_, mutable_cf_options, env_options_, + versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, + earliest_write_conflict_snapshot, job_context, log_buffer, + directories_.GetDbDir(), directories_.GetDataDir(0U), + GetCompressionFlush(*cfd->ioptions()), stats_, &event_logger_); FileMetaData file_meta; @@ -5368,9 +5378,9 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE -Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, - const Slice& key, - SequenceNumber* seq) { +Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, + bool cache_only, SequenceNumber* seq, + bool* found_record_for_key) { Status s; std::string value; MergeContext merge_context; @@ -5379,6 +5389,10 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, LookupKey lkey(key, current_seq); *seq = kMaxSequenceNumber; + *found_record_for_key = false; + + // TODO(agiardullo): Should optimize all the Get() functions below to not + // return a value since we do not use it. // Check if there is a record for this key in the latest memtable sv->mem->Get(lkey, &value, &s, &merge_context, seq); @@ -5394,6 +5408,7 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check immutable memtables + *found_record_for_key = true; return Status::OK(); } @@ -5411,6 +5426,7 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, if (*seq != kMaxSequenceNumber) { // Found a sequence number, no need to check memtable history + *found_record_for_key = true; return Status::OK(); } @@ -5426,6 +5442,31 @@ Status DBImpl::GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, return s; } + if (*seq != kMaxSequenceNumber) { + // Found a sequence number, no need to check SST files + *found_record_for_key = true; + return Status::OK(); + } + + // TODO(agiardullo): possible optimization: consider checking cached + // SST files if cache_only=true? + if (!cache_only) { + // Check tables + ReadOptions read_options; + + sv->current->Get(read_options, lkey, &value, &s, &merge_context, + nullptr /* value_found */, found_record_for_key, seq); + + if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { + // unexpected error reading SST files + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "Unexpected status returned from Version::Get: %s\n", + s.ToString().c_str()); + + return s; + } + } + return Status::OK(); } #endif // ROCKSDB_LITE diff --git a/db/db_impl.h b/db/db_impl.h index a02f47bfb..0f44b378a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -226,13 +226,30 @@ class DBImpl : public DB { bool include_history); // For a given key, check to see if there are any records for this key - // in the memtables, including memtable history. - - // On success, *seq will contain the sequence number for the - // latest such change or kMaxSequenceNumber if no records were present. - // Returns OK on success, other status on error reading memtables. - Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key, - SequenceNumber* seq); + // in the memtables, including memtable history. If cache_only is false, + // SST files will also be checked. + // + // If a key is found, *found_record_for_key will be set to true and + // *seq will will be set to the stored sequence number for the latest + // operation on this key or kMaxSequenceNumber if unknown. + // If no key is found, *found_record_for_key will be set to false. + // + // Note: If cache_only=false, it is possible for *seq to be set to 0 if + // the sequence number has been cleared from the record. If the caller is + // holding an active db snapshot, we know the missing sequence must be less + // than the snapshot's sequence number (sequence numbers are only cleared + // when there are no earlier active snapshots). + // + // If NotFound is returned and found_record_for_key is set to false, then no + // record for this key was found. If the caller is holding an active db + // snapshot, we know that no key could have existing after this snapshot + // (since we do not compact keys that have an earlier snapshot). + // + // Returns OK or NotFound on success, + // other status on unexpected error. + Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, + bool cache_only, SequenceNumber* seq, + bool* found_record_for_key); using DB::AddFile; virtual Status AddFile(ColumnFamilyHandle* column_family, diff --git a/db/flush_job.cc b/db/flush_job.cc index a20a0ba98..9da7d9546 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -62,6 +62,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, @@ -75,6 +76,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, db_mutex_(db_mutex), shutting_down_(shutting_down), existing_snapshots_(std::move(existing_snapshots)), + earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), job_context_(job_context), log_buffer_(log_buffer), db_directory_(db_directory), @@ -235,8 +237,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - existing_snapshots_, output_compression_, - cfd_->ioptions()->compression_opts, + existing_snapshots_, earliest_write_conflict_snapshot_, + output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), Env::IO_HIGH, &table_properties_); info.table_properties = table_properties_; diff --git a/db/flush_job.h b/db/flush_job.h index dbc4113e1..d12da141e 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -58,6 +58,7 @@ class FlushJob { const EnvOptions& env_options, VersionSet* versions, InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, @@ -83,6 +84,7 @@ class FlushJob { InstrumentedMutex* db_mutex_; std::atomic* shutting_down_; std::vector existing_snapshots_; + SequenceNumber earliest_write_conflict_snapshot_; JobContext* job_context_; LogBuffer* log_buffer_; Directory* db_directory_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index d20cf5e90..f7071c1ee 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -92,8 +92,8 @@ TEST_F(FlushJobTest, Empty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr, &event_logger); + {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, + nullptr, kNoCompression, nullptr, &event_logger); ASSERT_OK(flush_job.Run()); job_context.Clean(); } @@ -131,8 +131,8 @@ TEST_F(FlushJobTest, NonEmpty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr, &event_logger); + {}, kMaxSequenceNumber, &job_context, nullptr, nullptr, + nullptr, kNoCompression, nullptr, &event_logger); FileMetaData fd; mutex_.Lock(); ASSERT_OK(flush_job.Run(&fd)); @@ -195,8 +195,8 @@ TEST_F(FlushJobTest, Snapshots) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - snapshots, &job_context, nullptr, nullptr, nullptr, - kNoCompression, nullptr, &event_logger); + snapshots, kMaxSequenceNumber, &job_context, nullptr, + nullptr, nullptr, kNoCompression, nullptr, &event_logger); mutex_.Lock(); ASSERT_OK(flush_job.Run()); mutex_.Unlock(); diff --git a/db/repair.cc b/db/repair.cc index c27db6e49..a03549099 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -294,7 +294,8 @@ class Repairer { dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(), &meta, icmp_, &int_tbl_prop_collector_factories_, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, {}, - kNoCompression, CompressionOptions(), false, nullptr); + kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, + nullptr); } delete mem->Unref(); delete cf_mems_default; diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 4fa0bb9d5..277cf3a20 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -91,7 +91,7 @@ class SnapshotList { ret.push_back(s->next_->number_); if (oldest_write_conflict_snapshot != nullptr && - *oldest_write_conflict_snapshot != kMaxSequenceNumber && + *oldest_write_conflict_snapshot == kMaxSequenceNumber && s->next_->is_write_conflict_boundary_) { // If this is the first write-conflict boundary snapshot in the list, // it is the oldest diff --git a/db/table_cache.cc b/db/table_cache.cc index 82b52ddb5..48b8e1b07 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -219,7 +219,9 @@ Status TableCache::Get(const ReadOptions& options, IterKey row_cache_key; std::string row_cache_entry_buffer; - if (ioptions_.row_cache) { + // Check row cache if enabled. Since row cache does not currently store + // sequence numbers, we cannot use it if we need to fetch the sequence. + if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { uint64_t fd_number = fd.GetNumber(); auto user_key = ExtractUserKey(k); // We use the user key as cache key instead of the internal key, diff --git a/db/version_set.cc b/db/version_set.cc index a9422a84b..66762bebe 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -868,21 +868,24 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, refs_(0), version_number_(version_number) {} -void Version::Get(const ReadOptions& read_options, - const LookupKey& k, - std::string* value, - Status* status, - MergeContext* merge_context, - bool* value_found) { +void Version::Get(const ReadOptions& read_options, const LookupKey& k, + std::string* value, Status* status, + MergeContext* merge_context, bool* value_found, + bool* key_exists, SequenceNumber* seq) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); assert(status->ok() || status->IsMergeInProgress()); + if (key_exists != nullptr) { + // will falsify below if not found + *key_exists = true; + } + GetContext get_context( user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, - value, value_found, merge_context, this->env_); + value, value_found, merge_context, this->env_, seq); FilePicker fp( storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, @@ -942,6 +945,9 @@ void Version::Get(const ReadOptions& read_options, user_key); } } else { + if (key_exists != nullptr) { + *key_exists = false; + } *status = Status::NotFound(); // Use an empty error message for speed } } diff --git a/db/version_set.h b/db/version_set.h index b7482499f..997cd2a80 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -424,11 +424,24 @@ class Version { // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. - // Uses *operands to store merge_operator operations to apply later + // Uses *operands to store merge_operator operations to apply later. + // + // If the ReadOptions.read_tier is set to do a read-only fetch, then + // *value_found will be set to false if it cannot be determined whether + // this value exists without doing IO. + // + // If the key is Deleted, *status will be set to NotFound and + // *key_exists will be set to true. + // If no key was found, *status will be set to NotFound and + // *key_exists will be set to false. + // If seq is non-null, *seq will be set to the sequence number found + // for the key if a key was found. + // // REQUIRES: lock is not held void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, MergeContext* merge_context, - bool* value_found = nullptr); + bool* value_found = nullptr, bool* key_exists = nullptr, + SequenceNumber* seq = nullptr); // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 3bb250e9c..1286840fe 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -93,6 +93,12 @@ class CompactionFilter { // The compaction process invokes this method on every merge operand. If this // method returns true, the merge operand will be ignored and not written out // in the compaction output + // + // Note: If you are using a TransactionDB, it is not recommended to implement + // FilterMergeOperand(). If a Merge operation is filtered out, TransactionDB + // may not realize there is a write conflict and may allow a Transaction to + // Commit that should have failed. Instead, it is better to implement any + // Merge filtering inside the MergeOperator. virtual bool FilterMergeOperand(int level, const Slice& key, const Slice& operand) const { return false; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5ec1da71d..04f9d7d8f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -276,9 +276,17 @@ struct ColumnFamilyOptions { // max_write_buffer_number, this parameter does not affect flushing. // This controls the minimum amount of write history that will be available // in memory for conflict checking when Transactions are used. + // + // When using an OptimisticTransactionDB: // If this value is too low, some transactions may fail at commit time due // to not being able to determine whether there were any write conflicts. // + // When using a TransactionDB: + // If Transaction::SetSnapshot is used, TransactionDB will read either + // in-memory write buffers or SST files to do write-conflict checking. + // Increasing this value can reduce the number of reads to SST files + // done for conflict detection. + // // Setting this value to 0 will cause write buffers to be freed immediately // after they are flushed. // If this value is set to -1, 'max_write_buffer_number' will be used. diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index 2d413f043..d806e77ef 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -143,11 +143,16 @@ Status CuckooTableReader::Get(const ReadOptions& readOptions, const Slice& key, return Status::OK(); } // Here, we compare only the user key part as we support only one entry - // per user key and we don't support sanpshot. + // per user key and we don't support snapshot. if (ucomp_->Equal(user_key, Slice(bucket, user_key.size()))) { Slice value(bucket + key_length_, value_length_); if (is_last_level_) { - get_context->SaveValue(value); + // Sequence number is not stored at the last level, so we will use + // kMaxSequenceNumber since it is unknown. This could cause some + // transactions to fail to lock a key due to known sequence number. + // However, it is expected for anyone to use a CuckooTable in a + // TransactionDB. + get_context->SaveValue(value, kMaxSequenceNumber); } else { Slice full_key(bucket, key_length_); ParsedInternalKey found_ikey; diff --git a/table/get_context.cc b/table/get_context.cc index 609ca3083..ef4f6aff9 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -34,7 +34,8 @@ GetContext::GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, - bool* value_found, MergeContext* merge_context, Env* env) + bool* value_found, MergeContext* merge_context, Env* env, + SequenceNumber* seq) : ucmp_(ucmp), merge_operator_(merge_operator), logger_(logger), @@ -45,7 +46,12 @@ GetContext::GetContext(const Comparator* ucmp, value_found_(value_found), merge_context_(merge_context), env_(env), - replay_log_(nullptr) {} + seq_(seq), + replay_log_(nullptr) { + if (seq_) { + *seq_ = kMaxSequenceNumber; + } +} // Called from TableCache::Get and Table::Get when file/block in which // key may exist are not there in TableCache/BlockCache respectively. In this @@ -59,7 +65,7 @@ void GetContext::MarkKeyMayExist() { } } -void GetContext::SaveValue(const Slice& value) { +void GetContext::SaveValue(const Slice& value, SequenceNumber seq) { assert(state_ == kNotFound); appendToReplayLog(replay_log_, kTypeValue, value); @@ -74,6 +80,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, if (ucmp_->Equal(parsed_key.user_key, user_key_)) { appendToReplayLog(replay_log_, parsed_key.type, value); + if (seq_ != nullptr) { + // Set the sequence number if it is uninitialized + if (*seq_ == kMaxSequenceNumber) { + *seq_ = parsed_key.sequence; + } + } + // Key matches. Process it switch (parsed_key.type) { case kTypeValue: @@ -154,8 +167,11 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key, bool ret = GetLengthPrefixedSlice(&s, &value); assert(ret); (void)ret; - // Sequence number is ignored in SaveValue, so we just pass 0. - get_context->SaveValue(ParsedInternalKey(user_key, 0, type), value); + + // Since SequenceNumber is not stored and unknown, we will use + // kMaxSequenceNumber. + get_context->SaveValue( + ParsedInternalKey(user_key, kMaxSequenceNumber, type), value); } #else // ROCKSDB_LITE assert(false); diff --git a/table/get_context.h b/table/get_context.h index 2c2dd8e1d..c06c3c8d4 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -7,6 +7,7 @@ #include #include "db/merge_context.h" #include "rocksdb/env.h" +#include "rocksdb/types.h" namespace rocksdb { class MergeContext; @@ -24,11 +25,22 @@ class GetContext { GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, Statistics* statistics, GetState init_state, const Slice& user_key, std::string* ret_value, bool* value_found, - MergeContext* merge_context, Env* env_); + MergeContext* merge_context, Env* env, + SequenceNumber* seq = nullptr); void MarkKeyMayExist(); - void SaveValue(const Slice& value); + + // Records this key, value, and any meta-data (such as sequence number and + // state) into this GetContext. + // + // Returns True if more keys need to be read (due to merges) or + // False if the complete value has been found. bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value); + + // Simplified version of the previous function. Should only be used when we + // know that the operation is a Put. + void SaveValue(const Slice& value, SequenceNumber seq); + GetState State() const { return state_; } // If a non-null string is passed, all the SaveValue calls will be @@ -36,6 +48,9 @@ class GetContext { // another GetContext with replayGetContextLog. void SetReplayLog(std::string* replay_log) { replay_log_ = replay_log; } + // Do we need to fetch the SequenceNumber for this key? + bool NeedToReadSequence() const { return (seq_ != nullptr); } + private: const Comparator* ucmp_; const MergeOperator* merge_operator_; @@ -49,6 +64,9 @@ class GetContext { bool* value_found_; // Is value set correctly? Used by KeyMayExist MergeContext* merge_context_; Env* env_; + // If a key is found, seq_ will be set to the SequenceNumber of most recent + // write to the key or kMaxSequenceNumber if unknown + SequenceNumber* seq_; std::string* replay_log_; }; diff --git a/utilities/transactions/optimistic_transaction_impl.cc b/utilities/transactions/optimistic_transaction_impl.cc index 028bed4c9..120f18ed8 100644 --- a/utilities/transactions/optimistic_transaction_impl.cc +++ b/utilities/transactions/optimistic_transaction_impl.cc @@ -95,15 +95,19 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family, // if we can not determine whether there would be any such conflicts. // // Should only be called on writer thread in order to avoid any race conditions -// in detecting -// write conflicts. +// in detecting write conflicts. Status OptimisticTransactionImpl::CheckTransactionForConflicts(DB* db) { Status result; assert(dynamic_cast(db) != nullptr); auto db_impl = reinterpret_cast(db); - return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys()); + // Since we are on the write thread and do not want to block other writers, + // we will do a cache-only conflict check. This can result in TryAgain + // getting returned if there is not sufficient memtable history to check + // for conflicts. + return TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys(), + true /* cache_only */); } } // namespace rocksdb diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 33eb6509b..7480ce6dd 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -312,7 +312,8 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family, column_family ? column_family : db_impl->DefaultColumnFamily(); return TransactionUtil::CheckKeyForConflicts( - db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber()); + db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber(), + false /* cache_only */); } } // namespace rocksdb diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 73f227ffb..dc69f101c 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -12,8 +12,10 @@ #include "rocksdb/options.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" +#include "table/mock_table.h" #include "util/logging.h" #include "util/testharness.h" +#include "util/testutil.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" @@ -32,6 +34,8 @@ class TransactionTest : public testing::Test { TransactionTest() { options.create_if_missing = true; options.max_write_buffer_number = 2; + options.write_buffer_size = 4 * 1024; + options.level0_file_num_compaction_trigger = 2; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); dbname = test::TmpDir() + "/transaction_testdb"; @@ -46,6 +50,15 @@ class TransactionTest : public testing::Test { delete db; DestroyDB(dbname, options); } + + Status ReOpen() { + delete db; + DestroyDB(dbname, options); + + Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); + + return s; + } }; TEST_F(TransactionTest, SuccessTest) { @@ -85,6 +98,43 @@ TEST_F(TransactionTest, SuccessTest) { delete txn; } +TEST_F(TransactionTest, FirstWriteTest) { + WriteOptions write_options; + + // Test conflict checking against the very first write to a db. + // The transaction's snapshot will have seq 1 and the following write + // will have sequence 1. + Status s = db->Put(write_options, "A", "a"); + + Transaction* txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + ASSERT_OK(s); + + s = txn->Put("A", "b"); + ASSERT_OK(s); + + delete txn; +} + +TEST_F(TransactionTest, FirstWriteTest2) { + WriteOptions write_options; + + Transaction* txn = db->BeginTransaction(write_options); + txn->SetSnapshot(); + + // Test conflict checking against the very first write to a db. + // The transaction's snapshot is a seq 0 while the following write + // will have sequence 1. + Status s = db->Put(write_options, "A", "a"); + ASSERT_OK(s); + + s = txn->Put("A", "b"); + ASSERT_TRUE(s.IsBusy()); + + delete txn; +} + TEST_F(TransactionTest, WriteConflictTest) { WriteOptions write_options; ReadOptions read_options; @@ -268,68 +318,155 @@ TEST_F(TransactionTest, FlushTest) { } TEST_F(TransactionTest, FlushTest2) { - WriteOptions write_options; - ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; - string value; - Status s; + const size_t num_tests = 3; + + for (size_t n = 0; n < num_tests; n++) { + // Test different table factories + switch (n) { + case 0: + break; + case 1: + options.table_factory.reset(new mock::MockTableFactory()); + break; + case 2: { + PlainTableOptions pt_opts; + pt_opts.hash_table_ratio = 0; + options.table_factory.reset(NewPlainTableFactory(pt_opts)); + break; + } + } - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + Status s = ReOpen(); + ASSERT_OK(s); - txn_options.set_snapshot = true; - Transaction* txn = db->BeginTransaction(write_options, txn_options); - ASSERT_TRUE(txn); + WriteOptions write_options; + ReadOptions read_options, snapshot_read_options; + TransactionOptions txn_options; + string value; - snapshot_read_options.snapshot = txn->GetSnapshot(); + DBImpl* db_impl = reinterpret_cast(db->GetBaseDB()); - txn->GetForUpdate(snapshot_read_options, "foo", &value); - ASSERT_EQ(value, "bar"); + db->Put(write_options, Slice("foo"), Slice("bar")); + db->Put(write_options, Slice("foo2"), Slice("bar2")); + db->Put(write_options, Slice("foo3"), Slice("bar3")); - s = txn->Put(Slice("foo"), Slice("bar2")); - ASSERT_OK(s); + txn_options.set_snapshot = true; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn); - txn->GetForUpdate(snapshot_read_options, "foo", &value); - ASSERT_EQ(value, "bar2"); + snapshot_read_options.snapshot = txn->GetSnapshot(); - // Put a random key so we have a MemTable to flush - s = db->Put(write_options, "dummy", "dummy"); - ASSERT_OK(s); + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar"); - // force a memtable flush - FlushOptions flush_ops; - db->Flush(flush_ops); + s = txn->Put(Slice("foo"), Slice("bar2")); + ASSERT_OK(s); - // Put a random key so we have a MemTable to flush - s = db->Put(write_options, "dummy", "dummy2"); - ASSERT_OK(s); + txn->GetForUpdate(snapshot_read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + // verify foo is locked by txn + s = db->Delete(write_options, "foo"); + ASSERT_TRUE(s.IsTimedOut()); - // force a memtable flush - db->Flush(flush_ops); + s = db->Put(write_options, "Z", "z"); + ASSERT_OK(s); + s = db->Put(write_options, "dummy", "dummy"); + ASSERT_OK(s); - s = db->Put(write_options, "dummy", "dummy3"); - ASSERT_OK(s); + s = db->Put(write_options, "S", "s"); + ASSERT_OK(s); + s = db->SingleDelete(write_options, "S"); + ASSERT_OK(s); - // force a memtable flush - // Since our test db has max_write_buffer_number=2, this flush will cause - // the first memtable to get purged from the MemtableList history. - db->Flush(flush_ops); + s = txn->Delete("S"); + // Should fail after encountering a write to S in memtable + ASSERT_TRUE(s.IsBusy()); - s = txn->Put("X", "Y"); - // Put should fail since MemTableList History is not older than the snapshot. - ASSERT_TRUE(s.IsTryAgain()); + // force a memtable flush + s = db_impl->TEST_FlushMemTable(true); + ASSERT_OK(s); - s = txn->Commit(); - ASSERT_OK(s); + // Put a random key so we have a MemTable to flush + s = db->Put(write_options, "dummy", "dummy2"); + ASSERT_OK(s); - // Transaction should only write the keys that succeeded. - s = db->Get(read_options, "foo", &value); - ASSERT_EQ(value, "bar2"); + // force a memtable flush + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); - s = db->Get(read_options, "X", &value); - ASSERT_TRUE(s.IsNotFound()); + s = db->Put(write_options, "dummy", "dummy3"); + ASSERT_OK(s); + + // force a memtable flush + // Since our test db has max_write_buffer_number=2, this flush will cause + // the first memtable to get purged from the MemtableList history. + ASSERT_OK(db_impl->TEST_FlushMemTable(true)); + + s = txn->Put("X", "Y"); + // Should succeed after verifying there is no write to X in SST file + ASSERT_OK(s); + + s = txn->Put("Z", "zz"); + // Should fail after encountering a write to Z in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->GetForUpdate(read_options, "foo2", &value); + // should succeed since key was written before txn started + ASSERT_OK(s); + // verify foo2 is locked by txn + s = db->Delete(write_options, "foo2"); + ASSERT_TRUE(s.IsTimedOut()); + + s = txn->Delete("S"); + // Should fail after encountering a write to S in SST file + fprintf(stderr, "%lu %s\n", n, s.ToString().c_str()); + ASSERT_TRUE(s.IsBusy()); + + // Write a bunch of keys to db to force a compaction + Random rnd(47); + for (int i = 0; i < 1000; i++) { + s = db->Put(write_options, std::to_string(i), + test::CompressibleString(&rnd, 0.8, 100, &value)); + ASSERT_OK(s); + } + + s = txn->Put("X", "yy"); + // Should succeed after verifying there is no write to X in SST file + ASSERT_OK(s); + + s = txn->Put("Z", "zzz"); + // Should fail after encountering a write to Z in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->Delete("S"); + // Should fail after encountering a write to S in SST file + ASSERT_TRUE(s.IsBusy()); + + s = txn->GetForUpdate(read_options, "foo3", &value); + // should succeed since key was written before txn started + ASSERT_OK(s); + // verify foo3 is locked by txn + s = db->Delete(write_options, "foo3"); + ASSERT_TRUE(s.IsTimedOut()); + + db_impl->TEST_WaitForCompact(); + + s = txn->Commit(); + ASSERT_OK(s); + + // Transaction should only write the keys that succeeded. + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(value, "bar2"); + + s = db->Get(read_options, "X", &value); + ASSERT_OK(s); + ASSERT_EQ("yy", value); + + s = db->Get(read_options, "Z", &value); + ASSERT_OK(s); + ASSERT_EQ("z", value); delete txn; + } } TEST_F(TransactionTest, NoSnapshotTest) { diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 413cfbbe8..0cf4c7329 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -25,7 +25,8 @@ namespace rocksdb { Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, - SequenceNumber key_seq) { + SequenceNumber key_seq, + bool cache_only) { Status result; auto cfh = reinterpret_cast(column_family); @@ -41,7 +42,7 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, SequenceNumber earliest_seq = db_impl->GetEarliestMemTableSequenceNumber(sv, true); - result = CheckKey(db_impl, sv, earliest_seq, key_seq, key); + result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only); db_impl->ReturnAndCleanupSuperVersion(cfd, sv); } @@ -51,9 +52,10 @@ Status TransactionUtil::CheckKeyForConflicts(DBImpl* db_impl, Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, - SequenceNumber key_seq, - const std::string& key) { + SequenceNumber key_seq, const std::string& key, + bool cache_only) { Status result; + bool need_to_read_sst = false; // Since it would be too slow to check the SST files, we will only use // the memtables to check whether there have been any recent writes @@ -63,35 +65,47 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, if (earliest_seq == kMaxSequenceNumber) { // The age of this memtable is unknown. Cannot rely on it to check // for recent writes. This error shouldn't happen often in practice as - // the - // Memtable should have a valid earliest sequence number except in some + // the Memtable should have a valid earliest sequence number except in some // corner cases (such as error cases during recovery). - result = Status::TryAgain( - "Transaction ould not check for conflicts as the MemTable does not " - "countain a long enough history to check write at SequenceNumber: ", - ToString(key_seq)); + need_to_read_sst = true; + if (cache_only) { + result = Status::TryAgain( + "Transaction ould not check for conflicts as the MemTable does not " + "countain a long enough history to check write at SequenceNumber: ", + ToString(key_seq)); + } } else if (key_seq < earliest_seq) { - // The age of this memtable is too new to use to check for recent - // writes. - char msg[255]; - snprintf(msg, sizeof(msg), - "Transaction could not check for conflicts for opearation at " - "SequenceNumber %" PRIu64 - " as the MemTable only contains changes newer than SequenceNumber " - "%" PRIu64 - ". Increasing the value of the " - "max_write_buffer_number_to_maintain option could reduce the " - "frequency " - "of this error.", - key_seq, earliest_seq); - result = Status::TryAgain(msg); - } else { + need_to_read_sst = true; + + if (cache_only) { + // The age of this memtable is too new to use to check for recent + // writes. + char msg[255]; + snprintf(msg, sizeof(msg), + "Transaction could not check for conflicts for operation at " + "SequenceNumber %" PRIu64 + " as the MemTable only contains changes newer than " + "SequenceNumber %" PRIu64 + ". Increasing the value of the " + "max_write_buffer_number_to_maintain option could reduce the " + "frequency " + "of this error.", + key_seq, earliest_seq); + result = Status::TryAgain(msg); + } + } + + if (result.ok()) { SequenceNumber seq = kMaxSequenceNumber; - Status s = db_impl->GetLatestSequenceForKeyFromMemtable(sv, key, &seq); - if (!s.ok()) { + bool found_record_for_key = false; + + Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst, + &seq, &found_record_for_key); + + if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { result = s; - } else if (seq != kMaxSequenceNumber && seq > key_seq) { + } else if (found_record_for_key && (seq > key_seq)) { // Write Conflict result = Status::Busy(); } @@ -100,8 +114,9 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, return result; } -Status TransactionUtil::CheckKeysForConflicts( - DBImpl* db_impl, const TransactionKeyMap& key_map) { +Status TransactionUtil::CheckKeysForConflicts(DBImpl* db_impl, + const TransactionKeyMap& key_map, + bool cache_only) { Status result; for (auto& key_map_iter : key_map) { @@ -124,7 +139,7 @@ Status TransactionUtil::CheckKeysForConflicts( const auto& key = key_iter.first; const SequenceNumber key_seq = key_iter.second; - result = CheckKey(db_impl, sv, earliest_seq, key_seq, key); + result = CheckKey(db_impl, sv, earliest_seq, key_seq, key, cache_only); if (!result.ok()) { break; diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index c843b0ec1..b2ce7da19 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -30,12 +30,16 @@ class TransactionUtil { // Verifies there have been no writes to this key in the db since this // sequence number. // + // If cache_only is true, then this function will not attempt to read any + // SST files. This will make it more likely this function will + // return an error if it is unable to determine if there are any conflicts. + // // Returns OK on success, BUSY if there is a conflicting write, or other error // status for any unexpected errors. static Status CheckKeyForConflicts(DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, - SequenceNumber key_seq); + SequenceNumber key_seq, bool cache_only); // For each key,SequenceNumber pair in the TransactionKeyMap, this function // will verify there have been no writes to the key in the db since that @@ -47,12 +51,13 @@ class TransactionUtil { // REQUIRED: this function should only be called on the write thread or if the // mutex is held. static Status CheckKeysForConflicts(DBImpl* db_impl, - const TransactionKeyMap& keys); + const TransactionKeyMap& keys, + bool cache_only); private: static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber key_seq, - const std::string& key); + const std::string& key, bool cache_only); }; } // namespace rocksdb