From fe63899d1a122d74f4eedd495dd99d0c20c9d9f0 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 14 Apr 2022 17:12:16 -0700 Subject: [PATCH] Add checks to GetUpdatesSince (#9459) Summary: Make `DB::GetUpdatesSince` return early if told to scan WALs generated by transactions with write-prepared or write-unprepared policies (`seq_per_batch` is true), as indicated by API comment. Also add checks to `TransactionLogIterator` to clarify some conditions. No API change. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9459 Test Plan: make check Closing https://github.com/facebook/rocksdb/issues/1565 Reviewed By: akankshamahajan15 Differential Revision: D33821243 Pulled By: riversand963 fbshipit-source-id: c8b155d020ce0980e2d3b3b1da40b96e65b48d79 --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 5 +++ db/transaction_log_impl.cc | 81 +++++++++++++------------------------- db/transaction_log_impl.h | 19 +++++---- db/wal_manager.cc | 5 +++ db/wal_manager.h | 2 +- include/rocksdb/db.h | 9 +++-- 7 files changed, 57 insertions(+), 65 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index c6d7a5da4..ab56b975b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -23,6 +23,7 @@ ### Behavior changes * Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794). +* Make DB::GetUpdatesSince() return NotSupported early for write-prepared/write-unprepared transactions, as the API contract indicates. ### Public API changes * Exposed APIs to examine results of block cache stats collections in a structured way. In particular, users of `GetMapProperty()` with property `kBlockCacheEntryStats` can now use the functions in `BlockCacheEntryStatsMapKeys` to find stats in the map. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 9ec1677b8..dff9fce50 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3682,6 +3682,11 @@ Status DBImpl::GetUpdatesSince( SequenceNumber seq, std::unique_ptr* iter, const TransactionLogIterator::ReadOptions& read_options) { RecordTick(stats_, GET_UPDATES_SINCE_CALLS); + if (seq_per_batch_) { + return Status::NotSupported( + "This API is not yet compatible with write-prepared/write-unprepared " + "transactions"); + } if (seq > versions_->LastSequence()) { return Status::NotFound("Requested sequence not yet written in the db"); } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index cc53b42ab..15cd8a28d 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -6,9 +6,12 @@ #ifndef ROCKSDB_LITE #include "db/transaction_log_impl.h" + #include + #include "db/write_batch_internal.h" #include "file/sequence_file_reader.h" +#include "util/defer.h" namespace ROCKSDB_NAMESPACE { @@ -24,16 +27,17 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( soptions_(soptions), starting_sequence_number_(seq), files_(std::move(files)), + versions_(versions), + seq_per_batch_(seq_per_batch), + io_tracer_(io_tracer), started_(false), is_valid_(false), current_file_index_(0), current_batch_seq_(0), - current_last_seq_(0), - versions_(versions), - seq_per_batch_(seq_per_batch), - io_tracer_(io_tracer) { + current_last_seq_(0) { assert(files_ != nullptr); assert(versions_ != nullptr); + assert(!seq_per_batch_); current_status_.PermitUncheckedError(); // Clear on start reporter_.env = options_->env; reporter_.info_log = options_->info_log.get(); @@ -94,8 +98,21 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index, Slice record; started_ = false; is_valid_ = false; + // Check invariant of TransactionLogIterator when SeekToStartSequence() + // succeeds. + const Defer defer([this]() { + if (is_valid_) { + assert(current_status_.ok()); + if (starting_sequence_number_ > current_batch_seq_) { + assert(current_batch_seq_ < current_last_seq_); + assert(current_last_seq_ >= starting_sequence_number_); + } + } + }); if (files_->size() <= start_file_index) { return; + } else if (!current_status_.ok()) { + return; } Status s = OpenLogReader(files_->at(static_cast(start_file_index)).get()); @@ -151,6 +168,9 @@ void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index, } void TransactionLogIteratorImpl::Next() { + if (!current_status_.ok()) { + return; + } return NextImpl(false); } @@ -159,7 +179,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { is_valid_ = false; if (!internal && !started_) { // Runs every time until we can seek to the start sequence - return SeekToStartSequence(); + SeekToStartSequence(); } while(true) { assert(current_log_reader_); @@ -249,55 +269,10 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { return SeekToStartSequence(current_file_index_, !seq_per_batch_); } - struct BatchCounter : public WriteBatch::Handler { - SequenceNumber sequence_; - BatchCounter(SequenceNumber sequence) : sequence_(sequence) {} - Status MarkNoop(bool empty_batch) override { - if (!empty_batch) { - sequence_++; - } - return Status::OK(); - } - Status MarkEndPrepare(const Slice&) override { - sequence_++; - return Status::OK(); - } - Status MarkCommit(const Slice&) override { - sequence_++; - return Status::OK(); - } - Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { - ++sequence_; - return Status::OK(); - } - - Status PutCF(uint32_t /*cf*/, const Slice& /*key*/, - const Slice& /*val*/) override { - return Status::OK(); - } - Status DeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override { - return Status::OK(); - } - Status SingleDeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override { - return Status::OK(); - } - Status MergeCF(uint32_t /*cf*/, const Slice& /*key*/, - const Slice& /*val*/) override { - return Status::OK(); - } - Status MarkBeginPrepare(bool) override { return Status::OK(); } - Status MarkRollback(const Slice&) override { return Status::OK(); } - }; - current_batch_seq_ = WriteBatchInternal::Sequence(batch.get()); - if (seq_per_batch_) { - BatchCounter counter(current_batch_seq_); - batch->Iterate(&counter); - current_last_seq_ = counter.sequence_; - } else { - current_last_seq_ = - current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1; - } + assert(!seq_per_batch_); + current_last_seq_ = + current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1; // currentBatchSeq_ can only change here assert(current_last_seq_ <= versions_->LastSequence()); diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index f9718f169..6ec7b14e1 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -81,6 +81,13 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const EnvOptions& soptions_; SequenceNumber starting_sequence_number_; std::unique_ptr files_; + // Used only to get latest seq. num + // TODO(icanadi) can this be just a callback? + VersionSet const* const versions_; + const bool seq_per_batch_; + std::shared_ptr io_tracer_; + + // State variables bool started_; bool is_valid_; // not valid when it starts of. Status current_status_; @@ -104,14 +111,11 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { SequenceNumber current_batch_seq_; // sequence number at start of current batch SequenceNumber current_last_seq_; // last sequence in the current batch - // Used only to get latest seq. num - // TODO(icanadi) can this be just a callback? - VersionSet const* const versions_; - const bool seq_per_batch_; // Reads from transaction log only if the writebatch record has been written bool RestrictedRead(Slice* record); - // Seeks to startingSequenceNumber reading from startFileIndex in files_. - // If strict is set,then must get a batch starting with startingSequenceNumber + // Seeks to starting_sequence_number_ reading from start_file_index in files_. + // If strict is set, then must get a batch starting with + // starting_sequence_number_. void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false); // Implementation of Next. SeekToStartSequence calls it internally with // internal=true to let it find next entry even if it has to jump gaps because @@ -120,10 +124,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { void NextImpl(bool internal = false); // Check if batch is expected, else return false bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq); - // Update current batch if a continuous batch is found, else return false + // Update current batch if a continuous batch is found. void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file); - std::shared_ptr io_tracer_; }; } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/wal_manager.cc b/db/wal_manager.cc index c91d9a355..a5d59422c 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -105,6 +105,11 @@ Status WalManager::GetUpdatesSince( SequenceNumber seq, std::unique_ptr* iter, const TransactionLogIterator::ReadOptions& read_options, VersionSet* version_set) { + if (seq_per_batch_) { + return Status::NotSupported(); + } + + assert(!seq_per_batch_); // Get all sorted Wal Files. // Do binary search and open files and find the seq number. diff --git a/db/wal_manager.h b/db/wal_manager.h index 84de8ae67..743a0ce5f 100644 --- a/db/wal_manager.h +++ b/db/wal_manager.h @@ -113,7 +113,7 @@ class WalManager { // obsolete files will be deleted every this seconds if ttl deletion is // enabled and archive size_limit is disabled. - static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600; + static constexpr uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600; std::shared_ptr io_tracer_; }; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 4dd9fd1c8..59d24c64e 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1484,9 +1484,12 @@ class DB { virtual Status GetCreationTimeOfOldestFile(uint64_t* creation_time) = 0; // Note: this API is not yet consistent with WritePrepared transactions. - // Sets iter to an iterator that is positioned at a write-batch containing - // seq_number. If the sequence number is non existent, it returns an iterator - // at the first available seq_no after the requested seq_no + // + // Sets iter to an iterator that is positioned at a write-batch whose + // sequence number range [start_seq, end_seq] covers seq_number. If no such + // write-batch exists, then iter is positioned at the next write-batch whose + // start_seq > seq_number. + // // Returns Status::OK if iterator is valid // Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to // use this api, else the WAL files will get