Create an abstract interface for write batches

Summary: WriteBatch and WriteBatchWithIndex now both inherit from a common abstract base class.  This makes it easier to write code that is agnostic toward the implementation of the particular write batch.  In particular, I plan on utilizing this abstraction to allow transactions to support using either implementation of a write batch.

Test Plan: modified existing WriteBatchWithIndex tests to test new functions.  Running all tests.

Reviewers: igor, rven, yhchiang, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D34017
main
agiardullo 9 years ago
parent 46214df4a1
commit 81345b90f9
  1. 1
      HISTORY.md
  2. 24
      db/slice.cc
  3. 46
      db/write_batch_base.cc
  4. 4
      include/rocksdb/slice.h
  5. 29
      include/rocksdb/utilities/write_batch_with_index.h
  6. 36
      include/rocksdb/write_batch.h
  7. 72
      include/rocksdb/write_batch_base.h
  8. 2
      src.mk
  9. 14
      utilities/write_batch_with_index/write_batch_with_index.cc
  10. 88
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -17,6 +17,7 @@
* Block based table remembers whether a whole key or prefix based bloom filter is supported in SST files. Do a sanity check when reading the file with users' configuration.
* Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly
* options.level_compaction_dynamic_level_bytes, a feature to allow RocksDB to pick dynamic base of bytes for levels. With this feature turned on, we will automatically adjust max bytes for each level. The goal of this feature is to have lower bound on size amplification. For more details, see comments in options.h.
* Added an abstract base class WriteBatchBase for write batches
### Public API changes
* Deprecated skip_log_error_on_recovery option

@ -0,0 +1,24 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/slice.h"
namespace rocksdb {
Slice::Slice(const SliceParts& parts, std::string* buf) {
size_t length = 0;
for (int i = 0; i < parts.num_parts; ++i) {
length += parts.parts[i].size();
}
buf->reserve(length);
for (int i = 0; i < parts.num_parts; ++i) {
buf->append(parts.parts[i].data(), parts.parts[i].size());
}
data_ = buf->data();
size_ = buf->size();
}
} // namespace rocksdb

@ -0,0 +1,46 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/write_batch_base.h"
#include <string>
#include "rocksdb/slice.h"
namespace rocksdb {
// Simple implementation of SlicePart variants of Put(). Child classes
// can override these method with more performant solutions if they choose.
void WriteBatchBase::Put(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf;
Slice key_slice(key, &key_buf);
Slice value_slice(value, &value_buf);
Put(column_family, key_slice, value_slice);
}
void WriteBatchBase::Put(const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf;
Slice key_slice(key, &key_buf);
Slice value_slice(value, &value_buf);
Put(key_slice, value_slice);
}
void WriteBatchBase::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
std::string key_buf;
Slice key_slice(key, &key_buf);
Delete(column_family, key_slice);
}
void WriteBatchBase::Delete(const SliceParts& key) {
std::string key_buf;
Slice key_slice(key, &key_buf);
Delete(key_slice);
}
} // namespace rocksdb

@ -42,6 +42,10 @@ class Slice {
/* implicit */
Slice(const char* s) : data_(s), size_(strlen(s)) { }
// Create a single slice from SliceParts using buf as storage.
// buf must exist as long as the returned Slice exists.
Slice(const struct SliceParts& parts, std::string* buf);
// Return a pointer to the beginning of the referenced data
const char* data() const { return data_; }

@ -16,6 +16,7 @@
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_batch_base.h"
namespace rocksdb {
@ -61,7 +62,7 @@ class WBWIIterator {
// By calling GetWriteBatch(), a user will get the WriteBatch for the data
// they inserted, which can be used for DB::Write().
// A user can call NewIterator() to create an iterator.
class WriteBatchWithIndex {
class WriteBatchWithIndex : public WriteBatchBase {
public:
// backup_index_comparator: the backup comparator used to compare keys
// within the same column family, if column family is not given in the
@ -76,22 +77,30 @@ class WriteBatchWithIndex {
size_t reserved_bytes = 0, bool overwrite_key = false);
virtual ~WriteBatchWithIndex();
WriteBatch* GetWriteBatch();
using WriteBatchBase::Put;
void Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
const Slice& value) override;
void Put(const Slice& key, const Slice& value);
void Put(const Slice& key, const Slice& value) override;
using WriteBatchBase::Merge;
void Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
const Slice& value) override;
void Merge(const Slice& key, const Slice& value) override;
using WriteBatchBase::Delete;
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override;
void Merge(const Slice& key, const Slice& value);
using WriteBatchBase::PutLogData;
void PutLogData(const Slice& blob) override;
void PutLogData(const Slice& blob);
using WriteBatchBase::Clear;
void Clear() override;
void Delete(ColumnFamilyHandle* column_family, const Slice& key);
void Delete(const Slice& key);
using WriteBatchBase::GetWriteBatch;
WriteBatch* GetWriteBatch() override;
// Create an iterator of a column family. User can call iterator.Seek() to
// search to the next entry of or after a key. Keys will be iterated in the

@ -27,6 +27,7 @@
#include <string>
#include "rocksdb/status.h"
#include "rocksdb/write_batch_base.h"
namespace rocksdb {
@ -34,15 +35,16 @@ class Slice;
class ColumnFamilyHandle;
struct SliceParts;
class WriteBatch {
class WriteBatch : public WriteBatchBase {
public:
explicit WriteBatch(size_t reserved_bytes = 0);
~WriteBatch();
using WriteBatchBase::Put;
// Store the mapping "key->value" in the database.
void Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
void Put(const Slice& key, const Slice& value) {
const Slice& value) override;
void Put(const Slice& key, const Slice& value) override {
Put(nullptr, key, value);
}
@ -50,27 +52,31 @@ class WriteBatch {
// that will be written to the database are concatentations of arrays of
// slices.
void Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value);
void Put(const SliceParts& key, const SliceParts& value) {
const SliceParts& value) override;
void Put(const SliceParts& key, const SliceParts& value) override {
Put(nullptr, key, value);
}
using WriteBatchBase::Merge;
// Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)"
void Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
void Merge(const Slice& key, const Slice& value) {
const Slice& value) override;
void Merge(const Slice& key, const Slice& value) override {
Merge(nullptr, key, value);
}
using WriteBatchBase::Delete;
// If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(ColumnFamilyHandle* column_family, const Slice& key);
void Delete(const Slice& key) { Delete(nullptr, key); }
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override { Delete(nullptr, key); }
// variant that takes SliceParts
void Delete(ColumnFamilyHandle* column_family, const SliceParts& key);
void Delete(const SliceParts& key) { Delete(nullptr, key); }
void Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
void Delete(const SliceParts& key) override { Delete(nullptr, key); }
using WriteBatchBase::PutLogData;
// Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular,
// it will not be persisted to the SST files. When iterating over this
@ -81,10 +87,11 @@ class WriteBatch {
//
// Example application: add timestamps to the transaction log for use in
// replication.
void PutLogData(const Slice& blob);
void PutLogData(const Slice& blob) override;
using WriteBatchBase::Clear;
// Clear all updates buffered in this batch.
void Clear();
void Clear() override;
// Support for iterating over the contents of a batch.
class Handler {
@ -149,6 +156,9 @@ class WriteBatch {
// Returns the number of updates in the batch
int Count() const;
using WriteBatchBase::GetWriteBatch;
WriteBatch* GetWriteBatch() override { return this; }
// Constructor with a serialized string object
explicit WriteBatch(std::string rep): rep_(rep) {}

@ -0,0 +1,72 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
namespace rocksdb {
class Slice;
class ColumnFamilyHandle;
class WriteBatch;
struct SliceParts;
// Abstract base class that defines the basic interface for a write batch.
// See WriteBatch for a basic implementation and WrithBatchWithIndex for an
// indexed implemenation.
class WriteBatchBase {
public:
virtual ~WriteBatchBase() {}
// Store the mapping "key->value" in the database.
virtual void Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
virtual void Put(const Slice& key, const Slice& value) = 0;
// Variant of Put() that gathers output like writev(2). The key and value
// that will be written to the database are concatentations of arrays of
// slices.
virtual void Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value);
virtual void Put(const SliceParts& key, const SliceParts& value);
// Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)"
virtual void Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
virtual void Merge(const Slice& key, const Slice& value) = 0;
// If the database contains a mapping for "key", erase it. Else do nothing.
virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0;
virtual void Delete(const Slice& key) = 0;
// variant that takes SliceParts
virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key);
virtual void Delete(const SliceParts& key);
// Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular,
// it will not be persisted to the SST files. When iterating over this
// WriteBatch, WriteBatch::Handler::LogData will be called with the contents
// of the blob as it is encountered. Blobs, puts, deletes, and merges will be
// encountered in the same order in thich they were inserted. The blob will
// NOT consume sequence number(s) and will NOT increase the count of the batch
//
// Example application: add timestamps to the transaction log for use in
// replication.
virtual void PutLogData(const Slice& blob) = 0;
// Clear all updates buffered in this batch.
virtual void Clear() = 0;
// Covert this batch into a WriteBatch. This is an abstracted way of
// converting any WriteBatchBase(eg WriteBatchWithIndex) into a basic
// WriteBatch.
virtual WriteBatch* GetWriteBatch() = 0;
};
} // namespace rocksdb

@ -27,6 +27,7 @@ LIB_SOURCES = \
db/merge_helper.cc \
db/merge_operator.cc \
db/repair.cc \
db/slice.cc \
db/table_cache.cc \
db/table_properties_collector.cc \
db/transaction_log_impl.cc \
@ -35,6 +36,7 @@ LIB_SOURCES = \
db/version_set.cc \
db/wal_manager.cc \
db/write_batch.cc \
db/write_batch_base.cc \
db/write_controller.cc \
db/write_thread.cc \
port/stack_trace.cc \

@ -470,6 +470,9 @@ struct WriteBatchWithIndex::Rep {
// Allocate an index entry pointing to the last entry in the write batch and
// put it to skip list.
void AddNewEntry(uint32_t column_family_id);
// Clear all updates buffered in this batch.
void Clear();
};
bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
@ -523,6 +526,15 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::Rep::Clear() {
write_batch.Clear();
arena.~Arena();
new (&arena) Arena();
skip_list.~WriteBatchEntrySkipList();
new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
last_entry_offset = 0;
}
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Slice* value,
@ -645,6 +657,8 @@ void WriteBatchWithIndex::Delete(const Slice& key) {
rep->AddOrUpdateIndex(key);
}
void WriteBatchWithIndex::Clear() { rep->Clear(); }
int WriteBatchEntryComparator::operator()(
const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const {

@ -71,16 +71,8 @@ struct TestHandler : public WriteBatch::Handler {
class WriteBatchWithIndexTest : public testing::Test {};
TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
Entry entries[] = {{"aaa", "0005", kPutRecord},
{"b", "0002", kPutRecord},
{"cdd", "0002", kMergeRecord},
{"aab", "00001", kPutRecord},
{"cc", "00005", kPutRecord},
{"cdd", "0002", kPutRecord},
{"aab", "0003", kPutRecord},
{"cc", "00005", kDeleteRecord}, };
void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
WriteBatchWithIndex* batch) {
// In this test, we insert <key, value> to column family `data`, and
// <value, key> to column family `index`. Then iterator them in order
// and seek them by key.
@ -94,32 +86,31 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
index_map[e.value].push_back(&e);
}
WriteBatchWithIndex batch(nullptr, 20);
ColumnFamilyHandleImplDummy data(6, BytewiseComparator());
ColumnFamilyHandleImplDummy index(8, BytewiseComparator());
for (auto& e : entries) {
if (e.type == kPutRecord) {
batch.Put(&data, e.key, e.value);
batch.Put(&index, e.value, e.key);
batch->Put(&data, e.key, e.value);
batch->Put(&index, e.value, e.key);
} else if (e.type == kMergeRecord) {
batch.Merge(&data, e.key, e.value);
batch.Put(&index, e.value, e.key);
batch->Merge(&data, e.key, e.value);
batch->Put(&index, e.value, e.key);
} else {
assert(e.type == kDeleteRecord);
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&data));
std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
iter->Seek(e.key);
ASSERT_OK(iter->status());
auto& write_entry = iter->Entry();
ASSERT_EQ(e.key, write_entry.key.ToString());
ASSERT_EQ(e.value, write_entry.value.ToString());
batch.Delete(&data, e.key);
batch.Put(&index, e.value, "");
batch->Delete(&data, e.key);
batch->Put(&index, e.value, "");
}
}
// Iterator all keys
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&data));
std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
for (int seek_to_first : {0, 1}) {
if (seek_to_first) {
iter->SeekToFirst();
@ -160,7 +151,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
// Iterator all indexes
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&index));
std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index));
for (int seek_to_first : {0, 1}) {
if (seek_to_first) {
iter->SeekToFirst();
@ -202,7 +193,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
// Seek to every key
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&data));
std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
// Seek the keys one by one in reverse order
for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) {
@ -224,7 +215,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
// Seek to every index
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&index));
std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&index));
// Seek the keys one by one in reverse order
for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) {
@ -246,12 +237,11 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
// Verify WriteBatch can be iterated
TestHandler handler;
batch.GetWriteBatch()->Iterate(&handler);
batch->GetWriteBatch()->Iterate(&handler);
// Verify data column family
{
ASSERT_EQ(sizeof(entries) / sizeof(Entry),
handler.seen[data.GetID()].size());
ASSERT_EQ(entries.size(), handler.seen[data.GetID()].size());
size_t i = 0;
for (auto e : handler.seen[data.GetID()]) {
auto write_entry = entries[i++];
@ -265,8 +255,7 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
// Verify index column family
{
ASSERT_EQ(sizeof(entries) / sizeof(Entry),
handler.seen[index.GetID()].size());
ASSERT_EQ(entries.size(), handler.seen[index.GetID()].size());
size_t i = 0;
for (auto e : handler.seen[index.GetID()]) {
auto write_entry = entries[i++];
@ -278,6 +267,42 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
}
}
TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
Entry entries[] = {
{"aaa", "0005", kPutRecord},
{"b", "0002", kPutRecord},
{"cdd", "0002", kMergeRecord},
{"aab", "00001", kPutRecord},
{"cc", "00005", kPutRecord},
{"cdd", "0002", kPutRecord},
{"aab", "0003", kPutRecord},
{"cc", "00005", kDeleteRecord},
};
std::vector<Entry> entries_list(entries, entries + 8);
WriteBatchWithIndex batch(nullptr, 20);
TestValueAsSecondaryIndexHelper(entries_list, &batch);
// Clear batch and re-run test with new values
batch.Clear();
Entry new_entries[] = {
{"aaa", "0005", kPutRecord},
{"e", "0002", kPutRecord},
{"add", "0002", kMergeRecord},
{"aab", "00001", kPutRecord},
{"zz", "00005", kPutRecord},
{"add", "0002", kPutRecord},
{"aab", "0003", kPutRecord},
{"zz", "00005", kDeleteRecord},
};
entries_list = std::vector<Entry>(new_entries, new_entries + 8);
TestValueAsSecondaryIndexHelper(entries_list, &batch);
}
TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) {
ColumnFamilyHandleImplDummy cf1(6, nullptr);
ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator());
@ -290,7 +315,11 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) {
batch.Put(&cf1, "ccc", "");
batch.Put(&reverse_cf, "a11", "");
batch.Put(&cf1, "bbb", "");
batch.Put(&reverse_cf, "a33", "");
Slice key_slices[] = {"a", "3", "3"};
Slice value_slice = "";
batch.Put(&reverse_cf, SliceParts(key_slices, 3),
SliceParts(&value_slice, 1));
batch.Put(&reverse_cf, "a22", "");
{
@ -379,7 +408,8 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) {
batch.Delete(&cf1, "ccc");
batch.Put(&reverse_cf, "a33", "a33");
batch.Put(&reverse_cf, "a11", "a11");
batch.Delete(&reverse_cf, "a33");
Slice slices[] = {"a", "3", "3"};
batch.Delete(&reverse_cf, SliceParts(slices, 3));
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1));

Loading…
Cancel
Save