WriteBatchWithIndex to allow different Comparators for different column families

Summary:
Previously, one single column family is given to WriteBatchWithIndex to index keys for all column families. An extra map from column family ID to comparator is maintained which can override the default comparator given in the constructor. A WriteBatchWithIndex::SetComparatorForCF() is added for user to add comparators per column family.

Also move more codes into anonymous namespace.

Test Plan: Add a unit test

Reviewers: ljin, igor

Reviewed By: igor

Subscribers: dhruba, leveldb, yhchiang

Differential Revision: https://reviews.facebook.net/D23355
main
sdong 10 years ago
parent 57a32f147f
commit d0de413f4d
  1. 13
      db/column_family.cc
  2. 4
      db/column_family.h
  3. 5
      db/write_batch_test.cc
  4. 15
      include/rocksdb/utilities/write_batch_with_index.h
  5. 79
      utilities/write_batch_with_index/write_batch_with_index.cc
  6. 114
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -86,6 +86,10 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator();
}
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
@ -726,4 +730,13 @@ uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
return column_family_id;
}
const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family) {
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return cfh->user_comparator();
}
return nullptr;
}
} // namespace rocksdb

@ -49,6 +49,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual const Comparator* user_comparator() const;
virtual uint32_t GetID() const;
@ -448,4 +449,7 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family);
extern const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family);
} // namespace rocksdb

@ -289,6 +289,9 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
uint32_t GetID() const override { return id_; }
const Comparator* user_comparator() const override {
return BytewiseComparator();
}
private:
uint32_t id_;
@ -320,7 +323,7 @@ TEST(WriteBatchTest, ColumnFamiliesBatchTest) {
}
TEST(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
WriteBatchWithIndex batch(BytewiseComparator(), 20);
WriteBatchWithIndex batch;
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Put(&two, Slice("twofoo"), Slice("bar2"));

@ -11,8 +11,9 @@
#pragma once
#include "rocksdb/status.h"
#include "rocksdb/comparator.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
namespace rocksdb {
@ -56,12 +57,14 @@ class WBWIIterator {
// A user can call NewIterator() to create an iterator.
class WriteBatchWithIndex {
public:
// index_comparator indicates the order when iterating data in the write
// batch. Technically, it doesn't have to be the same as the one used in
// the DB.
// backup_index_comparator: the backup comparator used to compare keys
// within the same column family, if column family is not given in the
// interface, or we can't find a column family from the column family handle
// passed in, backup_index_comparator will be used for the column family.
// reserved_bytes: reserved bytes in underlying WriteBatch
explicit WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes = 0);
explicit WriteBatchWithIndex(
const Comparator* backup_index_comparator = BytewiseComparator(),
size_t reserved_bytes = 0);
virtual ~WriteBatchWithIndex();
WriteBatch* GetWriteBatch();

@ -20,7 +20,6 @@ class ReadableWriteBatch : public WriteBatch {
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
Slice* value, Slice* blob) const;
};
} // namespace
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
struct WriteBatchIndexEntry {
@ -38,44 +37,28 @@ struct WriteBatchIndexEntry {
class WriteBatchEntryComparator {
public:
WriteBatchEntryComparator(const Comparator* comparator,
WriteBatchEntryComparator(const Comparator* default_comparator,
const ReadableWriteBatch* write_batch)
: comparator_(comparator), write_batch_(write_batch) {}
: default_comparator_(default_comparator), write_batch_(write_batch) {}
// Compare a and b. Return a negative value if a is less than b, 0 if they
// are equal, and a positive value if a is greater than b
int operator()(const WriteBatchIndexEntry* entry1,
const WriteBatchIndexEntry* entry2) const;
void SetComparatorForCF(uint32_t column_family_id,
const Comparator* comparator) {
cf_comparator_map_[column_family_id] = comparator;
}
private:
const Comparator* comparator_;
const Comparator* default_comparator_;
std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
const ReadableWriteBatch* write_batch_;
};
typedef SkipList<WriteBatchIndexEntry*, const WriteBatchEntryComparator&>
WriteBatchEntrySkipList;
struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
: write_batch(reserved_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;
WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
return GetEntryWithCfId(GetColumnFamilyID(column_family));
}
WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry = new (mem)
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
return index_entry;
}
};
class WBWIIteratorImpl : public WBWIIterator {
public:
WBWIIteratorImpl(uint32_t column_family_id,
@ -138,6 +121,35 @@ class WBWIIteratorImpl : public WBWIIterator {
}
}
};
} // namespace
struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
: write_batch(reserved_bytes),
comparator(index_comparator, &write_batch),
skip_list(comparator, &arena) {}
ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator;
Arena arena;
WriteBatchEntrySkipList skip_list;
WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
uint32_t cf_id = GetColumnFamilyID(column_family);
const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
if (cf_cmp != nullptr) {
comparator.SetComparatorForCF(cf_id, cf_cmp);
}
return GetEntryWithCfId(cf_id);
}
WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
auto* index_entry = new (mem)
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
return index_entry;
}
};
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
@ -179,9 +191,9 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
return Status::OK();
}
WriteBatchWithIndex::WriteBatchWithIndex(const Comparator* index_comparator,
size_t reserved_bytes)
: rep(new Rep(index_comparator, reserved_bytes)) {}
WriteBatchWithIndex::WriteBatchWithIndex(
const Comparator* default_index_comparator, size_t reserved_bytes)
: rep(new Rep(default_index_comparator, reserved_bytes)) {}
WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
@ -287,7 +299,14 @@ int WriteBatchEntryComparator::operator()(
key2 = *(entry2->search_key);
}
int cmp = comparator_->Compare(key1, key2);
int cmp;
auto comparator_for_cf = cf_comparator_map_.find(entry1->column_family);
if (comparator_for_cf != cf_comparator_map_.end()) {
cmp = comparator_for_cf->second->Compare(key1, key2);
} else {
cmp = default_comparator_->Compare(key1, key2);
}
if (cmp != 0) {
return cmp;
} else if (entry1->offset > entry2->offset) {

@ -19,12 +19,16 @@ namespace rocksdb {
namespace {
class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
public:
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
explicit ColumnFamilyHandleImplDummy(int id, const Comparator* comparator)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
id_(id),
comparator_(comparator) {}
uint32_t GetID() const override { return id_; }
const Comparator* user_comparator() const override { return comparator_; }
private:
uint32_t id_;
const Comparator* comparator_;
};
struct Entry {
@ -90,8 +94,9 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
index_map[e.value].push_back(&e);
}
WriteBatchWithIndex batch(BytewiseComparator(), 20);
ColumnFamilyHandleImplDummy data(6), index(8);
WriteBatchWithIndex batch(nullptr, 20);
ColumnFamilyHandleImplDummy data(6, BytewiseComparator());
ColumnFamilyHandleImplDummy index(8, BytewiseComparator());
for (auto& e : entries) {
if (e.type == kPutRecord) {
batch.Put(&data, e.key, e.value);
@ -230,6 +235,107 @@ TEST(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) {
}
}
class ReverseComparator : public Comparator {
public:
ReverseComparator() {}
virtual const char* Name() const override {
return "rocksdb.ReverseComparator";
}
virtual int Compare(const Slice& a, const Slice& b) const override {
return 0 - BytewiseComparator()->Compare(a, b);
}
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const {}
virtual void FindShortSuccessor(std::string* key) const {}
};
TEST(WriteBatchWithIndexTest, TestComparatorForCF) {
ReverseComparator reverse_cmp;
ColumnFamilyHandleImplDummy cf1(6, nullptr);
ColumnFamilyHandleImplDummy reverse_cf(66, &reverse_cmp);
ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20);
batch.Put(&cf1, "ddd", "");
batch.Put(&cf2, "aaa", "");
batch.Put(&cf2, "eee", "");
batch.Put(&cf1, "ccc", "");
batch.Put(&reverse_cf, "a11", "");
batch.Put(&cf1, "bbb", "");
batch.Put(&reverse_cf, "a33", "");
batch.Put(&reverse_cf, "a22", "");
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbb", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ccc", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("ddd", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("aaa", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("eee", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
{
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf));
iter->Seek("");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("z");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a33", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a22", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a11", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("a22");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a22", iter->Entry().key.ToString());
iter->Seek("a13");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a11", iter->Entry().key.ToString());
}
}
} // namespace
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

Loading…
Cancel
Save