From 857adf388fd1de81b8749bf1e5fe20edf6f8a8c8 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Fri, 10 Nov 2017 17:18:01 -0800 Subject: [PATCH] WritePrepared Txn: Refactor conf params Summary: Summary of changes: - Move seq_per_batch out of Options - Rename concurrent_prepare to two_write_queues - Add allocate_seq_only_for_data_ Closes https://github.com/facebook/rocksdb/pull/3136 Differential Revision: D6304458 Pulled By: maysamyabandeh fbshipit-source-id: 08e685bfa82bbc41b5b1c5eb7040a8ca6e05e58c --- db/compaction_job_test.cc | 2 +- db/db_impl.cc | 63 +++++++++++-------- db/db_impl.h | 46 ++++++++++---- db/db_impl_debug.cc | 8 +-- db/db_impl_files.cc | 4 +- db/db_impl_open.cc | 28 ++++++--- db/db_impl_write.cc | 42 ++++++------- db/db_test_util.cc | 2 +- db/db_wal_test.cc | 2 +- db/external_sst_file_ingestion_job.cc | 4 +- db/repair.cc | 2 +- db/transaction_log_impl.cc | 14 +++-- db/transaction_log_impl.h | 5 +- db/version_set.cc | 20 +++--- db/version_set.h | 22 +++---- db/wal_manager.cc | 2 +- db/wal_manager.h | 7 ++- db/wal_manager_test.cc | 2 +- db/write_callback_test.cc | 5 +- include/rocksdb/options.h | 12 +--- options/db_options.cc | 10 ++- options/db_options.h | 3 +- options/options_helper.h | 14 ++--- options/options_settable_test.cc | 1 + .../pessimistic_transaction_db.cc | 7 +-- utilities/transactions/transaction_test.cc | 20 +++--- utilities/transactions/transaction_test.h | 17 +++-- .../write_prepared_transaction_test.cc | 26 ++++---- utilities/transactions/write_prepared_txn.cc | 2 +- utilities/transactions/write_prepared_txn.h | 6 +- 30 files changed, 215 insertions(+), 183 deletions(-) diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 9e0da91c5..85d25202c 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -143,7 +143,7 @@ class CompactionJobTest : public testing::Test { } void SetLastSequence(const SequenceNumber sequence_number) { - versions_->SetLastToBeWrittenSequence(sequence_number + 1); + versions_->SetLastAllocatedSequence(sequence_number + 1); versions_->SetLastSequence(sequence_number + 1); } diff --git a/db/db_impl.cc b/db/db_impl.cc index ad77d99bf..35383aada 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -136,7 +136,8 @@ void DumpSupportInfo(Logger* logger) { int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024; } // namespace -DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) +DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, + const bool seq_per_batch) : env_(options.env), dbname_(dbname), initial_db_options_(SanitizeOptions(dbname, options)), @@ -185,18 +186,30 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) env_options_, immutable_db_options_)), num_running_ingest_file_(0), #ifndef ROCKSDB_LITE - wal_manager_(immutable_db_options_, env_options_), + wal_manager_(immutable_db_options_, env_options_, seq_per_batch), #endif // ROCKSDB_LITE event_logger_(immutable_db_options_.info_log.get()), bg_work_paused_(0), bg_compaction_paused_(0), refitting_level_(false), opened_successfully_(false), - concurrent_prepare_(options.concurrent_prepare), + two_write_queues_(options.two_write_queues), manual_wal_flush_(options.manual_wal_flush), - seq_per_batch_(options.seq_per_batch), - // TODO(myabandeh): revise this when we change options.seq_per_batch - use_custom_gc_(options.seq_per_batch), + seq_per_batch_(seq_per_batch), + // When two_write_queues_ and seq_per_batch_ are both enabled we + // sometimes allocate a seq also to indicate the commit timestmamp of a + // transaction. In such cases last_sequence_ would not indicate the last + // visible sequence number in memtable and should not be used for + // snapshots. It should use last_allocated_sequence_ instaed but also + // needs other mechanisms to exclude the data that after last_sequence_ + // and before last_allocated_sequence_ from the snapshot. In + // WritePreparedTxn this property is ensured since such data are not + // committed yet. + allocate_seq_only_for_data_(!(seq_per_batch && options.two_write_queues)), + // Since seq_per_batch_ is currently set only by WritePreparedTxn which + // requires a custom gc for compaction, we use that to set use_custom_gc_ + // as well. + use_custom_gc_(seq_per_batch), preserve_deletes_(options.preserve_deletes) { env_->GetAbsolutePath(dbname, &db_absolute_path_); @@ -751,7 +764,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { } SequenceNumber DBImpl::IncAndFetchSequenceNumber() { - return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull; + return versions_->FetchAddLastAllocatedSequence(1ull) + 1ull; } bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) { @@ -977,9 +990,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // super versipon because a flush happening in between may compact // away data for the snapshot, but the snapshot is earlier than the // data overwriting it, so users may see wrong results. - snapshot = concurrent_prepare_ && seq_per_batch_ - ? versions_->LastToBeWrittenSequence() - : versions_->LastSequence(); + snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence() + : versions_->LastAllocatedSequence(); } TEST_SYNC_POINT("DBImpl::GetImpl:3"); TEST_SYNC_POINT("DBImpl::GetImpl:4"); @@ -1070,9 +1082,8 @@ std::vector DBImpl::MultiGet( snapshot = reinterpret_cast( read_options.snapshot)->number_; } else { - snapshot = concurrent_prepare_ && seq_per_batch_ - ? versions_->LastToBeWrittenSequence() - : versions_->LastSequence(); + snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence() + : versions_->LastAllocatedSequence(); } for (auto mgd_iter : multiget_cf_data) { mgd_iter.second->super_version = @@ -1478,8 +1489,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, read_callback); #endif } else { - // Note: no need to consider the special case of concurrent_prepare_ && - // seq_per_batch_ since NewIterator is overridden in WritePreparedTxnDB + // Note: no need to consider the special case of + // allocate_seq_only_for_data_==false since NewIterator is overridden in + // WritePreparedTxnDB auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : versions_->LastSequence(); @@ -1595,8 +1607,9 @@ Status DBImpl::NewIterators( } #endif } else { - // Note: no need to consider the special case of concurrent_prepare_ && - // seq_per_batch_ since NewIterators is overridden in WritePreparedTxnDB + // Note: no need to consider the special case of + // allocate_seq_only_for_data_==false since NewIterators is overridden in + // WritePreparedTxnDB auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : versions_->LastSequence(); @@ -1630,9 +1643,9 @@ const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { delete s; return nullptr; } - auto snapshot_seq = concurrent_prepare_ && seq_per_batch_ - ? versions_->LastToBeWrittenSequence() - : versions_->LastSequence(); + auto snapshot_seq = allocate_seq_only_for_data_ + ? versions_->LastSequence() + : versions_->LastAllocatedSequence(); return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); } @@ -1643,9 +1656,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { snapshots_.Delete(casted_s); uint64_t oldest_snapshot; if (snapshots_.empty()) { - oldest_snapshot = concurrent_prepare_ && seq_per_batch_ - ? versions_->LastToBeWrittenSequence() - : versions_->LastSequence(); + oldest_snapshot = allocate_seq_only_for_data_ + ? versions_->LastSequence() + : versions_->LastAllocatedSequence(); } else { oldest_snapshot = snapshots_.oldest()->number_; } @@ -2753,7 +2766,7 @@ Status DBImpl::IngestExternalFile( WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); WriteThread::Writer nonmem_w; - if (concurrent_prepare_) { + if (two_write_queues_) { nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } @@ -2796,7 +2809,7 @@ Status DBImpl::IngestExternalFile( } // Resume writes to the DB - if (concurrent_prepare_) { + if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } write_thread_.ExitUnbatched(&w); diff --git a/db/db_impl.h b/db/db_impl.h index f26e54344..a7eddba24 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -68,7 +68,8 @@ struct MemTableInfo; class DBImpl : public DB { public: - DBImpl(const DBOptions& options, const std::string& dbname); + DBImpl(const DBOptions& options, const std::string& dbname, + const bool seq_per_batch = false); virtual ~DBImpl(); // Implementations of the DB interface @@ -220,10 +221,10 @@ class DBImpl : public DB { virtual SequenceNumber GetLatestSequenceNumber() const override; virtual SequenceNumber IncAndFetchSequenceNumber(); - // Returns LastToBeWrittenSequence in concurrent_prepare_ && seq_per_batch_ - // mode and LastSequence otherwise. This is useful when visiblility depends - // also on data written to the WAL but not to the memtable. - SequenceNumber TEST_GetLatestVisibleSequenceNumber() const; + // Returns LastSequence in allocate_seq_only_for_data_ + // mode and LastAllocatedSequence otherwise. This is useful when visiblility + // depends also on data written to the WAL but not to the memtable. + SequenceNumber TEST_GetLastVisibleSequence() const; virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override; @@ -606,6 +607,12 @@ class DBImpl : public DB { Status NewDB(); + // This is to be used only by internal rocksdb classes. + static Status Open(const DBOptions& db_options, const std::string& name, + const std::vector& column_families, + std::vector* handles, DB** dbptr, + const bool seq_per_batch); + protected: Env* const env_; const std::string dbname_; @@ -905,12 +912,12 @@ class DBImpl : public DB { FileLock* db_lock_; // In addition to mutex_, log_write_mutex_ protected writes to logs_ and - // logfile_number_. With concurrent_prepare it also protects alive_log_files_, + // logfile_number_. With two_write_queues it also protects alive_log_files_, // and log_empty_. Refer to the definition of each variable below for more // details. InstrumentedMutex log_write_mutex_; // State below is protected by mutex_ - // With concurrent_prepare enabled, some of the variables that accessed during + // With two_write_queues enabled, some of the variables that accessed during // WriteToWAL need different synchronization: log_empty_, alive_log_files_, // logs_, logfile_number_. Refer to the definition of each variable below for // more description. @@ -935,10 +942,10 @@ class DBImpl : public DB { std::deque log_recycle_files; // a list of log files that we can recycle bool log_dir_synced_; - // Without concurrent_prepare, read and writes to log_empty_ are protected by + // Without two_write_queues, read and writes to log_empty_ are protected by // mutex_. Since it is currently updated/read only in write_thread_, it can be // accessed from the same write_thread_ without any locks. With - // concurrent_prepare writes, where it can be updated in different threads, + // two_write_queues writes, where it can be updated in different threads, // read and writes are protected by log_write_mutex_ instead. This is to avoid // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; @@ -975,10 +982,10 @@ class DBImpl : public DB { // true for some prefix of logs_ bool getting_synced = false; }; - // Without concurrent_prepare, read and writes to alive_log_files_ are + // Without two_write_queues, read and writes to alive_log_files_ are // protected by mutex_. However since back() is never popped, and push_back() // is done only from write_thread_, the same thread can access the item - // reffered by back() without mutex_. With concurrent_prepare_, writes + // reffered by back() without mutex_. With two_write_queues_, writes // are protected by locking both mutex_ and log_write_mutex_, and reads must // be under either mutex_ or log_write_mutex_. std::deque alive_log_files_; @@ -1003,7 +1010,7 @@ class DBImpl : public DB { // memtable on normal writes and hence improving the throughput. Each new // write of the state will replace the previous state entirely even if the // keys in the two consecuitive states do not overlap. - // It is protected by log_write_mutex_ when concurrent_prepare_ is enabled. + // It is protected by log_write_mutex_ when two_write_queues_ is enabled. // Otherwise only the heaad of write_thread_ can access it. WriteBatch cached_recoverable_state_; std::atomic cached_recoverable_state_empty_ = {true}; @@ -1322,9 +1329,22 @@ class DBImpl : public DB { // When set, we use a seprate queue for writes that dont write to memtable. In // 2PC these are the writes at Prepare phase. - const bool concurrent_prepare_; + const bool two_write_queues_; const bool manual_wal_flush_; + // Increase the sequence number after writing each batch, whether memtable is + // disabled for that or not. Otherwise the sequence number is increased after + // writing each key into memtable. This implies that when disable_memtable is + // set, the seq is not increased at all. + // + // Default: false const bool seq_per_batch_; + // A sequence number is allocated only for data written to DB. Otherwise it + // could also be allocated for operational purposes such as commit timestamp + // of a transaction. + const bool allocate_seq_only_for_data_; + // It indicates that a customized gc algorithm must be used for + // flush/compaction and if it is not provided vis SnapshotChecker, we should + // disable gc to be safe. const bool use_custom_gc_; // Clients must periodically call SetPreserveDeletesSequenceNumber() diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index e45fc6fcd..8a88b9020 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -209,11 +209,11 @@ int DBImpl::TEST_BGFlushesAllowed() const { return GetBGJobLimits().max_flushes; } -SequenceNumber DBImpl::TEST_GetLatestVisibleSequenceNumber() const { - if (concurrent_prepare_ && seq_per_batch_) { - return versions_->LastToBeWrittenSequence(); - } else { +SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const { + if (allocate_seq_only_for_data_) { return versions_->LastSequence(); + } else { + return versions_->LastAllocatedSequence(); } } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 66030fd7e..a854091f6 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -252,11 +252,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } job_context->size_log_to_delete += earliest.size; total_log_size_ -= earliest.size; - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Lock(); } alive_log_files_.pop_front(); - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Unlock(); } // Current log should always stay alive since it can't have diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 5186c90ab..24e6e262f 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -592,9 +592,10 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // happen when we open and write to a corrupted DB, where sequence id // will start from the last sequence id we recovered. if (sequence == *next_sequence || - // With seq_per_batch_, if previous run was with concurrent_prepare_ - // then gap in the sequence numbers is expected by the commits - // without prepares. + // With seq_per_batch_, if previous run was with two_write_queues_ + // then allocate_seq_only_for_data_ was disabled and a gap in the + // sequence numbers in the log is expected by the commits without + // prepares. (seq_per_batch_ && sequence >= *next_sequence)) { stop_replay_for_corruption = false; } @@ -754,7 +755,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, auto last_sequence = *next_sequence - 1; if ((*next_sequence != kMaxSequenceNumber) && (versions_->LastSequence() <= last_sequence)) { - versions_->SetLastToBeWrittenSequence(last_sequence); + versions_->SetLastAllocatedSequence(last_sequence); versions_->SetLastSequence(last_sequence); } } @@ -845,13 +846,13 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, if (data_seen && !flushed) { // Mark these as alive so they'll be considered for deletion later by // FindObsoleteFiles() - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Lock(); } for (auto log_number : log_numbers) { alive_log_files_.push_back(LogFileNumberSize(log_number)); } - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Unlock(); } } @@ -967,6 +968,15 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr) { + const bool seq_per_batch = true; + return DBImpl::Open(db_options, dbname, column_families, handles, dbptr, + !seq_per_batch); +} + +Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, + const std::vector& column_families, + std::vector* handles, DB** dbptr, + const bool seq_per_batch) { Status s = SanitizeOptionsByTable(db_options, column_families); if (!s.ok()) { return s; @@ -986,7 +996,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, std::max(max_write_buffer_size, cf.options.write_buffer_size); } - DBImpl* impl = new DBImpl(db_options, dbname); + DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch); s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir); if (s.ok()) { for (auto db_path : impl->immutable_db_options_.db_paths) { @@ -1073,12 +1083,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); } sv_context.Clean(); - if (impl->concurrent_prepare_) { + if (impl->two_write_queues_) { impl->log_write_mutex_.Lock(); } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); - if (impl->concurrent_prepare_) { + if (impl->two_write_queues_) { impl->log_write_mutex_.Unlock(); } impl->DeleteObsoleteFiles(); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index decbf215f..51434ba8c 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -67,7 +67,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (write_options.sync && write_options.disableWAL) { return Status::InvalidArgument("Sync writes has to enable WAL."); } - if (concurrent_prepare_ && immutable_db_options_.enable_pipelined_write) { + if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) { return Status::NotSupported( "pipelined_writes is not compatible with concurrent prepares"); } @@ -87,7 +87,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } - if (concurrent_prepare_ && disable_memtable) { + if (two_write_queues_ && disable_memtable) { return WriteImplWALOnly(write_options, my_batch, callback, log_used, log_ref, seq_used); } @@ -154,7 +154,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteThread::WriteGroup write_group; bool in_parallel_group = false; uint64_t last_sequence = kMaxSequenceNumber; - if (!concurrent_prepare_) { + if (!two_write_queues_) { last_sequence = versions_->LastSequence(); } @@ -162,7 +162,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_sync = write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; - if (!concurrent_prepare_ || !disable_memtable) { + if (!two_write_queues_ || !disable_memtable) { // With concurrent writes we do preprocess only in the write thread that // also does write to memtable to avoid sync issue on shared data structure // with the other thread @@ -209,7 +209,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } size_t seq_inc = seq_per_batch_ ? write_group.size : total_count; - const bool concurrent_update = concurrent_prepare_; + const bool concurrent_update = two_write_queues_; // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully @@ -237,7 +237,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); - if (!concurrent_prepare_) { + if (!two_write_queues_) { if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, @@ -246,13 +246,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } else { if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); - // LastToBeWrittenSequence is increased inside WriteToWAL under + // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); } else { // Otherwise we inc seq number for memtable writes - last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc); + last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); } } assert(last_sequence != kMaxSequenceNumber); @@ -310,9 +310,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); mutex_.Unlock(); - // Requesting sync with concurrent_prepare_ is expected to be very rare. We + // Requesting sync with two_write_queues_ is expected to be very rare. We // hance provide a simple implementation that is not necessarily efficient. - if (concurrent_prepare_) { + if (two_write_queues_) { if (manual_wal_flush_) { status = FlushWAL(true); } else { @@ -532,7 +532,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_wal_time); - // LastToBeWrittenSequence is increased inside WriteToWAL under + // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/; status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); @@ -548,7 +548,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, } } if (status.ok() && write_options.sync) { - // Requesting sync with concurrent_prepare_ is expected to be very rare. We + // Requesting sync with two_write_queues_ is expected to be very rare. We // hance provide a simple implementation that is not necessarily efficient. if (manual_wal_flush_) { status = FlushWAL(true); @@ -719,7 +719,7 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, return merged_batch; } -// When concurrent_prepare_ is disabled, this function is called from the only +// When two_write_queues_ is disabled, this function is called from the only // write thread. Otherwise this must be called holding log_write_mutex_. Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, @@ -828,7 +828,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, writer->log_used = logfile_number_; } } - *last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc); + *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); auto sequence = *last_sequence + 1; WriteBatchInternal::SetSequence(merged_batch, sequence); @@ -858,7 +858,7 @@ Status DBImpl::WriteRecoverableState() { if (!cached_recoverable_state_empty_) { bool dont_care_bool; SequenceNumber next_seq; - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Lock(); } SequenceNumber seq = versions_->LastSequence(); @@ -869,7 +869,7 @@ Status DBImpl::WriteRecoverableState() { false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, seq_per_batch_); versions_->SetLastSequence(--next_seq); - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Unlock(); } if (status.ok()) { @@ -1109,7 +1109,7 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd, Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); WriteThread::Writer nonmem_w; - if (concurrent_prepare_) { + if (two_write_queues_) { // SwitchMemtable is a rare event. To simply the reasoning, we make sure // that there is no concurrent thread writing to WAL. nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); @@ -1135,11 +1135,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { // Attempt to switch to a new memtable and trigger flush of old. // Do this without holding the dbmutex lock. assert(versions_->prev_log_number() == 0); - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Lock(); } bool creating_new_log = !log_empty_; - if (concurrent_prepare_) { + if (two_write_queues_) { log_write_mutex_.Unlock(); } uint64_t recycle_log_number = 0; @@ -1226,7 +1226,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { assert(creating_new_log); assert(!new_mem); assert(!new_log); - if (concurrent_prepare_) { + if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } return s; @@ -1266,7 +1266,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, mutable_cf_options); - if (concurrent_prepare_) { + if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } return s; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 7060568c4..68f58b705 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -486,7 +486,7 @@ Options DBTestBase::GetOptions( } case kConcurrentWALWrites: { // This options optimize 2PC commit path - options.concurrent_prepare = true; + options.two_write_queues = true; options.manual_wal_flush = true; break; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 507d8ffbe..3238167ca 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -730,7 +730,7 @@ class RecoveryTestHelper { batch.Put(key, value); WriteBatchInternal::SetSequence(&batch, seq); current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); - versions->SetLastToBeWrittenSequence(seq); + versions->SetLastAllocatedSequence(seq); versions->SetLastSequence(seq); } } diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 58fa35446..d52a496da 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -164,7 +164,7 @@ Status ExternalSstFileIngestionJob::Run() { // if the dont overlap with any ranges since we have snapshots force_global_seqno = true; } - // It is safe to use this instead of LastToBeWrittenSequence since we are + // It is safe to use this instead of LastAllocatedSequence since we are // the only active writer, and hence they are equal const SequenceNumber last_seqno = versions_->LastSequence(); SuperVersion* super_version = cfd_->GetSuperVersion(); @@ -199,7 +199,7 @@ Status ExternalSstFileIngestionJob::Run() { } if (consumed_seqno) { - versions_->SetLastToBeWrittenSequence(last_seqno + 1); + versions_->SetLastAllocatedSequence(last_seqno + 1); versions_->SetLastSequence(last_seqno + 1); } diff --git a/db/repair.cc b/db/repair.cc index 069c5adc9..3ab0a9e08 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -548,7 +548,7 @@ class Repairer { max_sequence = tables_[i].max_sequence; } } - vset_.SetLastToBeWrittenSequence(max_sequence); + vset_.SetLastAllocatedSequence(max_sequence); vset_.SetLastSequence(max_sequence); for (const auto& cf_id_and_tables : cf_id_to_tables) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 349bf1d5d..1dbba7de5 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -19,7 +19,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const std::string& dir, const ImmutableDBOptions* options, const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seq, - std::unique_ptr files, VersionSet const* const versions) + std::unique_ptr files, VersionSet const* const versions, + const bool seq_per_batch) : dir_(dir), options_(options), read_options_(read_options), @@ -31,7 +32,8 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( currentFileIndex_(0), currentBatchSeq_(0), currentLastSeq_(0), - versions_(versions) { + versions_(versions), + seq_per_batch_(seq_per_batch) { assert(files_ != nullptr); assert(versions_ != nullptr); @@ -241,12 +243,12 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { } startingSequenceNumber_ = expectedSeq; // currentStatus_ will be set to Ok if reseek succeeds - // Note: this is still ok in seq_pre_batch_ && concurrent_preparep_ mode + // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode // that allows gaps in the WAL since it will still skip over the gap. currentStatus_ = Status::NotFound("Gap in sequence numbers"); - // In seq_per_batch mode, gaps in the seq are possible so the strict mode + // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode // should be disabled - return SeekToStartSequence(currentFileIndex_, !options_->seq_per_batch); + return SeekToStartSequence(currentFileIndex_, !seq_per_batch_); } struct BatchCounter : public WriteBatch::Handler { @@ -284,7 +286,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { }; currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get()); - if (options_->seq_per_batch) { + if (seq_per_batch_) { BatchCounter counter(currentBatchSeq_); batch->Iterate(&counter); currentLastSeq_ = counter.sequence_; diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 769d8339b..b6762bf5f 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -62,7 +62,8 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const std::string& dir, const ImmutableDBOptions* options, const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seqNum, - std::unique_ptr files, VersionSet const* const versions); + std::unique_ptr files, VersionSet const* const versions, + const bool seq_per_batch); virtual bool Valid() override; @@ -103,7 +104,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { // Used only to get latest seq. num // TODO(icanadi) can this be just a callback? VersionSet const* const versions_; - + const bool seq_per_batch_; // Reads from transaction log only if the writebatch record has been written bool RestrictedRead(Slice* record, std::string* scratch); // Seeks to startingSequenceNumber reading from startFileIndex in files_. diff --git a/db/version_set.cc b/db/version_set.cc index b52eae03a..e494f231f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2414,7 +2414,7 @@ VersionSet::VersionSet(const std::string& dbname, manifest_file_number_(0), // Filled by Recover() pending_manifest_file_number_(0), last_sequence_(0), - last_to_be_written_sequence_(0), + last_allocated_sequence_(0), prev_log_number_(0), current_version_number_(0), manifest_file_size_(0), @@ -2754,10 +2754,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { // updated the last_sequence_ yet. It is also possible that the log has is // expecting some new data that is not written yet. Since LastSequence is an // upper bound on the sequence, it is ok to record - // last_to_be_written_sequence_ as the last sequence. - edit->SetLastSequence(db_options_->concurrent_prepare - ? last_to_be_written_sequence_ - : last_sequence_); + // last_allocated_sequence_ as the last sequence. + edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ + : last_sequence_); if (edit->is_column_family_drop_) { // if we drop column family, we have to make sure to save max column family, // so that we don't reuse existing ID @@ -2784,10 +2783,9 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, // updated the last_sequence_ yet. It is also possible that the log has is // expecting some new data that is not written yet. Since LastSequence is an // upper bound on the sequence, it is ok to record - // last_to_be_written_sequence_ as the last sequence. - edit->SetLastSequence(db_options_->concurrent_prepare - ? last_to_be_written_sequence_ - : last_sequence_); + // last_allocated_sequence_ as the last sequence. + edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_ + : last_sequence_); builder->Apply(edit); } @@ -3077,7 +3075,7 @@ Status VersionSet::Recover( manifest_file_size_ = current_manifest_file_size; next_file_number_.store(next_file + 1); - last_to_be_written_sequence_ = last_sequence; + last_allocated_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; @@ -3448,7 +3446,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } next_file_number_.store(next_file + 1); - last_to_be_written_sequence_ = last_sequence; + last_allocated_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; diff --git a/db/version_set.h b/db/version_set.h index f83099f23..59378d878 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -765,28 +765,27 @@ class VersionSet { } // Note: memory_order_acquire must be sufficient. - uint64_t LastToBeWrittenSequence() const { - return last_to_be_written_sequence_.load(std::memory_order_seq_cst); + uint64_t LastAllocatedSequence() const { + return last_allocated_sequence_.load(std::memory_order_seq_cst); } // Set the last sequence number to s. void SetLastSequence(uint64_t s) { assert(s >= last_sequence_); // Last visible seqeunce must always be less than last written seq - assert(!db_options_->concurrent_prepare || - s <= last_to_be_written_sequence_); + assert(!db_options_->two_write_queues || s <= last_allocated_sequence_); last_sequence_.store(s, std::memory_order_release); } // Note: memory_order_release must be sufficient - void SetLastToBeWrittenSequence(uint64_t s) { - assert(s >= last_to_be_written_sequence_); - last_to_be_written_sequence_.store(s, std::memory_order_seq_cst); + void SetLastAllocatedSequence(uint64_t s) { + assert(s >= last_allocated_sequence_); + last_allocated_sequence_.store(s, std::memory_order_seq_cst); } // Note: memory_order_release must be sufficient - uint64_t FetchAddLastToBeWrittenSequence(uint64_t s) { - return last_to_be_written_sequence_.fetch_add(s, std::memory_order_seq_cst); + uint64_t FetchAddLastAllocatedSequence(uint64_t s) { + return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst); } // Mark the specified file number as used. @@ -894,8 +893,9 @@ class VersionSet { uint64_t pending_manifest_file_number_; // The last seq visible to reads std::atomic last_sequence_; - // The last seq with which a writer has written/will write. - std::atomic last_to_be_written_sequence_; + // The last seq that is already allocated. The seq might or might not have + // appreated in memtable. + std::atomic last_allocated_sequence_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted // Opened lazily diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 4a9ecbfdd..235b74987 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -115,7 +115,7 @@ Status WalManager::GetUpdatesSince( } iter->reset(new TransactionLogIteratorImpl( db_options_.wal_dir, &db_options_, read_options, env_options_, seq, - std::move(wal_files), version_set)); + std::move(wal_files), version_set, seq_per_batch_)); return (*iter)->status(); } diff --git a/db/wal_manager.h b/db/wal_manager.h index aa62d793b..09a8d49c7 100644 --- a/db/wal_manager.h +++ b/db/wal_manager.h @@ -31,11 +31,12 @@ namespace rocksdb { class WalManager { public: WalManager(const ImmutableDBOptions& db_options, - const EnvOptions& env_options) + const EnvOptions& env_options, const bool seq_per_batch = false) : db_options_(db_options), env_options_(env_options), env_(db_options.env), - purge_wal_files_last_run_(0) {} + purge_wal_files_last_run_(0), + seq_per_batch_(seq_per_batch) {} Status GetSortedWalFiles(VectorLogPtr& files); @@ -86,6 +87,8 @@ class WalManager { // last time when PurgeObsoleteWALFiles ran. uint64_t purge_wal_files_last_run_; + bool seq_per_batch_; + // obsolete files will be deleted every this seconds if ttl deletion is // enabled and archive size_limit is disabled. static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600; diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 9f5cf273d..ba90e13fb 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -67,7 +67,7 @@ class WalManagerTest : public testing::Test { batch.Put(key, value); WriteBatchInternal::SetSequence(&batch, seq); current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); - versions_->SetLastToBeWrittenSequence(seq); + versions_->SetLastAllocatedSequence(seq); versions_->SetLastSequence(seq); } diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 431ceca16..fbe8fcc99 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -136,9 +136,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { options.create_if_missing = true; options.allow_concurrent_memtable_write = allow_parallel; options.enable_pipelined_write = enable_pipelined_write; - options.concurrent_prepare = two_queues; - if (options.enable_pipelined_write && - options.concurrent_prepare) { + options.two_write_queues = two_queues; + if (options.enable_pipelined_write && options.two_write_queues) { // This combination is not supported continue; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c67c44c86..5ac1aac91 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -904,22 +904,12 @@ struct DBOptions { // allows the memtable writes not to lag behind other writes. It can be used // to optimize MySQL 2PC in which only the commits, which are serial, write to // memtable. - bool concurrent_prepare = false; + bool two_write_queues = false; // If true WAL is not flushed automatically after each write. Instead it // relies on manual invocation of FlushWAL to write the WAL buffer to its // file. bool manual_wal_flush = false; - - // Increase the sequence number after writing each batch, whether memtable is - // disabled for that or not. Otherwise the sequence number is increased after - // writing each key into memtable. This implies that when memtable_disable is - // set, the seq is not increased at all. - // - // Default: false - // Note: This option is experimental and meant to be used only for internal - // projects. - bool seq_per_batch = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/db_options.cc b/options/db_options.cc index 723327288..a052a01b0 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -85,9 +85,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) avoid_flush_during_recovery(options.avoid_flush_during_recovery), allow_ingest_behind(options.allow_ingest_behind), preserve_deletes(options.preserve_deletes), - concurrent_prepare(options.concurrent_prepare), - manual_wal_flush(options.manual_wal_flush), - seq_per_batch(options.seq_per_batch) { + two_write_queues(options.two_write_queues), + manual_wal_flush(options.manual_wal_flush) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -217,11 +216,10 @@ void ImmutableDBOptions::Dump(Logger* log) const { allow_ingest_behind); ROCKS_LOG_HEADER(log, " Options.preserve_deletes: %d", preserve_deletes); - ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d", - concurrent_prepare); + ROCKS_LOG_HEADER(log, " Options.two_write_queues: %d", + two_write_queues); ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d", manual_wal_flush); - ROCKS_LOG_HEADER(log, " Options.seq_per_batch: %d", seq_per_batch); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index 8027ad04a..399b38443 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -77,9 +77,8 @@ struct ImmutableDBOptions { bool avoid_flush_during_recovery; bool allow_ingest_behind; bool preserve_deletes; - bool concurrent_prepare; + bool two_write_queues; bool manual_wal_flush; - bool seq_per_batch; }; struct MutableDBOptions { diff --git a/options/options_helper.h b/options/options_helper.h index 0ee64daee..ebcec7b2a 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -357,21 +357,21 @@ static std::unordered_map db_options_type_info = { OptionVerificationType::kNormal, false, offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}, {"preserve_deletes", - {offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean, + {offsetof(struct DBOptions, preserve_deletes), OptionType::kBoolean, OptionVerificationType::kNormal, false, offsetof(struct ImmutableDBOptions, preserve_deletes)}}, - {"concurrent_prepare", - {offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean, + {"concurrent_prepare", // Deprecated by two_write_queues + {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}}, + {"two_write_queues", + {offsetof(struct DBOptions, two_write_queues), OptionType::kBoolean, OptionVerificationType::kNormal, false, - offsetof(struct ImmutableDBOptions, concurrent_prepare)}}, + offsetof(struct ImmutableDBOptions, two_write_queues)}}, {"manual_wal_flush", {offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean, OptionVerificationType::kNormal, false, offsetof(struct ImmutableDBOptions, manual_wal_flush)}}, {"seq_per_batch", - {offsetof(struct DBOptions, seq_per_batch), OptionType::kBoolean, - OptionVerificationType::kNormal, false, - offsetof(struct ImmutableDBOptions, seq_per_batch)}}}; + {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}}}; // offset_of is used to get the offset of a class data member // ex: offset_of(&ColumnFamilyOptions::num_levels) diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 8c3872240..1d9b176e3 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -284,6 +284,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "allow_ingest_behind=false;" "preserve_deletes=false;" "concurrent_prepare=false;" + "two_write_queues=false;" "manual_wal_flush=false;" "seq_per_batch=false;", new_options)); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 335f05dc6..7edeb9a81 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -189,12 +189,11 @@ Status TransactionDB::Open( std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; DBOptions db_options_2pc = db_options; - if (txn_db_options.write_policy == WRITE_PREPARED) { - db_options_2pc.seq_per_batch = true; - } PrepareWrap(&db_options_2pc, &column_families_copy, &compaction_enabled_cf_indices); - s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db); + const bool use_seq_per_batch = txn_db_options.write_policy == WRITE_PREPARED; + s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, + use_seq_per_batch); if (s.ok()) { s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 0c9ce53ff..6b9914daa 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4862,12 +4862,12 @@ TEST_P(TransactionTest, SeqAdvanceTest) { auto seq = db_impl->GetLatestSequenceNumber(); exp_seq = seq; txn_t0(0); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { @@ -4880,16 +4880,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) { // Doing it twice might detect some bugs txn_t0(1); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); txn_t1(0); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { @@ -4901,12 +4901,12 @@ TEST_P(TransactionTest, SeqAdvanceTest) { } txn_t3(0); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { @@ -4918,16 +4918,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) { } txn_t0(0); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); txn_t2(0); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 6485e4893..260b904b5 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -54,7 +54,7 @@ class TransactionTest : public ::testing::TestWithParam< options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); env = new FaultInjectionTestEnv(Env::Default()); options.env = env; - options.concurrent_prepare = std::get<1>(GetParam()); + options.two_write_queues = std::get<1>(GetParam()); dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); @@ -113,11 +113,10 @@ class TransactionTest : public ::testing::TestWithParam< std::vector handles; DB* root_db; Options options_copy(options); - if (txn_db_options.write_policy == WRITE_PREPARED) { - options_copy.seq_per_batch = true; - } - Status s = - DB::Open(options_copy, dbname, column_families, &handles, &root_db); + const bool use_seq_per_batch = + txn_db_options.write_policy == WRITE_PREPARED; + Status s = DBImpl::Open(options_copy, dbname, column_families, &handles, + &root_db, use_seq_per_batch); if (s.ok()) { assert(handles.size() == 1); s = TransactionDB::WrapStackableDB( @@ -144,7 +143,7 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; - if (options.concurrent_prepare) { + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; } @@ -169,7 +168,7 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; - if (options.concurrent_prepare) { + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; } @@ -197,7 +196,7 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; - if (options.concurrent_prepare) { + if (options.two_write_queues) { // Consume one seq for commit exp_seq++; } diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 84ea6cdf3..cec6c193f 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -625,7 +625,7 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n); } DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - auto seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + auto seq = db_impl->TEST_GetLastVisibleSequence(); exp_seq = seq; // This is increased before writing the batch for commit commit_writes = 0; @@ -693,17 +693,17 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { for (auto& t : threads) { t.join(); } - if (options.concurrent_prepare) { + if (options.two_write_queues) { // In this case none of the above scheduling tricks to deterministically // form merged bactches works because the writes go to saparte queues. // This would result in different write groups in each run of the test. We // still keep the test since althgouh non-deterministic and hard to debug, // it is still useful to have. - // TODO(myabandeh): Add a deterministic unit test for concurrent_prepare + // TODO(myabandeh): Add a deterministic unit test for two_write_queues } // Check if memtable inserts advanced seq number as expected - seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + seq = db_impl->TEST_GetLastVisibleSequence(); ASSERT_EQ(exp_seq, seq); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); @@ -1258,7 +1258,7 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { VerifyKeys({{"foo", v}}); seq++; // one for the key/value KeyVersion kv = {"foo", v, seq, kTypeValue}; - if (options.concurrent_prepare) { + if (options.two_write_queues) { seq++; // one for the commit } versions.emplace_back(kv); @@ -1306,10 +1306,10 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { auto add_key = [&](std::function func) { ASSERT_OK(func()); expected_seq++; - if (options.concurrent_prepare) { + if (options.two_write_queues) { expected_seq++; // 1 for commit } - ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); + ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); snapshots.push_back(db->GetSnapshot()); }; @@ -1397,7 +1397,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); ASSERT_OK(txn1->Commit()); DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); + ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); delete txn1; // Take a snapshots to avoid keys get evicted before compaction. const Snapshot* snapshot1 = db->GetSnapshot(); @@ -1410,24 +1410,24 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { // txn2 commit after snapshot2 and it is not visible. const Snapshot* snapshot2 = db->GetSnapshot(); ASSERT_OK(txn2->Commit()); - ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); + ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence()); delete txn2; // Take a snapshots to avoid keys get evicted before compaction. const Snapshot* snapshot3 = db->GetSnapshot(); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); expected_seq++; // 1 for write SequenceNumber seq1 = expected_seq; - if (options.concurrent_prepare) { + if (options.two_write_queues) { expected_seq++; // 1 for commit } - ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); + ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); expected_seq++; // 1 for write SequenceNumber seq2 = expected_seq; - if (options.concurrent_prepare) { + if (options.two_write_queues) { expected_seq++; // 1 for commit } - ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); + ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Flush(FlushOptions())); db->ReleaseSnapshot(snapshot1); db->ReleaseSnapshot(snapshot3); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index d11d667c6..58e433255 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -90,7 +90,7 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { } SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) { - if (db_impl_->immutable_db_options().concurrent_prepare) { + if (db_impl_->immutable_db_options().two_write_queues) { return db_impl_->IncAndFetchSequenceNumber(); } else { return prep_seq; diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index f9ae1300f..5573de681 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -46,7 +46,7 @@ class WritePreparedTxn : public PessimisticTransaction { virtual ~WritePreparedTxn() {} // To make WAL commit markers visible, the snapshot will be based on the last - // seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the + // seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the // memtable. using Transaction::Get; virtual Status Get(const ReadOptions& options, @@ -54,7 +54,7 @@ class WritePreparedTxn : public PessimisticTransaction { PinnableSlice* value) override; // To make WAL commit markers visible, the snapshot will be based on the last - // seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the + // seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the // memtable. using Transaction::GetIterator; virtual Iterator* GetIterator(const ReadOptions& options) override; @@ -76,7 +76,7 @@ class WritePreparedTxn : public PessimisticTransaction { // commit entails writing only a commit marker in the WAL. The sequence number // of the commit marker is then the commit timestamp of the transaction. To // make the commit timestamp visible to readers, their snapshot is based on - // the last seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq + // the last seq in the WAL, LastAllocatedSequence, as opposed to the last seq // in the memtable. Status CommitInternal() override;