diff --git a/db/db_impl.cc b/db/db_impl.cc index b765c0f4b..17c4466a3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -163,7 +163,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_(new MemTable(internal_comparator_, NumberLevels())), + mem_rep_factory_(options_.memtable_factory), + mem_(new MemTable(internal_comparator_, + mem_rep_factory_, NumberLevels())), logfile_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), @@ -688,7 +690,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { - mem = new MemTable(internal_comparator_, NumberLevels()); + mem = new MemTable(internal_comparator_, + mem_rep_factory_, NumberLevels()); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem, &options_); @@ -2528,7 +2531,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { log_.reset(new log::Writer(std::move(lfile))); mem_->SetLogNumber(logfile_number_); imm_.Add(mem_); - mem_ = new MemTable(internal_comparator_, NumberLevels()); + mem_ = new MemTable(internal_comparator_, + mem_rep_factory_, NumberLevels()); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); @@ -2782,8 +2786,7 @@ Status DB::Merge(const WriteOptions& opt, const Slice& key, DB::~DB() { } -Status DB::Open(const Options& options, const std::string& dbname, - DB** dbptr) { +Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = nullptr; EnvOptions soptions; diff --git a/db/db_impl.h b/db/db_impl.h index 5229cf3e2..b7f48dcac 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -18,6 +18,7 @@ #include "port/port.h" #include "util/stats_logger.h" #include "memtablelist.h" +#include "leveldb/memtablerep.h" #ifdef USE_SCRIBE #include "scribe/scribe_logger.h" @@ -253,6 +254,7 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes + std::shared_ptr mem_rep_factory_; MemTable* mem_; MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; diff --git a/db/memtable.cc b/db/memtable.cc index cfd2bed04..8ccf49df0 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -3,6 +3,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/memtable.h" + +#include + #include "db/dbformat.h" #include "leveldb/comparator.h" #include "leveldb/env.h" @@ -19,23 +22,28 @@ static Slice GetLengthPrefixedSlice(const char* data) { return Slice(p, len); } -MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) +MemTable::MemTable(const InternalKeyComparator& cmp, + std::shared_ptr table_factory, + int numlevel) : comparator_(cmp), refs_(0), - table_(comparator_, &arena_), + table_(table_factory->CreateMemTableRep(comparator_)), flush_in_progress_(false), flush_completed_(false), file_number_(0), edit_(numlevel), first_seqno_(0), - mem_logfile_number_(0) { -} + mem_logfile_number_(0) { } MemTable::~MemTable() { assert(refs_ == 0); } -size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } +size_t MemTable::ApproximateMemoryUsage() { + // The first term is the amount of memory used by the memtable and + // the second term is the amount of memory used by the backing store + return arena_.MemoryUsage() + table_->ApproximateMemoryUsage(); +} int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) const { @@ -57,24 +65,27 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: - explicit MemTableIterator(MemTable::Table* table) : iter_(table) { } - - virtual bool Valid() const { return iter_.Valid(); } - virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); } - virtual void SeekToFirst() { iter_.SeekToFirst(); } - virtual void SeekToLast() { iter_.SeekToLast(); } - virtual void Next() { iter_.Next(); } - virtual void Prev() { iter_.Prev(); } - virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } + explicit MemTableIterator(MemTableRep* table) + : iter_(table->GetIterator()) { } + + virtual bool Valid() const { return iter_->Valid(); } + virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); } + virtual void SeekToFirst() { iter_->SeekToFirst(); } + virtual void SeekToLast() { iter_->SeekToLast(); } + virtual void Next() { iter_->Next(); } + virtual void Prev() { iter_->Prev(); } + virtual Slice key() const { + return GetLengthPrefixedSlice(iter_->key()); + } virtual Slice value() const { - Slice key_slice = GetLengthPrefixedSlice(iter_.key()); + Slice key_slice = GetLengthPrefixedSlice(iter_->key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); } virtual Status status() const { return Status::OK(); } private: - MemTable::Table::Iterator iter_; + std::shared_ptr iter_; std::string tmp_; // For passing to EncodeKey // No copying allowed @@ -83,7 +94,7 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator() { - return new MemTableIterator(&table_); + return new MemTableIterator(table_.get()); } void MemTable::Add(SequenceNumber s, ValueType type, @@ -109,7 +120,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((p + val_size) - buf == (unsigned)encoded_len); - table_.Insert(buf); + table_->Insert(buf); // The first sequence number inserted into the memtable assert(first_seqno_ == 0 || s > first_seqno_); @@ -119,10 +130,10 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only) { + const Options& options, const bool check_presence_only) { Slice memkey = key.memtable_key(); - Table::Iterator iter(&table_); - iter.Seek(memkey.data()); + std::shared_ptr iter(table_.get()->GetIterator()); + iter->Seek(memkey.data()); bool merge_in_progress = false; std::string operand; @@ -131,10 +142,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, merge_in_progress = true; } - auto merge_operator = options.merge_operator; auto logger = options.info_log; - for (; iter.Valid(); iter.Next()) { + for (; iter->Valid(); iter->Next()) { // entry format is: // klength varint32 // userkey char[klength-8] @@ -144,7 +154,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. - const char* entry = iter.key(); + const char* entry = iter->key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); if (comparator_.comparator.user_comparator()->Compare( diff --git a/db/memtable.h b/db/memtable.h index def3a5d3d..2ffe4b913 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -6,24 +6,31 @@ #define STORAGE_LEVELDB_DB_MEMTABLE_H_ #include +#include #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" #include "db/version_set.h" #include "util/arena.h" +#include "leveldb/memtablerep.h" namespace leveldb { -class InternalKeyComparator; class Mutex; class MemTableIterator; class MemTable { public: + struct KeyComparator : public MemTableRep::KeyComparator { + const InternalKeyComparator comparator; + explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } + virtual int operator()(const char* a, const char* b) const; + }; + // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, - int numlevel = 7); + std::shared_ptr table_factory, int numlevel = 7); // Increase reference count. void Ref() { ++refs_; } @@ -88,22 +95,14 @@ class MemTable { private: ~MemTable(); // Private since only Unref() should be used to delete it - - struct KeyComparator { - const InternalKeyComparator comparator; - explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } - int operator()(const char* a, const char* b) const; - }; friend class MemTableIterator; friend class MemTableBackwardIterator; friend class MemTableList; - typedef SkipList Table; - KeyComparator comparator_; int refs_; Arena arena_; - Table table_; + shared_ptr table_; // These are used to manage memtable flushes to storage bool flush_in_progress_; // started the flush diff --git a/db/repair.cc b/db/repair.cc index d1c0c4525..049aabb3d 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -191,7 +191,8 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_, options_.num_levels); + MemTable* mem = new MemTable(icmp_, options_.memtable_factory, + options_.num_levels); mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { diff --git a/db/skiplistrep.h b/db/skiplistrep.h new file mode 100644 index 000000000..0f7523b6e --- /dev/null +++ b/db/skiplistrep.h @@ -0,0 +1,106 @@ +#ifndef STORAGE_LEVELDB_DB_SKIPLISTREP_H_ +#define STORAGE_LEVELDB_DB_SKIPLISTREP_H_ + +#include "leveldb/memtablerep.h" +#include "db/memtable.h" +#include "db/skiplist.h" + +namespace leveldb { + +class Arena; + +class SkipListRep : public MemTableRep { + Arena arena_; + SkipList skip_list_; +public: + explicit SkipListRep(MemTableRep::KeyComparator& compare) + : skip_list_(compare, &arena_) { } + + // Insert key into the list. + // REQUIRES: nothing that compares equal to key is currently in the list. + virtual void Insert(const char* key) { + skip_list_.Insert(key); + } + + // Returns true iff an entry that compares equal to key is in the list. + virtual bool Contains(const char* key) const { + return skip_list_.Contains(key); + } + + virtual size_t ApproximateMemoryUsage() { + return arena_.MemoryUsage(); + } + + virtual ~SkipListRep() { } + + // Iteration over the contents of a skip list + class Iterator : public MemTableRep::Iterator { + SkipList::Iterator iter_; + public: + // Initialize an iterator over the specified list. + // The returned iterator is not valid. + explicit Iterator( + const SkipList* list + ) : iter_(list) { } + + virtual ~Iterator() { } + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const { + return iter_.Valid(); + } + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const { + return iter_.key(); + } + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() { + iter_.Next(); + } + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() { + iter_.Prev(); + } + + // Advance to the first entry with a key >= target + virtual void Seek(const char* target) { + iter_.Seek(target); + } + + // Position at the first entry in list. + // Final state of iterator is Valid() iff list is not empty. + virtual void SeekToFirst() { + iter_.SeekToFirst(); + } + + // Position at the last entry in list. + // Final state of iterator is Valid() iff list is not empty. + virtual void SeekToLast() { + iter_.SeekToLast(); + } + }; + + virtual std::shared_ptr GetIterator() { + return std::shared_ptr( + new SkipListRep::Iterator(&skip_list_) + ); + } +}; + +class SkipListFactory : public MemTableRepFactory { +public: + virtual std::shared_ptr CreateMemTableRep ( + MemTableRep::KeyComparator& compare) { + return std::shared_ptr(new SkipListRep(compare)); + } +}; + +} + +#endif // STORAGE_LEVELDB_DB_SKIPLISTREP_H_ diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index d17a08e8e..945ef16bd 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -4,6 +4,8 @@ #include "leveldb/db.h" +#include +#include "db/skiplistrep.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "leveldb/env.h" @@ -14,7 +16,8 @@ namespace leveldb { static std::string PrintContents(WriteBatch* b) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* mem = new MemTable(cmp); + auto factory = std::make_shared(); + MemTable* mem = new MemTable(cmp, factory); mem->Ref(); std::string state; Status s = WriteBatchInternal::InsertInto(b, mem); diff --git a/include/leveldb/memtablerep.h b/include/leveldb/memtablerep.h new file mode 100644 index 000000000..cb5a6ed35 --- /dev/null +++ b/include/leveldb/memtablerep.h @@ -0,0 +1,91 @@ +// This file contains the interface that must be implemented by any collection +// to be used as the backing store for a MemTable. Such a collection must +// satisfy the following properties: +// (1) It does not store duplicate items. +// (2) It uses MemTableRep::KeyComparator to compare items for iteration and +// equality. +// (3) It can be accessed concurrently by multiple readers but need not support +// concurrent writes. +// (4) Items are never deleted. +// The liberal use of assertions is encouraged to enforce (1). + +#ifndef STORAGE_LEVELDB_DB_TABLE_H_ +#define STORAGE_LEVELDB_DB_TABLE_H_ + +#include + +namespace leveldb { + +class MemTableRep { + public: + // KeyComparator(a, b) returns a negative value if a is less than b, 0 if they + // are equal, and a positive value if b is greater than a + class KeyComparator { + public: + virtual int operator()(const char* a, const char* b) const = 0; + virtual ~KeyComparator() { } + }; + + // Insert key into the collection. (The caller will pack key and value into a + // single buffer and pass that in as the parameter to Insert) + // REQUIRES: nothing that compares equal to key is currently in the + // collection. + virtual void Insert(const char* key) = 0; + + // Returns true iff an entry that compares equal to key is in the collection. + virtual bool Contains(const char* key) const = 0; + + // Returns an estimate of the number of bytes of data in use by this + // data structure. + virtual size_t ApproximateMemoryUsage() = 0; + + virtual ~MemTableRep() { } + + // Iteration over the contents of a skip collection + class Iterator { + public: + // Initialize an iterator over the specified collection. + // The returned iterator is not valid. + // explicit Iterator(const MemTableRep* collection); + virtual ~Iterator() { }; + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const = 0; + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const = 0; + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() = 0; + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() = 0; + + // Advance to the first entry with a key >= target + virtual void Seek(const char* target) = 0; + + // Position at the first entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToFirst() = 0; + + // Position at the last entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToLast() = 0; + }; + + virtual std::shared_ptr GetIterator() = 0; +}; + +class MemTableRepFactory { + public: + virtual ~MemTableRepFactory() { }; + virtual std::shared_ptr CreateMemTableRep( + MemTableRep::KeyComparator&) = 0; +}; + +} + +#endif // STORAGE_LEVELDB_DB_TABLE_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c178ddeee..f754fd3bf 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -12,6 +12,7 @@ #include #include "leveldb/slice.h" #include "leveldb/statistics.h" +#include "leveldb/memtablerep.h" namespace leveldb { @@ -474,6 +475,11 @@ struct Options { // Default: false bool filter_deletes; + // This is a factory that provides MemTableRep objects. + // Default: a factory that provides a skip-list-based implementation of + // MemTableRep. + std::shared_ptr memtable_factory; + }; // Options that control read operations diff --git a/table/table_test.cc b/table/table_test.cc index a2bba940d..118ffa232 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3,8 +3,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include +#include #include "db/dbformat.h" #include "db/memtable.h" +#include "db/skiplistrep.h" #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -342,8 +344,9 @@ class MemTableConstructor: public Constructor { public: explicit MemTableConstructor(const Comparator* cmp) : Constructor(cmp), - internal_comparator_(cmp) { - memtable_ = new MemTable(internal_comparator_); + internal_comparator_(cmp), + table_factory_(new SkipListFactory) { + memtable_ = new MemTable(internal_comparator_, table_factory_); memtable_->Ref(); } ~MemTableConstructor() { @@ -351,7 +354,7 @@ class MemTableConstructor: public Constructor { } virtual Status FinishImpl(const Options& options, const KVMap& data) { memtable_->Unref(); - memtable_ = new MemTable(internal_comparator_); + memtable_ = new MemTable(internal_comparator_, table_factory_); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -369,6 +372,7 @@ class MemTableConstructor: public Constructor { private: InternalKeyComparator internal_comparator_; MemTable* memtable_; + std::shared_ptr table_factory_; }; class DBConstructor: public Constructor { @@ -805,7 +809,8 @@ class MemTableTest { }; TEST(MemTableTest, Simple) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* memtable = new MemTable(cmp); + auto table_factory = std::make_shared(); + MemTable* memtable = new MemTable(cmp, table_factory); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/options.cc b/util/options.cc index e884d87d7..912c53923 100644 --- a/util/options.cc +++ b/util/options.cc @@ -12,6 +12,7 @@ #include "leveldb/env.h" #include "leveldb/filter_policy.h" #include "leveldb/merge_operator.h" +#include "db/skiplistrep.h" namespace leveldb { @@ -75,7 +76,9 @@ Options::Options() access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), bytes_per_sync(0), - filter_deletes(false) { + filter_deletes(false), + memtable_factory(std::shared_ptr(new SkipListFactory)) { + assert(memtable_factory.get() != nullptr); } static const char* const access_hints[] = {