Killing Transform Rep

Summary:
Let's get rid of TransformRep and it's children. We have confirmed that HashSkipListRep works better with multifeed, so there is no benefit to keeping this around.

This diff is mostly just deleting references to obsoleted functions. I also have a diff for fbcode that we'll need to push when we switch to new release.

I had to expose HashSkipListRepFactory in the client header files because db_impl.cc needs access to GetTransform() function for SanitizeOptions.

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14397
main
Igor Canadi 11 years ago
parent 043fc14c3e
commit eb12e47e0e
  1. 18
      db/db_bench.cc
  2. 5
      db/db_impl.cc
  3. 164
      db/db_test.cc
  4. 4
      db/perf_context_test.cc
  5. 11
      db/prefix_test.cc
  6. 109
      include/rocksdb/memtablerep.h
  7. 28
      tools/db_stress.cc
  8. 33
      util/hash_skiplist_rep.cc
  9. 38
      util/hash_skiplist_rep.h
  10. 19
      util/stl_wrappers.h
  11. 422
      util/transformrep.cc

@ -431,12 +431,11 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
}
return true;
}
DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep");
DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipList");
enum RepFactory {
kSkipList,
kPrefixHash,
kUnsorted,
kVectorRep
};
enum RepFactory StringToRepFactory(const char* ctype) {
@ -446,8 +445,6 @@ enum RepFactory StringToRepFactory(const char* ctype) {
return kSkipList;
else if (!strcasecmp(ctype, "prefix_hash"))
return kPrefixHash;
else if (!strcasecmp(ctype, "unsorted"))
return kUnsorted;
else if (!strcasecmp(ctype, "vector"))
return kVectorRep;
@ -803,9 +800,6 @@ class Benchmark {
case kSkipList:
fprintf(stdout, "Memtablerep: skip_list\n");
break;
case kUnsorted:
fprintf(stdout, "Memtablerep: unsorted\n");
break;
case kVectorRep:
fprintf(stdout, "Memtablerep: vector\n");
break;
@ -1328,14 +1322,8 @@ class Benchmark {
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
options.memtable_factory.reset(
new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size))
);
break;
case kUnsorted:
options.memtable_factory.reset(
new UnsortedRepFactory
);
options.memtable_factory.reset(NewHashSkipListRepFactory(
NewFixedPrefixTransform(FLAGS_prefix_size)));
break;
case kSkipList:
// no need to do anything

@ -50,6 +50,7 @@
#include "util/auto_roll_logger.h"
#include "util/build_version.h"
#include "util/coding.h"
#include "util/hash_skiplist_rep.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
@ -162,10 +163,10 @@ Options SanitizeOptions(const std::string& dbname,
Log(result.info_log, "Compaction filter specified, ignore factory");
}
if (result.prefix_extractor) {
// If a prefix extractor has been supplied and a PrefixHashRepFactory is
// If a prefix extractor has been supplied and a HashSkipListRepFactory is
// being used, make sure that the latter uses the former as its transform
// function.
auto factory = dynamic_cast<PrefixHashRepFactory*>(
auto factory = dynamic_cast<HashSkipListRepFactory*>(
result.memtable_factory.get());
if (factory &&
factory->GetTransform() != result.prefix_extractor) {

@ -244,7 +244,6 @@ class DBTest {
enum OptionConfig {
kDefault,
kVectorRep,
kUnsortedRep,
kMergePut,
kFilter,
kUncompressed,
@ -255,7 +254,7 @@ class DBTest {
kCompactOnFlush,
kPerfOptions,
kDeletesFilterFirst,
kPrefixHashRep,
kHashSkipList,
kUniversalCompaction,
kCompressedBlockCache,
kEnd
@ -339,9 +338,9 @@ class DBTest {
Options CurrentOptions() {
Options options;
switch (option_config_) {
case kPrefixHashRep:
options.memtable_factory.reset(new
PrefixHashRepFactory(NewFixedPrefixTransform(1)));
case kHashSkipList:
options.memtable_factory.reset(
NewHashSkipListRepFactory(NewFixedPrefixTransform(1)));
break;
case kMergePut:
options.merge_operator = MergeOperators::CreatePutOperator();
@ -375,9 +374,6 @@ class DBTest {
case kDeletesFilterFirst:
options.filter_deletes = true;
break;
case kUnsortedRep:
options.memtable_factory.reset(new UnsortedRepFactory);
break;
case kVectorRep:
options.memtable_factory.reset(new VectorRepFactory(100));
break;
@ -4600,7 +4596,7 @@ TEST(DBTest, Randomized) {
// TODO(sanjay): Test Get() works
int p = rnd.Uniform(100);
int minimum = 0;
if (option_config_ == kPrefixHashRep) {
if (option_config_ == kHashSkipList) {
minimum = 1;
}
if (p < 45) { // Put
@ -4770,90 +4766,82 @@ void PrefixScanInit(DBTest *dbtest) {
}
TEST(DBTest, PrefixScan) {
for (int it = 0; it < 2; ++it) {
ReadOptions ro = ReadOptions();
int count;
Slice prefix;
Slice key;
char buf[100];
Iterator* iter;
snprintf(buf, sizeof(buf), "03______:");
prefix = Slice(buf, 8);
key = Slice(buf, 9);
auto prefix_extractor = NewFixedPrefixTransform(8);
// db configs
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
options.no_block_cache = true;
options.filter_policy = NewBloomFilterPolicy(10);
options.prefix_extractor = prefix_extractor;
options.whole_key_filtering = false;
options.disable_auto_compactions = true;
options.max_background_compactions = 2;
options.create_if_missing = true;
options.disable_seek_compaction = true;
if (it == 0) {
options.memtable_factory.reset(NewHashSkipListRepFactory(
prefix_extractor));
} else {
options.memtable_factory = std::make_shared<PrefixHashRepFactory>(
prefix_extractor);
}
ReadOptions ro = ReadOptions();
int count;
Slice prefix;
Slice key;
char buf[100];
Iterator* iter;
snprintf(buf, sizeof(buf), "03______:");
prefix = Slice(buf, 8);
key = Slice(buf, 9);
auto prefix_extractor = NewFixedPrefixTransform(8);
// db configs
env_->count_random_reads_ = true;
Options options = CurrentOptions();
options.env = env_;
options.no_block_cache = true;
options.filter_policy = NewBloomFilterPolicy(10);
options.prefix_extractor = prefix_extractor;
options.whole_key_filtering = false;
options.disable_auto_compactions = true;
options.max_background_compactions = 2;
options.create_if_missing = true;
options.disable_seek_compaction = true;
options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor));
// prefix specified, with blooms: 2 RAND I/Os
// SeekToFirst
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// prefix specified, with blooms: 2 RAND I/Os
// SeekToFirst
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// prefix specified, with blooms: 2 RAND I/Os
// Seek
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->Seek(key); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// prefix specified, with blooms: 2 RAND I/Os
// Seek
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
ro.prefix = &prefix;
iter = db_->NewIterator(ro);
for (iter->Seek(key); iter->Valid(); iter->Next()) {
assert(iter->key().starts_with(prefix));
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 2);
// no prefix specified: 11 RAND I/Os
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
iter = db_->NewIterator(ReadOptions());
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
if (! iter->key().starts_with(prefix)) {
break;
}
count++;
// no prefix specified: 11 RAND I/Os
DestroyAndReopen(&options);
PrefixScanInit(this);
count = 0;
env_->random_read_counter_.Reset();
iter = db_->NewIterator(ReadOptions());
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
if (! iter->key().starts_with(prefix)) {
break;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 11);
Close();
delete options.filter_policy;
count++;
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 11);
Close();
delete options.filter_policy;
}
std::string MakeKey(unsigned int num) {

@ -38,8 +38,8 @@ std::shared_ptr<DB> OpenDb() {
if (FLAGS_use_set_based_memetable) {
auto prefix_extractor = rocksdb::NewFixedPrefixTransform(0);
options.memtable_factory =
std::make_shared<rocksdb::PrefixHashRepFactory>(prefix_extractor);
options.memtable_factory.reset(
NewHashSkipListRepFactory(prefix_extractor));
}
Status s = DB::Open(options, kDbName, &db);

@ -11,7 +11,6 @@
#include "util/testharness.h"
DEFINE_bool(use_prefix_hash_memtable, true, "");
DEFINE_bool(use_nolock_version, true, "");
DEFINE_bool(trigger_deadlock, false,
"issue delete in range scan to trigger PrefixHashMap deadlock");
DEFINE_uint64(bucket_count, 100000, "number of buckets");
@ -109,14 +108,8 @@ class PrefixTest {
if (FLAGS_use_prefix_hash_memtable) {
auto prefix_extractor = NewFixedPrefixTransform(8);
options.prefix_extractor = prefix_extractor;
if (FLAGS_use_nolock_version) {
options.memtable_factory.reset(NewHashSkipListRepFactory(
prefix_extractor, FLAGS_bucket_count));
} else {
options.memtable_factory =
std::make_shared<rocksdb::PrefixHashRepFactory>(
prefix_extractor, FLAGS_bucket_count, FLAGS_num_locks);
}
options.memtable_factory.reset(NewHashSkipListRepFactory(
prefix_extractor, FLAGS_bucket_count));
}
Status s = DB::Open(options, kDbName, &db);

@ -17,21 +17,13 @@
// The factory will be passed an Arena object when a new MemTableRep is
// requested. The API for this object is in rocksdb/arena.h.
//
// Users can implement their own memtable representations. We include four
// Users can implement their own memtable representations. We include three
// types built in:
// - SkipListRep: This is the default; it is backed by a skip list.
// - TransformRep: This is backed by an custom hash map.
// On construction, they are given a SliceTransform object. This
// object is applied to the user key of stored items which indexes into the
// hash map to yield a skiplist containing all records that share the same
// user key under the transform function.
// - UnsortedRep: A subclass of TransformRep where the transform function is
// the identity function. Optimized for point lookups.
// - PrefixHashRep: A subclass of TransformRep where the transform function is
// a fixed-size prefix extractor. If you use PrefixHashRepFactory, the transform
// must be identical to options.prefix_extractor, otherwise it will be discarded
// and the default will be used. It is optimized for ranged scans over a
// prefix.
// - HashSkipListRep: The memtable rep that is best used for keys that are
// structured like "prefix:suffix" where iteration withing a prefix is
// common and iteration across different prefixes is rare. It is backed by
// a hash map where each bucket is a skip list.
// - VectorRep: This is backed by an unordered std::vector. On iteration, the
// vector is sorted. It is intelligent about sorting; once the MarkReadOnly()
// has been called, the vector will only be sorted once. It is optimized for
@ -186,88 +178,23 @@ public:
}
};
// TransformReps are backed by an unordered map of buffers to buckets. When
// looking up a key, the user key is extracted and a user-supplied transform
// function (see rocksdb/slice_transform.h) is applied to get the key into the
// unordered map. This allows the user to bin user keys based on arbitrary
// criteria. Two example implementations are UnsortedRepFactory and
// PrefixHashRepFactory.
// HashSkipListRep is backed by hash map of buckets. Each bucket is a skip
// list. All the keys with the same prefix will be in the same bucket.
// The prefix is determined using user supplied SliceTransform. It has
// to match prefix_extractor in options.prefix_extractor.
//
// Iteration over the entire collection is implemented by dumping all the keys
// into an std::set. Thus, these data structures are best used when iteration
// over the entire collection is rare.
// into a separate skip list. Thus, these data structures are best used when
// iteration over the entire collection is rare.
//
// Parameters:
// transform: The SliceTransform to bucket user keys on. TransformRepFactory
// owns the pointer.
// bucket_count: Passed to the constructor of the underlying
// std::unordered_map of each TransformRep. On initialization, the
// underlying array will be at least bucket_count size.
// num_locks: Number of read-write locks to have for the rep. Each bucket is
// hashed onto a read-write lock which controls access to that lock. More
// locks means finer-grained concurrency but more memory overhead.
class TransformRepFactory : public MemTableRepFactory {
public:
explicit TransformRepFactory(const SliceTransform* transform,
size_t bucket_count, size_t num_locks = 1000)
: transform_(transform),
bucket_count_(bucket_count),
num_locks_(num_locks) { }
virtual ~TransformRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena*) override;
virtual const char* Name() const override {
return "TransformRepFactory";
}
const SliceTransform* GetTransform() { return transform_; }
protected:
const SliceTransform* transform_;
const size_t bucket_count_;
const size_t num_locks_;
};
// UnsortedReps bin user keys based on an identity function transform -- that
// is, transform(key) = key. This optimizes for point look-ups.
//
// Parameters: See TransformRepFactory.
class UnsortedRepFactory : public TransformRepFactory {
public:
explicit UnsortedRepFactory(size_t bucket_count = 0, size_t num_locks = 1000)
: TransformRepFactory(NewNoopTransform(),
bucket_count,
num_locks) { }
virtual const char* Name() const override {
return "UnsortedRepFactory";
}
};
// PrefixHashReps bin user keys based on a fixed-size prefix. This optimizes for
// short ranged scans over a given prefix.
//
// Parameters: See TransformRepFactory.
class PrefixHashRepFactory : public TransformRepFactory {
public:
explicit PrefixHashRepFactory(const SliceTransform* prefix_extractor,
size_t bucket_count = 0, size_t num_locks = 1000)
: TransformRepFactory(prefix_extractor, bucket_count, num_locks)
{ }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena*) override;
virtual const char* Name() const override {
return "PrefixHashRepFactory";
}
};
// The same as TransformRepFactory except it doesn't use locks.
// Experimental, will replace TransformRepFactory once we are sure
// it performs better
// transform: The prefix extractor that returns prefix when supplied a user
// key. Has to match options.prefix_extractor
// bucket_count: Number of buckets in a hash_map. Each bucket needs
// 8 bytes. By default, we set buckets to one million, which
// will take 8MB of memory. If you know the number of keys you'll
// keep in hash map, set bucket count to be approximately twice
// the number of keys
extern MemTableRepFactory* NewHashSkipListRepFactory(
const SliceTransform* transform, size_t bucket_count = 1000000);

@ -305,8 +305,7 @@ DEFINE_bool(filter_deletes, false, "On true, deletes use KeyMayExist to drop"
enum RepFactory {
kSkipList,
kPrefixHash,
kUnsorted,
kHashSkipList,
kVectorRep
};
enum RepFactory StringToRepFactory(const char* ctype) {
@ -315,9 +314,7 @@ enum RepFactory StringToRepFactory(const char* ctype) {
if (!strcasecmp(ctype, "skip_list"))
return kSkipList;
else if (!strcasecmp(ctype, "prefix_hash"))
return kPrefixHash;
else if (!strcasecmp(ctype, "unsorted"))
return kUnsorted;
return kHashSkipList;
else if (!strcasecmp(ctype, "vector"))
return kVectorRep;
@ -335,7 +332,7 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
}
return true;
}
DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep");
DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep");
static const bool FLAGS_prefix_size_dummy =
google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
@ -1338,12 +1335,9 @@ class StressTest {
case kSkipList:
memtablerep = "skip_list";
break;
case kPrefixHash:
case kHashSkipList:
memtablerep = "prefix_hash";
break;
case kUnsorted:
memtablerep = "unsorted";
break;
case kVectorRep:
memtablerep = "vector";
break;
@ -1393,21 +1387,15 @@ class StressTest {
FLAGS_delete_obsolete_files_period_micros;
options.max_manifest_file_size = 1024;
options.filter_deletes = FLAGS_filter_deletes;
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) {
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) {
fprintf(stderr,
"prefix_size should be non-zero iff memtablerep == prefix_hash\n");
exit(1);
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
options.memtable_factory.reset(
new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size))
);
break;
case kUnsorted:
options.memtable_factory.reset(
new UnsortedRepFactory()
);
case kHashSkipList:
options.memtable_factory.reset(NewHashSkipListRepFactory(
NewFixedPrefixTransform(FLAGS_prefix_size)));
break;
case kSkipList:
// no need to do anything

@ -4,6 +4,8 @@
// of patent rights can be found in the PATENTS file in the same directory.
//
#include "util/hash_skiplist_rep.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/arena.h"
#include "rocksdb/slice.h"
@ -296,31 +298,12 @@ std::shared_ptr<MemTableRep::Iterator>
} // anon namespace
class HashSkipListRepFactory : public MemTableRepFactory {
public:
explicit HashSkipListRepFactory(const SliceTransform* transform,
size_t bucket_count = 1000000)
: transform_(transform),
bucket_count_(bucket_count) { }
virtual ~HashSkipListRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) override {
return std::make_shared<HashSkipListRep>(compare, arena, transform_,
bucket_count_);
}
virtual const char* Name() const override {
return "HashSkipListRepFactory";
}
const SliceTransform* GetTransform() { return transform_; }
private:
const SliceTransform* transform_;
const size_t bucket_count_;
};
std::shared_ptr<MemTableRep>
HashSkipListRepFactory::CreateMemTableRep(MemTableRep::KeyComparator &compare,
Arena *arena) {
return std::make_shared<HashSkipListRep>(compare, arena, transform_,
bucket_count_);
}
MemTableRepFactory* NewHashSkipListRepFactory(
const SliceTransform* transform, size_t bucket_count) {

@ -0,0 +1,38 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/slice_transform.h"
#include "rocksdb/memtablerep.h"
namespace rocksdb {
class HashSkipListRepFactory : public MemTableRepFactory {
public:
explicit HashSkipListRepFactory(const SliceTransform* transform,
size_t bucket_count = 1000000)
: transform_(transform),
bucket_count_(bucket_count) { }
virtual ~HashSkipListRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) override;
virtual const char* Name() const override {
return "HashSkipListRepFactory";
}
const SliceTransform* GetTransform() { return transform_; }
private:
const SliceTransform* transform_;
const size_t bucket_count_;
};
}

@ -28,24 +28,5 @@ namespace stl_wrappers {
}
};
struct Hash {
inline size_t operator()(const char* buf) const {
Slice internal_key = GetLengthPrefixedSlice(buf);
Slice value =
GetLengthPrefixedSlice(internal_key.data() + internal_key.size());
unsigned int hval = MurmurHash(internal_key.data(), internal_key.size(),
0);
hval = MurmurHash(value.data(), value.size(), hval);
return hval;
}
};
struct KeyEqual : private Base {
explicit KeyEqual(const MemTableRep::KeyComparator& compare)
: Base(compare) { }
inline bool operator()(const char* a, const char* b) const {
return this->compare_(a, b) == 0;
}
};
}
}

@ -1,422 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include <unordered_map>
#include <set>
#include <vector>
#include <algorithm>
#include <iostream>
#include "rocksdb/memtablerep.h"
#include "rocksdb/arena.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "port/port.h"
#include "util/mutexlock.h"
#include "util/murmurhash.h"
#include "util/stl_wrappers.h"
namespace std {
template <>
struct hash<rocksdb::Slice> {
size_t operator()(const rocksdb::Slice& slice) const {
return MurmurHash(slice.data(), slice.size(), 0);
}
};
}
namespace rocksdb {
namespace {
using namespace stl_wrappers;
class TransformRep : public MemTableRep {
public:
TransformRep(const KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size,
size_t num_locks);
virtual void Insert(const char* key) override;
virtual bool Contains(const char* key) const override;
virtual size_t ApproximateMemoryUsage() override;
virtual ~TransformRep() { }
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() override;
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator(
const Slice& slice) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetDynamicPrefixIterator()
override {
return std::make_shared<DynamicPrefixIterator>(*this);
}
std::shared_ptr<MemTableRep::Iterator> GetTransformIterator(
const Slice& transformed);
private:
friend class DynamicPrefixIterator;
typedef std::set<const char*, Compare> Bucket;
typedef std::unordered_map<Slice, std::shared_ptr<Bucket>> BucketMap;
// Maps slices (which are transformed user keys) to buckets of keys sharing
// the same transform.
BucketMap buckets_;
// rwlock_ protects access to the buckets_ data structure itself. Each bucket
// has its own read-write lock as well.
mutable port::RWMutex rwlock_;
// Keep track of approximately how much memory is being used.
size_t memory_usage_ = 0;
// The user-supplied transform whose domain is the user keys.
const SliceTransform* transform_;
// Get a bucket from buckets_. If the bucket hasn't been initialized yet,
// initialize it before returning. Must be externally synchronized.
std::shared_ptr<Bucket>& GetBucket(const Slice& transformed);
port::RWMutex* GetLock(const Slice& transformed) const;
mutable std::vector<port::RWMutex> locks_;
const KeyComparator& compare_;
class Iterator : public MemTableRep::Iterator {
public:
explicit Iterator(std::shared_ptr<Bucket> items);
virtual ~Iterator() { };
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const;
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const;
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next();
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev();
// Advance to the first entry with a key >= target
virtual void Seek(const char* target);
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst();
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast();
private:
std::shared_ptr<Bucket> items_;
Bucket::const_iterator cit_;
};
class EmptyIterator : public MemTableRep::Iterator {
// This is used when there wasn't a bucket. It is cheaper than
// instantiating an empty bucket over which to iterate.
public:
virtual bool Valid() const {
return false;
}
virtual const char* key() const {
assert(false);
return nullptr;
}
virtual void Next() { }
virtual void Prev() { }
virtual void Seek(const char* target) { }
virtual void SeekToFirst() { }
virtual void SeekToLast() { }
static std::shared_ptr<EmptyIterator> GetInstance();
private:
static std::shared_ptr<EmptyIterator> instance;
EmptyIterator() { }
};
class TransformIterator : public Iterator {
public:
explicit TransformIterator(std::shared_ptr<Bucket> items,
port::RWMutex* rwlock);
virtual ~TransformIterator() { }
private:
const ReadLock l_;
};
class DynamicPrefixIterator : public MemTableRep::Iterator {
private:
// the underlying memtable rep
const TransformRep& memtable_rep_;
// the result of a prefix seek
std::unique_ptr<MemTableRep::Iterator> bucket_iterator_;
public:
explicit DynamicPrefixIterator(const TransformRep& memtable_rep)
: memtable_rep_(memtable_rep) {}
virtual ~DynamicPrefixIterator() { };
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const {
return bucket_iterator_ && bucket_iterator_->Valid();
}
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const {
assert(Valid());
return bucket_iterator_->key();
}
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() {
assert(Valid());
bucket_iterator_->Next();
}
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() {
assert(Valid());
bucket_iterator_->Prev();
}
// Advance to the first entry with a key >= target within the
// same bucket as target
virtual void Seek(const char* target) {
Slice prefix = memtable_rep_.transform_->Transform(
memtable_rep_.UserKey(target));
ReadLock l(&memtable_rep_.rwlock_);
auto bucket = memtable_rep_.buckets_.find(prefix);
if (bucket == memtable_rep_.buckets_.end()) {
bucket_iterator_.reset(nullptr);
} else {
bucket_iterator_.reset(
new TransformIterator(bucket->second, memtable_rep_.GetLock(prefix)));
bucket_iterator_->Seek(target);
}
}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() {
// Prefix iterator does not support total order.
// We simply set the iterator to invalid state
bucket_iterator_.reset(nullptr);
}
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast() {
// Prefix iterator does not support total order.
// We simply set the iterator to invalid state
bucket_iterator_.reset(nullptr);
}
};
};
class PrefixHashRep : public TransformRep {
public:
PrefixHashRep(const KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size,
size_t num_locks)
: TransformRep(compare, arena, transform,
bucket_size, num_locks) { }
virtual std::shared_ptr<MemTableRep::Iterator> GetPrefixIterator(
const Slice& prefix) override;
};
std::shared_ptr<TransformRep::Bucket>& TransformRep::GetBucket(
const Slice& transformed) {
WriteLock l(&rwlock_);
auto& bucket = buckets_[transformed];
if (!bucket) {
bucket.reset(
new decltype(buckets_)::mapped_type::element_type(Compare(compare_)));
// To memory_usage_ we add the size of the std::set and the size of the
// std::pair (decltype(buckets_)::value_type) which includes the
// Slice and the std::shared_ptr
memory_usage_ += sizeof(*bucket) +
sizeof(decltype(buckets_)::value_type);
}
return bucket;
}
port::RWMutex* TransformRep::GetLock(const Slice& transformed) const {
return &locks_[std::hash<Slice>()(transformed) % locks_.size()];
}
TransformRep::TransformRep(const KeyComparator& compare, Arena* arena,
const SliceTransform* transform, size_t bucket_size,
size_t num_locks)
: buckets_(bucket_size),
transform_(transform),
locks_(num_locks),
compare_(compare) { }
void TransformRep::Insert(const char* key) {
assert(!Contains(key));
auto transformed = transform_->Transform(UserKey(key));
auto& bucket = GetBucket(transformed);
WriteLock bl(GetLock(transformed));
bucket->insert(key);
memory_usage_ += sizeof(key);
}
bool TransformRep::Contains(const char* key) const {
ReadLock l(&rwlock_);
auto transformed = transform_->Transform(UserKey(key));
auto bucket = buckets_.find(transformed);
if (bucket == buckets_.end()) {
return false;
}
ReadLock bl(GetLock(transformed));
return bucket->second->count(key) != 0;
}
size_t TransformRep::ApproximateMemoryUsage() {
return memory_usage_;
}
std::shared_ptr<TransformRep::EmptyIterator>
TransformRep::EmptyIterator::GetInstance() {
if (!instance) {
instance.reset(new TransformRep::EmptyIterator);
}
return instance;
}
TransformRep::Iterator::Iterator(std::shared_ptr<Bucket> items)
: items_(items),
cit_(items_->begin()) { }
// Returns true iff the iterator is positioned at a valid node.
bool TransformRep::Iterator::Valid() const {
return cit_ != items_->end();
}
// Returns the key at the current position.
// REQUIRES: Valid()
const char* TransformRep::Iterator::key() const {
assert(Valid());
return *cit_;
}
// Advances to the next position.
// REQUIRES: Valid()
void TransformRep::Iterator::Next() {
assert(Valid());
if (cit_ == items_->end()) {
return;
}
++cit_;
}
// Advances to the previous position.
// REQUIRES: Valid()
void TransformRep::Iterator::Prev() {
assert(Valid());
if (cit_ == items_->begin()) {
// If you try to go back from the first element, the iterator should be
// invalidated. So we set it to past-the-end. This means that you can
// treat the container circularly.
cit_ = items_->end();
} else {
--cit_;
}
}
// Advance to the first entry with a key >= target
void TransformRep::Iterator::Seek(const char* target) {
cit_ = items_->lower_bound(target);
}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
void TransformRep::Iterator::SeekToFirst() {
cit_ = items_->begin();
}
void TransformRep::Iterator::SeekToLast() {
cit_ = items_->end();
if (items_->size() != 0) {
--cit_;
}
}
TransformRep::TransformIterator::TransformIterator(
std::shared_ptr<Bucket> items, port::RWMutex* rwlock)
: Iterator(items), l_(rwlock) { }
std::shared_ptr<MemTableRep::Iterator> TransformRep::GetIterator() {
auto items = std::make_shared<Bucket>(Compare(compare_));
// Hold read locks on all locks
ReadLock l(&rwlock_);
std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) {
lock.ReadLock();
});
for (auto& bucket : buckets_) {
items->insert(bucket.second->begin(), bucket.second->end());
}
std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) {
lock.Unlock();
});
return std::make_shared<Iterator>(std::move(items));
}
std::shared_ptr<MemTableRep::Iterator> TransformRep::GetTransformIterator(
const Slice& transformed) {
ReadLock l(&rwlock_);
auto bucket = buckets_.find(transformed);
if (bucket == buckets_.end()) {
return EmptyIterator::GetInstance();
}
return std::make_shared<TransformIterator>(bucket->second,
GetLock(transformed));
}
std::shared_ptr<MemTableRep::Iterator> TransformRep::GetIterator(
const Slice& slice) {
auto transformed = transform_->Transform(slice);
return GetTransformIterator(transformed);
}
std::shared_ptr<TransformRep::EmptyIterator>
TransformRep::EmptyIterator::instance;
} // anon namespace
std::shared_ptr<MemTableRep> TransformRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<TransformRep>(compare, arena, transform_,
bucket_count_, num_locks_);
}
std::shared_ptr<MemTableRep> PrefixHashRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<PrefixHashRep>(compare, arena, transform_,
bucket_count_, num_locks_);
}
std::shared_ptr<MemTableRep::Iterator> PrefixHashRep::GetPrefixIterator(
const Slice& prefix) {
return TransformRep::GetTransformIterator(prefix);
}
} // namespace rocksdb
Loading…
Cancel
Save