diff --git a/CMakeLists.txt b/CMakeLists.txt index b4cad7e29..2e9670188 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -543,10 +543,11 @@ set(SOURCES utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/transactions/optimistic_transaction.cc - utilities/transactions/transaction_base.cc + utilities/transactions/pessimistic_transaction.cc utilities/transactions/pessimistic_transaction_db.cc + utilities/transactions/snapshot_checker.cc + utilities/transactions/transaction_base.cc utilities/transactions/transaction_db_mutex_impl.cc - utilities/transactions/pessimistic_transaction.cc utilities/transactions/transaction_lock_mgr.cc utilities/transactions/transaction_util.cc utilities/transactions/write_prepared_txn.cc diff --git a/TARGETS b/TARGETS index 1bea19bce..57a8bd2df 100644 --- a/TARGETS +++ b/TARGETS @@ -248,10 +248,11 @@ cpp_library( "utilities/table_properties_collectors/compact_on_deletion_collector.cc", "utilities/transactions/optimistic_transaction_db_impl.cc", "utilities/transactions/optimistic_transaction.cc", - "utilities/transactions/transaction_base.cc", + "utilities/transactions/pessimistic_transaction.cc", "utilities/transactions/pessimistic_transaction_db.cc", + "utilities/transactions/snapshot_checker.cc", + "utilities/transactions/transaction_base.cc", "utilities/transactions/transaction_db_mutex_impl.cc", - "utilities/transactions/pessimistic_transaction.cc", "utilities/transactions/transaction_lock_mgr.cc", "utilities/transactions/transaction_util.cc", "utilities/transactions/write_prepared_txn.cc", diff --git a/db/builder.cc b/db/builder.cc index 6f973fdbd..0c3cc0919 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -70,7 +70,7 @@ Status BuildTable( uint32_t column_family_id, const std::string& column_family_name, std::vector snapshots, SequenceNumber earliest_write_conflict_snapshot, - const CompressionType compression, + SnapshotChecker* snapshot_checker, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, @@ -135,7 +135,7 @@ Status BuildTable( CompactionIterator c_iter( iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, - &snapshots, earliest_write_conflict_snapshot, env, + &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, true /* internal key corruption is not ok */, range_del_agg.get()); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/builder.h b/db/builder.h index a432a7531..17190f8fc 100644 --- a/db/builder.h +++ b/db/builder.h @@ -29,6 +29,7 @@ struct FileMetaData; class Env; struct EnvOptions; class Iterator; +class SnapshotChecker; class TableCache; class VersionEdit; class TableBuilder; @@ -71,7 +72,7 @@ extern Status BuildTable( uint32_t column_family_id, const std::string& column_family_name, std::vector snapshots, SequenceNumber earliest_write_conflict_snapshot, - const CompressionType compression, + SnapshotChecker* snapshot_checker, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger = nullptr, int job_id = 0, diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 8eac637c4..fe29460a4 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -4,6 +4,9 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/compaction_iterator.h" + +#include "db/snapshot_checker.h" +#include "port/likely.h" #include "rocksdb/listener.h" #include "table/internal_iterator.h" @@ -37,15 +40,16 @@ CompactionEventListener::CompactionListenerValueType fromInternalValueType( CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, Env* env, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, const Compaction* compaction, const CompactionFilter* compaction_filter, CompactionEventListener* compaction_listener, const std::atomic* shutting_down) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, - earliest_write_conflict_snapshot, env, expect_valid_internal_key, - range_del_agg, + earliest_write_conflict_snapshot, snapshot_checker, env, + expect_valid_internal_key, range_del_agg, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, compaction_listener, shutting_down) {} @@ -53,7 +57,8 @@ CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, Env* env, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter, @@ -64,6 +69,7 @@ CompactionIterator::CompactionIterator( merge_helper_(merge_helper), snapshots_(snapshots), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), + snapshot_checker_(snapshot_checker), env_(env), expect_valid_internal_key_(expect_valid_internal_key), range_del_agg_(range_del_agg), @@ -166,6 +172,55 @@ void CompactionIterator::Next() { PrepareOutput(); } +void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, + Slice* skip_until) { + if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && + (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || + ignore_snapshots_)) { + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. If the return value of the compaction filter is true, + // replace the entry with a deletion marker. + CompactionFilter::Decision filter; + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + { + StopWatchNano timer(env_, true); + filter = compaction_filter_->FilterV2( + compaction_->level(), ikey_.user_key, + CompactionFilter::ValueType::kValue, value_, + &compaction_filter_value_, compaction_filter_skip_until_.rep()); + iter_stats_.total_filter_time += + env_ != nullptr ? timer.ElapsedNanos() : 0; + } + + if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && + cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= + 0) { + // Can't skip to a key smaller than the current one. + // Keep the key as per FilterV2 documentation. + filter = CompactionFilter::Decision::kKeep; + } + + if (filter == CompactionFilter::Decision::kRemove) { + // convert the current key to a delete; key_ is pointing into + // current_key_ at this point, so updating current_key_ updates key() + ikey_.type = kTypeDeletion; + current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); + // no value associated with delete + value_.clear(); + iter_stats_.num_record_drop_user++; + } else if (filter == CompactionFilter::Decision::kChangeValue) { + value_ = compaction_filter_value_; + } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + *need_skip = true; + compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, + kValueTypeForSeek); + *skip_until = compaction_filter_skip_until_.Encode(); + } + } +} + void CompactionIterator::NextFromInput() { at_next_ = false; valid_ = false; @@ -220,6 +275,9 @@ void CompactionIterator::NextFromInput() { has_outputted_key_ = false; current_user_key_sequence_ = kMaxSequenceNumber; current_user_key_snapshot_ = 0; + current_key_committed_ = + (snapshot_checker_ == nullptr || + snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber)); #ifndef ROCKSDB_LITE if (compaction_listener_) { @@ -227,53 +285,12 @@ void CompactionIterator::NextFromInput() { fromInternalValueType(ikey_.type), value_, ikey_.sequence, true); } -#endif // ROCKSDB_LITE - - // apply the compaction filter to the first occurrence of the user key - if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && - (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || - ignore_snapshots_)) { - // If the user has specified a compaction filter and the sequence - // number is greater than any external snapshot, then invoke the - // filter. If the return value of the compaction filter is true, - // replace the entry with a deletion marker. - CompactionFilter::Decision filter; - compaction_filter_value_.clear(); - compaction_filter_skip_until_.Clear(); - { - StopWatchNano timer(env_, true); - filter = compaction_filter_->FilterV2( - compaction_->level(), ikey_.user_key, - CompactionFilter::ValueType::kValue, value_, - &compaction_filter_value_, compaction_filter_skip_until_.rep()); - iter_stats_.total_filter_time += - env_ != nullptr ? timer.ElapsedNanos() : 0; - } - - if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && - cmp_->Compare(*compaction_filter_skip_until_.rep(), - ikey_.user_key) <= 0) { - // Can't skip to a key smaller than the current one. - // Keep the key as per FilterV2 documentation. - filter = CompactionFilter::Decision::kKeep; - } +#endif // !ROCKSDB_LITE - if (filter == CompactionFilter::Decision::kRemove) { - // convert the current key to a delete; key_ is pointing into - // current_key_ at this point, so updating current_key_ updates key() - ikey_.type = kTypeDeletion; - current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); - // no value associated with delete - value_.clear(); - iter_stats_.num_record_drop_user++; - } else if (filter == CompactionFilter::Decision::kChangeValue) { - value_ = compaction_filter_value_; - } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { - need_skip = true; - compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, - kValueTypeForSeek); - skip_until = compaction_filter_skip_until_.Encode(); - } + // Apply the compaction filter to the first committed version of the user + // key. + if (current_key_committed_) { + InvokeFilterIfNeeded(&need_skip, &skip_until); } } else { #ifndef ROCKSDB_LITE @@ -292,6 +309,26 @@ void CompactionIterator::NextFromInput() { current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); key_ = current_key_.GetInternalKey(); ikey_.user_key = current_key_.GetUserKey(); + + // Note that newer version of a key is ordered before older versions. If a + // newer version of a key is committed, so as the older version. No need + // to query snapshot_checker_ in that case. + if (UNLIKELY(!current_key_committed_)) { + assert(snapshot_checker_ != nullptr); + current_key_committed_ = + snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber); + // Apply the compaction filter to the first committed version of the + // user key. + if (current_key_committed_) { + InvokeFilterIfNeeded(&need_skip, &skip_until); + } + } + } + + if (UNLIKELY(!current_key_committed_)) { + assert(snapshot_checker_ != nullptr); + valid_ = true; + break; } // If there are no snapshots, then this kv affect visibility at tip. @@ -557,6 +594,9 @@ void CompactionIterator::PrepareOutput() { // only care about sequence number larger than any active snapshots. if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ && + (snapshot_checker_ == nullptr || + LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence, + earliest_snapshot_))) && ikey_.type != kTypeMerge && !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) { assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); @@ -568,10 +608,11 @@ void CompactionIterator::PrepareOutput() { inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( SequenceNumber in, SequenceNumber* prev_snapshot) { assert(snapshots_->size()); - SequenceNumber prev __attribute__((__unused__)) = kMaxSequenceNumber; + SequenceNumber prev = kMaxSequenceNumber; for (const auto cur : *snapshots_) { assert(prev == kMaxSequenceNumber || prev <= cur); - if (cur >= in) { + if (cur >= in && (snapshot_checker_ == nullptr || + snapshot_checker_->IsInSnapshot(in, cur))) { *prev_snapshot = prev == kMaxSequenceNumber ? 0 : prev; return cur; } diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index cad238666..2556543f3 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -14,6 +14,7 @@ #include "db/merge_helper.h" #include "db/pinned_iterators_manager.h" #include "db/range_del_aggregator.h" +#include "db/snapshot_checker.h" #include "options/cf_options.h" #include "rocksdb/compaction_filter.h" @@ -59,7 +60,8 @@ class CompactionIterator { CompactionIterator(InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, Env* env, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, const Compaction* compaction = nullptr, @@ -71,7 +73,8 @@ class CompactionIterator { CompactionIterator(InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, Env* env, + SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, std::unique_ptr compaction, @@ -111,6 +114,9 @@ class CompactionIterator { // compression. void PrepareOutput(); + // Invoke compaction filter if needed. + void InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until); + // Given a sequence number, return the sequence number of the // earliest snapshot that this sequence number is visible in. // The snapshots themselves are arranged in ascending order of @@ -125,6 +131,7 @@ class CompactionIterator { MergeHelper* merge_helper_; const std::vector* snapshots_; const SequenceNumber earliest_write_conflict_snapshot_; + const SnapshotChecker* const snapshot_checker_; Env* env_; bool expect_valid_internal_key_; RangeDelAggregator* range_del_agg_; @@ -132,7 +139,7 @@ class CompactionIterator { const CompactionFilter* compaction_filter_; #ifndef ROCKSDB_LITE CompactionEventListener* compaction_listener_; -#endif // ROCKSDB_LITE +#endif // !ROCKSDB_LITE const std::atomic* shutting_down_; bool bottommost_level_; bool valid_ = false; @@ -189,6 +196,10 @@ class CompactionIterator { std::vector level_ptrs_; CompactionIterationStats iter_stats_; + // Used to avoid purging uncommitted values. The application can specify + // uncommitted values by providing a SnapshotChecker object. + bool current_key_committed_; + bool IsShuttingDown() { // This is a best-effort facility, so memory_order_relaxed is sufficient. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index dfc413936..1cca022eb 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -181,6 +181,8 @@ class CompactionIteratorTest : public testing::Test { compaction_proxy_ = new FakeCompaction(); compaction.reset(compaction_proxy_); } + // TODO(yiwu) add a mock snapshot checker and add test for it. + SnapshotChecker* snapshot_checker = nullptr; merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter, nullptr, false, 0, 0, nullptr, @@ -189,8 +191,9 @@ class CompactionIteratorTest : public testing::Test { iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, - kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(), - std::move(compaction), filter, nullptr, &shutting_down_)); + kMaxSequenceNumber, snapshot_checker, Env::Default(), false, + range_del_agg_.get(), std::move(compaction), filter, nullptr, + &shutting_down_)); } void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); } diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 0c4261ac2..c986b9a80 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -269,9 +269,9 @@ CompactionJob::CompactionJob( InstrumentedMutex* db_mutex, Status* db_bg_error, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, - std::shared_ptr table_cache, EventLogger* event_logger, - bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, - CompactionJobStats* compaction_job_stats) + const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, + EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, + const std::string& dbname, CompactionJobStats* compaction_job_stats) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -290,6 +290,7 @@ CompactionJob::CompactionJob( db_bg_error_(db_bg_error), existing_snapshots_(std::move(existing_snapshots)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), + snapshot_checker_(snapshot_checker), table_cache_(std::move(table_cache)), event_logger_(event_logger), paranoid_file_checks_(paranoid_file_checks), @@ -760,9 +761,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { Status status; sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), - &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false, - range_del_agg.get(), sub_compact->compaction, compaction_filter, - comp_event_listener, shutting_down_)); + &existing_snapshots_, earliest_write_conflict_snapshot_, + snapshot_checker_, env_, false, range_del_agg.get(), + sub_compact->compaction, compaction_filter, comp_event_listener, + shutting_down_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && diff --git a/db/compaction_job.h b/db/compaction_job.h index 22b35a5f3..0bb45b03a 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -45,12 +45,13 @@ namespace rocksdb { +class Arena; class MemTable; +class SnapshotChecker; class TableCache; class Version; class VersionEdit; class VersionSet; -class Arena; class CompactionJob { public: @@ -63,6 +64,7 @@ class CompactionJob { Status* db_bg_error, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, + const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, @@ -149,6 +151,8 @@ class CompactionJob { // should make sure not to remove evidence that a write occurred. SequenceNumber earliest_write_conflict_snapshot_; + const SnapshotChecker* const snapshot_checker_; + std::shared_ptr table_cache_; EventLogger* event_logger_; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index cace1814a..4012b183e 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -250,11 +250,14 @@ class CompactionJobTest : public testing::Test { LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); EventLogger event_logger(db_options_.info_log.get()); - CompactionJob compaction_job( - 0, &compaction, db_options_, env_options_, versions_.get(), - &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, &mutex_, - &bg_error_, snapshots, earliest_write_conflict_snapshot, table_cache_, - &event_logger, false, false, dbname_, &compaction_job_stats_); + // TODO(yiwu) add a mock snapshot checker and add test for it. + SnapshotChecker* snapshot_checker = nullptr; + CompactionJob compaction_job(0, &compaction, db_options_, env_options_, + versions_.get(), &shutting_down_, &log_buffer, + nullptr, nullptr, nullptr, &mutex_, &bg_error_, + snapshots, earliest_write_conflict_snapshot, + snapshot_checker, table_cache_, &event_logger, + false, false, dbname_, &compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); diff --git a/db/db_impl.h b/db/db_impl.h index 0b6146e19..7b47da824 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -29,6 +29,7 @@ #include "db/internal_stats.h" #include "db/log_writer.h" #include "db/read_callback.h" +#include "db/snapshot_checker.h" #include "db/snapshot_impl.h" #include "db/version_edit.h" #include "db/wal_manager.h" @@ -53,13 +54,13 @@ namespace rocksdb { +class Arena; class ArenaWrappedDBIter; class MemTable; class TableCache; class Version; class VersionEdit; class VersionSet; -class Arena; class WriteCallback; struct JobContext; struct ExternalSstFileInfo; @@ -573,6 +574,9 @@ class DBImpl : public DB { void AddToLogsToFreeQueue(log::Writer* log_writer) { logs_to_free_queue_.push_back(log_writer); } + + void SetSnapshotChecker(SnapshotChecker* snapshot_checker); + InstrumentedMutex* mutex() { return &mutex_; } Status NewDB(); @@ -1231,6 +1235,10 @@ class DBImpl : public DB { std::unordered_map prepared_section_completed_; std::mutex prep_heap_mutex_; + // Callback for compaction to check if a key is visible to a snapshot. + // REQUIRES: mutex held + std::unique_ptr snapshot_checker_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 178e886e3..7f7e23eaa 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -88,8 +88,8 @@ Status DBImpl::FlushMemTableToOutputFile( FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, - env_options_for_compaction_, versions_.get(), &mutex_, - &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, + env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker_.get(), job_context, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, @@ -534,10 +534,10 @@ Status DBImpl::CompactFilesImpl( assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, - env_options_for_compaction_, versions_.get(), &shutting_down_, - log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, - snapshot_seqs, earliest_write_conflict_snapshot, table_cache_, + env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer, + directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), + stats_, &mutex_, &bg_error_, snapshot_seqs, + earliest_write_conflict_snapshot, snapshot_checker_.get(), table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, nullptr); // Here we pass a nullptr for CompactionJobStats because @@ -1684,8 +1684,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, - &bg_error_, snapshot_seqs, - earliest_write_conflict_snapshot, table_cache_, &event_logger_, + &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker_.get(), table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats); @@ -1910,4 +1910,13 @@ void DBImpl::InstallSuperVersionAndScheduleWork( mutable_cf_options.write_buffer_size * mutable_cf_options.max_write_buffer_number; } + +void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) { + InstrumentedMutexLock l(&mutex_); + // snapshot_checker_ should only set once. If we need to set it multiple + // times, we need to make sure the old one is not deleted while it is still + // using by a compaction job. + assert(!snapshot_checker_); + snapshot_checker_.reset(snapshot_checker); +} } // namespace rocksdb diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 458106a0a..f78adb722 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -883,6 +883,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, SequenceNumber earliest_write_conflict_snapshot; std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); + // Only TransactionDB passes snapshot_checker and it creates it after db + // open. Just pass nullptr here. + SnapshotChecker* snapshot_checker = nullptr; s = BuildTable( dbname_, env_, *cfd->ioptions(), mutable_cf_options, @@ -890,7 +893,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, std::unique_ptr(mem->NewRangeTombstoneIterator(ro)), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), - snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), TableFileCreationReason::kRecovery, diff --git a/db/flush_job.cc b/db/flush_job.cc index 6c2478058..ad844be64 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -62,8 +62,9 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, std::atomic* shutting_down, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, - JobContext* job_context, LogBuffer* log_buffer, - Directory* db_directory, Directory* output_file_directory, + SnapshotChecker* snapshot_checker, JobContext* job_context, + LogBuffer* log_buffer, Directory* db_directory, + Directory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger, bool measure_io_stats) : dbname_(dbname), @@ -76,6 +77,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, shutting_down_(shutting_down), existing_snapshots_(std::move(existing_snapshots)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), + snapshot_checker_(snapshot_checker), job_context_(job_context), log_buffer_(log_buffer), db_directory_(db_directory), @@ -303,8 +305,8 @@ Status FlushJob::WriteLevel0Table() { std::move(range_del_iter), &meta_, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, - earliest_write_conflict_snapshot_, output_compression_, - cfd_->ioptions()->compression_opts, + earliest_write_conflict_snapshot_, snapshot_checker_, + output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, current_time); diff --git a/db/flush_job.h b/db/flush_job.h index 37275581a..81a8de306 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -43,6 +43,7 @@ namespace rocksdb { class MemTable; +class SnapshotChecker; class TableCache; class Version; class VersionEdit; @@ -56,15 +57,14 @@ class FlushJob { FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const MutableCFOptions& mutable_cf_options, - const EnvOptions env_options, - VersionSet* versions, InstrumentedMutex* db_mutex, - std::atomic* shutting_down, + const EnvOptions env_options, VersionSet* versions, + InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, - JobContext* job_context, LogBuffer* log_buffer, - Directory* db_directory, Directory* output_file_directory, - CompressionType output_compression, Statistics* stats, - EventLogger* event_logger, bool measure_io_stats); + SnapshotChecker* snapshot_checker, JobContext* job_context, + LogBuffer* log_buffer, Directory* db_directory, + Directory* output_file_directory, CompressionType output_compression, + Statistics* stats, EventLogger* event_logger, bool measure_io_stats); ~FlushJob(); @@ -90,6 +90,7 @@ class FlushJob { std::atomic* shutting_down_; std::vector existing_snapshots_; SequenceNumber earliest_write_conflict_snapshot_; + SnapshotChecker* snapshot_checker_; JobContext* job_context_; LogBuffer* log_buffer_; Directory* db_directory_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 9d55b7f73..47a935127 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -92,12 +92,12 @@ TEST_F(FlushJobTest, Empty) { JobContext job_context(0); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); EventLogger event_logger(db_options_.info_log.get()); - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &mutex_, - &shutting_down_, {}, kMaxSequenceNumber, &job_context, - nullptr, nullptr, nullptr, kNoCompression, nullptr, - &event_logger, false); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, + &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, + nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false); { InstrumentedMutexLock l(&mutex_); flush_job.PickMemTable(); @@ -137,12 +137,12 @@ TEST_F(FlushJobTest, NonEmpty) { } EventLogger event_logger(db_options_.info_log.get()); - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &mutex_, - &shutting_down_, {}, kMaxSequenceNumber, &job_context, - nullptr, nullptr, nullptr, kNoCompression, nullptr, - &event_logger, true); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, + &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, + nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, true); FileMetaData fd; mutex_.Lock(); flush_job.PickMemTable(); @@ -204,12 +204,13 @@ TEST_F(FlushJobTest, Snapshots) { } EventLogger event_logger(db_options_.info_log.get()); - FlushJob flush_job( - dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), env_options_, - versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, - &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, - &event_logger, true); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + env_options_, versions_.get(), &mutex_, &shutting_down_, + snapshots, kMaxSequenceNumber, snapshot_checker, + &job_context, nullptr, nullptr, nullptr, kNoCompression, + nullptr, &event_logger, true); mutex_.Lock(); flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); diff --git a/db/repair.cc b/db/repair.cc index 284efd4e3..8e21bead0 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -400,6 +400,10 @@ class Repairer { int64_t _current_time = 0; status = env_->GetCurrentTime(&_current_time); // ignore error const uint64_t current_time = static_cast(_current_time); + // Only TransactionDB make use of snapshot_checker and repair doesn't + // currently support TransactionDB with uncommitted prepared keys in WAL. + // TODO(yiwu) Support repairing TransactionDB. + SnapshotChecker* snapshot_checker = nullptr; status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), @@ -407,10 +411,11 @@ class Repairer { std::unique_ptr(mem->NewRangeTombstoneIterator(ro)), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), - {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, - nullptr /* internal_stats */, TableFileCreationReason::kRecovery, - nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, - nullptr /* table_properties */, -1 /* level */, current_time); + {}, kMaxSequenceNumber, snapshot_checker, kNoCompression, + CompressionOptions(), false, nullptr /* internal_stats */, + TableFileCreationReason::kRecovery, nullptr /* event_logger */, + 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */, + -1 /* level */, current_time); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/db/snapshot_checker.h b/db/snapshot_checker.h new file mode 100644 index 000000000..baea76e96 --- /dev/null +++ b/db/snapshot_checker.h @@ -0,0 +1,28 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include "rocksdb/types.h" + +namespace rocksdb { + +class WritePreparedTxnDB; + +// Callback class created by WritePreparedTxnDB to check if a key +// is visible by a snapshot. +class SnapshotChecker { + public: + explicit SnapshotChecker(WritePreparedTxnDB* txn_db); + + bool IsInSnapshot(SequenceNumber sequence, + SequenceNumber snapshot_sequence) const; + + private: +#ifndef ROCKSDB_LITE + const WritePreparedTxnDB* const txn_db_; +#endif // !ROCKSDB_LITE +}; + +} // namespace rocksdb diff --git a/src.mk b/src.mk index af0bb4b3f..ee5135830 100644 --- a/src.mk +++ b/src.mk @@ -166,7 +166,7 @@ LIB_SOURCES = \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/convenience/info_log_finder.cc \ utilities/date_tiered/date_tiered_db_impl.cc \ - utilities/debug.cc \ + utilities/debug.cc \ utilities/document/document_db.cc \ utilities/document/json_document.cc \ utilities/document/json_document_builder.cc \ @@ -193,14 +193,15 @@ LIB_SOURCES = \ utilities/spatialdb/spatial_db.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ - utilities/transactions/optimistic_transaction.cc \ + utilities/transactions/optimistic_transaction.cc \ + utilities/transactions/pessimistic_transaction.cc \ + utilities/transactions/pessimistic_transaction_db.cc \ + utilities/transactions/snapshot_checker.cc \ utilities/transactions/transaction_base.cc \ - utilities/transactions/pessimistic_transaction_db.cc \ utilities/transactions/transaction_db_mutex_impl.cc \ - utilities/transactions/pessimistic_transaction.cc \ utilities/transactions/transaction_lock_mgr.cc \ utilities/transactions/transaction_util.cc \ - utilities/transactions/write_prepared_txn.cc \ + utilities/transactions/write_prepared_txn.cc \ utilities/ttl/db_ttl_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \ diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 31aa629fa..4cc2f0915 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -147,6 +147,9 @@ Status WritePreparedTxnDB::Initialize( SequenceNumber prev_max = max_evicted_seq_; SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); AdvanceMaxEvictedSeq(prev_max, last_seq); + + db_impl_->SetSnapshotChecker(new SnapshotChecker(this)); + auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, handles); return s; @@ -573,7 +576,11 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, const Slice& key, PinnableSlice* value) { // We are fine with the latest committed value. This could be done by // specifying the snapshot as kMaxSequenceNumber. - WritePreparedTxnReadCallback callback(this, kMaxSequenceNumber); + SequenceNumber seq = kMaxSequenceNumber; + if (options.snapshot != nullptr) { + seq = options.snapshot->GetSequenceNumber(); + } + WritePreparedTxnReadCallback callback(this, seq); bool* dont_care = nullptr; // Note: no need to specify a snapshot for read options as no specific // snapshot is requested by the user. @@ -581,9 +588,20 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options, &callback); } +void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) { + // Adcance max_evicted_seq_ no more than 100 times before the cache wraps + // around. + INC_STEP_FOR_MAX_EVICTED = + std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); + snapshot_cache_ = unique_ptr[]>( + new std::atomic[SNAPSHOT_CACHE_SIZE] {}); + commit_cache_ = unique_ptr[]>( + new std::atomic[COMMIT_CACHE_SIZE] {}); +} + // Returns true if commit_seq <= snapshot_seq bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, - uint64_t snapshot_seq) { + uint64_t snapshot_seq) const { // Here we try to infer the return value without looking into prepare list. // This would help avoiding synchronization over a shared map. // TODO(myabandeh): read your own writes @@ -734,7 +752,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, - CommitEntry* entry) { + CommitEntry* entry) const { *entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire); bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT); return valid; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index f0eb87857..9e9d65849 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -14,6 +14,7 @@ #include #include "db/read_callback.h" +#include "db/snapshot_checker.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" @@ -174,7 +175,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { COMMIT_CACHE_BITS(commit_cache_bits), COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), FORMAT(COMMIT_CACHE_BITS) { - init(txn_db_options); + Init(txn_db_options); } explicit WritePreparedTxnDB( @@ -187,7 +188,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { COMMIT_CACHE_BITS(commit_cache_bits), COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), FORMAT(COMMIT_CACHE_BITS) { - init(txn_db_options); + Init(txn_db_options); } virtual ~WritePreparedTxnDB() {} @@ -207,7 +208,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Check whether the transaction that wrote the value with seqeunce number seq // is visible to the snapshot with sequence number snapshot_seq - bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq); + bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const; // Add the trasnaction with prepare sequence seq to the prepared list void AddPrepared(uint64_t seq); // Rollback a prepared txn identified with prep_seq. rollback_seq is the seq @@ -312,16 +313,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; - void init(const TransactionDBOptions& /* unused */) { - // Adcance max_evicted_seq_ no more than 100 times before the cache wraps - // around. - INC_STEP_FOR_MAX_EVICTED = - std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); - snapshot_cache_ = unique_ptr[]>( - new std::atomic[SNAPSHOT_CACHE_SIZE] {}); - commit_cache_ = unique_ptr[]>( - new std::atomic[COMMIT_CACHE_SIZE] {}); - } + void Init(const TransactionDBOptions& /* unused */); // A heap with the amortized O(1) complexity for erase. It uses one extra heap // to keep track of erased entries that are not yet on top of the main heap. @@ -363,7 +355,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Get the commit entry with index indexed_seq from the commit table. It // returns true if such entry exists. bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, - CommitEntry* entry); + CommitEntry* entry) const; // Rewrite the entry with the index indexed_seq in the commit table with the // commit entry . If the rewrite results into eviction, @@ -467,10 +459,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { std::atomic delayed_prepared_empty_ = {true}; // Update when old_commit_map_.empty() changes. Expected to be true normally. std::atomic old_commit_map_empty_ = {true}; - port::RWMutex prepared_mutex_; - port::RWMutex old_commit_map_mutex_; - port::RWMutex commit_cache_mutex_; - port::RWMutex snapshots_mutex_; + mutable port::RWMutex prepared_mutex_; + mutable port::RWMutex old_commit_map_mutex_; + mutable port::RWMutex commit_cache_mutex_; + mutable port::RWMutex snapshots_mutex_; }; class WritePreparedTxnReadCallback : public ReadCallback { diff --git a/utilities/transactions/snapshot_checker.cc b/utilities/transactions/snapshot_checker.cc new file mode 100644 index 000000000..724f7430a --- /dev/null +++ b/utilities/transactions/snapshot_checker.cc @@ -0,0 +1,37 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/snapshot_checker.h" + +#ifdef ROCKSDB_LITE +#include +#endif // ROCKSDB_LITE + +#include "utilities/transactions/pessimistic_transaction_db.h" + +namespace rocksdb { + +#ifdef ROCKSDB_LITE +SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db) {} + +bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence, + SequenceNumber snapshot_sequence) const { + // Should never be called in LITE mode. + assert(false); + return true; +} + +#else + +SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db) + : txn_db_(txn_db){}; + +bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence, + SequenceNumber snapshot_sequence) const { + return txn_db_->IsInSnapshot(sequence, snapshot_sequence); +} +#endif // ROCKSDB_LITE + +} // namespace rocksdb diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 9ea46edb4..8cae885aa 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -18,8 +18,10 @@ #include #include "db/db_impl.h" +#include "db/dbformat.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "rocksdb/types.h" #include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" @@ -602,6 +604,9 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) { WriteOptions wopts; FlushOptions fopt; + options.disable_auto_compactions = true; + ReOpen(); + // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some // of the branches. This is the same as counting a binary number where i-th // bit represents whether we take branch i in the represented by the number. @@ -1363,6 +1368,274 @@ TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) { db->ReleaseSnapshot(snapshot); } +// Compaction should not remove a key if it is not committed, and should +// proceed with older versions of the key as-if the new version doesn't exist. +TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { + options.disable_auto_compactions = true; + ReOpen(); + // Snapshots to avoid keys get evicted. + std::vector snapshots; + // Keep track of expected sequence number. + SequenceNumber expected_seq = 0; + + auto add_key = [&](std::function func) { + ASSERT_OK(func()); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + snapshots.push_back(db->GetSnapshot()); + }; + + // Each key here represent a standalone test case. + add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); }); + add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); }); + add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); }); + add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); }); + add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); }); + add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); }); + add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); }); + add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); }); + ASSERT_OK(db->Flush(FlushOptions())); + add_key([&]() { return db->Delete(WriteOptions(), "key6"); }); + add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); }); + + auto* transaction = db->BeginTransaction(WriteOptions()); + ASSERT_OK(transaction->SetName("txn")); + ASSERT_OK(transaction->Put("key1", "value1_2")); + ASSERT_OK(transaction->Delete("key2")); + ASSERT_OK(transaction->SingleDelete("key3")); + ASSERT_OK(transaction->Merge("key4", "value4_2")); + ASSERT_OK(transaction->Merge("key5", "value5_3")); + ASSERT_OK(transaction->Put("key6", "value6_2")); + ASSERT_OK(transaction->Put("key7", "value7_2")); + // Prepare but not commit. + ASSERT_OK(transaction->Prepare()); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + ASSERT_OK(db->Flush(FlushOptions())); + for (auto* s : snapshots) { + db->ReleaseSnapshot(s); + } + // Dummy keys to avoid compaction trivially move files and get around actual + // compaction logic. + ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); + ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + VerifyKeys({ + {"key1", "value1_1"}, + {"key2", "value2_1"}, + {"key3", "value3_1"}, + {"key4", "value4_1"}, + {"key5", "value5_1,value5_2"}, + {"key6", "NOT_FOUND"}, + {"key7", "NOT_FOUND"}, + }); + VerifyInternalKeys({ + {"key1", "value1_2", expected_seq, kTypeValue}, + {"key1", "value1_1", 0, kTypeValue}, + {"key2", "", expected_seq, kTypeDeletion}, + {"key2", "value2_1", 0, kTypeValue}, + {"key3", "", expected_seq, kTypeSingleDeletion}, + {"key3", "value3_1", 0, kTypeValue}, + {"key4", "value4_2", expected_seq, kTypeMerge}, + {"key4", "value4_1", 0, kTypeValue}, + {"key5", "value5_3", expected_seq, kTypeMerge}, + {"key5", "value5_1,value5_2", 0, kTypeValue}, + {"key6", "value6_2", expected_seq, kTypeValue}, + {"key7", "value7_2", expected_seq, kTypeValue}, + }); + ASSERT_OK(transaction->Commit()); + VerifyKeys({ + {"key1", "value1_2"}, + {"key2", "NOT_FOUND"}, + {"key3", "NOT_FOUND"}, + {"key4", "value4_1,value4_2"}, + {"key5", "value5_1,value5_2,value5_3"}, + {"key6", "value6_2"}, + {"key7", "value7_2"}, + }); + delete transaction; +} + +// Compaction should keep keys visible to a snapshot based on commit sequence, +// not just prepare sequence. +TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { + options.disable_auto_compactions = true; + ReOpen(); + // Keep track of expected sequence number. + SequenceNumber expected_seq = 0; + auto* txn1 = db->BeginTransaction(WriteOptions()); + ASSERT_OK(txn1->SetName("txn1")); + ASSERT_OK(txn1->Put("key1", "value1_1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + ASSERT_OK(txn1->Commit()); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + delete txn1; + // Take a snapshots to avoid keys get evicted before compaction. + const Snapshot* snapshot1 = db->GetSnapshot(); + auto* txn2 = db->BeginTransaction(WriteOptions()); + ASSERT_OK(txn2->SetName("txn2")); + ASSERT_OK(txn2->Put("key2", "value2_1")); + ASSERT_OK(txn2->Prepare()); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + // txn1 commit before snapshot2 and it is visible to snapshot2. + // txn2 commit after snapshot2 and it is not visible. + const Snapshot* snapshot2 = db->GetSnapshot(); + ASSERT_OK(txn2->Commit()); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + delete txn2; + // Take a snapshots to avoid keys get evicted before compaction. + const Snapshot* snapshot3 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + SequenceNumber seq1 = expected_seq; + ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + SequenceNumber seq2 = expected_seq; + ASSERT_OK(db->Flush(FlushOptions())); + db->ReleaseSnapshot(snapshot1); + db->ReleaseSnapshot(snapshot3); + // Dummy keys to avoid compaction trivially move files and get around actual + // compaction logic. + ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); + ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}}); + VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2); + VerifyInternalKeys({ + {"key1", "value1_2", seq1, kTypeValue}, + // "value1_1" is visible to snapshot2. Also keys at bottom level visible + // to earliest snapshot will output with seq = 0. + {"key1", "value1_1", 0, kTypeValue}, + {"key2", "value2_2", seq2, kTypeValue}, + }); + db->ReleaseSnapshot(snapshot2); +} + +// A more complex test to verify compaction/flush should keep keys visible +// to snapshots. +TEST_P(WritePreparedTransactionTest, + DISABLED_CompactionShouldKeepSnapshotVisibleKeysRandomized) { + constexpr size_t kNumTransactions = 10; + constexpr size_t kNumIterations = 1000; + + std::vector transactions(kNumTransactions, nullptr); + std::vector versions(kNumTransactions, 0); + std::unordered_map current_data; + std::vector snapshots; + std::vector> snapshot_data; + + Random rnd(1103); + options.disable_auto_compactions = true; + ReOpen(); + + for (size_t i = 0; i < kNumTransactions; i++) { + std::string key = "key" + ToString(i); + std::string value = "value0"; + ASSERT_OK(db->Put(WriteOptions(), key, value)); + current_data[key] = value; + } + VerifyKeys(current_data); + + for (size_t iter = 0; iter < kNumIterations; iter++) { + auto r = rnd.Next() % (kNumTransactions + 1); + if (r < kNumTransactions) { + std::string key = "key" + ToString(r); + if (transactions[r] == nullptr) { + std::string value = "value" + ToString(versions[r] + 1); + auto* txn = db->BeginTransaction(WriteOptions()); + ASSERT_OK(txn->SetName("txn" + ToString(r))); + ASSERT_OK(txn->Put(key, value)); + ASSERT_OK(txn->Prepare()); + transactions[r] = txn; + } else { + std::string value = "value" + ToString(++versions[r]); + ASSERT_OK(transactions[r]->Commit()); + delete transactions[r]; + transactions[r] = nullptr; + current_data[key] = value; + } + } else { + auto* snapshot = db->GetSnapshot(); + VerifyKeys(current_data, snapshot); + snapshots.push_back(snapshot); + snapshot_data.push_back(current_data); + } + VerifyKeys(current_data); + } + // Take a last snapshot to test compaction with uncommitted prepared + // transaction. + snapshots.push_back(db->GetSnapshot()); + snapshot_data.push_back(current_data); + + assert(snapshots.size() == snapshot_data.size()); + for (size_t i = 0; i < snapshots.size(); i++) { + VerifyKeys(snapshot_data[i], snapshots[i]); + } + ASSERT_OK(db->Flush(FlushOptions())); + for (size_t i = 0; i < snapshots.size(); i++) { + VerifyKeys(snapshot_data[i], snapshots[i]); + } + // Dummy keys to avoid compaction trivially move files and get around actual + // compaction logic. + ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); + ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + for (size_t i = 0; i < snapshots.size(); i++) { + VerifyKeys(snapshot_data[i], snapshots[i]); + } + // cleanup + for (size_t i = 0; i < kNumTransactions; i++) { + if (transactions[i] == nullptr) { + continue; + } + ASSERT_OK(transactions[i]->Commit()); + delete transactions[i]; + } + for (size_t i = 0; i < snapshots.size(); i++) { + db->ReleaseSnapshot(snapshots[i]); + } +} + +// Compaction should not apply the optimization to output key with sequence +// number equal to 0 if the key is not visible to earliest snapshot, based on +// commit sequence number. +TEST_P(WritePreparedTransactionTest, + CompactionShouldKeepSequenceForUncommittedKeys) { + options.disable_auto_compactions = true; + ReOpen(); + // Keep track of expected sequence number. + SequenceNumber expected_seq = 0; + auto* transaction = db->BeginTransaction(WriteOptions()); + ASSERT_OK(transaction->SetName("txn")); + ASSERT_OK(transaction->Put("key1", "value1")); + ASSERT_OK(transaction->Prepare()); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + SequenceNumber seq1 = expected_seq; + ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); + ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + ASSERT_OK(db->Flush(FlushOptions())); + // Dummy keys to avoid compaction trivially move files and get around actual + // compaction logic. + ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); + ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + VerifyKeys({ + {"key1", "NOT_FOUND"}, + {"key2", "value2"}, + }); + VerifyInternalKeys({ + // "key1" has not been committed. It keeps its sequence number. + {"key1", "value1", seq1, kTypeValue}, + // "key2" is committed and output with seq = 0. + {"key2", "value2", 0, kTypeValue}, + }); + ASSERT_OK(transaction->Commit()); + VerifyKeys({ + {"key1", "value1"}, + {"key2", "value2"}, + }); + delete transaction; +} + } // namespace rocksdb int main(int argc, char** argv) {