From aafb377bb5f999d4e065995437cc01627289f904 Mon Sep 17 00:00:00 2001 From: Anvesh Komuravelli Date: Thu, 28 Apr 2022 14:42:00 -0700 Subject: [PATCH] Update protection info on recovered logs data (#9875) Summary: Update protection info on recovered logs data Pull Request resolved: https://github.com/facebook/rocksdb/pull/9875 Test Plan: - Benchmark setup: `TEST_TMPDIR=/dev/shm/100MB_WAL_DB/ ./db_bench -benchmarks=fillrandom -write_buffer_size=1048576000` - Benchmark command: `TEST_TMPDIR=/dev/shm/100MB_WAL_DB/ /usr/bin/time ./db_bench -use_existing_db=true -benchmarks=overwrite -write_buffer_size=1048576000 -writes=1 -report_open_timing=true` - Results before this PR ``` OpenDb: 2350.14 milliseconds OpenDb: 2296.94 milliseconds OpenDb: 2184.29 milliseconds OpenDb: 2167.59 milliseconds OpenDb: 2231.24 milliseconds OpenDb: 2109.57 milliseconds OpenDb: 2197.71 milliseconds OpenDb: 2120.8 milliseconds OpenDb: 2148.12 milliseconds OpenDb: 2207.95 milliseconds ``` - Results after this PR ``` OpenDb: 2424.52 milliseconds OpenDb: 2359.84 milliseconds OpenDb: 2317.68 milliseconds OpenDb: 2339.4 milliseconds OpenDb: 2325.36 milliseconds OpenDb: 2321.06 milliseconds OpenDb: 2353.98 milliseconds OpenDb: 2344.64 milliseconds OpenDb: 2384.09 milliseconds OpenDb: 2428.58 milliseconds ``` Mean regressed 7.2% (2201.4 -> 2359.9) Reviewed By: ajkr Differential Revision: D36012787 Pulled By: akomurav fbshipit-source-id: d2aba09f29c6beb2fd0fe8e1e359be910b4ef02a --- db/db_impl/db_impl_open.cc | 6 +- db/write_batch.cc | 196 +++++++++++++++--- include/rocksdb/write_batch.h | 20 +- tools/ldb_cmd.cc | 5 +- utilities/transactions/write_prepared_txn.cc | 4 +- .../transactions/write_prepared_txn_db.h | 4 +- 6 files changed, 204 insertions(+), 31 deletions(-) diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 0377a7a5c..7a2ff8f49 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -947,7 +947,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // Read all the records and add to a memtable std::string scratch; Slice record; - WriteBatch batch; TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal", /*arg=*/nullptr); @@ -961,10 +960,15 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, continue; } + // We create a new batch and initialize with a valid prot_info_ to store + // the data checksums + WriteBatch batch(0, 0, 8, 0); + status = WriteBatchInternal::SetContents(&batch, record); if (!status.ok()) { return status; } + SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); if (immutable_db_options_.wal_recovery_mode == diff --git a/db/write_batch.cc b/db/write_batch.cc index 8a590a7f2..77e91504e 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -152,14 +152,6 @@ struct SavePoints { std::stack> stack; }; -WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes) - : content_flags_(0), max_bytes_(max_bytes), rep_() { - rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) - ? reserved_bytes - : WriteBatchInternal::kHeader); - rep_.resize(WriteBatchInternal::kHeader); -} - WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t protection_bytes_per_key, size_t default_cf_ts_sz) : content_flags_(0), @@ -580,14 +572,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, s = handler->MarkBeginPrepare(); assert(s.ok()); empty_batch = false; - if (!handler->WriteAfterCommit()) { + if (handler->WriteAfterCommit() == + WriteBatch::Handler::OptionState::kDisabled) { s = Status::NotSupported( "WriteCommitted txn tag when write_after_commit_ is disabled (in " "WritePrepared/WriteUnprepared mode). If it is not due to " "corruption, the WAL must be emptied before changing the " "WritePolicy."); } - if (handler->WriteBeforePrepare()) { + if (handler->WriteBeforePrepare() == + WriteBatch::Handler::OptionState::kEnabled) { s = Status::NotSupported( "WriteCommitted txn tag when write_before_prepare_ is enabled " "(in WriteUnprepared mode). If it is not due to corruption, the " @@ -600,7 +594,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, s = handler->MarkBeginPrepare(); assert(s.ok()); empty_batch = false; - if (handler->WriteAfterCommit()) { + if (handler->WriteAfterCommit() == + WriteBatch::Handler::OptionState::kEnabled) { s = Status::NotSupported( "WritePrepared/WriteUnprepared txn tag when write_after_commit_ " "is enabled (in default WriteCommitted mode). If it is not due " @@ -614,13 +609,15 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, s = handler->MarkBeginPrepare(true /* unprepared */); assert(s.ok()); empty_batch = false; - if (handler->WriteAfterCommit()) { + if (handler->WriteAfterCommit() == + WriteBatch::Handler::OptionState::kEnabled) { s = Status::NotSupported( "WriteUnprepared txn tag when write_after_commit_ is enabled (in " "default WriteCommitted mode). If it is not due to corruption, " "the WAL must be emptied before changing the WritePolicy."); } - if (!handler->WriteBeforePrepare()) { + if (handler->WriteBeforePrepare() == + WriteBatch::Handler::OptionState::kDisabled) { s = Status::NotSupported( "WriteUnprepared txn tag when write_before_prepare_ is disabled " "(in WriteCommitted/WritePrepared mode). If it is not due to " @@ -1494,6 +1491,8 @@ Status WriteBatch::UpdateTimestamps( return s; } +namespace { + class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; @@ -1581,9 +1580,24 @@ class MemTableInserter : public WriteBatch::Handler { return res; } + void DecrementProtectionInfoIdxForTryAgain() { + if (prot_info_ != nullptr) --prot_info_idx_; + } + + void ResetProtectionInfo() { + prot_info_idx_ = 0; + prot_info_ = nullptr; + } + protected: - bool WriteBeforePrepare() const override { return write_before_prepare_; } - bool WriteAfterCommit() const override { return write_after_commit_; } + Handler::OptionState WriteBeforePrepare() const override { + return write_before_prepare_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } + Handler::OptionState WriteAfterCommit() const override { + return write_after_commit_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } public: // cf_mems should not be shared with concurrent inserters @@ -1871,15 +1885,25 @@ class MemTableInserter : public WriteBatch::Handler { Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { const auto* kv_prot_info = NextProtectionInfo(); + Status ret_status; if (kv_prot_info != nullptr) { // Memtable needs seqno, doesn't need CF ID auto mem_kv_prot_info = kv_prot_info->StripC(column_family_id).ProtectS(sequence_); - return PutCFImpl(column_family_id, key, value, kTypeValue, - &mem_kv_prot_info); + ret_status = PutCFImpl(column_family_id, key, value, kTypeValue, + &mem_kv_prot_info); + } else { + ret_status = PutCFImpl(column_family_id, key, value, kTypeValue, + nullptr /* kv_prot_info */); } - return PutCFImpl(column_family_id, key, value, kTypeValue, - nullptr /* kv_prot_info */); + // TODO: this assumes that if TryAgain status is returned to the caller, + // the operation is actually tried again. The proper way to do this is to + // pass a `try_again` parameter to the operation itself and decrement + // prot_info_idx_ based on that + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; } Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key, @@ -1926,6 +1950,9 @@ class MemTableInserter : public WriteBatch::Handler { } else if (ret_status.ok()) { MaybeAdvanceSeq(false /* batch_boundary */); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } @@ -1957,6 +1984,9 @@ class MemTableInserter : public WriteBatch::Handler { ret_status = WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } @@ -1985,6 +2015,9 @@ class MemTableInserter : public WriteBatch::Handler { } else if (ret_status.ok()) { MaybeAdvanceSeq(false /* batch_boundary */); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } assert(ret_status.ok()); @@ -2009,6 +2042,9 @@ class MemTableInserter : public WriteBatch::Handler { ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } @@ -2038,6 +2074,9 @@ class MemTableInserter : public WriteBatch::Handler { } else if (ret_status.ok()) { MaybeAdvanceSeq(false /* batch_boundary */); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } assert(ret_status.ok()); @@ -2092,6 +2131,9 @@ class MemTableInserter : public WriteBatch::Handler { ret_status = WriteBatchInternal::DeleteRange( rebuilding_trx_, column_family_id, begin_key, end_key); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } @@ -2121,6 +2163,9 @@ class MemTableInserter : public WriteBatch::Handler { } else if (ret_status.ok()) { MaybeAdvanceSeq(false /* batch_boundary */); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } assert(ret_status.ok()); @@ -2242,23 +2287,31 @@ class MemTableInserter : public WriteBatch::Handler { ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } return ret_status; } Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { const auto* kv_prot_info = NextProtectionInfo(); + Status ret_status; if (kv_prot_info != nullptr) { // Memtable needs seqno, doesn't need CF ID auto mem_kv_prot_info = kv_prot_info->StripC(column_family_id).ProtectS(sequence_); // Same as PutCF except for value type. - return PutCFImpl(column_family_id, key, value, kTypeBlobIndex, - &mem_kv_prot_info); + ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex, + &mem_kv_prot_info); } else { - return PutCFImpl(column_family_id, key, value, kTypeBlobIndex, - nullptr /* kv_prot_info */); + ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex, + nullptr /* kv_prot_info */); } + if (UNLIKELY(ret_status.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return ret_status; } void CheckMemtableFull() { @@ -2401,6 +2454,7 @@ class MemTableInserter : public WriteBatch::Handler { const auto& batch_info = trx->batches_.begin()->second; // all inserts must reference this trx log number log_number_ref_ = batch_info.log_number_; + ResetProtectionInfo(); s = batch_info.batch_->Iterate(this); log_number_ref_ = 0; } @@ -2422,6 +2476,10 @@ class MemTableInserter : public WriteBatch::Handler { const bool batch_boundry = true; MaybeAdvanceSeq(batch_boundry); + if (UNLIKELY(s.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return s; } @@ -2466,6 +2524,7 @@ class MemTableInserter : public WriteBatch::Handler { return ucmp->timestamp_size(); }); if (s.ok()) { + ResetProtectionInfo(); s = batch_info.batch_->Iterate(this); log_number_ref_ = 0; } @@ -2488,6 +2547,10 @@ class MemTableInserter : public WriteBatch::Handler { constexpr bool batch_boundary = true; MaybeAdvanceSeq(batch_boundary); + if (UNLIKELY(s.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + return s; } @@ -2523,6 +2586,8 @@ class MemTableInserter : public WriteBatch::Handler { } }; +} // namespace + // This function can only be called in these conditions: // 1) During Recovery() // 2) During Write(), in a single-threaded write thread @@ -2613,11 +2678,94 @@ Status WriteBatchInternal::InsertInto( return s; } +namespace { + +// This class updates protection info for a WriteBatch. +class ProtectionInfoUpdater : public WriteBatch::Handler { + public: + explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo* prot_info) + : prot_info_(prot_info) {} + + ~ProtectionInfoUpdater() override {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { + return UpdateProtInfo(cf, key, val, kTypeValue); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return UpdateProtInfo(cf, key, "", kTypeDeletion); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return UpdateProtInfo(cf, key, "", kTypeSingleDeletion); + } + + Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, + const Slice& end_key) override { + return UpdateProtInfo(cf, begin_key, end_key, kTypeRangeDeletion); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override { + return UpdateProtInfo(cf, key, val, kTypeMerge); + } + + Status PutBlobIndexCF(uint32_t cf, const Slice& key, + const Slice& val) override { + return UpdateProtInfo(cf, key, val, kTypeBlobIndex); + } + + Status MarkBeginPrepare(bool /* unprepare */) override { + return Status::OK(); + } + + Status MarkEndPrepare(const Slice& /* xid */) override { + return Status::OK(); + } + + Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); } + + Status MarkCommitWithTimestamp(const Slice& /* xid */, + const Slice& /* ts */) override { + return Status::OK(); + } + + Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); } + + Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); } + + private: + Status UpdateProtInfo(uint32_t cf, const Slice& key, const Slice& val, + const ValueType op_type) { + if (prot_info_) { + prot_info_->entries_.emplace_back( + ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf)); + } + return Status::OK(); + } + + // No copy or move. + ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete; + ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete; + ProtectionInfoUpdater& operator=(const ProtectionInfoUpdater&) = delete; + ProtectionInfoUpdater& operator=(ProtectionInfoUpdater&&) = delete; + + WriteBatch::ProtectionInfo* const prot_info_ = nullptr; +}; + +} // namespace + Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= WriteBatchInternal::kHeader); - assert(b->prot_info_ == nullptr); + b->rep_.assign(contents.data(), contents.size()); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); + + // If we have a prot_info_, update protection info entries for the batch. + if (b->prot_info_) { + ProtectionInfoUpdater prot_info_updater(b->prot_info_.get()); + return b->Iterate(&prot_info_updater); + } + return Status::OK(); } diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index d39727fac..4078ceeaa 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -63,7 +63,9 @@ struct SavePoint { class WriteBatch : public WriteBatchBase { public: - explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0); + explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0) + : WriteBatch(reserved_bytes, max_bytes, 0, 0) {} + // `protection_bytes_per_key` is the number of bytes used to store // protection information for each key entry. Currently supported values are // zero (disabled) and eight. @@ -318,8 +320,17 @@ class WriteBatch : public WriteBatchBase { protected: friend class WriteBatchInternal; - virtual bool WriteAfterCommit() const { return true; } - virtual bool WriteBeforePrepare() const { return false; } + enum class OptionState { + kUnknown, + kDisabled, + kEnabled, + }; + virtual OptionState WriteAfterCommit() const { + return OptionState::kUnknown; + } + virtual OptionState WriteBeforePrepare() const { + return OptionState::kUnknown; + } }; Status Iterate(Handler* handler) const; @@ -402,6 +413,9 @@ class WriteBatch : public WriteBatchBase { struct ProtectionInfo; size_t GetProtectionBytesPerKey() const; + // Clears prot_info_ if there are no entries. + void ClearProtectionInfoIfEmpty(); + private: friend class WriteBatchInternal; friend class LocalSavePoint; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 638fbe262..2228ea47e 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2538,7 +2538,10 @@ class InMemoryHandler : public WriteBatch::Handler { ~InMemoryHandler() override {} protected: - bool WriteAfterCommit() const override { return write_after_commit_; } + Handler::OptionState WriteAfterCommit() const override { + return write_after_commit_ ? Handler::OptionState::kEnabled + : Handler::OptionState::kDisabled; + } private: std::stringstream& row_; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 97a964a23..ce2975354 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -374,7 +374,9 @@ Status WritePreparedTxn::RollbackInternal() { } protected: - bool WriteAfterCommit() const override { return false; } + Handler::OptionState WriteAfterCommit() const override { + return Handler::OptionState::kDisabled; + } } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch, *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), wpt_db_->txn_db_options_.rollback_merge_operands, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 5ae29b3f3..105dfe09f 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -1082,7 +1082,9 @@ struct SubBatchCounter : public WriteBatch::Handler { } Status MarkBeginPrepare(bool) override { return Status::OK(); } Status MarkRollback(const Slice&) override { return Status::OK(); } - bool WriteAfterCommit() const override { return false; } + Handler::OptionState WriteAfterCommit() const override { + return Handler::OptionState::kDisabled; + } }; SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,