diff --git a/db/dbformat.h b/db/dbformat.h index 51f5e4143..220232321 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -39,6 +39,11 @@ enum ValueType : unsigned char { kTypeColumnFamilyMerge = 0x6, // WAL only. kTypeSingleDeletion = 0x7, kTypeColumnFamilySingleDeletion = 0x8, // WAL only. + kTypeBeginPrepareXID = 0x9, // WAL only. + kTypeEndPrepareXID = 0xA, // WAL only. + kTypeCommitXID = 0xB, // WAL only. + kTypeRollbackXID = 0xC, // WAL only. + kTypeNoop = 0xD, // WAL only. kMaxValue = 0x7F // Not used for storing records. }; @@ -478,5 +483,5 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, // input will be advanced to after the record. extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, - Slice* value, Slice* blob); + Slice* value, Slice* blob, Slice* xid); } // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index 8a54432bb..9dcd4c26f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -20,6 +20,11 @@ // kTypeColumnFamilyDeletion varint32 varstring varstring // kTypeColumnFamilySingleDeletion varint32 varstring varstring // kTypeColumnFamilyMerge varint32 varstring varstring +// kTypeBeginPrepareXID varstring +// kTypeEndPrepareXID +// kTypeCommitXID varstring +// kTypeRollbackXID varstring +// kTypeNoop // varstring := // len: varint32 // data: uint8[len] @@ -48,11 +53,15 @@ namespace rocksdb { namespace { enum ContentFlags : uint32_t { - DEFERRED = 1, - HAS_PUT = 2, - HAS_DELETE = 4, - HAS_SINGLE_DELETE = 8, - HAS_MERGE = 16, + DEFERRED = 1 << 0, + HAS_PUT = 1 << 1, + HAS_DELETE = 1 << 2, + HAS_SINGLE_DELETE = 1 << 3, + HAS_MERGE = 1 << 4, + HAS_BEGIN_PREPARE = 1 << 5, + HAS_END_PREPARE = 1 << 6, + HAS_COMMIT = 1 << 7, + HAS_ROLLBACK = 1 << 8, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -77,6 +86,26 @@ struct BatchContentClassifier : public WriteBatch::Handler { content_flags |= ContentFlags::HAS_MERGE; return Status::OK(); } + + Status MarkBeginPrepare() override { + content_flags |= ContentFlags::HAS_BEGIN_PREPARE; + return Status::OK(); + } + + Status MarkEndPrepare(const Slice&) override { + content_flags |= ContentFlags::HAS_END_PREPARE; + return Status::OK(); + } + + Status MarkCommit(const Slice&) override { + content_flags |= ContentFlags::HAS_COMMIT; + return Status::OK(); + } + + Status MarkRollback(const Slice&) override { + content_flags |= ContentFlags::HAS_ROLLBACK; + return Status::OK(); + } }; } // anon namespace @@ -209,9 +238,25 @@ bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) { return GetLengthPrefixedSlice(input, key); } +bool WriteBatch::HasBeginPrepare() const { + return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0; +} + +bool WriteBatch::HasEndPrepare() const { + return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0; +} + +bool WriteBatch::HasCommit() const { + return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0; +} + +bool WriteBatch::HasRollback() const { + return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0; +} + Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, - Slice* value, Slice* blob) { + Slice* value, Slice* blob, Slice* xid) { assert(key != nullptr && value != nullptr); *tag = (*input)[0]; input->remove_prefix(1); @@ -257,6 +302,24 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad WriteBatch Blob"); } break; + case kTypeNoop: + case kTypeBeginPrepareXID: + break; + case kTypeEndPrepareXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad EndPrepare XID"); + } + break; + case kTypeCommitXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad Commit XID"); + } + break; + case kTypeRollbackXID: + if (!GetLengthPrefixedSlice(input, xid)) { + return Status::Corruption("bad Rollback XID"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -270,7 +333,7 @@ Status WriteBatch::Iterate(Handler* handler) const { } input.remove_prefix(WriteBatchInternal::kHeader); - Slice key, value, blob; + Slice key, value, blob, xid; int found = 0; Status s; while (s.ok() && !input.empty() && handler->Continue()) { @@ -278,7 +341,7 @@ Status WriteBatch::Iterate(Handler* handler) const { uint32_t column_family = 0; // default s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, - &blob); + &blob, &xid); if (!s.ok()) { return s; } @@ -315,6 +378,28 @@ Status WriteBatch::Iterate(Handler* handler) const { case kTypeLogData: handler->LogData(blob); break; + case kTypeBeginPrepareXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); + handler->MarkBeginPrepare(); + break; + case kTypeEndPrepareXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); + handler->MarkEndPrepare(xid); + break; + case kTypeCommitXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); + handler->MarkCommit(xid); + break; + case kTypeRollbackXID: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); + handler->MarkRollback(xid); + break; + case kTypeNoop: + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -391,6 +476,47 @@ void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); } +void WriteBatchInternal::InsertNoop(WriteBatch* b) { + b->rep_.push_back(static_cast(kTypeNoop)); +} + +void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) { + // a manually constructed batch can only contain one prepare section + assert(b->rep_[12] == static_cast(kTypeNoop)); + + // all savepoints up to this point are cleared + if (b->save_points_ != nullptr) { + while (!b->save_points_->stack.empty()) { + b->save_points_->stack.pop(); + } + } + + // rewrite noop as begin marker + b->rep_[12] = static_cast(kTypeBeginPrepareXID); + b->rep_.push_back(static_cast(kTypeEndPrepareXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_END_PREPARE | + ContentFlags::HAS_BEGIN_PREPARE, + std::memory_order_relaxed); +} + +void WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast(kTypeCommitXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_COMMIT, + std::memory_order_relaxed); +} + +void WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { + b->rep_.push_back(static_cast(kTypeRollbackXID)); + PutLengthPrefixedSlice(&b->rep_, xid); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_ROLLBACK, + std::memory_order_relaxed); +} + void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, const Slice& key) { WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 3987645ef..80e08d1d0 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -92,6 +92,14 @@ class WriteBatchInternal { static void Merge(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static void MarkEndPrepare(WriteBatch* batch, const Slice& xid); + + static void MarkRollback(WriteBatch* batch, const Slice& xid); + + static void MarkCommit(WriteBatch* batch, const Slice& xid); + + static void InsertNoop(WriteBatch* batch); + // Return the number of entries in the batch. static int Count(const WriteBatch* batch); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 58c7273c3..530cccf92 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -231,6 +231,22 @@ namespace { virtual void LogData(const Slice& blob) override { seen += "LogData(" + blob.ToString() + ")"; } + virtual Status MarkBeginPrepare() override { + seen += "MarkBeginPrepare()"; + return Status::OK(); + } + virtual Status MarkEndPrepare(const Slice& xid) override { + seen += "MarkEndPrepare(" + xid.ToString() + ")"; + return Status::OK(); + } + virtual Status MarkCommit(const Slice& xid) override { + seen += "MarkCommit(" + xid.ToString() + ")"; + return Status::OK(); + } + virtual Status MarkRollback(const Slice& xid) override { + seen += "MarkRollback(" + xid.ToString() + ")"; + return Status::OK(); + } }; } @@ -308,6 +324,31 @@ TEST_F(WriteBatchTest, Blob) { handler.seen); } +TEST_F(WriteBatchTest, PrepareCommit) { + WriteBatch batch; + WriteBatchInternal::InsertNoop(&batch); + batch.Put(Slice("k1"), Slice("v1")); + batch.Put(Slice("k2"), Slice("v2")); + batch.SetSavePoint(); + WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1")); + Status s = batch.RollbackToSavePoint(); + ASSERT_EQ(s, Status::NotFound()); + WriteBatchInternal::MarkCommit(&batch, Slice("xid1")); + WriteBatchInternal::MarkRollback(&batch, Slice("xid1")); + ASSERT_EQ(2, batch.Count()); + + TestHandler handler; + batch.Iterate(&handler); + ASSERT_EQ( + "MarkBeginPrepare()" + "Put(k1, v1)" + "Put(k2, v2)" + "MarkEndPrepare(xid1)" + "MarkCommit(xid1)" + "MarkRollback(xid1)", + handler.seen); +} + // It requires more than 30GB of memory to run the test. With single memory // allocation of more than 30GB. // Not all platform can run it. Also it runs a long time. So disable it. diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index aab12ba02..ccfd67e5e 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -34,7 +34,8 @@ enum WriteType { kMergeRecord, kDeleteRecord, kSingleDeleteRecord, - kLogDataRecord + kLogDataRecord, + kXIDRecord, }; // an entry for Put, Merge, Delete, or SingleDelete entry for write batches. diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index e9bd72b58..2a6ed63bd 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -186,6 +186,23 @@ class WriteBatch : public WriteBatchBase { // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); + virtual Status MarkBeginPrepare() { + return Status::InvalidArgument("MarkBeginPrepare() handler not defined."); + } + + virtual Status MarkEndPrepare(const Slice& xid) { + return Status::InvalidArgument("MarkEndPrepare() handler not defined."); + } + + virtual Status MarkRollback(const Slice& xid) { + return Status::InvalidArgument( + "MarkRollbackPrepare() handler not defined."); + } + + virtual Status MarkCommit(const Slice& xid) { + return Status::InvalidArgument("MarkCommit() handler not defined."); + } + // Continue is called by WriteBatch::Iterate. If it returns false, // iteration is halted. Otherwise, it continues iterating. The default // implementation always returns true. @@ -214,6 +231,18 @@ class WriteBatch : public WriteBatchBase { // Returns trie if MergeCF will be called during Iterate bool HasMerge() const; + // Returns true if MarkBeginPrepare will be called during Iterate + bool HasBeginPrepare() const; + + // Returns true if MarkEndPrepare will be called during Iterate + bool HasEndPrepare() const; + + // Returns trie if MarkCommit will be called during Iterate + bool HasCommit() const; + + // Returns trie if MarkRollback will be called during Iterate + bool HasRollback() const; + using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; } diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index f0b511035..e6c1e6bfb 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1782,24 +1782,24 @@ class InMemoryHandler : public WriteBatch::Handler { return Status::OK(); } - virtual Status MarkBeginPrepare() { + virtual Status MarkBeginPrepare() override { row_ << "BEGIN_PREARE "; return Status::OK(); } - virtual Status MarkEndPrepare(const Slice& xid) { + virtual Status MarkEndPrepare(const Slice& xid) override { row_ << "END_PREPARE("; row_ << LDBCommand::StringToHex(xid.ToString()) << ") "; return Status::OK(); } - virtual Status MarkRollback(const Slice& xid) { + virtual Status MarkRollback(const Slice& xid) override { row_ << "ROLLBACK("; row_ << LDBCommand::StringToHex(xid.ToString()) << ") "; return Status::OK(); } - virtual Status MarkCommit(const Slice& xid) { + virtual Status MarkCommit(const Slice& xid) override { row_ << "COMMIT("; row_ << LDBCommand::StringToHex(xid.ToString()) << ") "; return Status::OK(); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 83b07f4db..e16175449 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -337,13 +337,13 @@ class WBWIIteratorImpl : public WBWIIterator { virtual WriteEntry Entry() const override { WriteEntry ret; - Slice blob; + Slice blob, xid; const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); // this is guaranteed with Valid() assert(iter_entry != nullptr && iter_entry->column_family == column_family_id_); - auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type, - &ret.key, &ret.value, &blob); + auto s = write_batch_->GetEntryFromDataOffset( + iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid); assert(s.ok()); assert(ret.type == kPutRecord || ret.type == kDeleteRecord || ret.type == kSingleDeleteRecord || ret.type == kMergeRecord); @@ -501,7 +501,7 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { // Loop through all entries in Rep and add each one to the index int found = 0; while (s.ok() && !input.empty()) { - Slice key, value, blob; + Slice key, value, blob, xid; uint32_t column_family_id = 0; // default char tag = 0; @@ -509,7 +509,7 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { last_entry_offset = input.data() - write_batch.Data().data(); s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, - &value, &blob); + &value, &blob, &xid); if (!s.ok()) { break; } @@ -529,6 +529,11 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { } break; case kTypeLogData: + case kTypeBeginPrepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + case kTypeNoop: break; default: return Status::Corruption("unknown WriteBatch tag"); diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 89114f02d..e4ea104e3 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -24,10 +24,10 @@ class Statistics; Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, - Slice* value, - Slice* blob) const { + Slice* value, Slice* blob, + Slice* xid) const { if (type == nullptr || Key == nullptr || value == nullptr || - blob == nullptr) { + blob == nullptr || xid == nullptr) { return Status::InvalidArgument("Output parameters cannot be null"); } @@ -42,8 +42,8 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); char tag; uint32_t column_family; - Status s = - ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); + Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, + blob, xid); switch (tag) { case kTypeColumnFamilyValue: @@ -65,6 +65,12 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, case kTypeLogData: *type = kLogDataRecord; break; + case kTypeBeginPrepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + *type = kXIDRecord; + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -183,7 +189,8 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( result = WriteBatchWithIndexInternal::Result::kDeleted; break; } - case kLogDataRecord: { + case kLogDataRecord: + case kXIDRecord: { // ignore break; } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index b45dcadf8..95fdf5aaa 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -58,7 +58,7 @@ class ReadableWriteBatch : public WriteBatch { // Retrieve some information from a write entry in the write batch, given // the start offset of the write entry. Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, - Slice* value, Slice* blob) const; + Slice* value, Slice* blob, Slice* xid) const; }; class WriteBatchEntryComparator {