From 81345b90f9cea999badc6255b4d62e84c6eda911 Mon Sep 17 00:00:00 2001 From: agiardullo Date: Mon, 23 Feb 2015 17:49:23 -0800 Subject: [PATCH] Create an abstract interface for write batches Summary: WriteBatch and WriteBatchWithIndex now both inherit from a common abstract base class. This makes it easier to write code that is agnostic toward the implementation of the particular write batch. In particular, I plan on utilizing this abstraction to allow transactions to support using either implementation of a write batch. Test Plan: modified existing WriteBatchWithIndex tests to test new functions. Running all tests. Reviewers: igor, rven, yhchiang, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D34017 --- HISTORY.md | 1 + db/slice.cc | 24 +++++ db/write_batch_base.cc | 46 ++++++++++ include/rocksdb/slice.h | 4 + .../utilities/write_batch_with_index.h | 29 +++--- include/rocksdb/write_batch.h | 36 +++++--- include/rocksdb/write_batch_base.h | 72 +++++++++++++++ src.mk | 2 + .../write_batch_with_index.cc | 14 +++ .../write_batch_with_index_test.cc | 88 +++++++++++++------ 10 files changed, 264 insertions(+), 52 deletions(-) create mode 100644 db/slice.cc create mode 100644 db/write_batch_base.cc create mode 100644 include/rocksdb/write_batch_base.h diff --git a/HISTORY.md b/HISTORY.md index ea3fe284c..5759dfba2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ * Block based table remembers whether a whole key or prefix based bloom filter is supported in SST files. Do a sanity check when reading the file with users' configuration. * Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly * options.level_compaction_dynamic_level_bytes, a feature to allow RocksDB to pick dynamic base of bytes for levels. With this feature turned on, we will automatically adjust max bytes for each level. The goal of this feature is to have lower bound on size amplification. For more details, see comments in options.h. +* Added an abstract base class WriteBatchBase for write batches ### Public API changes * Deprecated skip_log_error_on_recovery option diff --git a/db/slice.cc b/db/slice.cc new file mode 100644 index 000000000..7e7245d79 --- /dev/null +++ b/db/slice.cc @@ -0,0 +1,24 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/slice.h" + +namespace rocksdb { + +Slice::Slice(const SliceParts& parts, std::string* buf) { + size_t length = 0; + for (int i = 0; i < parts.num_parts; ++i) { + length += parts.parts[i].size(); + } + buf->reserve(length); + + for (int i = 0; i < parts.num_parts; ++i) { + buf->append(parts.parts[i].data(), parts.parts[i].size()); + } + data_ = buf->data(); + size_ = buf->size(); +} + +} // namespace rocksdb diff --git a/db/write_batch_base.cc b/db/write_batch_base.cc new file mode 100644 index 000000000..5e3f5f08a --- /dev/null +++ b/db/write_batch_base.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/write_batch_base.h" + +#include + +#include "rocksdb/slice.h" + +namespace rocksdb { + +// Simple implementation of SlicePart variants of Put(). Child classes +// can override these method with more performant solutions if they choose. +void WriteBatchBase::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value) { + std::string key_buf, value_buf; + Slice key_slice(key, &key_buf); + Slice value_slice(value, &value_buf); + + Put(column_family, key_slice, value_slice); +} + +void WriteBatchBase::Put(const SliceParts& key, const SliceParts& value) { + std::string key_buf, value_buf; + Slice key_slice(key, &key_buf); + Slice value_slice(value, &value_buf); + + Put(key_slice, value_slice); +} + +void WriteBatchBase::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + std::string key_buf; + Slice key_slice(key, &key_buf); + Delete(column_family, key_slice); +} + +void WriteBatchBase::Delete(const SliceParts& key) { + std::string key_buf; + Slice key_slice(key, &key_buf); + Delete(key_slice); +} + +} // namespace rocksdb diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 05d0f9df6..a67688c5d 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -42,6 +42,10 @@ class Slice { /* implicit */ Slice(const char* s) : data_(s), size_(strlen(s)) { } + // Create a single slice from SliceParts using buf as storage. + // buf must exist as long as the returned Slice exists. + Slice(const struct SliceParts& parts, std::string* buf); + // Return a pointer to the beginning of the referenced data const char* data() const { return data_; } diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 566934b70..dabbe1189 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -16,6 +16,7 @@ #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/write_batch.h" +#include "rocksdb/write_batch_base.h" namespace rocksdb { @@ -61,7 +62,7 @@ class WBWIIterator { // By calling GetWriteBatch(), a user will get the WriteBatch for the data // they inserted, which can be used for DB::Write(). // A user can call NewIterator() to create an iterator. -class WriteBatchWithIndex { +class WriteBatchWithIndex : public WriteBatchBase { public: // backup_index_comparator: the backup comparator used to compare keys // within the same column family, if column family is not given in the @@ -76,22 +77,30 @@ class WriteBatchWithIndex { size_t reserved_bytes = 0, bool overwrite_key = false); virtual ~WriteBatchWithIndex(); - WriteBatch* GetWriteBatch(); - + using WriteBatchBase::Put; void Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value); + const Slice& value) override; - void Put(const Slice& key, const Slice& value); + void Put(const Slice& key, const Slice& value) override; + using WriteBatchBase::Merge; void Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value); + const Slice& value) override; + + void Merge(const Slice& key, const Slice& value) override; + + using WriteBatchBase::Delete; + void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + void Delete(const Slice& key) override; - void Merge(const Slice& key, const Slice& value); + using WriteBatchBase::PutLogData; + void PutLogData(const Slice& blob) override; - void PutLogData(const Slice& blob); + using WriteBatchBase::Clear; + void Clear() override; - void Delete(ColumnFamilyHandle* column_family, const Slice& key); - void Delete(const Slice& key); + using WriteBatchBase::GetWriteBatch; + WriteBatch* GetWriteBatch() override; // Create an iterator of a column family. User can call iterator.Seek() to // search to the next entry of or after a key. Keys will be iterated in the diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 462a54a59..c096ae1ed 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -27,6 +27,7 @@ #include #include "rocksdb/status.h" +#include "rocksdb/write_batch_base.h" namespace rocksdb { @@ -34,15 +35,16 @@ class Slice; class ColumnFamilyHandle; struct SliceParts; -class WriteBatch { +class WriteBatch : public WriteBatchBase { public: explicit WriteBatch(size_t reserved_bytes = 0); ~WriteBatch(); + using WriteBatchBase::Put; // Store the mapping "key->value" in the database. void Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value); - void Put(const Slice& key, const Slice& value) { + const Slice& value) override; + void Put(const Slice& key, const Slice& value) override { Put(nullptr, key, value); } @@ -50,27 +52,31 @@ class WriteBatch { // that will be written to the database are concatentations of arrays of // slices. void Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value); - void Put(const SliceParts& key, const SliceParts& value) { + const SliceParts& value) override; + void Put(const SliceParts& key, const SliceParts& value) override { Put(nullptr, key, value); } + using WriteBatchBase::Merge; // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" void Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value); - void Merge(const Slice& key, const Slice& value) { + const Slice& value) override; + void Merge(const Slice& key, const Slice& value) override { Merge(nullptr, key, value); } + using WriteBatchBase::Delete; // If the database contains a mapping for "key", erase it. Else do nothing. - void Delete(ColumnFamilyHandle* column_family, const Slice& key); - void Delete(const Slice& key) { Delete(nullptr, key); } + void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + void Delete(const Slice& key) override { Delete(nullptr, key); } // variant that takes SliceParts - void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); - void Delete(const SliceParts& key) { Delete(nullptr, key); } + void Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + void Delete(const SliceParts& key) override { Delete(nullptr, key); } + using WriteBatchBase::PutLogData; // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, // it will not be persisted to the SST files. When iterating over this @@ -81,10 +87,11 @@ class WriteBatch { // // Example application: add timestamps to the transaction log for use in // replication. - void PutLogData(const Slice& blob); + void PutLogData(const Slice& blob) override; + using WriteBatchBase::Clear; // Clear all updates buffered in this batch. - void Clear(); + void Clear() override; // Support for iterating over the contents of a batch. class Handler { @@ -149,6 +156,9 @@ class WriteBatch { // Returns the number of updates in the batch int Count() const; + using WriteBatchBase::GetWriteBatch; + WriteBatch* GetWriteBatch() override { return this; } + // Constructor with a serialized string object explicit WriteBatch(std::string rep): rep_(rep) {} diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h new file mode 100644 index 000000000..a218cc1de --- /dev/null +++ b/include/rocksdb/write_batch_base.h @@ -0,0 +1,72 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +namespace rocksdb { + +class Slice; +class ColumnFamilyHandle; +class WriteBatch; +struct SliceParts; + +// Abstract base class that defines the basic interface for a write batch. +// See WriteBatch for a basic implementation and WrithBatchWithIndex for an +// indexed implemenation. +class WriteBatchBase { + public: + virtual ~WriteBatchBase() {} + + // Store the mapping "key->value" in the database. + virtual void Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) = 0; + virtual void Put(const Slice& key, const Slice& value) = 0; + + // Variant of Put() that gathers output like writev(2). The key and value + // that will be written to the database are concatentations of arrays of + // slices. + virtual void Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value); + virtual void Put(const SliceParts& key, const SliceParts& value); + + // Merge "value" with the existing value of "key" in the database. + // "key->merge(existing, value)" + virtual void Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) = 0; + virtual void Merge(const Slice& key, const Slice& value) = 0; + + // If the database contains a mapping for "key", erase it. Else do nothing. + virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0; + virtual void Delete(const Slice& key) = 0; + + // variant that takes SliceParts + virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); + virtual void Delete(const SliceParts& key); + + // Append a blob of arbitrary size to the records in this batch. The blob will + // be stored in the transaction log but not in any other file. In particular, + // it will not be persisted to the SST files. When iterating over this + // WriteBatch, WriteBatch::Handler::LogData will be called with the contents + // of the blob as it is encountered. Blobs, puts, deletes, and merges will be + // encountered in the same order in thich they were inserted. The blob will + // NOT consume sequence number(s) and will NOT increase the count of the batch + // + // Example application: add timestamps to the transaction log for use in + // replication. + virtual void PutLogData(const Slice& blob) = 0; + + // Clear all updates buffered in this batch. + virtual void Clear() = 0; + + // Covert this batch into a WriteBatch. This is an abstracted way of + // converting any WriteBatchBase(eg WriteBatchWithIndex) into a basic + // WriteBatch. + virtual WriteBatch* GetWriteBatch() = 0; +}; + +} // namespace rocksdb diff --git a/src.mk b/src.mk index a7c8eb601..138a6d725 100644 --- a/src.mk +++ b/src.mk @@ -27,6 +27,7 @@ LIB_SOURCES = \ db/merge_helper.cc \ db/merge_operator.cc \ db/repair.cc \ + db/slice.cc \ db/table_cache.cc \ db/table_properties_collector.cc \ db/transaction_log_impl.cc \ @@ -35,6 +36,7 @@ LIB_SOURCES = \ db/version_set.cc \ db/wal_manager.cc \ db/write_batch.cc \ + db/write_batch_base.cc \ db/write_controller.cc \ db/write_thread.cc \ port/stack_trace.cc \ diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index f28ad9fd7..75d9e9dd5 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -470,6 +470,9 @@ struct WriteBatchWithIndex::Rep { // Allocate an index entry pointing to the last entry in the write batch and // put it to skip list. void AddNewEntry(uint32_t column_family_id); + + // Clear all updates buffered in this batch. + void Clear(); }; bool WriteBatchWithIndex::Rep::UpdateExistingEntry( @@ -523,6 +526,15 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { skip_list.Insert(index_entry); } + void WriteBatchWithIndex::Rep::Clear() { + write_batch.Clear(); + arena.~Arena(); + new (&arena) Arena(); + skip_list.~WriteBatchEntrySkipList(); + new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); + last_entry_offset = 0; + } + Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, Slice* value, @@ -645,6 +657,8 @@ void WriteBatchWithIndex::Delete(const Slice& key) { rep->AddOrUpdateIndex(key); } +void WriteBatchWithIndex::Clear() { rep->Clear(); } + int WriteBatchEntryComparator::operator()( const WriteBatchIndexEntry* entry1, const WriteBatchIndexEntry* entry2) const { diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index f31beaf9d..c6da1612a 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -71,16 +71,8 @@ struct TestHandler : public WriteBatch::Handler { class WriteBatchWithIndexTest : public testing::Test {}; -TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { - Entry entries[] = {{"aaa", "0005", kPutRecord}, - {"b", "0002", kPutRecord}, - {"cdd", "0002", kMergeRecord}, - {"aab", "00001", kPutRecord}, - {"cc", "00005", kPutRecord}, - {"cdd", "0002", kPutRecord}, - {"aab", "0003", kPutRecord}, - {"cc", "00005", kDeleteRecord}, }; - +void TestValueAsSecondaryIndexHelper(std::vector entries, + WriteBatchWithIndex* batch) { // In this test, we insert to column family `data`, and // to column family `index`. Then iterator them in order // and seek them by key. @@ -94,32 +86,31 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { index_map[e.value].push_back(&e); } - WriteBatchWithIndex batch(nullptr, 20); ColumnFamilyHandleImplDummy data(6, BytewiseComparator()); ColumnFamilyHandleImplDummy index(8, BytewiseComparator()); for (auto& e : entries) { if (e.type == kPutRecord) { - batch.Put(&data, e.key, e.value); - batch.Put(&index, e.value, e.key); + batch->Put(&data, e.key, e.value); + batch->Put(&index, e.value, e.key); } else if (e.type == kMergeRecord) { - batch.Merge(&data, e.key, e.value); - batch.Put(&index, e.value, e.key); + batch->Merge(&data, e.key, e.value); + batch->Put(&index, e.value, e.key); } else { assert(e.type == kDeleteRecord); - std::unique_ptr iter(batch.NewIterator(&data)); + std::unique_ptr iter(batch->NewIterator(&data)); iter->Seek(e.key); ASSERT_OK(iter->status()); auto& write_entry = iter->Entry(); ASSERT_EQ(e.key, write_entry.key.ToString()); ASSERT_EQ(e.value, write_entry.value.ToString()); - batch.Delete(&data, e.key); - batch.Put(&index, e.value, ""); + batch->Delete(&data, e.key); + batch->Put(&index, e.value, ""); } } // Iterator all keys { - std::unique_ptr iter(batch.NewIterator(&data)); + std::unique_ptr iter(batch->NewIterator(&data)); for (int seek_to_first : {0, 1}) { if (seek_to_first) { iter->SeekToFirst(); @@ -160,7 +151,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { // Iterator all indexes { - std::unique_ptr iter(batch.NewIterator(&index)); + std::unique_ptr iter(batch->NewIterator(&index)); for (int seek_to_first : {0, 1}) { if (seek_to_first) { iter->SeekToFirst(); @@ -202,7 +193,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { // Seek to every key { - std::unique_ptr iter(batch.NewIterator(&data)); + std::unique_ptr iter(batch->NewIterator(&data)); // Seek the keys one by one in reverse order for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) { @@ -224,7 +215,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { // Seek to every index { - std::unique_ptr iter(batch.NewIterator(&index)); + std::unique_ptr iter(batch->NewIterator(&index)); // Seek the keys one by one in reverse order for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) { @@ -246,12 +237,11 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { // Verify WriteBatch can be iterated TestHandler handler; - batch.GetWriteBatch()->Iterate(&handler); + batch->GetWriteBatch()->Iterate(&handler); // Verify data column family { - ASSERT_EQ(sizeof(entries) / sizeof(Entry), - handler.seen[data.GetID()].size()); + ASSERT_EQ(entries.size(), handler.seen[data.GetID()].size()); size_t i = 0; for (auto e : handler.seen[data.GetID()]) { auto write_entry = entries[i++]; @@ -265,8 +255,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { // Verify index column family { - ASSERT_EQ(sizeof(entries) / sizeof(Entry), - handler.seen[index.GetID()].size()); + ASSERT_EQ(entries.size(), handler.seen[index.GetID()].size()); size_t i = 0; for (auto e : handler.seen[index.GetID()]) { auto write_entry = entries[i++]; @@ -278,6 +267,42 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { } } +TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { + Entry entries[] = { + {"aaa", "0005", kPutRecord}, + {"b", "0002", kPutRecord}, + {"cdd", "0002", kMergeRecord}, + {"aab", "00001", kPutRecord}, + {"cc", "00005", kPutRecord}, + {"cdd", "0002", kPutRecord}, + {"aab", "0003", kPutRecord}, + {"cc", "00005", kDeleteRecord}, + }; + std::vector entries_list(entries, entries + 8); + + WriteBatchWithIndex batch(nullptr, 20); + + TestValueAsSecondaryIndexHelper(entries_list, &batch); + + // Clear batch and re-run test with new values + batch.Clear(); + + Entry new_entries[] = { + {"aaa", "0005", kPutRecord}, + {"e", "0002", kPutRecord}, + {"add", "0002", kMergeRecord}, + {"aab", "00001", kPutRecord}, + {"zz", "00005", kPutRecord}, + {"add", "0002", kPutRecord}, + {"aab", "0003", kPutRecord}, + {"zz", "00005", kDeleteRecord}, + }; + + entries_list = std::vector(new_entries, new_entries + 8); + + TestValueAsSecondaryIndexHelper(entries_list, &batch); +} + TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { ColumnFamilyHandleImplDummy cf1(6, nullptr); ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); @@ -290,7 +315,11 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { batch.Put(&cf1, "ccc", ""); batch.Put(&reverse_cf, "a11", ""); batch.Put(&cf1, "bbb", ""); - batch.Put(&reverse_cf, "a33", ""); + + Slice key_slices[] = {"a", "3", "3"}; + Slice value_slice = ""; + batch.Put(&reverse_cf, SliceParts(key_slices, 3), + SliceParts(&value_slice, 1)); batch.Put(&reverse_cf, "a22", ""); { @@ -379,7 +408,8 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { batch.Delete(&cf1, "ccc"); batch.Put(&reverse_cf, "a33", "a33"); batch.Put(&reverse_cf, "a11", "a11"); - batch.Delete(&reverse_cf, "a33"); + Slice slices[] = {"a", "3", "3"}; + batch.Delete(&reverse_cf, SliceParts(slices, 3)); { std::unique_ptr iter(batch.NewIterator(&cf1));