diff --git a/Makefile b/Makefile index d89c87f1e..5bf6e34fd 100644 --- a/Makefile +++ b/Makefile @@ -37,34 +37,34 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TESTS = \ arena_test \ + auto_roll_logger_test \ + block_test \ bloom_test \ c_test \ cache_test \ coding_test \ - histogram_test \ corruption_test \ crc32c_test \ db_test \ dbformat_test \ env_test \ + filelock_test \ filename_test \ filter_block_test \ + histogram_test \ log_test \ + manual_compaction_test \ memenv_test \ + merge_test \ + redis_test \ + reduce_levels_test \ skiplist_test \ + stringappend_test \ table_test \ ttl_test \ - block_test \ version_edit_test \ version_set_test \ - reduce_levels_test \ - write_batch_test \ - auto_roll_logger_test \ - filelock_test \ - merge_test \ - redis_test \ - manual_compaction_test \ - stringappend_test + write_batch_test TOOLS = \ sst_dump \ diff --git a/db/db_iter.cc b/db/db_iter.cc index 0d6a2c846..1d8c84427 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -217,6 +217,9 @@ void DBIter::FindNextUserEntry(bool skipping) { // TODO: what if !iter_->Valid() return; break; + case kTypeLogData: + assert(false); + break; } } } diff --git a/db/db_test.cc b/db/db_test.cc index 492ae04be..8cd1dda6b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -457,6 +457,9 @@ class DBTest { case kTypeDeletion: result += "DEL"; break; + case kTypeLogData: + assert(false); + break; } } iter->Next(); @@ -3132,6 +3135,48 @@ TEST(DBTest, TransactionLogIteratorBatchOperations) { } while (ChangeCompactOptions()); } +TEST(DBTest, TransactionLogIteratorBlobs) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + { + WriteBatch batch; + batch.Put("key1", DummyString(1024)); + batch.Put("key2", DummyString(1024)); + batch.PutLogData(Slice("blob1")); + batch.Put("key3", DummyString(1024)); + batch.PutLogData(Slice("blob2")); + batch.Delete("key2"); + dbfull()->Write(WriteOptions(), &batch); + Reopen(&options); + } + + auto res = OpenTransactionLogIter(0)->GetBatch(); + struct Handler : public WriteBatch::Handler { + std::string seen; + virtual void Put(const Slice& key, const Slice& value) { + seen += "Put(" + key.ToString() + ", " + std::to_string(value.size()) + + ")"; + } + virtual void Merge(const Slice& key, const Slice& value) { + seen += "Merge(" + key.ToString() + ", " + std::to_string(value.size()) + + ")"; + } + virtual void LogData(const Slice& blob) { + seen += "LogData(" + blob.ToString() + ")"; + } + virtual void Delete(const Slice& key) { + seen += "Delete(" + key.ToString() + ")"; + } + } handler; + res.writeBatchPtr->Iterate(&handler); + ASSERT_EQ("Put(key1, 1024)" + "Put(key2, 1024)" + "LogData(blob1)" + "Put(key3, 1024)" + "LogData(blob2)" + "Delete(key2)", handler.seen); +} + TEST(DBTest, ReadCompaction) { std::string value(4096, '4'); // a string of size 4K { diff --git a/db/dbformat.h b/db/dbformat.h index 5d596ad1a..0a7b50ed1 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -25,7 +25,8 @@ class InternalKey; enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1, - kTypeMerge = 0x2 + kTypeMerge = 0x2, + kTypeLogData = 0x3 }; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular diff --git a/db/memtable.cc b/db/memtable.cc index 4844bcd05..24d6bd8dd 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -211,6 +211,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } break; } + case kTypeLogData: + assert(false); + break; } } else { // exit loop if user key does not match diff --git a/db/version_set.cc b/db/version_set.cc index 981d7932f..63a58f671 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -330,6 +330,10 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ } } return true; + + case kTypeLogData: + assert(false); + break; } } } diff --git a/db/write_batch.cc b/db/write_batch.cc index a4213cd97..317599542 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -43,6 +43,11 @@ void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) { throw std::runtime_error("Handler::Merge not implemented!"); } +void WriteBatch::Handler::LogData(const Slice& blob) { + // If the user has not specified something to do with blobs, then we ignore + // them. +} + void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); @@ -59,10 +64,9 @@ Status WriteBatch::Iterate(Handler* handler) const { } input.remove_prefix(kHeader); - Slice key, value; + Slice key, value, blob; int found = 0; while (!input.empty()) { - found++; char tag = input[0]; input.remove_prefix(1); switch (tag) { @@ -70,6 +74,7 @@ Status WriteBatch::Iterate(Handler* handler) const { if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { handler->Put(key, value); + found++; } else { return Status::Corruption("bad WriteBatch Put"); } @@ -77,6 +82,7 @@ Status WriteBatch::Iterate(Handler* handler) const { case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { handler->Delete(key); + found++; } else { return Status::Corruption("bad WriteBatch Delete"); } @@ -85,10 +91,18 @@ Status WriteBatch::Iterate(Handler* handler) const { if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { handler->Merge(key, value); + found++; } else { return Status::Corruption("bad WriteBatch Merge"); } break; + case kTypeLogData: + if (GetLengthPrefixedSlice(&input, &blob)) { + handler->LogData(blob); + } else { + return Status::Corruption("bad WriteBatch Blob"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -136,6 +150,10 @@ void WriteBatch::Merge(const Slice& key, const Slice& value) { PutLengthPrefixedSlice(&rep_, value); } +void WriteBatch::PutLogData(const Slice& blob) { + rep_.push_back(static_cast(kTypeLogData)); + PutLengthPrefixedSlice(&rep_, blob); +} namespace { class MemTableInserter : public WriteBatch::Handler { diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 945ef16bd..ce6104c7f 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -50,13 +50,16 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; + case kTypeLogData: + assert(false); + break; } state.append("@"); state.append(NumberToString(ikey.sequence)); } delete iter; if (!s.ok()) { - state.append("ParseError()"); + state.append(s.ToString()); } else if (count != WriteBatchInternal::Count(b)) { state.append("CountMismatch()"); } @@ -97,7 +100,7 @@ TEST(WriteBatchTest, Corruption) { WriteBatchInternal::SetContents(&batch, Slice(contents.data(),contents.size()-1)); ASSERT_EQ("Put(foo, bar)@200" - "ParseError()", + "Corruption: bad WriteBatch Delete", PrintContents(&batch)); } @@ -131,6 +134,50 @@ TEST(WriteBatchTest, Append) { ASSERT_EQ(4, b1.Count()); } +TEST(WriteBatchTest, Blob) { + WriteBatch batch; + batch.Put(Slice("k1"), Slice("v1")); + batch.Put(Slice("k2"), Slice("v2")); + batch.Put(Slice("k3"), Slice("v3")); + batch.PutLogData(Slice("blob1")); + batch.Delete(Slice("k2")); + batch.PutLogData(Slice("blob2")); + batch.Merge(Slice("foo"), Slice("bar")); + ASSERT_EQ(5, batch.Count()); + ASSERT_EQ("Merge(foo, bar)@4" + "Put(k1, v1)@0" + "Delete(k2)@3" + "Put(k2, v2)@1" + "Put(k3, v3)@2", + PrintContents(&batch)); + + struct Handler : public WriteBatch::Handler { + std::string seen; + virtual void Put(const Slice& key, const Slice& value) { + seen += "Put(" + key.ToString() + ", " + value.ToString() + ")"; + } + virtual void Merge(const Slice& key, const Slice& value) { + seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")"; + } + virtual void LogData(const Slice& blob) { + seen += "LogData(" + blob.ToString() + ")"; + } + virtual void Delete(const Slice& key) { + seen += "Delete(" + key.ToString() + ")"; + } + } handler; + batch.Iterate(&handler); + ASSERT_EQ( + "Put(k1, v1)" + "Put(k2, v2)" + "Put(k3, v3)" + "LogData(blob1)" + "Delete(k2)" + "LogData(blob2)" + "Merge(foo, bar)", + handler.seen); +} + } // namespace leveldb int main(int argc, char** argv) { diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 64f4682f3..b2cfc5348 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -43,6 +43,17 @@ class WriteBatch { // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); + // Append a blob of arbitrary to the records in this batch. The blob will be + // stored in the transaction log but not in any other file. In particular, it + // will not be persisted to the SST files. When iterating over this + // WriteBatch, WriteBatch::Handler::LogData will be called with the contents + // of the blob as it is encountered. Blobs, puts, deletes, and merges will be + // encountered in the same order in thich they were inserted. + // + // Example application: add timestamps to the transaction log for use in + // replication. + void PutLogData(const Slice& blob); + // Clear all updates buffered in this batch. void Clear(); @@ -51,10 +62,12 @@ class WriteBatch { public: virtual ~Handler(); virtual void Put(const Slice& key, const Slice& value) = 0; - // Merge is not pure virtual. Otherwise, we would break existing - // clients of Handler on a source code level. - // The default implementation simply throws a runtime exception. + // Merge and LogData are not pure virtual. Otherwise, we would break + // existing clients of Handler on a source code level. The default + // implementation of Merge simply throws a runtime exception. virtual void Merge(const Slice& key, const Slice& value); + // The default implementation of LogData does nothing. + virtual void LogData(const Slice& blob); virtual void Delete(const Slice& key) = 0; }; Status Iterate(Handler* handler) const; @@ -66,7 +79,7 @@ class WriteBatch { int Count() const; // Constructor with a serialized string object - WriteBatch(std::string rep): rep_(rep) {} + explicit WriteBatch(std::string rep): rep_(rep) {} private: friend class WriteBatchInternal;