From 28b5c760046a1341dc5471254fc8d80bfb9853d7 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 18 Aug 2014 15:19:17 -0700 Subject: [PATCH] WriteBatchWithIndex: a wrapper of WriteBatch, with a searchable index Summary: Add WriteBatchWithIndex so that a user can query data out of a WriteBatch, to support MongoDB's read-its-own-write. WriteBatchWithIndex uses a skiplist to store the binary index. The index stores the offset of the entry in the write batch. When searching for a key, the key for the entry is read by read the entry from the write batch from the offset. Define a new iterator class for querying data out of WriteBatchWithIndex. A user can create an iterator of the write batch for one column family, seek to a key and keep calling Next() to see next entries. I will add more unit tests if people are OK about this API. Test Plan: make all check Add unit tests. Reviewers: yhchiang, igor, MarkCallaghan, ljin Reviewed By: ljin Subscribers: dhruba, leveldb, xjin Differential Revision: https://reviews.facebook.net/D21381 --- HISTORY.md | 1 + Makefile | 6 +- db/column_family.cc | 9 + db/column_family.h | 2 + db/db_test.cc | 26 +- db/dbformat.h | 8 + db/write_batch.cc | 118 ++++--- db/write_batch_test.cc | 83 +++++ .../utilities/write_batch_with_index.h | 102 ++++++ include/rocksdb/write_batch.h | 1 + .../write_batch_with_index.cc | 301 ++++++++++++++++++ .../write_batch_with_index_test.cc | 235 ++++++++++++++ 12 files changed, 835 insertions(+), 57 deletions(-) create mode 100644 include/rocksdb/utilities/write_batch_with_index.h create mode 100644 utilities/write_batch_with_index/write_batch_with_index.cc create mode 100644 utilities/write_batch_with_index/write_batch_with_index_test.cc diff --git a/HISTORY.md b/HISTORY.md index f8b8c548d..c9d3c5663 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ### Unreleased ### New Features +* Add include/utilities/write_batch_with_index.h, providing a utilitiy class to query data out of WriteBatch when building it. ### Public API changes * The Prefix Extractor used with V2 compaction filters is now passed user key to SliceTransform::Transform instead of unparsed RocksDB key. diff --git a/Makefile b/Makefile index 1bd202bc9..b380c6f58 100644 --- a/Makefile +++ b/Makefile @@ -121,7 +121,8 @@ TESTS = \ options_test \ cuckoo_table_builder_test \ cuckoo_table_reader_test \ - cuckoo_table_db_test + cuckoo_table_db_test \ + write_batch_with_index_test TOOLS = \ sst_dump \ @@ -375,6 +376,9 @@ spatial_db_test: utilities/spatialdb/spatial_db_test.o $(LIBOBJECTS) $(TESTHARNE ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/column_family.cc b/db/column_family.cc index d4467eea0..ac0150b83 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -631,4 +631,13 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { return &handle_; } +uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { + uint32_t column_family_id = 0; + if (column_family != nullptr) { + auto cfh = reinterpret_cast(column_family); + column_family_id = cfh->GetID(); + } + return column_family_id; +} + } // namespace rocksdb diff --git a/db/column_family.h b/db/column_family.h index ecd9f21fb..b49ef2f97 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -457,4 +457,6 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { ColumnFamilyHandleInternal handle_; }; +extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family); + } // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index ee84ba975..c3dbaabaa 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -30,6 +30,7 @@ #include "rocksdb/table.h" #include "rocksdb/options.h" #include "rocksdb/table_properties.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "table/block_based_table_factory.h" #include "table/plain_table_factory.h" #include "util/hash.h" @@ -6429,13 +6430,26 @@ static void MTThreadBody(void* arg) { // into each of the CFs // We add some padding for force compactions. int unique_id = rnd.Uniform(1000000); - WriteBatch batch; - for (int cf = 0; cf < kColumnFamilies; ++cf) { - snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id, - static_cast(counter), cf, unique_id); - batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf)); + + // Half of the time directly use WriteBatch. Half of the time use + // WriteBatchWithIndex. + if (rnd.OneIn(2)) { + WriteBatch batch; + for (int cf = 0; cf < kColumnFamilies; ++cf) { + snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id, + static_cast(counter), cf, unique_id); + batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf)); + } + ASSERT_OK(db->Write(WriteOptions(), &batch)); + } else { + WriteBatchWithIndex batch(db->GetOptions().comparator); + for (int cf = 0; cf < kColumnFamilies; ++cf) { + snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id, + static_cast(counter), cf, unique_id); + batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf)); + } + ASSERT_OK(db->Write(WriteOptions(), batch.GetWriteBatch())); } - ASSERT_OK(db->Write(WriteOptions(), &batch)); } else { // Read a value and verify that it matches the pattern written above // and that writes to all column families were atomic (unique_id is the diff --git a/db/dbformat.h b/db/dbformat.h index b6a6c7a35..6b15c47c4 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -401,4 +401,12 @@ class InternalKeySliceTransform : public SliceTransform { const SliceTransform* const transform_; }; +// Read record from a write batch piece from input. +// tag, column_family, key, value and blob are return values. Callers own the +// Slice they point to. +// Tag is defined as ValueType. +// input will be advanced to after the record. +extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, + uint32_t* column_family, Slice* key, + Slice* value, Slice* blob); } // namespace rocksdb diff --git a/db/write_batch.cc b/db/write_batch.cc index dc72a1138..fdc0e2c6e 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -27,6 +27,7 @@ #include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" +#include "db/column_family.h" #include "db/memtable.h" #include "db/snapshot.h" #include "db/write_batch_internal.h" @@ -80,6 +81,58 @@ int WriteBatch::Count() const { return WriteBatchInternal::Count(this); } +Status ReadRecordFromWriteBatch(Slice* input, char* tag, + uint32_t* column_family, Slice* key, + Slice* value, Slice* blob) { + assert(key != nullptr && value != nullptr); + *tag = (*input)[0]; + input->remove_prefix(1); + *column_family = 0; // default + switch (*tag) { + case kTypeColumnFamilyValue: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch Put"); + } + // intentional fallthrough + case kTypeValue: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch Put"); + } + break; + case kTypeColumnFamilyDeletion: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch Delete"); + } + // intentional fallthrough + case kTypeDeletion: + if (!GetLengthPrefixedSlice(input, key)) { + return Status::Corruption("bad WriteBatch Delete"); + } + break; + case kTypeColumnFamilyMerge: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch Merge"); + } + // intentional fallthrough + case kTypeMerge: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch Merge"); + } + break; + case kTypeLogData: + assert(blob != nullptr); + if (!GetLengthPrefixedSlice(input, blob)) { + return Status::Corruption("bad WriteBatch Blob"); + } + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + return Status::OK(); +} + Status WriteBatch::Iterate(Handler* handler) const { Slice input(rep_); if (input.size() < kHeader) { @@ -91,57 +144,33 @@ Status WriteBatch::Iterate(Handler* handler) const { int found = 0; Status s; while (s.ok() && !input.empty() && handler->Continue()) { - char tag = input[0]; - input.remove_prefix(1); + char tag = 0; uint32_t column_family = 0; // default + + s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value, + &blob); + if (!s.ok()) { + return s; + } + switch (tag) { case kTypeColumnFamilyValue: - if (!GetVarint32(&input, &column_family)) { - return Status::Corruption("bad WriteBatch Put"); - } - // intentional fallthrough case kTypeValue: - if (GetLengthPrefixedSlice(&input, &key) && - GetLengthPrefixedSlice(&input, &value)) { - s = handler->PutCF(column_family, key, value); - found++; - } else { - return Status::Corruption("bad WriteBatch Put"); - } + s = handler->PutCF(column_family, key, value); + found++; break; case kTypeColumnFamilyDeletion: - if (!GetVarint32(&input, &column_family)) { - return Status::Corruption("bad WriteBatch Delete"); - } - // intentional fallthrough case kTypeDeletion: - if (GetLengthPrefixedSlice(&input, &key)) { - s = handler->DeleteCF(column_family, key); - found++; - } else { - return Status::Corruption("bad WriteBatch Delete"); - } + s = handler->DeleteCF(column_family, key); + found++; break; case kTypeColumnFamilyMerge: - if (!GetVarint32(&input, &column_family)) { - return Status::Corruption("bad WriteBatch Merge"); - } - // intentional fallthrough case kTypeMerge: - if (GetLengthPrefixedSlice(&input, &key) && - GetLengthPrefixedSlice(&input, &value)) { - s = handler->MergeCF(column_family, key, value); - found++; - } else { - return Status::Corruption("bad WriteBatch Merge"); - } + s = handler->MergeCF(column_family, key, value); + found++; break; case kTypeLogData: - if (GetLengthPrefixedSlice(&input, &blob)) { - handler->LogData(blob); - } else { - return Status::Corruption("bad WriteBatch Blob"); - } + handler->LogData(blob); break; default: return Status::Corruption("unknown WriteBatch tag"); @@ -186,17 +215,6 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, PutLengthPrefixedSlice(&b->rep_, value); } -namespace { -inline uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { - uint32_t column_family_id = 0; - if (column_family != nullptr) { - auto cfh = reinterpret_cast(column_family); - column_family_id = cfh->GetID(); - } - return column_family_id; -} -} // namespace - void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index febd35c05..1d30552b3 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -15,6 +15,7 @@ #include "db/write_batch_internal.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "util/logging.h" #include "util/testharness.h" @@ -316,6 +317,88 @@ TEST(WriteBatchTest, ColumnFamiliesBatchTest) { handler.seen); } +TEST(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { + WriteBatchWithIndex batch(BytewiseComparator(), 20); + ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8); + batch.Put(&zero, Slice("foo"), Slice("bar")); + batch.Put(&two, Slice("twofoo"), Slice("bar2")); + batch.Put(&eight, Slice("eightfoo"), Slice("bar8")); + batch.Delete(&eight, Slice("eightfoo")); + batch.Merge(&three, Slice("threethree"), Slice("3three")); + batch.Put(&zero, Slice("foo"), Slice("bar")); + batch.Merge(Slice("omom"), Slice("nom")); + + std::unique_ptr iter; + + iter.reset(batch.NewIterator(&eight)); + iter->Seek("eightfoo"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type); + ASSERT_EQ("eightfoo", iter->Entry().key.ToString()); + ASSERT_EQ("bar8", iter->Entry().value.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kDeleteRecord, iter->Entry().type); + ASSERT_EQ("eightfoo", iter->Entry().key.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter.reset(batch.NewIterator()); + iter->Seek("gggg"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type); + ASSERT_EQ("omom", iter->Entry().key.ToString()); + ASSERT_EQ("nom", iter->Entry().value.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter.reset(batch.NewIterator(&zero)); + iter->Seek("foo"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type); + ASSERT_EQ("foo", iter->Entry().key.ToString()); + ASSERT_EQ("bar", iter->Entry().value.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type); + ASSERT_EQ("foo", iter->Entry().key.ToString()); + ASSERT_EQ("bar", iter->Entry().value.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type); + ASSERT_EQ("omom", iter->Entry().key.ToString()); + ASSERT_EQ("nom", iter->Entry().value.ToString()); + + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + TestHandler handler; + batch.GetWriteBatch()->Iterate(&handler); + ASSERT_EQ( + "Put(foo, bar)" + "PutCF(2, twofoo, bar2)" + "PutCF(8, eightfoo, bar8)" + "DeleteCF(8, eightfoo)" + "MergeCF(3, threethree, 3three)" + "Put(foo, bar)" + "Merge(omom, nom)", + handler.seen); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h new file mode 100644 index 000000000..c09f53d11 --- /dev/null +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -0,0 +1,102 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// A WriteBatchWithIndex with a binary searchable index built for all the keys +// inserted. + +#pragma once + +#include "rocksdb/status.h" +#include "rocksdb/slice.h" +#include "rocksdb/write_batch.h" + +namespace rocksdb { + +class ColumnFamilyHandle; +struct SliceParts; +class Comparator; + +enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord }; + +// an entry for Put, Merge or Delete entry for write batches. Used in +// WBWIIterator. +struct WriteEntry { + WriteType type; + Slice key; + Slice value; +}; + +// Iterator of one column family out of a WriteBatchWithIndex. +class WBWIIterator { + public: + virtual ~WBWIIterator() {} + + virtual bool Valid() const = 0; + + virtual void Seek(const Slice& key) = 0; + + virtual void Next() = 0; + + virtual const WriteEntry& Entry() const = 0; + + virtual Status status() const = 0; +}; + +// A WriteBatchWithIndex with a binary searchable index built for all the keys +// inserted. +// In Put(), Merge() or Delete(), the same function of the wrapped will be +// called. At the same time, indexes will be built. +// By calling GetWriteBatch(), a user will get the WriteBatch for the data +// they inserted, which can be used for DB::Write(). +// A user can call NewIterator() to create an iterator. +class WriteBatchWithIndex { + public: + // index_comparator indicates the order when iterating data in the write + // batch. Technically, it doesn't have to be the same as the one used in + // the DB. + // reserved_bytes: reserved bytes in underlying WriteBatch + explicit WriteBatchWithIndex(const Comparator* index_comparator, + size_t reserved_bytes = 0); + virtual ~WriteBatchWithIndex(); + + WriteBatch* GetWriteBatch(); + + virtual void Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value); + + virtual void Put(const Slice& key, const Slice& value); + + virtual void Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value); + + virtual void Merge(const Slice& key, const Slice& value); + + virtual void PutLogData(const Slice& blob); + + virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key); + virtual void Delete(const Slice& key); + + virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); + + virtual void Delete(const SliceParts& key); + + // Create an iterator of a column family. User can call iterator.Seek() to + // search to the next entry of or after a key. Keys will be iterated in the + // order given by index_comparator. For multiple updates on the same key, + // each update will be returned as a separate entry, in the order of update + // time. + virtual WBWIIterator* NewIterator(ColumnFamilyHandle* column_family); + // Create an iterator of the default column family. + virtual WBWIIterator* NewIterator(); + + private: + struct Rep; + Rep* rep; +}; + +} // namespace rocksdb diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 3272fd2f9..db440be02 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -152,6 +152,7 @@ class WriteBatch { private: friend class WriteBatchInternal; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ // Intentionally copyable diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc new file mode 100644 index 000000000..68b3d3970 --- /dev/null +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -0,0 +1,301 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/utilities/write_batch_with_index.h" +#include "rocksdb/comparator.h" +#include "db/column_family.h" +#include "db/skiplist.h" +#include "util/arena.h" + +namespace rocksdb { +namespace { +class ReadableWriteBatch : public WriteBatch { + public: + explicit ReadableWriteBatch(size_t reserved_bytes = 0) + : WriteBatch(reserved_bytes) {} + // Retrieve some information from a write entry in the write batch, given + // the start offset of the write entry. + Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, + Slice* value, Slice* blob) const; +}; +} // namespace + +// Key used by skip list, as the binary searchable index of WriteBatchWithIndex. +struct WriteBatchIndexEntry { + WriteBatchIndexEntry(size_t o, uint32_t c) + : offset(o), column_family(c), search_key(nullptr) {} + WriteBatchIndexEntry(const Slice* sk, uint32_t c) + : offset(0), column_family(c), search_key(sk) {} + + size_t offset; // offset of an entry in write batch's string buffer. + uint32_t column_family; // column family of the entry + const Slice* search_key; // if not null, instead of reading keys from + // write batch, use it to compare. This is used + // for lookup key. +}; + +class WriteBatchEntryComparator { + public: + WriteBatchEntryComparator(const Comparator* comparator, + const ReadableWriteBatch* write_batch) + : comparator_(comparator), write_batch_(write_batch) {} + // Compare a and b. Return a negative value if a is less than b, 0 if they + // are equal, and a positive value if a is greater than b + int operator()(const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const; + + private: + const Comparator* comparator_; + const ReadableWriteBatch* write_batch_; +}; + +typedef SkipList + WriteBatchEntrySkipList; + +struct WriteBatchWithIndex::Rep { + Rep(const Comparator* index_comparator, size_t reserved_bytes = 0) + : write_batch(reserved_bytes), + comparator(index_comparator, &write_batch), + skip_list(comparator, &arena) {} + ReadableWriteBatch write_batch; + WriteBatchEntryComparator comparator; + Arena arena; + WriteBatchEntrySkipList skip_list; + + WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) { + return GetEntryWithCfId(GetColumnFamilyID(column_family)); + } + + WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) { + auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); + auto* index_entry = new (mem) + WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id); + return index_entry; + } +}; + +class WBWIIteratorImpl : public WBWIIterator { + public: + WBWIIteratorImpl(uint32_t column_family_id, + WriteBatchEntrySkipList* skip_list, + const ReadableWriteBatch* write_batch) + : column_family_id_(column_family_id), + skip_list_iter_(skip_list), + write_batch_(write_batch), + valid_(false) {} + + virtual ~WBWIIteratorImpl() {} + + virtual bool Valid() const override { return valid_; } + + virtual void Seek(const Slice& key) override { + valid_ = true; + WriteBatchIndexEntry search_entry(&key, column_family_id_); + skip_list_iter_.Seek(&search_entry); + ReadEntry(); + } + + virtual void Next() override { + skip_list_iter_.Next(); + ReadEntry(); + } + + virtual const WriteEntry& Entry() const override { return current_; } + + virtual Status status() const override { return status_; } + + private: + uint32_t column_family_id_; + WriteBatchEntrySkipList::Iterator skip_list_iter_; + const ReadableWriteBatch* write_batch_; + Status status_; + bool valid_; + WriteEntry current_; + + void ReadEntry() { + if (!status_.ok() || !skip_list_iter_.Valid()) { + valid_ = false; + return; + } + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + if (iter_entry == nullptr || + iter_entry->column_family != column_family_id_) { + valid_ = false; + return; + } + Slice blob; + status_ = write_batch_->GetEntryFromDataOffset( + iter_entry->offset, ¤t_.type, ¤t_.key, ¤t_.value, + &blob); + if (!status_.ok()) { + valid_ = false; + } else if (current_.type != kPutRecord && current_.type != kDeleteRecord && + current_.type != kMergeRecord) { + valid_ = false; + status_ = Status::Corruption("write batch index is corrupted"); + } + } +}; + +Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, + WriteType* type, Slice* Key, + Slice* value, + Slice* blob) const { + if (type == nullptr || Key == nullptr || value == nullptr || + blob == nullptr) { + return Status::InvalidArgument("Output parameters cannot be null"); + } + + if (data_offset >= GetDataSize()) { + return Status::InvalidArgument("data offset exceed write batch size"); + } + Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); + char tag; + uint32_t column_family; + Status s = + ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + *type = kPutRecord; + break; + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + *type = kDeleteRecord; + break; + case kTypeColumnFamilyMerge: + case kTypeMerge: + *type = kMergeRecord; + break; + case kTypeLogData: + *type = kLogDataRecord; + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + return Status::OK(); +} + +WriteBatchWithIndex::WriteBatchWithIndex(const Comparator* index_comparator, + size_t reserved_bytes) + : rep(new Rep(index_comparator, reserved_bytes)) {} + +WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; } + +WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } + +WBWIIterator* WriteBatchWithIndex::NewIterator() { + return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); +} + +WBWIIterator* WriteBatchWithIndex::NewIterator( + ColumnFamilyHandle* column_family) { + return new WBWIIteratorImpl(GetColumnFamilyID(column_family), + &(rep->skip_list), &rep->write_batch); +} + +void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + auto* index_entry = rep->GetEntry(column_family); + rep->write_batch.Put(column_family, key, value); + rep->skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { + auto* index_entry = rep->GetEntryWithCfId(0); + rep->write_batch.Put(key, value); + rep->skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + auto* index_entry = rep->GetEntry(column_family); + rep->write_batch.Merge(column_family, key, value); + rep->skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { + auto* index_entry = rep->GetEntryWithCfId(0); + rep->write_batch.Merge(key, value); + rep->skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::PutLogData(const Slice& blob) { + rep->write_batch.PutLogData(blob); +} + +void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, + const Slice& key) { + auto* index_entry = rep->GetEntry(column_family); + rep->write_batch.Delete(column_family, key); + rep->skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::Delete(const Slice& key) { + auto* index_entry = rep->GetEntryWithCfId(0); + rep->write_batch.Delete(key); + rep->skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + auto* index_entry = rep->GetEntry(column_family); + rep->write_batch.Delete(column_family, key); + rep->skip_list.Insert(index_entry); +} + +void WriteBatchWithIndex::Delete(const SliceParts& key) { + auto* index_entry = rep->GetEntryWithCfId(0); + rep->write_batch.Delete(key); + rep->skip_list.Insert(index_entry); +} + +int WriteBatchEntryComparator::operator()( + const WriteBatchIndexEntry* entry1, + const WriteBatchIndexEntry* entry2) const { + if (entry1->column_family > entry2->column_family) { + return 1; + } else if (entry1->column_family < entry2->column_family) { + return -1; + } + + Status s; + Slice key1, key2; + if (entry1->search_key == nullptr) { + Slice value, blob; + WriteType write_type; + s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, + &value, &blob); + if (!s.ok()) { + return 1; + } + } else { + key1 = *(entry1->search_key); + } + if (entry2->search_key == nullptr) { + Slice value, blob; + WriteType write_type; + s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2, + &value, &blob); + if (!s.ok()) { + return -1; + } + } else { + key2 = *(entry2->search_key); + } + + int cmp = comparator_->Compare(key1, key2); + if (cmp != 0) { + return cmp; + } else if (entry1->offset > entry2->offset) { + return 1; + } else if (entry1->offset < entry2->offset) { + return -1; + } + return 0; +} + +} // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc new file mode 100644 index 000000000..fdceed4c4 --- /dev/null +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -0,0 +1,235 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + + +#include +#include +#include "db/column_family.h" +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/testharness.h" + +namespace rocksdb { + +namespace { +class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl { + public: + explicit ColumnFamilyHandleImplDummy(int id) + : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {} + uint32_t GetID() const override { return id_; } + + private: + uint32_t id_; +}; + +struct Entry { + std::string key; + std::string value; + WriteType type; +}; + +struct TestHandler : public WriteBatch::Handler { + std::map> seen; + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + Entry e; + e.key = key.ToString(); + e.value = value.ToString(); + e.type = kPutRecord; + seen[column_family_id].push_back(e); + return Status::OK(); + } + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + Entry e; + e.key = key.ToString(); + e.value = value.ToString(); + e.type = kMergeRecord; + seen[column_family_id].push_back(e); + return Status::OK(); + } + virtual void LogData(const Slice& blob) {} + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { + Entry e; + e.key = key.ToString(); + e.value = ""; + e.type = kDeleteRecord; + seen[column_family_id].push_back(e); + return Status::OK(); + } +}; +} // namespace anonymous + +class WriteBatchWithIndexTest {}; + +TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { + Entry entries[] = {{"aaa", "0005", kPutRecord}, + {"b", "0002", kPutRecord}, + {"cdd", "0002", kMergeRecord}, + {"aab", "00001", kPutRecord}, + {"cc", "00005", kPutRecord}, + {"cdd", "0002", kPutRecord}, + {"aab", "0003", kPutRecord}, + {"cc", "00005", kDeleteRecord}, }; + + // In this test, we insert to column family `data`, and + // to column family `index`. Then iterator them in order + // and seek them by key. + + // Sort entries by key + std::map> data_map; + // Sort entries by value + std::map> index_map; + for (auto& e : entries) { + data_map[e.key].push_back(&e); + index_map[e.value].push_back(&e); + } + + WriteBatchWithIndex batch(BytewiseComparator(), 20); + ColumnFamilyHandleImplDummy data(6), index(8); + for (auto& e : entries) { + if (e.type == kPutRecord) { + batch.Put(&data, e.key, e.value); + batch.Put(&index, e.value, e.key); + } else if (e.type == kMergeRecord) { + batch.Merge(&data, e.key, e.value); + batch.Put(&index, e.value, e.key); + } else { + assert(e.type == kDeleteRecord); + std::unique_ptr iter(batch.NewIterator(&data)); + iter->Seek(e.key); + ASSERT_OK(iter->status()); + auto& write_entry = iter->Entry(); + ASSERT_EQ(e.key, write_entry.key.ToString()); + ASSERT_EQ(e.value, write_entry.value.ToString()); + batch.Delete(&data, e.key); + batch.Put(&index, e.value, ""); + } + } + + // Iterator all keys + { + std::unique_ptr iter(batch.NewIterator(&data)); + iter->Seek(""); + for (auto pair : data_map) { + for (auto v : pair.second) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + auto& write_entry = iter->Entry(); + ASSERT_EQ(pair.first, write_entry.key.ToString()); + ASSERT_EQ(v->type, write_entry.type); + if (write_entry.type != kDeleteRecord) { + ASSERT_EQ(v->value, write_entry.value.ToString()); + } + iter->Next(); + } + } + ASSERT_TRUE(!iter->Valid()); + } + + // Iterator all indexes + { + std::unique_ptr iter(batch.NewIterator(&index)); + iter->Seek(""); + for (auto pair : index_map) { + for (auto v : pair.second) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + auto& write_entry = iter->Entry(); + ASSERT_EQ(pair.first, write_entry.key.ToString()); + if (v->type != kDeleteRecord) { + ASSERT_EQ(v->key, write_entry.value.ToString()); + ASSERT_EQ(v->value, write_entry.key.ToString()); + } + iter->Next(); + } + } + ASSERT_TRUE(!iter->Valid()); + } + + // Seek to every key + { + std::unique_ptr iter(batch.NewIterator(&data)); + + // Seek the keys one by one in reverse order + for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) { + iter->Seek(pair->first); + ASSERT_OK(iter->status()); + for (auto v : pair->second) { + ASSERT_TRUE(iter->Valid()); + auto& write_entry = iter->Entry(); + ASSERT_EQ(pair->first, write_entry.key.ToString()); + ASSERT_EQ(v->type, write_entry.type); + if (write_entry.type != kDeleteRecord) { + ASSERT_EQ(v->value, write_entry.value.ToString()); + } + iter->Next(); + ASSERT_OK(iter->status()); + } + } + } + + // Seek to every index + { + std::unique_ptr iter(batch.NewIterator(&index)); + + // Seek the keys one by one in reverse order + for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) { + iter->Seek(pair->first); + ASSERT_OK(iter->status()); + for (auto v : pair->second) { + ASSERT_TRUE(iter->Valid()); + auto& write_entry = iter->Entry(); + ASSERT_EQ(pair->first, write_entry.key.ToString()); + ASSERT_EQ(v->value, write_entry.key.ToString()); + if (v->type != kDeleteRecord) { + ASSERT_EQ(v->key, write_entry.value.ToString()); + } + iter->Next(); + ASSERT_OK(iter->status()); + } + } + } + + // Verify WriteBatch can be iterated + TestHandler handler; + batch.GetWriteBatch()->Iterate(&handler); + + // Verify data column family + { + ASSERT_EQ(sizeof(entries) / sizeof(Entry), + handler.seen[data.GetID()].size()); + size_t i = 0; + for (auto e : handler.seen[data.GetID()]) { + auto write_entry = entries[i++]; + ASSERT_EQ(e.type, write_entry.type); + ASSERT_EQ(e.key, write_entry.key); + if (e.type != kDeleteRecord) { + ASSERT_EQ(e.value, write_entry.value); + } + } + } + + // Verify index column family + { + ASSERT_EQ(sizeof(entries) / sizeof(Entry), + handler.seen[index.GetID()].size()); + size_t i = 0; + for (auto e : handler.seen[index.GetID()]) { + auto write_entry = entries[i++]; + ASSERT_EQ(e.key, write_entry.value); + if (write_entry.type != kDeleteRecord) { + ASSERT_EQ(e.value, write_entry.key); + } + } + } +} + +} // namespace + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }