diff --git a/db/db_impl.cc b/db/db_impl.cc index 46b03d27e..cec473b24 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4768,7 +4768,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_wal_time); WriteBatch* merged_batch = nullptr; - if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL()) { + if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() && + write_group[0]->batch->GetWalTerminationPoint().is_cleared()) { + // we simply write the first WriteBatch to WAL if the group only + // contains one batch, that batch should be written to the WAL, + // and the batch is not wanting to be truncated merged_batch = write_group[0]->batch; write_group[0]->log_used = logfile_number_; } else { @@ -4778,7 +4782,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, merged_batch = &tmp_batch_; for (auto writer : write_group) { if (writer->ShouldWriteToWAL()) { - WriteBatchInternal::Append(merged_batch, writer->batch); + WriteBatchInternal::Append(merged_batch, writer->batch, + /*WAL_only*/ true); } writer->log_used = logfile_number_; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index b0ca938a9..425a26b71 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1130,6 +1130,29 @@ TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) { #endif // ROCKSDB_LITE +TEST_F(DBWALTest, WalTermTest) { + Options options = CurrentOptions(); + options.env = env_; + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "foo", "bar")); + + WriteOptions wo; + wo.sync = true; + wo.disableWAL = false; + + WriteBatch batch; + batch.Put("foo", "bar"); + batch.MarkWalTerminationPoint(); + batch.Put("foo2", "bar2"); + + ASSERT_OK(dbfull()->Write(wo, &batch)); + + // make sure we can re-open it. + ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); + ASSERT_EQ("bar", Get(1, "foo")); + ASSERT_EQ("NOT_FOUND", Get(1, "foo2")); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index d4d4932aa..8200de1b6 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -503,6 +503,30 @@ TEST_P(FaultInjectionTest, ManualLogSyncTest) { ASSERT_EQ(value_space, val); } +TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) { + ReadOptions ro; + Options options = CurrentOptions(); + options.env = env_; + + WriteOptions wo; + wo.sync = true; + wo.disableWAL = false; + WriteBatch batch; + batch.Put("cats", "dogs"); + batch.MarkWalTerminationPoint(); + batch.Put("boys", "girls"); + ASSERT_OK(db_->Write(wo, &batch)); + + env_->SetFilesystemActive(false); + NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); + ASSERT_OK(OpenDB()); + + std::string val; + ASSERT_OK(db_->Get(ro, "cats", &val)); + ASSERT_EQ("dogs", val); + ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound()); +} + INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool()); } // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index 2ed3cd85b..078d9e6c6 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -118,13 +118,6 @@ struct BatchContentClassifier : public WriteBatch::Handler { } // anon namespace - -struct SavePoint { - size_t size; // size of rep_ - int count; // count of elements in rep_ - uint32_t content_flags; -}; - struct SavePoints { std::stack stack; }; @@ -143,11 +136,13 @@ WriteBatch::WriteBatch(const std::string& rep) WriteBatch::WriteBatch(const WriteBatch& src) : save_points_(src.save_points_), + wal_term_point_(src.wal_term_point_), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), rep_(src.rep_) {} WriteBatch::WriteBatch(WriteBatch&& src) : save_points_(std::move(src.save_points_)), + wal_term_point_(std::move(src.wal_term_point_)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), rep_(std::move(src.rep_)) {} @@ -191,6 +186,8 @@ void WriteBatch::Clear() { save_points_->stack.pop(); } } + + wal_term_point_.clear(); } int WriteBatch::Count() const { @@ -213,6 +210,12 @@ uint32_t WriteBatch::ComputeContentFlags() const { return rv; } +void WriteBatch::MarkWalTerminationPoint() { + wal_term_point_.size = GetDataSize(); + wal_term_point_.count = Count(); + wal_term_point_.content_flags = content_flags_; +} + bool WriteBatch::HasPut() const { return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; } @@ -729,8 +732,8 @@ void WriteBatch::SetSavePoint() { save_points_ = new SavePoints(); } // Record length and count of current batch of writes. - save_points_->stack.push(SavePoint{ - GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)}); + save_points_->stack.push(SavePoint( + GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed))); } Status WriteBatch::RollbackToSavePoint() { @@ -1252,14 +1255,29 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); } -void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { - SetCount(dst, Count(dst) + Count(src)); +void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, + const bool wal_only) { + size_t src_len; + int src_count; + uint32_t src_flags; + + const SavePoint& batch_end = src->GetWalTerminationPoint(); + + if (wal_only && !batch_end.is_cleared()) { + src_len = batch_end.size - WriteBatchInternal::kHeader; + src_count = batch_end.count; + src_flags = batch_end.content_flags; + } else { + src_len = src->rep_.size() - WriteBatchInternal::kHeader; + src_count = Count(src); + src_flags = src->content_flags_.load(std::memory_order_relaxed); + } + + SetCount(dst, Count(dst) + src_count); assert(src->rep_.size() >= WriteBatchInternal::kHeader); - dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, - src->rep_.size() - WriteBatchInternal::kHeader); + dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len); dst->content_flags_.store( - dst->content_flags_.load(std::memory_order_relaxed) | - src->content_flags_.load(std::memory_order_relaxed), + dst->content_flags_.load(std::memory_order_relaxed) | src_flags, std::memory_order_relaxed); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 1602fdbcc..77e46ecff 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -177,7 +177,8 @@ class WriteBatchInternal { uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false); - static void Append(WriteBatch* dst, const WriteBatch* src); + static void Append(WriteBatch* dst, const WriteBatch* src, + const bool WAL_only = false); // Returns the byte size of appending a WriteBatch with ByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 05e7c0e7b..75d9e3e7f 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -185,6 +185,27 @@ TEST_F(WriteBatchTest, Append) { "Delete(foo)@203", PrintContents(&b1)); ASSERT_EQ(4, b1.Count()); + b2.Clear(); + b2.Put("c", "cc"); + b2.Put("d", "dd"); + b2.MarkWalTerminationPoint(); + b2.Put("e", "ee"); + WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true); + ASSERT_EQ( + "Put(a, va)@200" + "Put(b, vb)@202" + "Put(b, vb)@201" + "Put(c, cc)@204" + "Put(d, dd)@205" + "Delete(foo)@203", + PrintContents(&b1)); + ASSERT_EQ(6, b1.Count()); + ASSERT_EQ( + "Put(c, cc)@0" + "Put(d, dd)@1" + "Put(e, ee)@2", + PrintContents(&b2)); + ASSERT_EQ(3, b2.Count()); } TEST_F(WriteBatchTest, SingleDeletion) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 01f2108f1..89f9e5017 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -39,6 +39,25 @@ class ColumnFamilyHandle; struct SavePoints; struct SliceParts; +struct SavePoint { + size_t size; // size of rep_ + int count; // count of elements in rep_ + uint32_t content_flags; + + SavePoint() : size(0), count(0), content_flags(0) {} + + SavePoint(size_t _size, int _count, uint32_t _flags) + : size(_size), count(_count), content_flags(_flags) {} + + void clear() { + size = 0; + count = 0; + content_flags = 0; + } + + bool is_cleared() const { return (size | count | content_flags) == 0; } +}; + class WriteBatch : public WriteBatchBase { public: explicit WriteBatch(size_t reserved_bytes = 0); @@ -280,10 +299,20 @@ class WriteBatch : public WriteBatchBase { WriteBatch& operator=(const WriteBatch& src); WriteBatch& operator=(WriteBatch&& src); + // marks this point in the WriteBatch as the last record to + // be inserted into the WAL, provided the WAL is enabled + void MarkWalTerminationPoint(); + const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; } + private: friend class WriteBatchInternal; SavePoints* save_points_; + // When sending a WriteBatch through WriteImpl we might want to + // specify that only the first x records of the batch be written to + // the WAL. + SavePoint wal_term_point_; + // For HasXYZ. Mutable to allow lazy computation of results mutable std::atomic content_flags_; diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index fe4a959b3..8676582e9 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -250,25 +250,23 @@ Status TransactionImpl::Commit() { } } else if (commit_prepared) { exec_status_.store(AWAITING_COMMIT); - WriteOptions write_options = write_options_; - // insert prepared batch into Memtable only. - // Memtable will ignore BeginPrepare/EndPrepare markers - // in non recovery mode and simply insert the values - write_options.disableWAL = true; - assert(log_number_ > 0); - s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - nullptr, nullptr, log_number_); - if (!s.ok()) { - return s; - } // We take the commit-time batch and append the Commit marker. - // We then write this batch to both WAL and Memtable. // The Memtable will ignore the Commit marker in non-recovery mode - write_options.disableWAL = false; - WriteBatchInternal::MarkCommit(GetCommitTimeWriteBatch(), name_); - s = db_impl_->WriteImpl(write_options, GetCommitTimeWriteBatch()); + WriteBatch* working_batch = GetCommitTimeWriteBatch(); + WriteBatchInternal::MarkCommit(working_batch, name_); + + // any operations appended to this working_batch will be ignored from WAL + working_batch->MarkWalTerminationPoint(); + + // insert prepared batch into Memtable only skipping WAL. + // Memtable will ignore BeginPrepare/EndPrepare markers + // in non recovery mode and simply insert the values + WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch()); + + s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + log_number_); if (!s.ok()) { return s; }