From 0307c5fe3ab30da3990032c28b217b4bc6e15dd3 Mon Sep 17 00:00:00 2001 From: Jim Paton Date: Wed, 14 Aug 2013 16:32:46 -0700 Subject: [PATCH] Implement log blobs Summary: This patch adds the ability for the user to add sequences of arbitrary data (blobs) to write batches. These blobs are saved to the log along with everything else in the write batch. You can add multiple blobs per WriteBatch and the ordering of blobs, puts, merges, and deletes are preserved. Blobs are not saves to SST files. RocksDB ignores blobs in every way except for writing them to the log. Before committing this patch, I need to add some test code. But I'm submitting it now so people can comment on the API. Test Plan: make -j32 check Reviewers: dhruba, haobo, vamsi Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D12195 --- Makefile | 20 +++++++------- db/db_iter.cc | 3 +++ db/db_test.cc | 45 +++++++++++++++++++++++++++++++ db/dbformat.h | 3 ++- db/memtable.cc | 3 +++ db/version_set.cc | 4 +++ db/write_batch.cc | 22 +++++++++++++-- db/write_batch_test.cc | 51 +++++++++++++++++++++++++++++++++-- include/leveldb/write_batch.h | 21 ++++++++++++--- 9 files changed, 153 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index d89c87f1e..5bf6e34fd 100644 --- a/Makefile +++ b/Makefile @@ -37,34 +37,34 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TESTS = \ arena_test \ + auto_roll_logger_test \ + block_test \ bloom_test \ c_test \ cache_test \ coding_test \ - histogram_test \ corruption_test \ crc32c_test \ db_test \ dbformat_test \ env_test \ + filelock_test \ filename_test \ filter_block_test \ + histogram_test \ log_test \ + manual_compaction_test \ memenv_test \ + merge_test \ + redis_test \ + reduce_levels_test \ skiplist_test \ + stringappend_test \ table_test \ ttl_test \ - block_test \ version_edit_test \ version_set_test \ - reduce_levels_test \ - write_batch_test \ - auto_roll_logger_test \ - filelock_test \ - merge_test \ - redis_test \ - manual_compaction_test \ - stringappend_test + write_batch_test TOOLS = \ sst_dump \ diff --git a/db/db_iter.cc b/db/db_iter.cc index 0d6a2c846..1d8c84427 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -217,6 +217,9 @@ void DBIter::FindNextUserEntry(bool skipping) { // TODO: what if !iter_->Valid() return; break; + case kTypeLogData: + assert(false); + break; } } } diff --git a/db/db_test.cc b/db/db_test.cc index 492ae04be..8cd1dda6b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -457,6 +457,9 @@ class DBTest { case kTypeDeletion: result += "DEL"; break; + case kTypeLogData: + assert(false); + break; } } iter->Next(); @@ -3132,6 +3135,48 @@ TEST(DBTest, TransactionLogIteratorBatchOperations) { } while (ChangeCompactOptions()); } +TEST(DBTest, TransactionLogIteratorBlobs) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + { + WriteBatch batch; + batch.Put("key1", DummyString(1024)); + batch.Put("key2", DummyString(1024)); + batch.PutLogData(Slice("blob1")); + batch.Put("key3", DummyString(1024)); + batch.PutLogData(Slice("blob2")); + batch.Delete("key2"); + dbfull()->Write(WriteOptions(), &batch); + Reopen(&options); + } + + auto res = OpenTransactionLogIter(0)->GetBatch(); + struct Handler : public WriteBatch::Handler { + std::string seen; + virtual void Put(const Slice& key, const Slice& value) { + seen += "Put(" + key.ToString() + ", " + std::to_string(value.size()) + + ")"; + } + virtual void Merge(const Slice& key, const Slice& value) { + seen += "Merge(" + key.ToString() + ", " + std::to_string(value.size()) + + ")"; + } + virtual void LogData(const Slice& blob) { + seen += "LogData(" + blob.ToString() + ")"; + } + virtual void Delete(const Slice& key) { + seen += "Delete(" + key.ToString() + ")"; + } + } handler; + res.writeBatchPtr->Iterate(&handler); + ASSERT_EQ("Put(key1, 1024)" + "Put(key2, 1024)" + "LogData(blob1)" + "Put(key3, 1024)" + "LogData(blob2)" + "Delete(key2)", handler.seen); +} + TEST(DBTest, ReadCompaction) { std::string value(4096, '4'); // a string of size 4K { diff --git a/db/dbformat.h b/db/dbformat.h index 5d596ad1a..0a7b50ed1 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -25,7 +25,8 @@ class InternalKey; enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1, - kTypeMerge = 0x2 + kTypeMerge = 0x2, + kTypeLogData = 0x3 }; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular diff --git a/db/memtable.cc b/db/memtable.cc index 4844bcd05..24d6bd8dd 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -211,6 +211,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } break; } + case kTypeLogData: + assert(false); + break; } } else { // exit loop if user key does not match diff --git a/db/version_set.cc b/db/version_set.cc index 981d7932f..63a58f671 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -330,6 +330,10 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ } } return true; + + case kTypeLogData: + assert(false); + break; } } } diff --git a/db/write_batch.cc b/db/write_batch.cc index a4213cd97..317599542 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -43,6 +43,11 @@ void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) { throw std::runtime_error("Handler::Merge not implemented!"); } +void WriteBatch::Handler::LogData(const Slice& blob) { + // If the user has not specified something to do with blobs, then we ignore + // them. +} + void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); @@ -59,10 +64,9 @@ Status WriteBatch::Iterate(Handler* handler) const { } input.remove_prefix(kHeader); - Slice key, value; + Slice key, value, blob; int found = 0; while (!input.empty()) { - found++; char tag = input[0]; input.remove_prefix(1); switch (tag) { @@ -70,6 +74,7 @@ Status WriteBatch::Iterate(Handler* handler) const { if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { handler->Put(key, value); + found++; } else { return Status::Corruption("bad WriteBatch Put"); } @@ -77,6 +82,7 @@ Status WriteBatch::Iterate(Handler* handler) const { case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { handler->Delete(key); + found++; } else { return Status::Corruption("bad WriteBatch Delete"); } @@ -85,10 +91,18 @@ Status WriteBatch::Iterate(Handler* handler) const { if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { handler->Merge(key, value); + found++; } else { return Status::Corruption("bad WriteBatch Merge"); } break; + case kTypeLogData: + if (GetLengthPrefixedSlice(&input, &blob)) { + handler->LogData(blob); + } else { + return Status::Corruption("bad WriteBatch Blob"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -136,6 +150,10 @@ void WriteBatch::Merge(const Slice& key, const Slice& value) { PutLengthPrefixedSlice(&rep_, value); } +void WriteBatch::PutLogData(const Slice& blob) { + rep_.push_back(static_cast(kTypeLogData)); + PutLengthPrefixedSlice(&rep_, blob); +} namespace { class MemTableInserter : public WriteBatch::Handler { diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 945ef16bd..ce6104c7f 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -50,13 +50,16 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; + case kTypeLogData: + assert(false); + break; } state.append("@"); state.append(NumberToString(ikey.sequence)); } delete iter; if (!s.ok()) { - state.append("ParseError()"); + state.append(s.ToString()); } else if (count != WriteBatchInternal::Count(b)) { state.append("CountMismatch()"); } @@ -97,7 +100,7 @@ TEST(WriteBatchTest, Corruption) { WriteBatchInternal::SetContents(&batch, Slice(contents.data(),contents.size()-1)); ASSERT_EQ("Put(foo, bar)@200" - "ParseError()", + "Corruption: bad WriteBatch Delete", PrintContents(&batch)); } @@ -131,6 +134,50 @@ TEST(WriteBatchTest, Append) { ASSERT_EQ(4, b1.Count()); } +TEST(WriteBatchTest, Blob) { + WriteBatch batch; + batch.Put(Slice("k1"), Slice("v1")); + batch.Put(Slice("k2"), Slice("v2")); + batch.Put(Slice("k3"), Slice("v3")); + batch.PutLogData(Slice("blob1")); + batch.Delete(Slice("k2")); + batch.PutLogData(Slice("blob2")); + batch.Merge(Slice("foo"), Slice("bar")); + ASSERT_EQ(5, batch.Count()); + ASSERT_EQ("Merge(foo, bar)@4" + "Put(k1, v1)@0" + "Delete(k2)@3" + "Put(k2, v2)@1" + "Put(k3, v3)@2", + PrintContents(&batch)); + + struct Handler : public WriteBatch::Handler { + std::string seen; + virtual void Put(const Slice& key, const Slice& value) { + seen += "Put(" + key.ToString() + ", " + value.ToString() + ")"; + } + virtual void Merge(const Slice& key, const Slice& value) { + seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")"; + } + virtual void LogData(const Slice& blob) { + seen += "LogData(" + blob.ToString() + ")"; + } + virtual void Delete(const Slice& key) { + seen += "Delete(" + key.ToString() + ")"; + } + } handler; + batch.Iterate(&handler); + ASSERT_EQ( + "Put(k1, v1)" + "Put(k2, v2)" + "Put(k3, v3)" + "LogData(blob1)" + "Delete(k2)" + "LogData(blob2)" + "Merge(foo, bar)", + handler.seen); +} + } // namespace leveldb int main(int argc, char** argv) { diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 64f4682f3..b2cfc5348 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -43,6 +43,17 @@ class WriteBatch { // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); + // Append a blob of arbitrary to the records in this batch. The blob will be + // stored in the transaction log but not in any other file. In particular, it + // will not be persisted to the SST files. When iterating over this + // WriteBatch, WriteBatch::Handler::LogData will be called with the contents + // of the blob as it is encountered. Blobs, puts, deletes, and merges will be + // encountered in the same order in thich they were inserted. + // + // Example application: add timestamps to the transaction log for use in + // replication. + void PutLogData(const Slice& blob); + // Clear all updates buffered in this batch. void Clear(); @@ -51,10 +62,12 @@ class WriteBatch { public: virtual ~Handler(); virtual void Put(const Slice& key, const Slice& value) = 0; - // Merge is not pure virtual. Otherwise, we would break existing - // clients of Handler on a source code level. - // The default implementation simply throws a runtime exception. + // Merge and LogData are not pure virtual. Otherwise, we would break + // existing clients of Handler on a source code level. The default + // implementation of Merge simply throws a runtime exception. virtual void Merge(const Slice& key, const Slice& value); + // The default implementation of LogData does nothing. + virtual void LogData(const Slice& blob); virtual void Delete(const Slice& key) = 0; }; Status Iterate(Handler* handler) const; @@ -66,7 +79,7 @@ class WriteBatch { int Count() const; // Constructor with a serialized string object - WriteBatch(std::string rep): rep_(rep) {} + explicit WriteBatch(std::string rep): rep_(rep) {} private: friend class WriteBatchInternal;