diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 85d25202c..3bafda9e1 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -144,6 +144,7 @@ class CompactionJobTest : public testing::Test { void SetLastSequence(const SequenceNumber sequence_number) { versions_->SetLastAllocatedSequence(sequence_number + 1); + versions_->SetLastPublishedSequence(sequence_number + 1); versions_->SetLastSequence(sequence_number + 1); } diff --git a/db/db_impl.cc b/db/db_impl.cc index a2c72f0d9..b08e30e5f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -196,16 +196,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, two_write_queues_(options.two_write_queues), manual_wal_flush_(options.manual_wal_flush), seq_per_batch_(seq_per_batch), - // When two_write_queues_ and seq_per_batch_ are both enabled we - // sometimes allocate a seq also to indicate the commit timestmamp of a - // transaction. In such cases last_sequence_ would not indicate the last - // visible sequence number in memtable and should not be used for - // snapshots. It should use last_allocated_sequence_ instaed but also - // needs other mechanisms to exclude the data that after last_sequence_ - // and before last_allocated_sequence_ from the snapshot. In - // WritePreparedTxn this property is ensured since such data are not - // committed yet. - allocate_seq_only_for_data_(!(seq_per_batch && options.two_write_queues)), + // last_sequencee_ is always maintained by the main queue that also writes + // to the memtable. When two_write_queues_ is disabled last seq in + // memtable is the same as last seq published to the readers. When it is + // enabled but seq_per_batch_ is disabled, last seq in memtable still + // indicates last published seq since wal-only writes that go to the 2nd + // queue do not consume a sequence number. Otherwise writes performed by + // the 2nd queue could change what is visible to the readers. In this + // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a + // separate variable to indicate the last published sequence. + last_seq_same_as_publish_seq_( + !(seq_per_batch && options.two_write_queues)), // Since seq_per_batch_ is currently set only by WritePreparedTxn which // requires a custom gc for compaction, we use that to set use_custom_gc_ // as well. @@ -765,8 +766,8 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { return versions_->LastSequence(); } -SequenceNumber DBImpl::IncAndFetchSequenceNumber() { - return versions_->FetchAddLastAllocatedSequence(1ull) + 1ull; +void DBImpl::SetLastPublishedSequence(SequenceNumber seq) { + versions_->SetLastPublishedSequence(seq); } bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) { @@ -992,8 +993,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // super versipon because a flush happening in between may compact // away data for the snapshot, but the snapshot is earlier than the // data overwriting it, so users may see wrong results. - snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence() - : versions_->LastAllocatedSequence(); + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); } TEST_SYNC_POINT("DBImpl::GetImpl:3"); TEST_SYNC_POINT("DBImpl::GetImpl:4"); @@ -1084,8 +1086,9 @@ std::vector DBImpl::MultiGet( snapshot = reinterpret_cast( read_options.snapshot)->number_; } else { - snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence() - : versions_->LastAllocatedSequence(); + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); } for (auto mgd_iter : multiget_cf_data) { mgd_iter.second->super_version = @@ -1492,7 +1495,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, #endif } else { // Note: no need to consider the special case of - // allocate_seq_only_for_data_==false since NewIterator is overridden in + // last_seq_same_as_publish_seq_==false since NewIterator is overridden in // WritePreparedTxnDB auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() @@ -1610,7 +1613,7 @@ Status DBImpl::NewIterators( #endif } else { // Note: no need to consider the special case of - // allocate_seq_only_for_data_==false since NewIterators is overridden in + // last_seq_same_as_publish_seq_==false since NewIterators is overridden in // WritePreparedTxnDB auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() @@ -1645,9 +1648,9 @@ const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { delete s; return nullptr; } - auto snapshot_seq = allocate_seq_only_for_data_ + auto snapshot_seq = last_seq_same_as_publish_seq_ ? versions_->LastSequence() - : versions_->LastAllocatedSequence(); + : versions_->LastPublishedSequence(); return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); } @@ -1658,9 +1661,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { snapshots_.Delete(casted_s); uint64_t oldest_snapshot; if (snapshots_.empty()) { - oldest_snapshot = allocate_seq_only_for_data_ + oldest_snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() - : versions_->LastAllocatedSequence(); + : versions_->LastPublishedSequence(); } else { oldest_snapshot = snapshots_.oldest()->number_; } diff --git a/db/db_impl.h b/db/db_impl.h index 4519b7169..3db65048b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -28,6 +28,7 @@ #include "db/flush_scheduler.h" #include "db/internal_stats.h" #include "db/log_writer.h" +#include "db/pre_release_callback.h" #include "db/read_callback.h" #include "db/snapshot_checker.h" #include "db/snapshot_impl.h" @@ -220,8 +221,8 @@ class DBImpl : public DB { virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; - virtual SequenceNumber IncAndFetchSequenceNumber(); - // Returns LastSequence in allocate_seq_only_for_data_ + virtual void SetLastPublishedSequence(SequenceNumber seq); + // Returns LastSequence in last_seq_same_as_publish_seq_ // mode and LastAllocatedSequence otherwise. This is useful when visiblility // depends also on data written to the WAL but not to the memtable. SequenceNumber TEST_GetLastVisibleSequence() const; @@ -671,7 +672,8 @@ class DBImpl : public DB { Status WriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, - bool disable_memtable = false, uint64_t* seq_used = nullptr); + bool disable_memtable = false, uint64_t* seq_used = nullptr, + PreReleaseCallback* pre_release_callback = nullptr); Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, @@ -682,7 +684,8 @@ class DBImpl : public DB { Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, - uint64_t* seq_used = nullptr); + uint64_t* seq_used = nullptr, + PreReleaseCallback* pre_release_callback = nullptr); uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); @@ -705,6 +708,7 @@ class DBImpl : public DB { friend class CompactedDBImpl; #ifndef NDEBUG friend class DBTest2_ReadCallbackTest_Test; + friend class WriteCallbackTest_WriteWithCallbackTest_Test; friend class XFTransactionWriteHandler; friend class DBBlobIndexTest; #endif @@ -1344,10 +1348,9 @@ class DBImpl : public DB { // // Default: false const bool seq_per_batch_; - // A sequence number is allocated only for data written to DB. Otherwise it - // could also be allocated for operational purposes such as commit timestamp - // of a transaction. - const bool allocate_seq_only_for_data_; + // LastSequence also indicates last published sequence visibile to the + // readers. Otherwise LastPublishedSequence should be used. + const bool last_seq_same_as_publish_seq_; // It indicates that a customized gc algorithm must be used for // flush/compaction and if it is not provided vis SnapshotChecker, we should // disable gc to be safe. diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 8a88b9020..ca6772074 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -210,7 +210,7 @@ int DBImpl::TEST_BGFlushesAllowed() const { } SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const { - if (allocate_seq_only_for_data_) { + if (last_seq_same_as_publish_seq_) { return versions_->LastSequence(); } else { return versions_->LastAllocatedSequence(); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index c462839df..c9487b2b7 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -591,12 +591,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // consecutive, we continue recovery despite corruption. This could // happen when we open and write to a corrupted DB, where sequence id // will start from the last sequence id we recovered. - if (sequence == *next_sequence || - // With seq_per_batch_, if previous run was with two_write_queues_ - // then allocate_seq_only_for_data_ was disabled and a gap in the - // sequence numbers in the log is expected by the commits without - // prepares. - (seq_per_batch_ && sequence >= *next_sequence)) { + if (sequence == *next_sequence) { stop_replay_for_corruption = false; } if (stop_replay_for_corruption) { @@ -762,6 +757,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, if ((*next_sequence != kMaxSequenceNumber) && (versions_->LastSequence() <= last_sequence)) { versions_->SetLastAllocatedSequence(last_sequence); + versions_->SetLastPublishedSequence(last_sequence); versions_->SetLastSequence(last_sequence); } } diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 2560c38bd..7900be853 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -57,10 +57,14 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options, } #endif // ROCKSDB_LITE +// The main write queue. This is the only write queue that updates LastSequence. +// When using one write queue, the same sequence also indicates the last +// published sequence. Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, - bool disable_memtable, uint64_t* seq_used) { + bool disable_memtable, uint64_t* seq_used, + PreReleaseCallback* pre_release_callback) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } @@ -89,7 +93,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (two_write_queues_ && disable_memtable) { return WriteImplWALOnly(write_options, my_batch, callback, log_used, - log_ref, seq_used); + log_ref, seq_used, pre_release_callback); } if (immutable_db_options_.enable_pipelined_write) { @@ -99,7 +103,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable); + disable_memtable, pre_release_callback); if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); @@ -123,6 +127,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (write_thread_.CompleteParallelMemTableWriter(&w)) { // we're responsible for exit batch group + for (auto* writer : *(w.write_group)) { + if (!writer->CallbackFailed() && writer->pre_release_callback) { + assert(writer->sequence != kMaxSequenceNumber); + Status ws = writer->pre_release_callback->Callback(writer->sequence); + if (!ws.ok()) { + status = ws; + break; + } + } + } + // TODO(myabandeh): propagate status to write_group auto last_sequence = w.write_group->last_sequence; versions_->SetLastSequence(last_sequence); MemTableInsertStatusCheck(w.status); @@ -345,6 +360,16 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (should_exit_batch_group) { if (status.ok()) { + for (auto* writer : write_group) { + if (!writer->CallbackFailed() && writer->pre_release_callback) { + assert(writer->sequence != kMaxSequenceNumber); + Status ws = writer->pre_release_callback->Callback(writer->sequence); + if (!ws.ok()) { + status = ws; + break; + } + } + } versions_->SetLastSequence(last_sequence); } MemTableInsertStatusCheck(w.status); @@ -484,14 +509,18 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, return w.FinalStatus(); } +// The 2nd write queue. If enabled it will be used only for WAL-only writes. +// This is the only queue that updates LastPublishedSequence which is only +// applicable in a two-queue setting. Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, - uint64_t* seq_used) { + uint64_t* seq_used, + PreReleaseCallback* pre_release_callback) { Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, - true /* disable_memtable */); + true /* disable_memtable */, pre_release_callback); if (write_options.disableWAL) { return status; } @@ -577,6 +606,18 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, if (!w.CallbackFailed()) { WriteCallbackStatusCheck(status); } + if (status.ok()) { + for (auto* writer : write_group) { + if (!writer->CallbackFailed() && writer->pre_release_callback) { + assert(writer->sequence != kMaxSequenceNumber); + Status ws = writer->pre_release_callback->Callback(writer->sequence); + if (!ws.ok()) { + status = ws; + break; + } + } + } + } nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { status = w.FinalStatus(); @@ -883,13 +924,18 @@ Status DBImpl::WriteRecoverableState() { log_write_mutex_.Lock(); } SequenceNumber seq = versions_->LastSequence(); - WriteBatchInternal::SetSequence(&cached_recoverable_state_, ++seq); + WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1); auto status = WriteBatchInternal::InsertInto( &cached_recoverable_state_, column_family_memtables_.get(), &flush_scheduler_, true, 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, seq_per_batch_); - versions_->SetLastSequence(--next_seq); + auto last_seq = next_seq - 1; + if (two_write_queues_) { + versions_->FetchAddLastAllocatedSequence(last_seq - seq); + } + versions_->SetLastSequence(last_seq); + versions_->SetLastPublishedSequence(last_seq); if (two_write_queues_) { log_write_mutex_.Unlock(); } diff --git a/db/db_iter.cc b/db/db_iter.cc index b070c7caa..129af5064 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -1384,9 +1384,9 @@ Status ArenaWrappedDBIter::Refresh() { return Status::NotSupported("Creating renew iterator is not allowed."); } assert(db_iter_ != nullptr); - // TODO(yiwu): For allocate_seq_only_for_data_==false, this is not the correct - // behavior. Will be corrected automatically when we take a snapshot here for - // the case of WritePreparedTxnDB. + // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the + // correct behavior. Will be corrected automatically when we take a snapshot + // here for the case of WritePreparedTxnDB. SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); if (sv_number_ != cur_sv_number) { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 3238167ca..796ef251c 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -731,6 +731,7 @@ class RecoveryTestHelper { WriteBatchInternal::SetSequence(&batch, seq); current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); versions->SetLastAllocatedSequence(seq); + versions->SetLastPublishedSequence(seq); versions->SetLastSequence(seq); } } diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index e86488919..898939afc 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -200,6 +200,7 @@ Status ExternalSstFileIngestionJob::Run() { if (consumed_seqno) { versions_->SetLastAllocatedSequence(last_seqno + 1); + versions_->SetLastPublishedSequence(last_seqno + 1); versions_->SetLastSequence(last_seqno + 1); } diff --git a/db/pre_release_callback.h b/db/pre_release_callback.h new file mode 100644 index 000000000..fdc4d50c5 --- /dev/null +++ b/db/pre_release_callback.h @@ -0,0 +1,30 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/status.h" + +namespace rocksdb { + +class DB; + +class PreReleaseCallback { + public: + virtual ~PreReleaseCallback() {} + + // Will be called while on the write thread after the write and before the + // release of the sequence number. This is useful if any operation needs to be + // done before the write gets visible to the readers, or if we want to reduce + // the overhead of locking by updating something sequentially while we are on + // the write thread. If the callback fails, this function returns a non-OK + // status, the sequence number will not be released, and same status will be + // propagated to all the writers in the write group. + // seq is the sequence number that is used for this write and will be + // released. + virtual Status Callback(SequenceNumber seq) = 0; +}; + +} // namespace rocksdb diff --git a/db/repair.cc b/db/repair.cc index 3ab0a9e08..583a3dbe7 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -549,6 +549,7 @@ class Repairer { } } vset_.SetLastAllocatedSequence(max_sequence); + vset_.SetLastPublishedSequence(max_sequence); vset_.SetLastSequence(max_sequence); for (const auto& cf_id_and_tables : cf_id_to_tables) { diff --git a/db/version_set.cc b/db/version_set.cc index b2ec82fba..46ef1941a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2399,6 +2399,7 @@ VersionSet::VersionSet(const std::string& dbname, pending_manifest_file_number_(0), last_sequence_(0), last_allocated_sequence_(0), + last_published_sequence_(0), prev_log_number_(0), current_version_number_(0), manifest_file_size_(0), @@ -3058,6 +3059,7 @@ Status VersionSet::Recover( manifest_file_size_ = current_manifest_file_size; next_file_number_.store(next_file + 1); last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; @@ -3429,6 +3431,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, next_file_number_.store(next_file + 1); last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; diff --git a/db/version_set.h b/db/version_set.h index 583e2d995..ea6e4e88a 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -765,6 +765,11 @@ class VersionSet { return last_allocated_sequence_.load(std::memory_order_seq_cst); } + // Note: memory_order_acquire must be sufficient. + uint64_t LastPublishedSequence() const { + return last_published_sequence_.load(std::memory_order_seq_cst); + } + // Set the last sequence number to s. void SetLastSequence(uint64_t s) { assert(s >= last_sequence_); @@ -773,6 +778,12 @@ class VersionSet { last_sequence_.store(s, std::memory_order_release); } + // Note: memory_order_release must be sufficient + void SetLastPublishedSequence(uint64_t s) { + assert(s >= last_published_sequence_); + last_published_sequence_.store(s, std::memory_order_seq_cst); + } + // Note: memory_order_release must be sufficient void SetLastAllocatedSequence(uint64_t s) { assert(s >= last_allocated_sequence_); @@ -888,11 +899,21 @@ class VersionSet { uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_; - // The last seq visible to reads + // The last seq visible to reads. It normally indicates the last sequence in + // the memtable but when using two write queues it could also indicate the + // last sequence in the WAL visible to reads. std::atomic last_sequence_; - // The last seq that is already allocated. The seq might or might not have - // appreated in memtable. + // The last seq that is already allocated. It is applicable only when we have + // two write queues. In that case seq might or might not have appreated in + // memtable but it is expected to appear in the WAL. + // We have last_sequence <= last_allocated_sequence_ std::atomic last_allocated_sequence_; + // The last allocated sequence that is also published to the readers. This is + // applicable only when last_seq_same_as_publish_seq_ is not set. Otherwise + // last_sequence_ also indicates the last published seq. + // We have last_sequence <= last_published_seqeunce_ <= + // last_allocated_sequence_ + std::atomic last_published_sequence_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted // Opened lazily diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index ba90e13fb..fe54b84cb 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -68,6 +68,7 @@ class WalManagerTest : public testing::Test { WriteBatchInternal::SetSequence(&batch, seq); current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); versions_->SetLastAllocatedSequence(seq); + versions_->SetLastPublishedSequence(seq); versions_->SetLastSequence(seq); } diff --git a/db/write_batch.cc b/db/write_batch.cc index f87fd6f43..e4c05eeae 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -454,7 +454,8 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeLogData: handler->LogData(blob); - empty_batch = true; + // A batch might have nothing but LogData. It is still a batch. + empty_batch = false; break; case kTypeBeginPrepareXID: assert(content_flags_.load(std::memory_order_relaxed) & diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 3d40150f9..0ca830ceb 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -290,8 +290,24 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { WriteOptions woptions; woptions.disableWAL = !enable_WAL; woptions.sync = enable_WAL; - Status s = db_impl->WriteWithCallback( - woptions, &write_op.write_batch_, &write_op.callback_); + Status s; + if (seq_per_batch && two_queues) { + class PublishSeqCallback : public PreReleaseCallback { + public: + PublishSeqCallback(DBImpl* db_impl) : db_impl_(db_impl) {} + virtual Status Callback(SequenceNumber last_seq) { + db_impl_->SetLastPublishedSequence(last_seq); + return Status::OK(); + } + DBImpl* db_impl_; + } publish_seq_callback(db_impl); + s = db_impl->WriteImpl(woptions, &write_op.write_batch_, + &write_op.callback_, nullptr, 0, false, + nullptr, &publish_seq_callback); + } else { + s = db_impl->WriteWithCallback( + woptions, &write_op.write_batch_, &write_op.callback_); + } if (write_op.callback_.should_fail_) { ASSERT_TRUE(s.IsBusy()); diff --git a/db/write_thread.h b/db/write_thread.h index 67150a858..3d5e2f9e6 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -15,6 +15,7 @@ #include #include "db/dbformat.h" +#include "db/pre_release_callback.h" #include "db/write_callback.h" #include "monitoring/instrumented_mutex.h" #include "rocksdb/options.h" @@ -117,6 +118,7 @@ class WriteThread { bool no_slowdown; bool disable_wal; bool disable_memtable; + PreReleaseCallback* pre_release_callback; uint64_t log_used; // log number that this batch was inserted into uint64_t log_ref; // log number that memtable insert should reference WriteCallback* callback; @@ -137,6 +139,7 @@ class WriteThread { no_slowdown(false), disable_wal(false), disable_memtable(false), + pre_release_callback(nullptr), log_used(0), log_ref(0), callback(nullptr), @@ -148,12 +151,14 @@ class WriteThread { link_newer(nullptr) {} Writer(const WriteOptions& write_options, WriteBatch* _batch, - WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable) + WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, + PreReleaseCallback* _pre_release_callback = nullptr) : batch(_batch), sync(write_options.sync), no_slowdown(write_options.no_slowdown), disable_wal(write_options.disableWAL), disable_memtable(_disable_memtable), + pre_release_callback(_pre_release_callback), log_used(0), log_ref(_log_ref), callback(_callback), diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index d90a4d4f9..61b6b17f7 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -369,7 +369,8 @@ void LDBCommand::OpenDB() { if (column_families_.empty()) { st = DB::Open(options_, db_path_, &db_); } else { - st = DB::Open(options_, db_path_, column_families_, &handles_opened, &db_); + st = DB::Open(options_, db_path_, column_families_, &handles_opened, + &db_); } } } diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index ffa356e86..203dd2071 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -512,7 +512,7 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family, if (tracked_at_seq == kMaxSequenceNumber) { // Since we haven't checked a snapshot, we only know this key has not // been modified since after we locked it. - // Note: when allocate_seq_only_for_data_==false this is less than the + // Note: when last_seq_same_as_publish_seq_==false this is less than the // latest allocated seq but it is ok since i) this is just a heuristic // used only as a hint to avoid actual check for conflicts, ii) this would // cause a false positive only if the snapthot is taken right after the diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 400ca0106..bec5d9d98 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -143,10 +143,8 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; - if (options.two_write_queues) { // Consume one seq for commit exp_seq++; - } } }; std::function txn_t0 = [&](size_t index) { @@ -168,10 +166,8 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; - if (options.two_write_queues) { // Consume one seq for commit exp_seq++; - } } ASSERT_OK(s); }; @@ -196,10 +192,8 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; - if (options.two_write_queues) { // Consume one seq for commit exp_seq++; - } } auto pdb = reinterpret_cast(db); pdb->UnregisterTransaction(txn); @@ -264,10 +258,8 @@ class TransactionTest : public ::testing::TestWithParam< exp_seq++; // Consume one seq per rollback batch exp_seq++; - if (options.two_write_queues) { // Consume one seq for rollback commit exp_seq++; - } } delete txn; }; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index b1e0fdd58..7fede0e51 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -727,26 +727,6 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); - // The latest seq might be due to a commit without prepare and hence not - // persisted in the WAL. We need to discount such seqs if they are not - // continued by any seq consued by a value write. - if (options.two_write_queues) { - WritePreparedTxnDB* wp_db = dynamic_cast(db); - MutexLock l(&wp_db->seq_for_metadata_mutex_); - auto& vec = wp_db->seq_for_metadata; - std::sort(vec.begin(), vec.end()); - // going backward discount any last seq consumed for metadata until we see - // a seq that is consumed for actualy key/values. - auto rit = vec.rbegin(); - for (; rit != vec.rend(); ++rit) { - if (*rit == exp_seq) { - exp_seq--; - } else { - break; - } - } - } - // Check if recovery preserves the last sequence number db_impl->FlushWAL(true); ReOpenNoDelete(); @@ -1084,11 +1064,10 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } } -void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, - Slice key) { +void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s, + PinnableSlice& exp_v, Slice key) { Status s; PinnableSlice v; - ReadOptions roptions; s = db->Get(roptions, db->DefaultColumnFamily(), key, &v); ASSERT_TRUE(exp_s == s); ASSERT_TRUE(s.ok() || s.IsNotFound()); @@ -1110,6 +1089,11 @@ void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, } } +void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, + Slice key) { + ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key); +} + TEST_P(WritePreparedTransactionTest, RollbackTest) { ReadOptions roptions; WriteOptions woptions; @@ -1296,9 +1280,7 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { VerifyKeys({{"foo", v}}); seq++; // one for the key/value KeyVersion kv = {"foo", v, seq, kTypeValue}; - if (options.two_write_queues) { seq++; // one for the commit - } versions.emplace_back(kv); } std::reverse(std::begin(versions), std::end(versions)); @@ -1344,9 +1326,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { auto add_key = [&](std::function func) { ASSERT_OK(func()); expected_seq++; - if (options.two_write_queues) { expected_seq++; // 1 for commit - } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); snapshots.push_back(db->GetSnapshot()); }; @@ -1455,16 +1435,12 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); expected_seq++; // 1 for write SequenceNumber seq1 = expected_seq; - if (options.two_write_queues) { expected_seq++; // 1 for commit - } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); expected_seq++; // 1 for write SequenceNumber seq2 = expected_seq; - if (options.two_write_queues) { expected_seq++; // 1 for commit - } ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Flush(FlushOptions())); db->ReleaseSnapshot(snapshot1); @@ -1587,7 +1563,10 @@ TEST_P(WritePreparedTransactionTest, ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); SequenceNumber seq1 = expected_seq; ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); - ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + expected_seq++; // one for data + expected_seq++; // one for commit + ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_OK(db->Flush(FlushOptions())); // Dummy keys to avoid compaction trivially move files and get around actual // compaction logic. @@ -1667,6 +1646,50 @@ TEST_P(WritePreparedTransactionTest, Iterate) { delete transaction; } +// Test that updating the commit map will not affect the existing snapshots +TEST_P(WritePreparedTransactionTest, AtomicCommit) { + for (bool skip_prepare : {true, false}) { + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"WritePreparedTxnDB::AddCommitted:start", + "AtomicCommit::GetSnapshot:start"}, + {"AtomicCommit::Get:end", + "WritePreparedTxnDB::AddCommitted:start:pause"}, + {"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"}, + {"AtomicCommit::Get2:end", + "WritePreparedTxnDB::AddCommitted:end:pause:"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + rocksdb::port::Thread write_thread([&]() { + if (skip_prepare) { + db->Put(WriteOptions(), Slice("key"), Slice("value")); + } else { + Transaction* txn = + db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn->SetName("xid")); + ASSERT_OK(txn->Put(Slice("key"), Slice("value"))); + ASSERT_OK(txn->Prepare()); + ASSERT_OK(txn->Commit()); + delete txn; + } + }); + rocksdb::port::Thread read_thread([&]() { + ReadOptions roptions; + TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start"); + roptions.snapshot = db->GetSnapshot(); + PinnableSlice val; + auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val); + TEST_SYNC_POINT("AtomicCommit::Get:end"); + TEST_SYNC_POINT("AtomicCommit::Get2:start"); + ASSERT_SAME(roptions, db, s, val, "key"); + TEST_SYNC_POINT("AtomicCommit::Get2:end"); + db->ReleaseSnapshot(roptions.snapshot); + }); + read_thread.join(); + write_thread.join(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +} + // Test that we can change write policy from WriteCommitted to WritePrepared // after a clean shutdown (which would empty the WAL) TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 6e601235f..0d2710d54 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -61,10 +61,10 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, Status WritePreparedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; - const bool write_after_commit = true; + const bool WRITE_AFTER_COMMIT = true; WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, - !write_after_commit); - const bool disable_memtable = true; + !WRITE_AFTER_COMMIT); + const bool DISABLE_MEMTABLE = true; uint64_t seq_used = kMaxSequenceNumber; bool collapsed = GetWriteBatch()->Collapse(); if (collapsed) { @@ -74,7 +74,7 @@ Status WritePreparedTxn::PrepareInternal() { Status s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), /*callback*/ nullptr, &log_number_, /*log ref*/ 0, - !disable_memtable, &seq_used); + !DISABLE_MEMTABLE, &seq_used); assert(seq_used != kMaxSequenceNumber); auto prepare_seq = seq_used; SetId(prepare_seq); @@ -91,34 +91,29 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { return CommitBatchInternal(GetWriteBatch()->GetWriteBatch()); } -SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) { - if (db_impl_->immutable_db_options().two_write_queues) { - auto s = db_impl_->IncAndFetchSequenceNumber(); -#ifndef NDEBUG - MutexLock l(&wpt_db_->seq_for_metadata_mutex_); - wpt_db_->seq_for_metadata.push_back(s); -#endif - return s; - } else { - return prep_seq; - } -} - Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { // TODO(myabandeh): handle the duplicate keys in the batch // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(batch); - const bool disable_memtable = true; + const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, - no_log_ref, !disable_memtable, &seq_used); + no_log_ref, !DISABLE_MEMTABLE, &seq_used); assert(seq_used != kMaxSequenceNumber); uint64_t& prepare_seq = seq_used; - uint64_t commit_seq = GetACommitSeqNumber(prepare_seq); - // TODO(myabandeh): skip AddPrepared - wpt_db_->AddPrepared(prepare_seq); - wpt_db_->AddCommitted(prepare_seq, commit_seq); + // Commit the batch by writing an empty batch to the queue that will release + // the commit sequence number to readers. + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, prepare_seq); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + no_log_ref, DISABLE_MEMTABLE, &seq_used, + &update_commit_map); + assert(seq_used != kMaxSequenceNumber); return s; } @@ -137,28 +132,22 @@ Status WritePreparedTxn::CommitInternal() { WriteBatchInternal::SetAsLastestPersistentState(working_batch); } - const bool disable_memtable = true; + // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode + // commit_seq. This happens if prep_seq <<< commit_seq. + auto prepare_seq = GetId(); + const bool includes_data = !empty && !for_recovery; + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, prepare_seq, includes_data); + const bool disable_memtable = !includes_data; uint64_t seq_used = kMaxSequenceNumber; // Since the prepared batch is directly written to memtable, there is already // a connection between the memtable and its WAL, so there is no need to // redundantly reference the log that contains the prepared data. const uint64_t zero_log_number = 0ull; - auto s = db_impl_->WriteImpl( - write_options_, working_batch, nullptr, nullptr, zero_log_number, - empty || for_recovery ? disable_memtable : !disable_memtable, &seq_used); + auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + zero_log_number, disable_memtable, &seq_used, + &update_commit_map); assert(seq_used != kMaxSequenceNumber); - uint64_t& commit_seq = seq_used; - // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode - // commit_seq. This happens if prep_seq <<< commit_seq. - auto prepare_seq = GetId(); - wpt_db_->AddCommitted(prepare_seq, commit_seq); - if (!empty && !for_recovery) { - // Commit the data that is accompnaied with the commit marker - // TODO(myabandeh): skip AddPrepared - wpt_db_->AddPrepared(commit_seq); - uint64_t commit_seq_2 = GetACommitSeqNumber(commit_seq); - wpt_db_->AddCommitted(commit_seq, commit_seq_2); - } return s; } @@ -232,19 +221,31 @@ Status WritePreparedTxn::RollbackInternal() { } // The Rollback marker will be used as a batch separator WriteBatchInternal::MarkRollback(&rollback_batch, name_); - const bool disable_memtable = true; + const bool DISABLE_MEMTABLE = true; const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, - no_log_ref, !disable_memtable, &seq_used); + no_log_ref, !DISABLE_MEMTABLE, &seq_used); assert(seq_used != kMaxSequenceNumber); + if (!s.ok()) { + return s; + } uint64_t& prepare_seq = seq_used; - uint64_t commit_seq = GetACommitSeqNumber(prepare_seq); - // TODO(myabandeh): skip AddPrepared - wpt_db_->AddPrepared(prepare_seq); - wpt_db_->AddCommitted(prepare_seq, commit_seq); + // Commit the batch by writing an empty batch to the queue that will release + // the commit sequence number to readers. + WritePreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, prepare_seq); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + no_log_ref, DISABLE_MEMTABLE, &seq_used, + &update_commit_map); + assert(seq_used != kMaxSequenceNumber); // Mark the txn as rolled back - wpt_db_->RollbackPrepared(GetId(), commit_seq); + uint64_t& rollback_seq = seq_used; + wpt_db_->RollbackPrepared(GetId(), rollback_seq); return s; } diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index c1e20042c..17f7d0c96 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -46,16 +46,16 @@ class WritePreparedTxn : public PessimisticTransaction { virtual ~WritePreparedTxn() {} // To make WAL commit markers visible, the snapshot will be based on the last - // seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the - // memtable. + // seq in the WAL that is also published, LastPublishedSequence, as opposed to + // the last seq in the memtable. using Transaction::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; // To make WAL commit markers visible, the snapshot will be based on the last - // seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the - // memtable. + // seq in the WAL that is also published, LastPublishedSequence, as opposed to + // the last seq in the memtable. using Transaction::GetIterator; virtual Iterator* GetIterator(const ReadOptions& options) override; virtual Iterator* GetIterator(const ReadOptions& options, @@ -64,8 +64,6 @@ class WritePreparedTxn : public PessimisticTransaction { private: friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; - SequenceNumber GetACommitSeqNumber(SequenceNumber prep_seq); - Status PrepareInternal() override; Status CommitWithoutPrepareInternal() override; @@ -75,9 +73,9 @@ class WritePreparedTxn : public PessimisticTransaction { // Since the data is already written to memtables at the Prepare phase, the // commit entails writing only a commit marker in the WAL. The sequence number // of the commit marker is then the commit timestamp of the transaction. To - // make the commit timestamp visible to readers, their snapshot is based on - // the last seq in the WAL, LastAllocatedSequence, as opposed to the last seq - // in the memtable. + // make WAL commit markers visible, the snapshot will be based on the last seq + // in the WAL that is also published, LastPublishedSequence, as opposed to the + // last seq in the memtable. Status CommitInternal() override; Status RollbackInternal() override; diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index c2b18b326..ad34e92b1 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -283,6 +283,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq) { ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, prepare_seq, commit_seq); + TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start"); + TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause"); auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; CommitEntry64b evicted_64b; CommitEntry evicted; @@ -319,6 +321,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, } } } + TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end"); + TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause"); } bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 454d7f827..a67f5ac25 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -14,6 +14,7 @@ #include #include "db/db_iter.h" +#include "db/pre_release_callback.h" #include "db/read_callback.h" #include "db/snapshot_checker.h" #include "rocksdb/db.h" @@ -186,12 +187,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Struct to hold ownership of snapshot and read callback for cleanup. struct IteratorState; -#ifndef NDEBUG - // For unit tests we can track of the seq numbers that are used for metadata as opposed to actual key/values - std::vector seq_for_metadata; - mutable port::Mutex seq_for_metadata_mutex_; -#endif - private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; @@ -373,5 +368,41 @@ class WritePreparedTxnReadCallback : public ReadCallback { SequenceNumber snapshot_; }; +class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { + public: + // includes_data indicates that the commit also writes non-empty + // CommitTimeWriteBatch to memtable, which needs to be committed separately. + WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db, + DBImpl* db_impl, + SequenceNumber prep_seq, + bool includes_data = false) + : db_(db), + db_impl_(db_impl), + prep_seq_(prep_seq), + includes_data_(includes_data) {} + + virtual Status Callback(SequenceNumber commit_seq) { + db_->AddCommitted(prep_seq_, commit_seq); + if (includes_data_) { + // Commit the data that is accompnaied with the commit marker + // TODO(myabandeh): skip AddPrepared + db_->AddPrepared(commit_seq); + db_->AddCommitted(commit_seq, commit_seq); + } + // Publish the sequence number. We can do that here assuming the callback is + // invoked only from one write queue, which would guarantee that the publish + // sequence numbers will be in order, i.e., once a seq is published all the + // seq prior to that are also publishable. + db_impl_->SetLastPublishedSequence(commit_seq); + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + DBImpl* db_impl_; + SequenceNumber prep_seq_; + bool includes_data_; +}; + } // namespace rocksdb #endif // ROCKSDB_LITE