WriteBatchWithIndex: a wrapper of WriteBatch, with a searchable index

Summary:
Add WriteBatchWithIndex so that a user can query data out of a WriteBatch, to support MongoDB's read-its-own-write.

WriteBatchWithIndex uses a skiplist to store the binary index. The index stores the offset of the entry in the write batch. When searching for a key, the key for the entry is read by read the entry from the write batch from the offset.

Define a new iterator class for querying data out of WriteBatchWithIndex. A user can create an iterator of the write batch for one column family, seek to a key and keep calling Next() to see next entries.

I will add more unit tests if people are OK about this API.

Test Plan:
make all check
Add unit tests.

Reviewers: yhchiang, igor, MarkCallaghan, ljin

Reviewed By: ljin

Subscribers: dhruba, leveldb, xjin

Differential Revision: https://reviews.facebook.net/D21381
main
sdong 10 years ago
parent 5585e00279
commit 28b5c76004
  1. 1
      HISTORY.md
  2. 6
      Makefile
  3. 9
      db/column_family.cc
  4. 2
      db/column_family.h
  5. 26
      db/db_test.cc
  6. 8
      db/dbformat.h
  7. 118
      db/write_batch.cc
  8. 83
      db/write_batch_test.cc
  9. 102
      include/rocksdb/utilities/write_batch_with_index.h
  10. 1
      include/rocksdb/write_batch.h
  11. 301
      utilities/write_batch_with_index/write_batch_with_index.cc
  12. 235
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -2,6 +2,7 @@
### Unreleased ### Unreleased
### New Features ### New Features
* Add include/utilities/write_batch_with_index.h, providing a utilitiy class to query data out of WriteBatch when building it.
### Public API changes ### Public API changes
* The Prefix Extractor used with V2 compaction filters is now passed user key to SliceTransform::Transform instead of unparsed RocksDB key. * The Prefix Extractor used with V2 compaction filters is now passed user key to SliceTransform::Transform instead of unparsed RocksDB key.

@ -121,7 +121,8 @@ TESTS = \
options_test \ options_test \
cuckoo_table_builder_test \ cuckoo_table_builder_test \
cuckoo_table_reader_test \ cuckoo_table_reader_test \
cuckoo_table_db_test cuckoo_table_db_test \
write_batch_with_index_test
TOOLS = \ TOOLS = \
sst_dump \ sst_dump \
@ -375,6 +376,9 @@ spatial_db_test: utilities/spatialdb/spatial_db_test.o $(LIBOBJECTS) $(TESTHARNE
ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) $(CXX) utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) $(CXX) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -631,4 +631,13 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
return &handle_; return &handle_;
} }
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
return column_family_id;
}
} // namespace rocksdb } // namespace rocksdb

@ -457,4 +457,6 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
ColumnFamilyHandleInternal handle_; ColumnFamilyHandleInternal handle_;
}; };
extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family);
} // namespace rocksdb } // namespace rocksdb

@ -30,6 +30,7 @@
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "table/block_based_table_factory.h" #include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h" #include "table/plain_table_factory.h"
#include "util/hash.h" #include "util/hash.h"
@ -6429,13 +6430,26 @@ static void MTThreadBody(void* arg) {
// into each of the CFs // into each of the CFs
// We add some padding for force compactions. // We add some padding for force compactions.
int unique_id = rnd.Uniform(1000000); int unique_id = rnd.Uniform(1000000);
WriteBatch batch;
for (int cf = 0; cf < kColumnFamilies; ++cf) { // Half of the time directly use WriteBatch. Half of the time use
snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id, // WriteBatchWithIndex.
static_cast<int>(counter), cf, unique_id); if (rnd.OneIn(2)) {
batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf)); WriteBatch batch;
for (int cf = 0; cf < kColumnFamilies; ++cf) {
snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id,
static_cast<int>(counter), cf, unique_id);
batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf));
}
ASSERT_OK(db->Write(WriteOptions(), &batch));
} else {
WriteBatchWithIndex batch(db->GetOptions().comparator);
for (int cf = 0; cf < kColumnFamilies; ++cf) {
snprintf(valbuf, sizeof(valbuf), "%d.%d.%d.%d.%-1000d", key, id,
static_cast<int>(counter), cf, unique_id);
batch.Put(t->state->test->handles_[cf], Slice(keybuf), Slice(valbuf));
}
ASSERT_OK(db->Write(WriteOptions(), batch.GetWriteBatch()));
} }
ASSERT_OK(db->Write(WriteOptions(), &batch));
} else { } else {
// Read a value and verify that it matches the pattern written above // Read a value and verify that it matches the pattern written above
// and that writes to all column families were atomic (unique_id is the // and that writes to all column families were atomic (unique_id is the

@ -401,4 +401,12 @@ class InternalKeySliceTransform : public SliceTransform {
const SliceTransform* const transform_; const SliceTransform* const transform_;
}; };
// Read record from a write batch piece from input.
// tag, column_family, key, value and blob are return values. Callers own the
// Slice they point to.
// Tag is defined as ValueType.
// input will be advanced to after the record.
extern Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob);
} // namespace rocksdb } // namespace rocksdb

@ -27,6 +27,7 @@
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/column_family.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/snapshot.h" #include "db/snapshot.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
@ -80,6 +81,58 @@ int WriteBatch::Count() const {
return WriteBatchInternal::Count(this); return WriteBatchInternal::Count(this);
} }
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob) {
assert(key != nullptr && value != nullptr);
*tag = (*input)[0];
input->remove_prefix(1);
*column_family = 0; // default
switch (*tag) {
case kTypeColumnFamilyValue:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
// intentional fallthrough
case kTypeValue:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeColumnFamilyDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
case kTypeDeletion:
if (!GetLengthPrefixedSlice(input, key)) {
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeColumnFamilyMerge:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
// intentional fallthrough
case kTypeMerge:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Merge");
}
break;
case kTypeLogData:
assert(blob != nullptr);
if (!GetLengthPrefixedSlice(input, blob)) {
return Status::Corruption("bad WriteBatch Blob");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
Status WriteBatch::Iterate(Handler* handler) const { Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_); Slice input(rep_);
if (input.size() < kHeader) { if (input.size() < kHeader) {
@ -91,57 +144,33 @@ Status WriteBatch::Iterate(Handler* handler) const {
int found = 0; int found = 0;
Status s; Status s;
while (s.ok() && !input.empty() && handler->Continue()) { while (s.ok() && !input.empty() && handler->Continue()) {
char tag = input[0]; char tag = 0;
input.remove_prefix(1);
uint32_t column_family = 0; // default uint32_t column_family = 0; // default
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob);
if (!s.ok()) {
return s;
}
switch (tag) { switch (tag) {
case kTypeColumnFamilyValue: case kTypeColumnFamilyValue:
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
// intentional fallthrough
case kTypeValue: case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) && s = handler->PutCF(column_family, key, value);
GetLengthPrefixedSlice(&input, &value)) { found++;
s = handler->PutCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Put");
}
break; break;
case kTypeColumnFamilyDeletion: case kTypeColumnFamilyDeletion:
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
case kTypeDeletion: case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) { s = handler->DeleteCF(column_family, key);
s = handler->DeleteCF(column_family, key); found++;
found++;
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break; break;
case kTypeColumnFamilyMerge: case kTypeColumnFamilyMerge:
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
// intentional fallthrough
case kTypeMerge: case kTypeMerge:
if (GetLengthPrefixedSlice(&input, &key) && s = handler->MergeCF(column_family, key, value);
GetLengthPrefixedSlice(&input, &value)) { found++;
s = handler->MergeCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Merge");
}
break; break;
case kTypeLogData: case kTypeLogData:
if (GetLengthPrefixedSlice(&input, &blob)) { handler->LogData(blob);
handler->LogData(blob);
} else {
return Status::Corruption("bad WriteBatch Blob");
}
break; break;
default: default:
return Status::Corruption("unknown WriteBatch tag"); return Status::Corruption("unknown WriteBatch tag");
@ -186,17 +215,6 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
PutLengthPrefixedSlice(&b->rep_, value); PutLengthPrefixedSlice(&b->rep_, value);
} }
namespace {
inline uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
return column_family_id;
}
} // namespace
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);

@ -15,6 +15,7 @@
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/testharness.h" #include "util/testharness.h"
@ -316,6 +317,88 @@ TEST(WriteBatchTest, ColumnFamiliesBatchTest) {
handler.seen); handler.seen);
} }
TEST(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
WriteBatchWithIndex batch(BytewiseComparator(), 20);
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Put(&two, Slice("twofoo"), Slice("bar2"));
batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
batch.Delete(&eight, Slice("eightfoo"));
batch.Merge(&three, Slice("threethree"), Slice("3three"));
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Merge(Slice("omom"), Slice("nom"));
std::unique_ptr<WBWIIterator> iter;
iter.reset(batch.NewIterator(&eight));
iter->Seek("eightfoo");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
ASSERT_EQ("bar8", iter->Entry().value.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kDeleteRecord, iter->Entry().type);
ASSERT_EQ("eightfoo", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter.reset(batch.NewIterator());
iter->Seek("gggg");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
ASSERT_EQ("omom", iter->Entry().key.ToString());
ASSERT_EQ("nom", iter->Entry().value.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter.reset(batch.NewIterator(&zero));
iter->Seek("foo");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
ASSERT_EQ("foo", iter->Entry().key.ToString());
ASSERT_EQ("bar", iter->Entry().value.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
ASSERT_EQ("foo", iter->Entry().key.ToString());
ASSERT_EQ("bar", iter->Entry().value.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kMergeRecord, iter->Entry().type);
ASSERT_EQ("omom", iter->Entry().key.ToString());
ASSERT_EQ("nom", iter->Entry().value.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
TestHandler handler;
batch.GetWriteBatch()->Iterate(&handler);
ASSERT_EQ(
"Put(foo, bar)"
"PutCF(2, twofoo, bar2)"
"PutCF(8, eightfoo, bar8)"
"DeleteCF(8, eightfoo)"
"MergeCF(3, threethree, 3three)"
"Put(foo, bar)"
"Merge(omom, nom)",
handler.seen);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -0,0 +1,102 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// A WriteBatchWithIndex with a binary searchable index built for all the keys
// inserted.
#pragma once
#include "rocksdb/status.h"
#include "rocksdb/slice.h"
#include "rocksdb/write_batch.h"
namespace rocksdb {
class ColumnFamilyHandle;
struct SliceParts;
class Comparator;
enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord };
// an entry for Put, Merge or Delete entry for write batches. Used in
// WBWIIterator.
struct WriteEntry {
WriteType type;
Slice key;
Slice value;
};
// Iterator of one column family out of a WriteBatchWithIndex.
class WBWIIterator {
public:
virtual ~WBWIIterator() {}
virtual bool Valid() const = 0;
virtual void Seek(const Slice& key) = 0;
virtual void Next() = 0;
virtual const WriteEntry& Entry() const = 0;
virtual Status status() const = 0;
};
// A WriteBatchWithIndex with a binary searchable index built for all the keys
// inserted.
// In Put(), Merge() or Delete(), the same function of the wrapped will be
// called. At the same time, indexes will be built.
// By calling GetWriteBatch(), a user will get the WriteBatch for the data
// they inserted, which can be used for DB::Write().
// A user can call NewIterator() to create an iterator.
class WriteBatchWithIndex {
public:
// index_comparator indicates the order when iterating data in the write
// batch. Technically, it doesn't have to be the same as the one used in
// the DB.
// reserved_bytes: reserved bytes in underlying WriteBatch
explicit WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes = 0);
virtual ~WriteBatchWithIndex();
WriteBatch* GetWriteBatch();
virtual void Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
virtual void Put(const Slice& key, const Slice& value);
virtual void Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value);
virtual void Merge(const Slice& key, const Slice& value);
virtual void PutLogData(const Slice& blob);
virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key);
virtual void Delete(const Slice& key);
virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key);
virtual void Delete(const SliceParts& key);
// Create an iterator of a column family. User can call iterator.Seek() to
// search to the next entry of or after a key. Keys will be iterated in the
// order given by index_comparator. For multiple updates on the same key,
// each update will be returned as a separate entry, in the order of update
// time.
virtual WBWIIterator* NewIterator(ColumnFamilyHandle* column_family);
// Create an iterator of the default column family.
virtual WBWIIterator* NewIterator();
private:
struct Rep;
Rep* rep;
};
} // namespace rocksdb

@ -152,6 +152,7 @@ class WriteBatch {
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;
protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_ std::string rep_; // See comment in write_batch.cc for the format of rep_
// Intentionally copyable // Intentionally copyable

@ -0,0 +1,301 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/comparator.h"
#include "db/column_family.h"
#include "db/skiplist.h"
#include "util/arena.h"
namespace rocksdb {
namespace {
class ReadableWriteBatch : public WriteBatch {
public:
explicit ReadableWriteBatch(size_t reserved_bytes = 0)
: WriteBatch(reserved_bytes) {}
// Retrieve some information from a write entry in the write batch, given
// the start offset of the write entry.
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
Slice* value, Slice* blob) const;
};
} // namespace
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
struct WriteBatchIndexEntry {
WriteBatchIndexEntry(size_t o, uint32_t c)
: offset(o), column_family(c), search_key(nullptr) {}
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
: offset(0), column_family(c), search_key(sk) {}
size_t offset; // offset of an entry in write batch's string buffer.
uint32_t column_family; // column family of the entry
const Slice* search_key; // if not null, instead of reading keys from
// write batch, use it to compare. This is used
// for lookup key.
};
class WriteBatchEntryComparator {
public:
WriteBatchEntryComparator(const Comparator* comparator,
const ReadableWriteBatch* write_batch)
: comparator_(comparator), write_batch_(write_batch) {}
// Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b
int operator()(const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const;
private:
const Comparator* comparator_;
const ReadableWriteBatch* write_batch_;
};
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList;
struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
: write_batch(reserved_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;
WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
return GetEntryWithCfId(GetColumnFamilyID(column_family));
}
WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry = new (mem)
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
return index_entry;
}
};
class WBWIIteratorImpl : public WBWIIterator {
public:
WBWIIteratorImpl(uint32_t column_family_id,
WriteBatchEntrySkipList* skip_list,
const ReadableWriteBatch* write_batch)
: column_family_id_(column_family_id),
skip_list_iter_(skip_list),
write_batch_(write_batch),
valid_(false) {}
virtual ~WBWIIteratorImpl() {}
virtual bool Valid() const override { return valid_; }
virtual void Seek(const Slice& key) override {
valid_ = true;
WriteBatchIndexEntry search_entry(&key, column_family_id_);
skip_list_iter_.Seek(&search_entry);
ReadEntry();
}
virtual void Next() override {
skip_list_iter_.Next();
ReadEntry();
}
virtual const WriteEntry& Entry() const override { return current_; }
virtual Status status() const override { return status_; }
private:
uint32_t column_family_id_;
WriteBatchEntrySkipList::Iterator skip_list_iter_;
const ReadableWriteBatch* write_batch_;
Status status_;
bool valid_;
WriteEntry current_;
void ReadEntry() {
if (!status_.ok() || !skip_list_iter_.Valid()) {
valid_ = false;
return;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
if (iter_entry == nullptr ||
iter_entry->column_family != column_family_id_) {
valid_ = false;
return;
}
Slice blob;
status_ = write_batch_->GetEntryFromDataOffset(
iter_entry->offset, &current_.type, &current_.key, &current_.value,
&blob);
if (!status_.ok()) {
valid_ = false;
} else if (current_.type != kPutRecord && current_.type != kDeleteRecord &&
current_.type != kMergeRecord) {
valid_ = false;
status_ = Status::Corruption("write batch index is corrupted");
}
}
};
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Slice* value,
Slice* blob) const {
if (type == nullptr || Key == nullptr || value == nullptr ||
blob == nullptr) {
return Status::InvalidArgument("Output parameters cannot be null");
}
if (data_offset >= GetDataSize()) {
return Status::InvalidArgument("data offset exceed write batch size");
}
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
char tag;
uint32_t column_family;
Status s =
ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob);
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
*type = kPutRecord;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
*type = kDeleteRecord;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
*type = kMergeRecord;
break;
case kTypeLogData:
*type = kLogDataRecord;
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
WriteBatchWithIndex::WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes)
: rep(new Rep(index_comparator, reserved_bytes)) {}
WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
WBWIIterator* WriteBatchWithIndex::NewIterator() {
return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
}
WBWIIterator* WriteBatchWithIndex::NewIterator(
ColumnFamilyHandle* column_family) {
return new WBWIIteratorImpl(GetColumnFamilyID(column_family),
&(rep->skip_list), &rep->write_batch);
}
void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
auto* index_entry = rep->GetEntry(column_family);
rep->write_batch.Put(column_family, key, value);
rep->skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
auto* index_entry = rep->GetEntryWithCfId(0);
rep->write_batch.Put(key, value);
rep->skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
auto* index_entry = rep->GetEntry(column_family);
rep->write_batch.Merge(column_family, key, value);
rep->skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
auto* index_entry = rep->GetEntryWithCfId(0);
rep->write_batch.Merge(key, value);
rep->skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::PutLogData(const Slice& blob) {
rep->write_batch.PutLogData(blob);
}
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
auto* index_entry = rep->GetEntry(column_family);
rep->write_batch.Delete(column_family, key);
rep->skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::Delete(const Slice& key) {
auto* index_entry = rep->GetEntryWithCfId(0);
rep->write_batch.Delete(key);
rep->skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
auto* index_entry = rep->GetEntry(column_family);
rep->write_batch.Delete(column_family, key);
rep->skip_list.Insert(index_entry);
}
void WriteBatchWithIndex::Delete(const SliceParts& key) {
auto* index_entry = rep->GetEntryWithCfId(0);
rep->write_batch.Delete(key);
rep->skip_list.Insert(index_entry);
}
int WriteBatchEntryComparator::operator()(
const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const {
if (entry1->column_family > entry2->column_family) {
return 1;
} else if (entry1->column_family < entry2->column_family) {
return -1;
}
Status s;
Slice key1, key2;
if (entry1->search_key == nullptr) {
Slice value, blob;
WriteType write_type;
s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1,
&value, &blob);
if (!s.ok()) {
return 1;
}
} else {
key1 = *(entry1->search_key);
}
if (entry2->search_key == nullptr) {
Slice value, blob;
WriteType write_type;
s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2,
&value, &blob);
if (!s.ok()) {
return -1;
}
} else {
key2 = *(entry2->search_key);
}
int cmp = comparator_->Compare(key1, key2);
if (cmp != 0) {
return cmp;
} else if (entry1->offset > entry2->offset) {
return 1;
} else if (entry1->offset < entry2->offset) {
return -1;
}
return 0;
}
} // namespace rocksdb

@ -0,0 +1,235 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <memory>
#include <map>
#include "db/column_family.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/testharness.h"
namespace rocksdb {
namespace {
class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
public:
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
uint32_t GetID() const override { return id_; }
private:
uint32_t id_;
};
struct Entry {
std::string key;
std::string value;
WriteType type;
};
struct TestHandler : public WriteBatch::Handler {
std::map<uint32_t, std::vector<Entry>> seen;
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
Entry e;
e.key = key.ToString();
e.value = value.ToString();
e.type = kPutRecord;
seen[column_family_id].push_back(e);
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
Entry e;
e.key = key.ToString();
e.value = value.ToString();
e.type = kMergeRecord;
seen[column_family_id].push_back(e);
return Status::OK();
}
virtual void LogData(const Slice& blob) {}
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
Entry e;
e.key = key.ToString();
e.value = "";
e.type = kDeleteRecord;
seen[column_family_id].push_back(e);
return Status::OK();
}
};
} // namespace anonymous
class WriteBatchWithIndexTest {};
TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
Entry entries[] = {{"aaa", "0005", kPutRecord},
{"b", "0002", kPutRecord},
{"cdd", "0002", kMergeRecord},
{"aab", "00001", kPutRecord},
{"cc", "00005", kPutRecord},
{"cdd", "0002", kPutRecord},
{"aab", "0003", kPutRecord},
{"cc", "00005", kDeleteRecord}, };
// In this test, we insert <key, value> to column family `data`, and
// <value, key> to column family `index`. Then iterator them in order
// and seek them by key.
// Sort entries by key
std::map<std::string, std::vector<Entry*>> data_map;
// Sort entries by value
std::map<std::string, std::vector<Entry*>> index_map;
for (auto& e : entries) {
data_map[e.key].push_back(&e);
index_map[e.value].push_back(&e);
}
WriteBatchWithIndex batch(BytewiseComparator(), 20);
ColumnFamilyHandleImplDummy data(6), index(8);
for (auto& e : entries) {
if (e.type == kPutRecord) {
batch.Put(&data, e.key, e.value);
batch.Put(&index, e.value, e.key);
} else if (e.type == kMergeRecord) {
batch.Merge(&data, e.key, e.value);
batch.Put(&index, e.value, e.key);
} else {
assert(e.type == kDeleteRecord);
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&data));
iter->Seek(e.key);
ASSERT_OK(iter->status());
auto& write_entry = iter->Entry();
ASSERT_EQ(e.key, write_entry.key.ToString());
ASSERT_EQ(e.value, write_entry.value.ToString());
batch.Delete(&data, e.key);
batch.Put(&index, e.value, "");
}
}
// Iterator all keys
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&data));
iter->Seek("");
for (auto pair : data_map) {
for (auto v : pair.second) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
ASSERT_EQ(pair.first, write_entry.key.ToString());
ASSERT_EQ(v->type, write_entry.type);
if (write_entry.type != kDeleteRecord) {
ASSERT_EQ(v->value, write_entry.value.ToString());
}
iter->Next();
}
}
ASSERT_TRUE(!iter->Valid());
}
// Iterator all indexes
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&index));
iter->Seek("");
for (auto pair : index_map) {
for (auto v : pair.second) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
ASSERT_EQ(pair.first, write_entry.key.ToString());
if (v->type != kDeleteRecord) {
ASSERT_EQ(v->key, write_entry.value.ToString());
ASSERT_EQ(v->value, write_entry.key.ToString());
}
iter->Next();
}
}
ASSERT_TRUE(!iter->Valid());
}
// Seek to every key
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&data));
// Seek the keys one by one in reverse order
for (auto pair = data_map.rbegin(); pair != data_map.rend(); ++pair) {
iter->Seek(pair->first);
ASSERT_OK(iter->status());
for (auto v : pair->second) {
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ(v->type, write_entry.type);
if (write_entry.type != kDeleteRecord) {
ASSERT_EQ(v->value, write_entry.value.ToString());
}
iter->Next();
ASSERT_OK(iter->status());
}
}
}
// Seek to every index
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&index));
// Seek the keys one by one in reverse order
for (auto pair = index_map.rbegin(); pair != index_map.rend(); ++pair) {
iter->Seek(pair->first);
ASSERT_OK(iter->status());
for (auto v : pair->second) {
ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ(v->value, write_entry.key.ToString());
if (v->type != kDeleteRecord) {
ASSERT_EQ(v->key, write_entry.value.ToString());
}
iter->Next();
ASSERT_OK(iter->status());
}
}
}
// Verify WriteBatch can be iterated
TestHandler handler;
batch.GetWriteBatch()->Iterate(&handler);
// Verify data column family
{
ASSERT_EQ(sizeof(entries) / sizeof(Entry),
handler.seen[data.GetID()].size());
size_t i = 0;
for (auto e : handler.seen[data.GetID()]) {
auto write_entry = entries[i++];
ASSERT_EQ(e.type, write_entry.type);
ASSERT_EQ(e.key, write_entry.key);
if (e.type != kDeleteRecord) {
ASSERT_EQ(e.value, write_entry.value);
}
}
}
// Verify index column family
{
ASSERT_EQ(sizeof(entries) / sizeof(Entry),
handler.seen[index.GetID()].size());
size_t i = 0;
for (auto e : handler.seen[index.GetID()]) {
auto write_entry = entries[i++];
ASSERT_EQ(e.key, write_entry.value);
if (write_entry.type != kDeleteRecord) {
ASSERT_EQ(e.value, write_entry.key);
}
}
}
}
} // namespace
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
Loading…
Cancel
Save