From 19e3ee64ac6e1bd10bb9d289c11ca0e9d4143ede Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 7 Jan 2014 14:41:42 -0800 Subject: [PATCH] Add column family information to WAL Summary: I have added three new value types: * kTypeColumnFamilyDeletion * kTypeColumnFamilyValue * kTypeColumnFamilyMerge which include column family Varint32 before the data (value, deletion and merge). These values are used only in WAL (not in memtables yet). This endeavour required changing some WriteBatch internals. Test Plan: Added a unittest Reviewers: dhruba, haobo, sdong, kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15045 --- db/db_impl.cc | 6 +-- db/db_iter.cc | 3 ++ db/db_test.cc | 3 ++ db/dbformat.h | 5 ++- db/memtable.cc | 3 ++ db/version_set.cc | 3 ++ db/write_batch.cc | 76 ++++++++++++++++++++++++++--------- db/write_batch_test.cc | 69 +++++++++++++++++++++++++------ include/rocksdb/write_batch.h | 44 +++++++++++--------- 9 files changed, 157 insertions(+), 55 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index e6c1b56dd..0909c5694 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3817,14 +3817,14 @@ Status DBImpl::GetDbIdentity(std::string& identity) { Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Put(column_family, key, value); + batch.Put(column_family.id, key, value); return Write(opt, &batch); } Status DB::Delete(const WriteOptions& opt, const ColumnFamilyHandle& column_family, const Slice& key) { WriteBatch batch; - batch.Delete(column_family, key); + batch.Delete(column_family.id, key); return Write(opt, &batch); } @@ -3832,7 +3832,7 @@ Status DB::Merge(const WriteOptions& opt, const ColumnFamilyHandle& column_family, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Merge(column_family, key, value); + batch.Merge(column_family.id, key, value); return Write(opt, &batch); } diff --git a/db/db_iter.cc b/db/db_iter.cc index 596a9f651..71bb2e57c 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -226,6 +226,9 @@ void DBIter::FindNextUserEntry(bool skipping) { valid_ = true; MergeValuesNewToOld(); // Go to a different state machine return; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/db_test.cc b/db/db_test.cc index 577655b36..3659e8d84 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -519,6 +519,9 @@ class DBTest { case kTypeDeletion: result += "DEL"; break; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/dbformat.h b/db/dbformat.h index 64a2c9f05..82031cf5c 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -29,7 +29,10 @@ enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1, kTypeMerge = 0x2, - kTypeLogData = 0x3 + kTypeLogData = 0x3, + kTypeColumnFamilyDeletion = 0x4, + kTypeColumnFamilyValue = 0x5, + kTypeColumnFamilyMerge = 0x6, }; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular diff --git a/db/memtable.cc b/db/memtable.cc index 675a314ff..2dba364b0 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -247,6 +247,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } break; } + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/version_set.cc b/db/version_set.cc index fbb46404a..f65851329 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -379,6 +379,9 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ } return true; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; diff --git a/db/write_batch.cc b/db/write_batch.cc index 9d3190579..5a5d7e278 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -15,6 +15,9 @@ // kTypeValue varstring varstring // kTypeMerge varstring varstring // kTypeDeletion varstring +// kTypeColumnFamilyValue varint32 varstring varstring +// kTypeColumnFamilyMerge varint32 varstring varstring +// kTypeColumnFamilyDeletion varint32 varstring varstring // varstring := // len: varint32 // data: uint8[len] @@ -87,28 +90,44 @@ Status WriteBatch::Iterate(Handler* handler) const { while (!input.empty() && handler->Continue()) { char tag = input[0]; input.remove_prefix(1); + uint32_t column_family = 0; // default switch (tag) { + case kTypeColumnFamilyValue: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Put"); + } + // intentional fallthrough case kTypeValue: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->PutCF(default_column_family, key, value); + handler->PutCF(column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Put"); } break; + case kTypeColumnFamilyDeletion: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Delete"); + } + // intentional fallthrough case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { - handler->DeleteCF(default_column_family, key); + handler->DeleteCF(column_family, key); found++; } else { return Status::Corruption("bad WriteBatch Delete"); } break; + case kTypeColumnFamilyMerge: + if (!GetVarint32(&input, &column_family)) { + return Status::Corruption("bad WriteBatch Merge"); + } + // intentional fallthrough case kTypeMerge: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->MergeCF(default_column_family, key, value); + handler->MergeCF(column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Merge"); @@ -148,33 +167,53 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -void WriteBatch::Put(const ColumnFamilyHandle& column_family, const Slice& key, +void WriteBatch::Put(uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeValue)); + if (column_family_id == 0) { + // save some data on disk by not writing default column family + rep_.push_back(static_cast(kTypeValue)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyValue)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } -void WriteBatch::Put(const ColumnFamilyHandle& column_family, - const SliceParts& key, const SliceParts& value) { +void WriteBatch::Put(uint32_t column_family_id, const SliceParts& key, + const SliceParts& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeValue)); + if (column_family_id == 0) { + rep_.push_back(static_cast(kTypeValue)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyValue)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSliceParts(&rep_, key); PutLengthPrefixedSliceParts(&rep_, value); } -void WriteBatch::Delete(const ColumnFamilyHandle& column_family, - const Slice& key) { +void WriteBatch::Delete(uint32_t column_family_id, const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeDeletion)); + if (column_family_id == 0) { + rep_.push_back(static_cast(kTypeDeletion)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSlice(&rep_, key); } -void WriteBatch::Merge(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { +void WriteBatch::Merge(uint32_t column_family_id, const Slice& key, + const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeMerge)); + if (column_family_id == 0) { + rep_.push_back(static_cast(kTypeMerge)); + } else { + rep_.push_back(static_cast(kTypeColumnFamilyMerge)); + PutVarint32(&rep_, column_family_id); + } PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } @@ -207,7 +246,7 @@ class MemTableInserter : public WriteBatch::Handler { } } - virtual void PutCF(const ColumnFamilyHandle& column_family, const Slice& key, + virtual void PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { if (options_->inplace_update_support && mem_->Update(sequence_, kTypeValue, key, value)) { @@ -217,13 +256,12 @@ class MemTableInserter : public WriteBatch::Handler { } sequence_++; } - virtual void MergeCF(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { mem_->Add(sequence_, kTypeMerge, key, value); sequence_++; } - virtual void DeleteCF(const ColumnFamilyHandle& column_family, - const Slice& key) { + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { if (filter_deletes_) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index ff9aa63ee..490a4401f 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -56,6 +56,9 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; + case kTypeColumnFamilyDeletion: + case kTypeColumnFamilyValue: + case kTypeColumnFamilyMerge: case kTypeLogData: assert(false); break; @@ -143,17 +146,34 @@ TEST(WriteBatchTest, Append) { namespace { struct TestHandler : public WriteBatch::Handler { std::string seen; - virtual void Put(const Slice& key, const Slice& value) { - seen += "Put(" + key.ToString() + ", " + value.ToString() + ")"; + virtual void PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + seen += "Put(" + key.ToString() + ", " + value.ToString() + ")"; + } else { + seen += "PutCF(" + std::to_string(column_family_id) + ", " + + key.ToString() + ", " + value.ToString() + ")"; + } } - virtual void Merge(const Slice& key, const Slice& value) { - seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")"; + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")"; + } else { + seen += "MergeCF(" + std::to_string(column_family_id) + ", " + + key.ToString() + ", " + value.ToString() + ")"; + } } virtual void LogData(const Slice& blob) { seen += "LogData(" + blob.ToString() + ")"; } - virtual void Delete(const Slice& key) { - seen += "Delete(" + key.ToString() + ")"; + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + if (column_family_id == 0) { + seen += "Delete(" + key.ToString() + ")"; + } else { + seen += "DeleteCF(" + std::to_string(column_family_id) + ", " + + key.ToString() + ")"; + } } }; } @@ -193,21 +213,23 @@ TEST(WriteBatchTest, Continue) { struct Handler : public TestHandler { int num_seen = 0; - virtual void Put(const Slice& key, const Slice& value) { + virtual void PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { ++num_seen; - TestHandler::Put(key, value); + TestHandler::PutCF(column_family_id, key, value); } - virtual void Merge(const Slice& key, const Slice& value) { + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { ++num_seen; - TestHandler::Merge(key, value); + TestHandler::MergeCF(column_family_id, key, value); } virtual void LogData(const Slice& blob) { ++num_seen; TestHandler::LogData(blob); } - virtual void Delete(const Slice& key) { + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { ++num_seen; - TestHandler::Delete(key); + TestHandler::DeleteCF(column_family_id, key); } virtual bool Continue() override { return num_seen < 3; @@ -255,6 +277,29 @@ TEST(WriteBatchTest, PutGatherSlices) { ASSERT_EQ(3, batch.Count()); } +TEST(WriteBatchTest, ColumnFamiliesBatchTest) { + WriteBatch batch; + batch.Put(0, Slice("foo"), Slice("bar")); + batch.Put(2, Slice("twofoo"), Slice("bar2")); + batch.Put(8, Slice("eightfoo"), Slice("bar8")); + batch.Delete(8, Slice("eightfoo")); + batch.Merge(3, Slice("threethree"), Slice("3three")); + batch.Put(0, Slice("foo"), Slice("bar")); + batch.Merge(Slice("omom"), Slice("nom")); + + TestHandler handler; + batch.Iterate(&handler); + ASSERT_EQ( + "Put(foo, bar)" + "PutCF(2, twofoo, bar2)" + "PutCF(8, eightfoo, bar8)" + "DeleteCF(8, eightfoo)" + "MergeCF(3, threethree, 3three)" + "Put(foo, bar)" + "Merge(omom, nom)", + handler.seen); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 9e92f21c5..bc1d63ce4 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -40,33 +40,31 @@ class WriteBatch { ~WriteBatch(); // Store the mapping "key->value" in the database. - void Put(const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& value); + void Put(uint32_t column_family_id, const Slice& key, const Slice& value); void Put(const Slice& key, const Slice& value) { - Put(default_column_family, key, value); + Put(0, key, value); } // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatentations of arrays of // slices. - void Put(const ColumnFamilyHandle& column_family, const SliceParts& key, + void Put(uint32_t column_family_id, const SliceParts& key, const SliceParts& value); void Put(const SliceParts& key, const SliceParts& value) { - Put(default_column_family, key, value); + Put(0, key, value); } // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" - void Merge(const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& value); + void Merge(uint32_t column_family_id, const Slice& key, const Slice& value); void Merge(const Slice& key, const Slice& value) { - Merge(default_column_family, key, value); + Merge(0, key, value); } // If the database contains a mapping for "key", erase it. Else do nothing. - void Delete(const ColumnFamilyHandle& column_family, const Slice& key); + void Delete(uint32_t column_family_id, const Slice& key); void Delete(const Slice& key) { - Delete(default_column_family, key); + Delete(0, key); } // Append a blob of arbitrary size to the records in this batch. The blob will @@ -89,25 +87,31 @@ class WriteBatch { public: virtual ~Handler(); // default implementation will just call Put without column family for - // backwards compatibility - virtual void PutCF(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { - Put(key, value); + // backwards compatibility. If the column family is not default, + // the function is noop + virtual void PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + Put(key, value); + } } virtual void Put(const Slice& key, const Slice& value); // Merge and LogData are not pure virtual. Otherwise, we would break // existing clients of Handler on a source code level. The default // implementation of Merge simply throws a runtime exception. - virtual void MergeCF(const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { - Merge(key, value); + virtual void MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + if (column_family_id == 0) { + Merge(key, value); + } } virtual void Merge(const Slice& key, const Slice& value); // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); - virtual void DeleteCF(const ColumnFamilyHandle& column_family, - const Slice& key) { - Delete(key); + virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + if (column_family_id == 0) { + Delete(key); + } } virtual void Delete(const Slice& key); // Continue is called by WriteBatch::Iterate. If it returns false,