diff --git a/HISTORY.md b/HISTORY.md index 33721c8f4..0e6db1347 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,9 @@ ## Unreleased +### New Features +* RollbackToSavePoint() in WriteBatch/WriteBatchWithIndex + ### Public API Changes * Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it. * Deprecated purge_redundant_kvs_while_flush option. diff --git a/db/write_batch.cc b/db/write_batch.cc index 53509b90f..44e1cd84d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -23,6 +23,10 @@ // data: uint8[len] #include "rocksdb/write_batch.h" + +#include +#include + #include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" @@ -32,7 +36,6 @@ #include "db/write_batch_internal.h" #include "util/coding.h" #include "util/statistics.h" -#include #include "util/perf_context_imp.h" namespace rocksdb { @@ -40,12 +43,26 @@ namespace rocksdb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; -WriteBatch::WriteBatch(size_t reserved_bytes) { +struct SavePoint { + size_t size; // size of rep_ + int count; // count of elements in rep_ + SavePoint(size_t s, int c) : size(s), count(c) {} +}; + +struct SavePoints { + std::stack stack; +}; + +WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr) { rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); Clear(); } -WriteBatch::~WriteBatch() { } +WriteBatch::~WriteBatch() { + if (save_points_ != nullptr) { + delete save_points_; + } +} WriteBatch::Handler::~Handler() { } @@ -61,6 +78,12 @@ bool WriteBatch::Handler::Continue() { void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); + + if (save_points_ != nullptr) { + while (!save_points_->stack.empty()) { + save_points_->stack.pop(); + } + } } int WriteBatch::Count() const { @@ -188,6 +211,8 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } +size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return kHeader; } + void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); @@ -301,6 +326,38 @@ void WriteBatch::PutLogData(const Slice& blob) { PutLengthPrefixedSlice(&rep_, blob); } +void WriteBatch::SetSavePoint() { + if (save_points_ == nullptr) { + save_points_ = new SavePoints(); + } + // Record length and count of current batch of writes. + save_points_->stack.push(SavePoint(GetDataSize(), Count())); +} + +Status WriteBatch::RollbackToSavePoint() { + if (save_points_ == nullptr || save_points_->stack.size() == 0) { + return Status::NotFound(); + } + + // Pop the most recent savepoint off the stack + SavePoint savepoint = save_points_->stack.top(); + save_points_->stack.pop(); + + assert(savepoint.size <= rep_.size()); + + if (savepoint.size == rep_.size()) { + // No changes to rollback + } else if (savepoint.size == 0) { + // Rollback everything + Clear(); + } else { + rep_.resize(savepoint.size); + WriteBatchInternal::SetCount(this, savepoint.count); + } + + return Status::OK(); +} + namespace { // This class can *only* be used from a single-threaded write thread, because it // calls ColumnFamilyMemTablesImpl::Seek() diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 18f106776..0718057e8 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -92,6 +92,10 @@ class WriteBatchInternal { // this batch. static void SetSequence(WriteBatch* batch, SequenceNumber seq); + // Returns the offset of the first entry in the batch. + // This offset is only valid if the batch is not empty. + static size_t GetFirstOffset(WriteBatch* batch); + static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 2ba43d2d3..077a54fb2 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -446,6 +446,111 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { } #endif // !ROCKSDB_LITE +TEST_F(WriteBatchTest, SavePointTest) { + Status s; + WriteBatch batch; + batch.SetSavePoint(); + + batch.Put("A", "a"); + batch.Put("B", "b"); + batch.SetSavePoint(); + + batch.Put("C", "c"); + batch.Delete("A"); + batch.SetSavePoint(); + batch.SetSavePoint(); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Delete(A)@3" + "Put(A, a)@0" + "Put(B, b)@1" + "Put(C, c)@2", + PrintContents(&batch)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Put(A, a)@0" + "Put(B, b)@1", + PrintContents(&batch)); + + batch.Delete("A"); + batch.Put("B", "bb"); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("", PrintContents(&batch)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("", PrintContents(&batch)); + + batch.Put("D", "d"); + batch.Delete("A"); + + batch.SetSavePoint(); + + batch.Put("A", "aaa"); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Delete(A)@1" + "Put(D, d)@0", + PrintContents(&batch)); + + batch.SetSavePoint(); + + batch.Put("D", "d"); + batch.Delete("A"); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Delete(A)@1" + "Put(D, d)@0", + PrintContents(&batch)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ( + "Delete(A)@1" + "Put(D, d)@0", + PrintContents(&batch)); + + WriteBatch batch2; + + s = batch2.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("", PrintContents(&batch2)); + + batch2.Delete("A"); + batch2.SetSavePoint(); + + s = batch2.RollbackToSavePoint(); + ASSERT_OK(s); + ASSERT_EQ("Delete(A)@0", PrintContents(&batch2)); + + batch2.Clear(); + ASSERT_EQ("", PrintContents(&batch2)); + + batch2.SetSavePoint(); + + batch2.Delete("B"); + ASSERT_EQ("Delete(B)@0", PrintContents(&batch2)); + + batch2.SetSavePoint(); + s = batch2.RollbackToSavePoint(); + ASSERT_OK(s); + ASSERT_EQ("Delete(B)@0", PrintContents(&batch2)); + + s = batch2.RollbackToSavePoint(); + ASSERT_OK(s); + ASSERT_EQ("", PrintContents(&batch2)); + + s = batch2.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("", PrintContents(&batch2)); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 22e7253c1..c1d27c17e 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -157,6 +157,22 @@ class WriteBatchWithIndex : public WriteBatchBase { ColumnFamilyHandle* column_family, const Slice& key, std::string* value); + // Records the state of the batch for future calls to RollbackToSavePoint(). + // May be called multiple times to set multiple save points. + void SetSavePoint() override; + + // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the + // most recent call to SetSavePoint() and removes the most recent save point. + // If there is no previous call to SetSavePoint(), behaves the same as + // Clear(). + // + // Calling RollbackToSavePoint invalidates any open iterators on this batch. + // + // Returns Status::OK() on success, + // Status::NotFound() if no previous call to SetSavePoint(), + // or other Status on corruption. + Status RollbackToSavePoint() override; + private: struct Rep; Rep* rep; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index d76c96f7f..7fb4b6e52 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -25,6 +25,7 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ #define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ +#include #include #include #include "rocksdb/status.h" @@ -34,6 +35,7 @@ namespace rocksdb { class Slice; class ColumnFamilyHandle; +struct SavePoints; struct SliceParts; class WriteBatch : public WriteBatchBase { @@ -101,6 +103,17 @@ class WriteBatch : public WriteBatchBase { // Clear all updates buffered in this batch. void Clear() override; + // Records the state of the batch for future calls to RollbackToSavePoint(). + // May be called multiple times to set multiple save points. + void SetSavePoint() override; + + // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the + // most recent call to SetSavePoint() and removes the most recent save point. + // If there is no previous call to SetSavePoint(), Status::NotFound() + // will be returned. + // Oterwise returns Status::OK(). + Status RollbackToSavePoint() override; + // Support for iterating over the contents of a batch. class Handler { public: @@ -168,10 +181,11 @@ class WriteBatch : public WriteBatchBase { WriteBatch* GetWriteBatch() override { return this; } // Constructor with a serialized string object - explicit WriteBatch(std::string rep): rep_(rep) {} + explicit WriteBatch(std::string rep) : save_points_(nullptr), rep_(rep) {} private: friend class WriteBatchInternal; + SavePoints* save_points_; protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h index cfe3ceb48..d64ffc200 100644 --- a/include/rocksdb/write_batch_base.h +++ b/include/rocksdb/write_batch_base.h @@ -11,6 +11,7 @@ namespace rocksdb { class Slice; +class Status; class ColumnFamilyHandle; class WriteBatch; struct SliceParts; @@ -72,6 +73,16 @@ class WriteBatchBase { // converting any WriteBatchBase(eg WriteBatchWithIndex) into a basic // WriteBatch. virtual WriteBatch* GetWriteBatch() = 0; + + // Records the state of the batch for future calls to RollbackToSavePoint(). + // May be called multiple times to set multiple save points. + virtual void SetSavePoint() = 0; + + // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the + // most recent call to SetSavePoint() and removes the most recent save point. + // If there is no previous call to SetSavePoint(), behaves the same as + // Clear(). + virtual Status RollbackToSavePoint() = 0; }; } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index be8d93ccf..507aff248 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -400,6 +400,11 @@ struct WriteBatchWithIndex::Rep { // Clear all updates buffered in this batch. void Clear(); + void ClearIndex(); + + // Rebuild index by reading all records from the batch. + // Returns non-ok status on corruption. + Status ReBuildIndex(); }; bool WriteBatchWithIndex::Rep::UpdateExistingEntry( @@ -455,13 +460,73 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { void WriteBatchWithIndex::Rep::Clear() { write_batch.Clear(); + ClearIndex(); + } + + void WriteBatchWithIndex::Rep::ClearIndex() { + skip_list.~WriteBatchEntrySkipList(); arena.~Arena(); new (&arena) Arena(); - skip_list.~WriteBatchEntrySkipList(); new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); last_entry_offset = 0; } + Status WriteBatchWithIndex::Rep::ReBuildIndex() { + Status s; + + ClearIndex(); + + if (write_batch.Count() == 0) { + // Nothing to re-index + return s; + } + + size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); + + Slice input(write_batch.Data()); + input.remove_prefix(offset); + + // Loop through all entries in Rep and add each one to the index + int found = 0; + while (s.ok() && !input.empty()) { + Slice key, value, blob; + uint32_t column_family_id = 0; // default + char tag = 0; + + // set offset of current entry for call to AddNewEntry() + last_entry_offset = input.data() - write_batch.Data().data(); + + s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, + &value, &blob); + if (!s.ok()) { + break; + } + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + case kTypeColumnFamilyMerge: + case kTypeMerge: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key)) { + AddNewEntry(column_family_id); + } + break; + case kTypeLogData: + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } + + if (s.ok() && found != write_batch.Count()) { + s = Status::Corruption("WriteBatch has wrong count"); + } + + return s; + } WriteBatchWithIndex::WriteBatchWithIndex( const Comparator* default_index_comparator, size_t reserved_bytes, @@ -640,5 +705,17 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, return s; } +void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } + +Status WriteBatchWithIndex::RollbackToSavePoint() { + Status s = rep->write_batch.RollbackToSavePoint(); + + if (s.ok()) { + s = rep->ReBuildIndex(); + } + + return s; +} + } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 2c0033ce1..f5c141121 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -30,7 +30,12 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, return Status::InvalidArgument("Output parameters cannot be null"); } - if (data_offset >= GetDataSize()) { + if (data_offset == GetDataSize()) { + // reached end of batch. + return Status::NotFound(); + } + + if (data_offset > GetDataSize()) { return Status::InvalidArgument("data offset exceed write batch size"); } Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 70337f885..3e509ca93 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -1364,6 +1364,150 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { } } } + +static std::string PrintContents(WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family) { + std::string result; + + WBWIIterator* iter; + if (column_family == nullptr) { + iter = batch->NewIterator(); + } else { + iter = batch->NewIterator(column_family); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + WriteEntry e = iter->Entry(); + + if (e.type == kPutRecord) { + result.append("PUT("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kMergeRecord) { + result.append("MERGE("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else { + assert(e.type == kDeleteRecord); + result.append("DEL("); + result.append(e.key.ToString()); + result.append(")"); + } + + result.append(","); + iter->Next(); + } + + delete iter; + return result; +} + +TEST_F(WriteBatchWithIndexTest, SavePointTest) { + WriteBatchWithIndex batch; + ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + Status s; + + batch.Put("A", "a"); + batch.Put("B", "b"); + batch.Put("A", "aa"); + batch.Put(&cf1, "A", "a1"); + batch.Delete(&cf1, "B"); + batch.Put(&cf1, "C", "c1"); + + batch.SetSavePoint(); + + batch.Put("C", "cc"); + batch.Put("B", "bb"); + batch.Delete("A"); + batch.Put(&cf1, "B", "b1"); + batch.Delete(&cf1, "A"); + batch.SetSavePoint(); + + batch.Put("A", "aaa"); + batch.Put("A", "xxx"); + batch.Delete("B"); + batch.Put(&cf1, "B", "b2"); + batch.Delete(&cf1, "C"); + batch.SetSavePoint(); + batch.SetSavePoint(); + batch.Delete("D"); + batch.Delete(&cf1, "D"); + + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,DEL(D),", + PrintContents(&batch, nullptr)); + + ASSERT_EQ( + "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," + "DEL(D),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,", + PrintContents(&batch, &cf1)); + + batch.SetSavePoint(); + batch.Put("X", "x"); + + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,PUT(X):x,", + PrintContents(&batch, nullptr)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,", PrintContents(&batch, &cf1)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,", PrintContents(&batch, &cf1)); + + batch.SetSavePoint(); + + batch.Clear(); + ASSERT_EQ("", PrintContents(&batch, nullptr)); + ASSERT_EQ("", PrintContents(&batch, &cf1)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); +} + } // namespace int main(int argc, char** argv) {