Virtualize SkipList Interface

Summary: This diff virtualizes the skiplist interface so that users can provide their own implementation of a backing store for MemTables. Eventually, the backing store will be responsible for its own synchronization, allowing users (and us) to experiment with different lockless implementations.

Test Plan:
make clean
make -j32 check
./db_stress

Reviewers: dhruba, emayanke, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D11739
main
Jim Paton 11 years ago
parent 6fbe4e981a
commit 52d7ecfc78
  1. 13
      db/db_impl.cc
  2. 2
      db/db_impl.h
  3. 56
      db/memtable.cc
  4. 21
      db/memtable.h
  5. 3
      db/repair.cc
  6. 106
      db/skiplistrep.h
  7. 5
      db/write_batch_test.cc
  8. 91
      include/leveldb/memtablerep.h
  9. 6
      include/leveldb/options.h
  10. 13
      table/table_test.cc
  11. 5
      util/options.cc

@ -163,7 +163,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mutex_(options.use_adaptive_mutex), mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr), shutting_down_(nullptr),
bg_cv_(&mutex_), bg_cv_(&mutex_),
mem_(new MemTable(internal_comparator_, NumberLevels())), mem_rep_factory_(options_.memtable_factory),
mem_(new MemTable(internal_comparator_,
mem_rep_factory_, NumberLevels())),
logfile_number_(0), logfile_number_(0),
tmp_batch_(), tmp_batch_(),
bg_compaction_scheduled_(0), bg_compaction_scheduled_(0),
@ -688,7 +690,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) { if (mem == nullptr) {
mem = new MemTable(internal_comparator_, NumberLevels()); mem = new MemTable(internal_comparator_,
mem_rep_factory_, NumberLevels());
mem->Ref(); mem->Ref();
} }
status = WriteBatchInternal::InsertInto(&batch, mem, &options_); status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
@ -2528,7 +2531,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
log_.reset(new log::Writer(std::move(lfile))); log_.reset(new log::Writer(std::move(lfile)));
mem_->SetLogNumber(logfile_number_); mem_->SetLogNumber(logfile_number_);
imm_.Add(mem_); imm_.Add(mem_);
mem_ = new MemTable(internal_comparator_, NumberLevels()); mem_ = new MemTable(internal_comparator_,
mem_rep_factory_, NumberLevels());
mem_->Ref(); mem_->Ref();
force = false; // Do not force another compaction if have room force = false; // Do not force another compaction if have room
MaybeScheduleCompaction(); MaybeScheduleCompaction();
@ -2782,8 +2786,7 @@ Status DB::Merge(const WriteOptions& opt, const Slice& key,
DB::~DB() { } DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname, Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
DB** dbptr) {
*dbptr = nullptr; *dbptr = nullptr;
EnvOptions soptions; EnvOptions soptions;

@ -18,6 +18,7 @@
#include "port/port.h" #include "port/port.h"
#include "util/stats_logger.h" #include "util/stats_logger.h"
#include "memtablelist.h" #include "memtablelist.h"
#include "leveldb/memtablerep.h"
#ifdef USE_SCRIBE #ifdef USE_SCRIBE
#include "scribe/scribe_logger.h" #include "scribe/scribe_logger.h"
@ -253,6 +254,7 @@ class DBImpl : public DB {
port::Mutex mutex_; port::Mutex mutex_;
port::AtomicPointer shutting_down_; port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes port::CondVar bg_cv_; // Signalled when background work finishes
std::shared_ptr<MemTableRepFactory> mem_rep_factory_;
MemTable* mem_; MemTable* mem_;
MemTableList imm_; // Memtable that are not changing MemTableList imm_; // Memtable that are not changing
uint64_t logfile_number_; uint64_t logfile_number_;

@ -3,6 +3,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/memtable.h" #include "db/memtable.h"
#include <memory>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/env.h" #include "leveldb/env.h"
@ -19,23 +22,28 @@ static Slice GetLengthPrefixedSlice(const char* data) {
return Slice(p, len); return Slice(p, len);
} }
MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) MemTable::MemTable(const InternalKeyComparator& cmp,
std::shared_ptr<MemTableRepFactory> table_factory,
int numlevel)
: comparator_(cmp), : comparator_(cmp),
refs_(0), refs_(0),
table_(comparator_, &arena_), table_(table_factory->CreateMemTableRep(comparator_)),
flush_in_progress_(false), flush_in_progress_(false),
flush_completed_(false), flush_completed_(false),
file_number_(0), file_number_(0),
edit_(numlevel), edit_(numlevel),
first_seqno_(0), first_seqno_(0),
mem_logfile_number_(0) { mem_logfile_number_(0) { }
}
MemTable::~MemTable() { MemTable::~MemTable() {
assert(refs_ == 0); assert(refs_ == 0);
} }
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } size_t MemTable::ApproximateMemoryUsage() {
// The first term is the amount of memory used by the memtable and
// the second term is the amount of memory used by the backing store
return arena_.MemoryUsage() + table_->ApproximateMemoryUsage();
}
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
const { const {
@ -57,24 +65,27 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator { class MemTableIterator: public Iterator {
public: public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) { } explicit MemTableIterator(MemTableRep* table)
: iter_(table->GetIterator()) { }
virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); } virtual bool Valid() const { return iter_->Valid(); }
virtual void SeekToFirst() { iter_.SeekToFirst(); } virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToLast() { iter_.SeekToLast(); } virtual void SeekToFirst() { iter_->SeekToFirst(); }
virtual void Next() { iter_.Next(); } virtual void SeekToLast() { iter_->SeekToLast(); }
virtual void Prev() { iter_.Prev(); } virtual void Next() { iter_->Next(); }
virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } virtual void Prev() { iter_->Prev(); }
virtual Slice key() const {
return GetLengthPrefixedSlice(iter_->key());
}
virtual Slice value() const { virtual Slice value() const {
Slice key_slice = GetLengthPrefixedSlice(iter_.key()); Slice key_slice = GetLengthPrefixedSlice(iter_->key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
} }
virtual Status status() const { return Status::OK(); } virtual Status status() const { return Status::OK(); }
private: private:
MemTable::Table::Iterator iter_; std::shared_ptr<MemTableRep::Iterator> iter_;
std::string tmp_; // For passing to EncodeKey std::string tmp_; // For passing to EncodeKey
// No copying allowed // No copying allowed
@ -83,7 +94,7 @@ class MemTableIterator: public Iterator {
}; };
Iterator* MemTable::NewIterator() { Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_); return new MemTableIterator(table_.get());
} }
void MemTable::Add(SequenceNumber s, ValueType type, void MemTable::Add(SequenceNumber s, ValueType type,
@ -109,7 +120,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
p = EncodeVarint32(p, val_size); p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size); memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == (unsigned)encoded_len); assert((p + val_size) - buf == (unsigned)encoded_len);
table_.Insert(buf); table_->Insert(buf);
// The first sequence number inserted into the memtable // The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_); assert(first_seqno_ == 0 || s > first_seqno_);
@ -121,8 +132,8 @@ void MemTable::Add(SequenceNumber s, ValueType type,
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
const Options& options, const bool check_presence_only) { const Options& options, const bool check_presence_only) {
Slice memkey = key.memtable_key(); Slice memkey = key.memtable_key();
Table::Iterator iter(&table_); std::shared_ptr<MemTableRep::Iterator> iter(table_.get()->GetIterator());
iter.Seek(memkey.data()); iter->Seek(memkey.data());
bool merge_in_progress = false; bool merge_in_progress = false;
std::string operand; std::string operand;
@ -131,10 +142,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
merge_in_progress = true; merge_in_progress = true;
} }
auto merge_operator = options.merge_operator; auto merge_operator = options.merge_operator;
auto logger = options.info_log; auto logger = options.info_log;
for (; iter.Valid(); iter.Next()) { for (; iter->Valid(); iter->Next()) {
// entry format is: // entry format is:
// klength varint32 // klength varint32
// userkey char[klength-8] // userkey char[klength-8]
@ -144,7 +154,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
// Check that it belongs to same user key. We do not check the // Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped // sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers. // all entries with overly large sequence numbers.
const char* entry = iter.key(); const char* entry = iter->key();
uint32_t key_length; uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare( if (comparator_.comparator.user_comparator()->Compare(

@ -6,24 +6,31 @@
#define STORAGE_LEVELDB_DB_MEMTABLE_H_ #define STORAGE_LEVELDB_DB_MEMTABLE_H_
#include <string> #include <string>
#include <memory>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/skiplist.h" #include "db/skiplist.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "util/arena.h" #include "util/arena.h"
#include "leveldb/memtablerep.h"
namespace leveldb { namespace leveldb {
class InternalKeyComparator;
class Mutex; class Mutex;
class MemTableIterator; class MemTableIterator;
class MemTable { class MemTable {
public: public:
struct KeyComparator : public MemTableRep::KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
virtual int operator()(const char* a, const char* b) const;
};
// MemTables are reference counted. The initial reference count // MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once. // is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator, explicit MemTable(const InternalKeyComparator& comparator,
int numlevel = 7); std::shared_ptr<MemTableRepFactory> table_factory, int numlevel = 7);
// Increase reference count. // Increase reference count.
void Ref() { ++refs_; } void Ref() { ++refs_; }
@ -88,22 +95,14 @@ class MemTable {
private: private:
~MemTable(); // Private since only Unref() should be used to delete it ~MemTable(); // Private since only Unref() should be used to delete it
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
};
friend class MemTableIterator; friend class MemTableIterator;
friend class MemTableBackwardIterator; friend class MemTableBackwardIterator;
friend class MemTableList; friend class MemTableList;
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_; KeyComparator comparator_;
int refs_; int refs_;
Arena arena_; Arena arena_;
Table table_; shared_ptr<MemTableRep> table_;
// These are used to manage memtable flushes to storage // These are used to manage memtable flushes to storage
bool flush_in_progress_; // started the flush bool flush_in_progress_; // started the flush

@ -191,7 +191,8 @@ class Repairer {
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_.num_levels); MemTable* mem = new MemTable(icmp_, options_.memtable_factory,
options_.num_levels);
mem->Ref(); mem->Ref();
int counter = 0; int counter = 0;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {

@ -0,0 +1,106 @@
#ifndef STORAGE_LEVELDB_DB_SKIPLISTREP_H_
#define STORAGE_LEVELDB_DB_SKIPLISTREP_H_
#include "leveldb/memtablerep.h"
#include "db/memtable.h"
#include "db/skiplist.h"
namespace leveldb {
class Arena;
class SkipListRep : public MemTableRep {
Arena arena_;
SkipList<const char*, MemTableRep::KeyComparator&> skip_list_;
public:
explicit SkipListRep(MemTableRep::KeyComparator& compare)
: skip_list_(compare, &arena_) { }
// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
virtual void Insert(const char* key) {
skip_list_.Insert(key);
}
// Returns true iff an entry that compares equal to key is in the list.
virtual bool Contains(const char* key) const {
return skip_list_.Contains(key);
}
virtual size_t ApproximateMemoryUsage() {
return arena_.MemoryUsage();
}
virtual ~SkipListRep() { }
// Iteration over the contents of a skip list
class Iterator : public MemTableRep::Iterator {
SkipList<const char*, MemTableRep::KeyComparator&>::Iterator iter_;
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(
const SkipList<const char*, MemTableRep::KeyComparator&>* list
) : iter_(list) { }
virtual ~Iterator() { }
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const {
return iter_.Valid();
}
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const {
return iter_.key();
}
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() {
iter_.Next();
}
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() {
iter_.Prev();
}
// Advance to the first entry with a key >= target
virtual void Seek(const char* target) {
iter_.Seek(target);
}
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
virtual void SeekToFirst() {
iter_.SeekToFirst();
}
// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
virtual void SeekToLast() {
iter_.SeekToLast();
}
};
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() {
return std::shared_ptr<MemTableRep::Iterator>(
new SkipListRep::Iterator(&skip_list_)
);
}
};
class SkipListFactory : public MemTableRepFactory {
public:
virtual std::shared_ptr<MemTableRep> CreateMemTableRep (
MemTableRep::KeyComparator& compare) {
return std::shared_ptr<MemTableRep>(new SkipListRep(compare));
}
};
}
#endif // STORAGE_LEVELDB_DB_SKIPLISTREP_H_

@ -4,6 +4,8 @@
#include "leveldb/db.h" #include "leveldb/db.h"
#include <memory>
#include "db/skiplistrep.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "leveldb/env.h" #include "leveldb/env.h"
@ -14,7 +16,8 @@ namespace leveldb {
static std::string PrintContents(WriteBatch* b) { static std::string PrintContents(WriteBatch* b) {
InternalKeyComparator cmp(BytewiseComparator()); InternalKeyComparator cmp(BytewiseComparator());
MemTable* mem = new MemTable(cmp); auto factory = std::make_shared<SkipListFactory>();
MemTable* mem = new MemTable(cmp, factory);
mem->Ref(); mem->Ref();
std::string state; std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem); Status s = WriteBatchInternal::InsertInto(b, mem);

@ -0,0 +1,91 @@
// This file contains the interface that must be implemented by any collection
// to be used as the backing store for a MemTable. Such a collection must
// satisfy the following properties:
// (1) It does not store duplicate items.
// (2) It uses MemTableRep::KeyComparator to compare items for iteration and
// equality.
// (3) It can be accessed concurrently by multiple readers but need not support
// concurrent writes.
// (4) Items are never deleted.
// The liberal use of assertions is encouraged to enforce (1).
#ifndef STORAGE_LEVELDB_DB_TABLE_H_
#define STORAGE_LEVELDB_DB_TABLE_H_
#include <memory>
namespace leveldb {
class MemTableRep {
public:
// KeyComparator(a, b) returns a negative value if a is less than b, 0 if they
// are equal, and a positive value if b is greater than a
class KeyComparator {
public:
virtual int operator()(const char* a, const char* b) const = 0;
virtual ~KeyComparator() { }
};
// Insert key into the collection. (The caller will pack key and value into a
// single buffer and pass that in as the parameter to Insert)
// REQUIRES: nothing that compares equal to key is currently in the
// collection.
virtual void Insert(const char* key) = 0;
// Returns true iff an entry that compares equal to key is in the collection.
virtual bool Contains(const char* key) const = 0;
// Returns an estimate of the number of bytes of data in use by this
// data structure.
virtual size_t ApproximateMemoryUsage() = 0;
virtual ~MemTableRep() { }
// Iteration over the contents of a skip collection
class Iterator {
public:
// Initialize an iterator over the specified collection.
// The returned iterator is not valid.
// explicit Iterator(const MemTableRep* collection);
virtual ~Iterator() { };
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const = 0;
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const = 0;
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() = 0;
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() = 0;
// Advance to the first entry with a key >= target
virtual void Seek(const char* target) = 0;
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() = 0;
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast() = 0;
};
virtual std::shared_ptr<Iterator> GetIterator() = 0;
};
class MemTableRepFactory {
public:
virtual ~MemTableRepFactory() { };
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&) = 0;
};
}
#endif // STORAGE_LEVELDB_DB_TABLE_H_

@ -12,6 +12,7 @@
#include <stdint.h> #include <stdint.h>
#include "leveldb/slice.h" #include "leveldb/slice.h"
#include "leveldb/statistics.h" #include "leveldb/statistics.h"
#include "leveldb/memtablerep.h"
namespace leveldb { namespace leveldb {
@ -474,6 +475,11 @@ struct Options {
// Default: false // Default: false
bool filter_deletes; bool filter_deletes;
// This is a factory that provides MemTableRep objects.
// Default: a factory that provides a skip-list-based implementation of
// MemTableRep.
std::shared_ptr<MemTableRepFactory> memtable_factory;
}; };
// Options that control read operations // Options that control read operations

@ -3,8 +3,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <map> #include <map>
#include <string> #include <string>
#include <memory>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/skiplistrep.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
@ -342,8 +344,9 @@ class MemTableConstructor: public Constructor {
public: public:
explicit MemTableConstructor(const Comparator* cmp) explicit MemTableConstructor(const Comparator* cmp)
: Constructor(cmp), : Constructor(cmp),
internal_comparator_(cmp) { internal_comparator_(cmp),
memtable_ = new MemTable(internal_comparator_); table_factory_(new SkipListFactory) {
memtable_ = new MemTable(internal_comparator_, table_factory_);
memtable_->Ref(); memtable_->Ref();
} }
~MemTableConstructor() { ~MemTableConstructor() {
@ -351,7 +354,7 @@ class MemTableConstructor: public Constructor {
} }
virtual Status FinishImpl(const Options& options, const KVMap& data) { virtual Status FinishImpl(const Options& options, const KVMap& data) {
memtable_->Unref(); memtable_->Unref();
memtable_ = new MemTable(internal_comparator_); memtable_ = new MemTable(internal_comparator_, table_factory_);
memtable_->Ref(); memtable_->Ref();
int seq = 1; int seq = 1;
for (KVMap::const_iterator it = data.begin(); for (KVMap::const_iterator it = data.begin();
@ -369,6 +372,7 @@ class MemTableConstructor: public Constructor {
private: private:
InternalKeyComparator internal_comparator_; InternalKeyComparator internal_comparator_;
MemTable* memtable_; MemTable* memtable_;
std::shared_ptr<SkipListFactory> table_factory_;
}; };
class DBConstructor: public Constructor { class DBConstructor: public Constructor {
@ -805,7 +809,8 @@ class MemTableTest { };
TEST(MemTableTest, Simple) { TEST(MemTableTest, Simple) {
InternalKeyComparator cmp(BytewiseComparator()); InternalKeyComparator cmp(BytewiseComparator());
MemTable* memtable = new MemTable(cmp); auto table_factory = std::make_shared<SkipListFactory>();
MemTable* memtable = new MemTable(cmp, table_factory);
memtable->Ref(); memtable->Ref();
WriteBatch batch; WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100); WriteBatchInternal::SetSequence(&batch, 100);

@ -12,6 +12,7 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/filter_policy.h" #include "leveldb/filter_policy.h"
#include "leveldb/merge_operator.h" #include "leveldb/merge_operator.h"
#include "db/skiplistrep.h"
namespace leveldb { namespace leveldb {
@ -75,7 +76,9 @@ Options::Options()
access_hint_on_compaction_start(NORMAL), access_hint_on_compaction_start(NORMAL),
use_adaptive_mutex(false), use_adaptive_mutex(false),
bytes_per_sync(0), bytes_per_sync(0),
filter_deletes(false) { filter_deletes(false),
memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)) {
assert(memtable_factory.get() != nullptr);
} }
static const char* const access_hints[] = { static const char* const access_hints[] = {

Loading…
Cancel
Save