From 05e8854085ca783956fd33021a9a334d77ed4d9e Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Thu, 21 Mar 2013 15:59:47 -0700 Subject: [PATCH] [Rocksdb] Support Merge operation in rocksdb Summary: This diff introduces a new Merge operation into rocksdb. The purpose of this review is mostly getting feedback from the team (everyone please) on the design. Please focus on the four files under include/leveldb/, as they spell the client visible interface change. include/leveldb/db.h include/leveldb/merge_operator.h include/leveldb/options.h include/leveldb/write_batch.h Please go over local/my_test.cc carefully, as it is a concerete use case. Please also review the impelmentation files to see if the straw man implementation makes sense. Note that, the diff does pass all make check and truly supports forward iterator over db and a version of Get that's based on iterator. Future work: - Integration with compaction - A raw Get implementation I am working on a wiki that explains the design and implementation choices, but coding comes just naturally and I think it might be a good idea to share the code earlier. The code is heavily commented. Test Plan: run all local tests Reviewers: dhruba, heyongqiang Reviewed By: dhruba CC: leveldb, zshao, sheki, emayanke, MarkCallaghan Differential Revision: https://reviews.facebook.net/D9651 --- Makefile | 7 +- db/builder.cc | 60 +++++- db/db_impl.cc | 71 ++++++- db/db_impl.h | 11 +- db/db_impl_readonly.cc | 6 +- db/db_impl_readonly.h | 4 + db/db_iter.cc | 174 ++++++++++++++--- db/db_iter.h | 3 +- db/db_test.cc | 36 +++- db/dbformat.h | 7 +- db/memtable.cc | 52 ++++- db/memtable.h | 7 +- db/memtablelist.cc | 5 +- db/memtablelist.h | 3 +- db/merge_helper.cc | 114 +++++++++++ db/merge_helper.h | 64 +++++++ db/merge_test.cc | 253 +++++++++++++++++++++++++ db/table_cache.cc | 2 +- db/table_cache.h | 5 +- db/version_set.cc | 128 ++++++++++--- db/version_set.h | 4 +- db/write_batch.cc | 28 ++- db/write_batch_test.cc | 8 + include/leveldb/db.h | 10 +- include/leveldb/merge_operator.h | 74 ++++++++ include/leveldb/options.h | 13 ++ include/leveldb/status.h | 9 +- include/leveldb/write_batch.h | 8 + table/table.cc | 21 +- table/table.h | 8 +- util/options.cc | 7 +- util/status.cc | 3 + utilities/merge_operators.h | 18 ++ utilities/merge_operators/put.cc | 35 ++++ utilities/merge_operators/uint64add.cc | 63 ++++++ utilities/ttl/db_ttl.cc | 6 + utilities/ttl/db_ttl.h | 5 + 37 files changed, 1209 insertions(+), 123 deletions(-) create mode 100644 db/merge_helper.cc create mode 100644 db/merge_helper.h create mode 100644 db/merge_test.cc create mode 100644 include/leveldb/merge_operator.h create mode 100644 utilities/merge_operators.h create mode 100644 utilities/merge_operators/put.cc create mode 100644 utilities/merge_operators/uint64add.cc diff --git a/Makefile b/Makefile index cc536b6f1..1d9f02b0e 100644 --- a/Makefile +++ b/Makefile @@ -60,8 +60,8 @@ TESTS = \ reduce_levels_test \ write_batch_test \ auto_roll_logger_test \ - filelock_test - + filelock_test \ + merge_test TOOLS = \ sst_dump \ @@ -225,6 +225,9 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +merge_test: db/merge_test.o $(LIBOBJECTS) + $(CXX) db/merge_test.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) + $(MEMENVLIBRARY) : $(MEMENVOBJECTS) rm -f $@ $(AR) -rs $@ $(MEMENVOBJECTS) diff --git a/db/builder.cc b/db/builder.cc index 450fcf051..98e43195c 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -6,6 +6,7 @@ #include "db/filename.h" #include "db/dbformat.h" +#include "db/merge_helper.h" #include "db/table_cache.h" #include "db/version_edit.h" #include "leveldb/db.h" @@ -49,36 +50,75 @@ Status BuildTable(const std::string& dbname, Slice key = iter->key(); meta->smallest.DecodeFrom(key); + MergeHelper merge(user_comparator, options.merge_operator, + options.info_log.get(), + true /* internal key corruption is not ok */); + if (purge) { + ParsedInternalKey ikey; + // Ugly walkaround to avoid compiler error for release build + // TODO: find a clean way to treat in memory key corruption + ikey.type = kTypeValue; ParsedInternalKey prev_ikey; std::string prev_value; std::string prev_key; - // store first key-value - prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); - ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(prev_ikey.sequence >= earliest_seqno_in_memtable); + // Ugly walkaround to avoid compiler error for release build + // TODO: find a clean way to treat in memory key corruption + auto ok __attribute__((unused)) = ParseInternalKey(key, &ikey); + // in-memory key corruption is not ok; + assert(ok); + + if (ikey.type == kTypeMerge) { + // merge values if the first entry is of merge type + merge.MergeUntil(iter, 0 /* don't worry about snapshot */); + prev_key.assign(merge.key().data(), merge.key().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + prev_value.assign(merge.value().data(), merge.value().size()); + } else { + // store first key-value + prev_key.assign(key.data(), key.size()); + prev_value.assign(iter->value().data(), iter->value().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + assert(prev_ikey.sequence >= earliest_seqno_in_memtable); + iter->Next(); + } - for (iter->Next(); iter->Valid(); iter->Next()) { + while (iter->Valid()) { + bool iterator_at_next = false; ParsedInternalKey this_ikey; Slice key = iter->key(); - ParseInternalKey(key, &this_ikey); + ok = ParseInternalKey(key, &this_ikey); + assert(ok); assert(this_ikey.sequence >= earliest_seqno_in_memtable); if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) { // This key is different from previous key. // Output prev key and remember current key builder->Add(Slice(prev_key), Slice(prev_value)); - prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); - ParseInternalKey(Slice(prev_key), &prev_ikey); + if (this_ikey.type == kTypeMerge) { + merge.MergeUntil(iter, 0 /* don't worry about snapshot */); + iterator_at_next = true; + prev_key.assign(merge.key().data(), merge.key().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + prev_value.assign(merge.value().data(), merge.value().size()); + } else { + prev_key.assign(key.data(), key.size()); + prev_value.assign(iter->value().data(), iter->value().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + } } else { // seqno within the same key are in decreasing order assert(this_ikey.sequence < prev_ikey.sequence); // This key is an earlier version of the same key in prev_key. // Skip current key. } + + if (!iterator_at_next) iter->Next(); } // output last key builder->Add(Slice(prev_key), Slice(prev_value)); diff --git a/db/db_impl.cc b/db/db_impl.cc index 54f6d34dd..051e9806a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -21,12 +21,14 @@ #include "db/log_writer.h" #include "db/memtable.h" #include "db/memtablelist.h" +#include "db/merge_helper.h" #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" #include "db/transaction_log_iterator_impl.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/merge_operator.h" #include "leveldb/statistics.h" #include "leveldb/status.h" #include "leveldb/table_builder.h" @@ -497,6 +499,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, if (!env_->FileExists(CurrentFileName(dbname_))) { if (options_.create_if_missing) { + // TODO: add merge_operator name check s = NewDB(); if (!s.ok()) { return s; @@ -1514,11 +1517,13 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { // Employ a sequential search because the total number of // snapshots are typically small. inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( - SequenceNumber in, std::vector& snapshots) { + SequenceNumber in, std::vector& snapshots, + SequenceNumber* prev_snapshot) { SequenceNumber prev __attribute__((unused)) = 0; for (const auto cur : snapshots) { assert(prev <= cur); if (cur >= in) { + *prev_snapshot = prev; return cur; } prev = cur; // assignment @@ -1591,6 +1596,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber; std::string compaction_filter_value; + MergeHelper merge(user_comparator(), options_.merge_operator, + options_.info_log.get(), + false /* internal key corruption is expected */); for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { @@ -1617,8 +1625,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Handle key/value, add to state, etc. bool drop = false; + bool current_entry_is_merged = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys + // TODO: error key stays in db forever? Figure out the intention/rationale + // v10 error v8 : we cannot hide v8 even though it's pretty obvious. current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; @@ -1637,15 +1648,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If there are no snapshots, then this kv affect visibility at tip. // Otherwise, search though all existing snapshots to find // the earlist snapshot that is affected by this kv. - SequenceNumber visible = visible_at_tip ? visible_at_tip : - findEarliestVisibleSnapshot(ikey.sequence, - compact->existing_snapshots); + SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot + SequenceNumber visible = visible_at_tip ? + visible_at_tip : + findEarliestVisibleSnapshot(ikey.sequence, + compact->existing_snapshots, + &prev_snapshot); if (visible_in_snapshot == visible) { // If the earliest snapshot is which this key is visible in // is the same as the visibily of a previous instance of the // same key, then this kv is not visible in any snapshot. // Hidden by an newer entry for same user key + // TODO: why not > ? assert(last_sequence_for_key >= ikey.sequence); drop = true; // (A) RecordTick(options_.statistics, COMPACTION_KEY_DROP_NEWER_ENTRY); @@ -1661,6 +1676,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Therefore this deletion marker is obsolete and can be dropped. drop = true; RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE); + } else if (ikey.type == kTypeMerge) { + // We know the merge type entry is not hidden, otherwise we would + // have hit (A) + // We encapsulate the merge related state machine in a different + // object to minimize change to the existing flow. Turn out this + // logic could also be nicely re-used for memtable flush purge + // optimization in BuildTable. + merge.MergeUntil(input.get(), prev_snapshot, bottommost_level); + current_entry_is_merged = true; + // get the merge result + key = merge.key(); + ParseInternalKey(key, &ikey); + value = merge.value(); } else if (options_.CompactionFilter != nullptr && ikey.type != kTypeDeletion && ikey.sequence < earliest_snapshot) { @@ -1709,7 +1737,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot) { + if (bottommost_level && ikey.sequence < earliest_snapshot && + ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems // with the priority queue that is managing the input key iterator @@ -1744,7 +1773,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } } - input->Next(); + // MergeUntil has moved input to the next entry + if (!current_entry_is_merged) { + input->Next(); + } } if (status.ok() && shutting_down_.Acquire_Load()) { @@ -1906,14 +1938,17 @@ Status DBImpl::Get(const ReadOptions& options, mutex_.Unlock(); bool have_stat_update = false; Version::GetStats stats; + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // value will contain the current merge operand in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s)) { + if (mem->Get(lkey, value, &s, options_)) { // Done - } else if (imm.Get(lkey, value, &s)) { + } else if (imm.Get(lkey, value, &s, options_)) { // Done } else { - s = current->Get(options, lkey, value, &stats); + current->Get(options, lkey, value, &s, &stats, options_); have_stat_update = true; } mutex_.Lock(); @@ -1934,7 +1969,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); return NewDBIterator( - &dbname_, env_, user_comparator(), internal_iter, + &dbname_, env_, options_, user_comparator(), internal_iter, (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); @@ -1955,6 +1990,15 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } +Status DBImpl::Merge(const WriteOptions& o, const Slice& key, + const Slice& val) { + if (!options_.merge_operator) { + return Status::NotSupported("Provide a merge_operator when opening DB"); + } else { + return DB::Merge(o, key, val); + } +} + Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -2382,6 +2426,13 @@ Status DB::Delete(const WriteOptions& opt, const Slice& key) { return Write(opt, &batch); } +Status DB::Merge(const WriteOptions& opt, const Slice& key, + const Slice& value) { + WriteBatch batch; + batch.Merge(key, value); + return Write(opt, &batch); +} + DB::~DB() { } Status DB::Open(const Options& options, const std::string& dbname, diff --git a/db/db_impl.h b/db/db_impl.h index 2be00c9d8..2c241ba6b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -36,6 +36,8 @@ class DBImpl : public DB { // Implementations of the DB interface virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value); + virtual Status Merge(const WriteOptions&, const Slice& key, + const Slice& value); virtual Status Delete(const WriteOptions&, const Slice& key); virtual Status Write(const WriteOptions& options, WriteBatch* updates); virtual Status Get(const ReadOptions& options, @@ -339,9 +341,12 @@ class DBImpl : public DB { // dump the delayed_writes_ to the log file and reset counter. void DelayLoggingAndReset(); - // find the earliest snapshot where seqno is visible - inline SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in, - std::vector& snapshots); + // Return the earliest snapshot where seqno is visible. + // Store the snapshot right before that, if any, in prev_snapshot + inline SequenceNumber findEarliestVisibleSnapshot( + SequenceNumber in, + std::vector& snapshots, + SequenceNumber* prev_snapshot); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 2ec52970b..d4170d30f 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -54,10 +54,10 @@ Status DBImplReadOnly::Get(const ReadOptions& options, Version* current = versions_->current(); SequenceNumber snapshot = versions_->LastSequence(); LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s)) { + if (mem->Get(lkey, value, &s, options_)) { } else { Version::GetStats stats; - s = current->Get(options, lkey, value, &stats); + current->Get(options, lkey, value, &s, &stats, options_); } return s; } @@ -66,7 +66,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); return NewDBIterator( - &dbname_, env_, user_comparator(), internal_iter, + &dbname_, env_, options_, user_comparator(),internal_iter, (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 403b35452..317d290d0 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -37,6 +37,10 @@ public: virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) { return Status::NotSupported("Not supported operation in read only mode."); } + virtual Status Merge(const WriteOptions&, const Slice& key, + const Slice& value) { + return Status::NotSupported("Not supported operation in read only mode."); + } virtual Status Delete(const WriteOptions&, const Slice& key) { return Status::NotSupported("Not supported operation in read only mode."); } diff --git a/db/db_iter.cc b/db/db_iter.cc index 87dca2ded..a06bde51b 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -2,12 +2,15 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include "db/db_iter.h" #include "db/filename.h" #include "db/dbformat.h" #include "leveldb/env.h" +#include "leveldb/options.h" #include "leveldb/iterator.h" +#include "leveldb/merge_operator.h" #include "port/port.h" #include "util/logging.h" #include "util/mutexlock.h" @@ -36,6 +39,7 @@ namespace { // numbers, deletion markers, overwrites, etc. class DBIter: public Iterator { public: + // The following is grossly complicated. TODO: clean it up // Which direction is the iterator currently moving? // (1) When moving forward, the internal iterator is positioned at // the exact entry that yields this->key(), this->value() @@ -46,15 +50,18 @@ class DBIter: public Iterator { kReverse }; - DBIter(const std::string* dbname, Env* env, + DBIter(const std::string* dbname, Env* env, const Options& options, const Comparator* cmp, Iterator* iter, SequenceNumber s) : dbname_(dbname), env_(env), + logger_(options.info_log), user_comparator_(cmp), + user_merge_operator_(options.merge_operator), iter_(iter), sequence_(s), direction_(kForward), - valid_(false) { + valid_(false), + current_entry_is_merged_(false) { } virtual ~DBIter() { delete iter_; @@ -62,11 +69,12 @@ class DBIter: public Iterator { virtual bool Valid() const { return valid_; } virtual Slice key() const { assert(valid_); - return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_; + return saved_key_; } virtual Slice value() const { assert(valid_); - return (direction_ == kForward) ? iter_->value() : saved_value_; + return (direction_ == kForward && !current_entry_is_merged_) ? + iter_->value() : saved_value_; } virtual Status status() const { if (status_.ok()) { @@ -83,9 +91,10 @@ class DBIter: public Iterator { virtual void SeekToLast(); private: - void FindNextUserEntry(bool skipping, std::string* skip); + void FindNextUserEntry(bool skipping); void FindPrevUserEntry(); bool ParseKey(ParsedInternalKey* key); + void MergeValuesNewToOld(); inline void SaveKey(const Slice& k, std::string* dst) { dst->assign(k.data(), k.size()); @@ -102,15 +111,19 @@ class DBIter: public Iterator { const std::string* const dbname_; Env* const env_; + shared_ptr logger_; const Comparator* const user_comparator_; + const MergeOperator* const user_merge_operator_; Iterator* const iter_; SequenceNumber const sequence_; Status status_; std::string saved_key_; // == current key when direction_==kReverse std::string saved_value_; // == current raw value when direction_==kReverse + std::string skip_key_; Direction direction_; bool valid_; + bool current_entry_is_merged_; // No copying allowed DBIter(const DBIter&); @@ -120,6 +133,8 @@ class DBIter: public Iterator { inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { if (!ParseInternalKey(iter_->key(), ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); + Log(logger_, "corrupted internal key in DBIter: %s", + iter_->key().ToString(true).c_str()); return false; } else { return true; @@ -146,47 +161,136 @@ void DBIter::Next() { } } - // Temporarily use saved_key_ as storage for key to skip. - std::string* skip = &saved_key_; - SaveKey(ExtractUserKey(iter_->key()), skip); - FindNextUserEntry(true, skip); + // If the current value is merged, we might already hit end of iter_ + if (!iter_->Valid()) { + valid_ = false; + return; + } + FindNextUserEntry(true /* skipping the current user key */); } -void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { + +// PRE: saved_key_ has the current user key if skipping +// POST: saved_key_ should have the next user key if valid_, +// if the current entry is a result of merge +// current_entry_is_merged_ => true +// saved_value_ => the merged value +// +// NOTE: In between, saved_key_ can point to a user key that has +// a delete marker +void DBIter::FindNextUserEntry(bool skipping) { // Loop until we hit an acceptable entry to yield assert(iter_->Valid()); assert(direction_ == kForward); + current_entry_is_merged_ = false; do { ParsedInternalKey ikey; if (ParseKey(&ikey) && ikey.sequence <= sequence_) { - switch (ikey.type) { - case kTypeDeletion: - // Arrange to skip all upcoming entries for this key since - // they are hidden by this deletion. - SaveKey(ikey.user_key, skip); - skipping = true; - break; - case kTypeValue: - if (skipping && - user_comparator_->Compare(ikey.user_key, *skip) <= 0) { - // Entry hidden - } else { + if (skipping && + user_comparator_->Compare(ikey.user_key, saved_key_) <= 0) { + // skip this entry + } else { + skipping = false; + switch (ikey.type) { + case kTypeDeletion: + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + SaveKey(ikey.user_key, &saved_key_); + skipping = true; + break; + case kTypeValue: valid_ = true; - saved_key_.clear(); + SaveKey(ikey.user_key, &saved_key_); return; - } - break; + case kTypeMerge: + // By now, we are sure the current ikey is going to yield a value + SaveKey(ikey.user_key, &saved_key_); + current_entry_is_merged_ = true; + valid_ = true; + // Go to a different state machine + MergeValuesNewToOld(); + // TODO: what if !iter_->Valid() + return; + break; + } } } iter_->Next(); } while (iter_->Valid()); - saved_key_.clear(); valid_ = false; } +// Merge values of the same user key starting from the current iter_ position +// Scan from the newer entries to older entries. +// PRE: iter_->key() points to the first merge type entry +// saved_key_ stores the user key +// POST: saved_value_ has the merged value for the user key +// iter_ points to the next entry (or invalid) +void DBIter::MergeValuesNewToOld() { + + const Slice value = iter_->value(); + std::string operand(value.data(), value.size()); + + ParsedInternalKey ikey; + for (iter_->Next(); iter_->Valid(); iter_->Next()) { + if (!ParseKey(&ikey)) { + // skip corrupted key + continue; + } + + if (user_comparator_->Compare(ikey.user_key, saved_key_) != 0) { + // hit the next user key, stop right here + break; + } + + if (kTypeDeletion == ikey.type) { + // hit a delete with the same user key, stop right here + // iter_ is positioned after delete + iter_->Next(); + break; + } + + if (kTypeValue == ikey.type) { + // hit a put, merge the put value with operand and store it in the + // final result saved_value_. We are done! + const Slice value = iter_->value(); + user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), + &saved_value_, logger_.get()); + // iter_ is positioned after put + iter_->Next(); + return; + } + + if (kTypeMerge == ikey.type) { + // hit a merge, merge the value with operand and continue. + // saved_value_ is used as a scratch area. The result is put + // back in operand + const Slice value = iter_->value(); + user_merge_operator_->Merge(ikey.user_key, &value, operand, + &saved_value_, logger_.get()); + swap(saved_value_, operand); + } + } + + // we either exhausted all internal keys under this user key, or hit + // a deletion marker. + // feed null as the existing value to the merge opexrator, such that + // client can differentiate this scenario and do things accordingly. + user_merge_operator_->Merge(ikey.user_key, nullptr, operand, + &saved_value_, logger_.get()); +} + void DBIter::Prev() { assert(valid_); + // TODO: support backward iteration + // Throw an exception now if merge_operator is provided + if (user_merge_operator_) { + Log(logger_, "Prev not supported yet if merge_operator is provided"); + throw std::logic_error("DBIter::Prev backward iteration not supported" + " if merge_operator is provided"); + } + if (direction_ == kForward) { // Switch directions? // iter_ is pointing at the current entry. Scan backwards until // the key changes so we can use the normal reverse scanning code. @@ -261,7 +365,7 @@ void DBIter::Seek(const Slice& target) { &saved_key_, ParsedInternalKey(target, sequence_, kValueTypeForSeek)); iter_->Seek(saved_key_); if (iter_->Valid()) { - FindNextUserEntry(false, &saved_key_ /* temporary storage */); + FindNextUserEntry(false /*not skipping */); } else { valid_ = false; } @@ -272,13 +376,21 @@ void DBIter::SeekToFirst() { ClearSavedValue(); iter_->SeekToFirst(); if (iter_->Valid()) { - FindNextUserEntry(false, &saved_key_ /* temporary storage */); + FindNextUserEntry(false /* not skipping */); } else { valid_ = false; } } void DBIter::SeekToLast() { + // TODO: support backward iteration + // throw an exception for now if merge_operator is provided + if (user_merge_operator_) { + Log(logger_, "SeekToLast not supported yet if merge_operator is provided"); + throw std::logic_error("DBIter::SeekToLast: backward iteration not" + " supported if merge_operator is provided"); + } + direction_ = kReverse; ClearSavedValue(); iter_->SeekToLast(); @@ -290,10 +402,12 @@ void DBIter::SeekToLast() { Iterator* NewDBIterator( const std::string* dbname, Env* env, - const Comparator* user_key_comparator, + const Options& options, + const Comparator *user_key_comparator, Iterator* internal_iter, const SequenceNumber& sequence) { - return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence); + return new DBIter(dbname, env, options, user_key_comparator, + internal_iter, sequence); } } // namespace leveldb diff --git a/db/db_iter.h b/db/db_iter.h index d9e1b174a..a8d849bb9 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -17,7 +17,8 @@ namespace leveldb { extern Iterator* NewDBIterator( const std::string* dbname, Env* env, - const Comparator* user_key_comparator, + const Options& options, + const Comparator *user_key_comparator, Iterator* internal_iter, const SequenceNumber& sequence); diff --git a/db/db_test.cc b/db/db_test.cc index b1ebd9004..6c042f05d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,6 +20,7 @@ #include "util/testharness.h" #include "util/testutil.h" #include "util/storage_options.h" +#include "utilities/merge_operators.h" namespace leveldb { @@ -209,6 +210,7 @@ class DBTest { // Sequence of option configurations to try enum OptionConfig { kDefault, + kMergePut, kFilter, kUncompressed, kNumLevel_3, @@ -220,6 +222,8 @@ class DBTest { }; int option_config_; + std::shared_ptr merge_operator_; + public: std::string dbname_; SpecialEnv* env_; @@ -228,6 +232,7 @@ class DBTest { Options last_options_; DBTest() : option_config_(kDefault), + merge_operator_(MergeOperators::CreatePutOperator()), env_(new SpecialEnv(Env::Default())) { filter_policy_ = NewBloomFilterPolicy(10); dbname_ = test::TmpDir() + "/db_test"; @@ -259,6 +264,9 @@ class DBTest { Options CurrentOptions() { Options options; switch (option_config_) { + case kMergePut: + options.merge_operator = merge_operator_.get(); + break; case kFilter: options.filter_policy = filter_policy_; break; @@ -326,8 +334,12 @@ class DBTest { return DB::Open(opts, dbname_, &db_); } - Status Put(const std::string& k, const std::string& v) { - return db_->Put(WriteOptions(), k, v); + Status Put(const Slice& k, const Slice& v) { + if (kMergePut == option_config_ ) { + return db_->Merge(WriteOptions(), k, v); + } else { + return db_->Put(WriteOptions(), k, v); + } } Status Delete(const std::string& k) { @@ -400,6 +412,10 @@ class DBTest { case kTypeValue: result += iter->value().ToString(); break; + case kTypeMerge: + // keep it the same as kTypeValue for testing kMergePut + result += iter->value().ToString(); + break; case kTypeDeletion: result += "DEL"; break; @@ -935,8 +951,11 @@ TEST(DBTest, IterMultiWithDelete) { Iterator* iter = db_->NewIterator(ReadOptions()); iter->Seek("c"); ASSERT_EQ(IterStatus(iter), "c->vc"); - iter->Prev(); - ASSERT_EQ(IterStatus(iter), "a->va"); + if (!CurrentOptions().merge_operator) { + // TODO: merge operator does not support backward iteration yet + iter->Prev(); + ASSERT_EQ(IterStatus(iter), "a->va"); + } delete iter; } while (ChangeOptions()); } @@ -2822,7 +2841,7 @@ static void MTThreadBody(void* arg) { // We add some padding for force compactions. snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d", key, id, static_cast(counter)); - ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf))); + ASSERT_OK(t->state->test->Put(Slice(keybuf), Slice(valbuf))); } else { // Read a value and verify that it matches the pattern written above. Status s = db->Get(ReadOptions(), Slice(keybuf), &value); @@ -2895,6 +2914,9 @@ class ModelDB: public DB { virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) { return DB::Put(o, k, v); } + virtual Status Merge(const WriteOptions& o, const Slice& k, const Slice& v) { + return DB::Merge(o, k, v); + } virtual Status Delete(const WriteOptions& o, const Slice& key) { return DB::Delete(o, key); } @@ -2930,6 +2952,10 @@ class ModelDB: public DB { virtual void Put(const Slice& key, const Slice& value) { (*map_)[key.ToString()] = value.ToString(); } + virtual void Merge(const Slice& key, const Slice& value) { + // ignore merge for now + //(*map_)[key.ToString()] = value.ToString(); + } virtual void Delete(const Slice& key) { map_->erase(key.ToString()); } diff --git a/db/dbformat.h b/db/dbformat.h index fb8525b0b..09af082e9 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -24,7 +24,8 @@ class InternalKey; // data structures. enum ValueType { kTypeDeletion = 0x0, - kTypeValue = 0x1 + kTypeValue = 0x1, + kTypeMerge = 0x2 }; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular @@ -32,7 +33,7 @@ enum ValueType { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -static const ValueType kValueTypeForSeek = kTypeValue; +static const ValueType kValueTypeForSeek = kTypeMerge; // We leave eight bits empty at the bottom so a type and sequence# // can be packed together into 64-bits. @@ -154,7 +155,7 @@ inline bool ParseInternalKey(const Slice& internal_key, result->sequence = num >> 8; result->type = static_cast(c); result->user_key = Slice(internal_key.data(), n - 8); - return (c <= static_cast(kTypeValue)); + return (c <= static_cast(kValueTypeForSeek)); } // Update the sequence number in the internal key diff --git a/db/memtable.cc b/db/memtable.cc index efe383fe7..097b60ff5 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -7,6 +7,7 @@ #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/iterator.h" +#include "leveldb/merge_operator.h" #include "util/coding.h" namespace leveldb { @@ -116,11 +117,23 @@ void MemTable::Add(SequenceNumber s, ValueType type, } } -bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, + const Options& options) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); - if (iter.Valid()) { + + bool merge_in_progress = false; + std::string operand; + if (s->IsMergeInProgress()) { + swap(*value, operand); + merge_in_progress = true; + } + + + auto merge_operator = options.merge_operator; + auto logger = options.info_log; + for (; iter.Valid(); iter.Next()) { // entry format is: // klength varint32 // userkey char[klength-8] @@ -141,15 +154,44 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - value->assign(v.data(), v.size()); + if (merge_in_progress) { + merge_operator->Merge(key.user_key(), &v, operand, + value, logger.get()); + } else { + value->assign(v.data(), v.size()); + } return true; } - case kTypeDeletion: - *s = Status::NotFound(Slice()); + case kTypeMerge: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + if (merge_in_progress) { + merge_operator->Merge(key.user_key(), &v, operand, + value, logger.get()); + swap(*value, operand); + } else { + assert(merge_operator); + merge_in_progress = true; + operand.assign(v.data(), v.size()); + } + break; + } + case kTypeDeletion: { + if (merge_in_progress) { + merge_operator->Merge(key.user_key(), nullptr, operand, + value, logger.get()); + } else { + *s = Status::NotFound(Slice()); + } return true; + } } } } + + if (merge_in_progress) { + swap(*value, operand); + *s = Status::MergeInProgress(""); + } return false; } diff --git a/db/memtable.h b/db/memtable.h index 61aa29205..8fb9ce943 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -62,8 +62,13 @@ class MemTable { // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. + // If memtable contains Merge operation as the most recent entry for a key, + // and the merge process does not stop (not reaching a value or delete), + // store the current merged result in value and MergeInProgress in s. + // return false // Else, return false. - bool Get(const LookupKey& key, std::string* value, Status* s); + bool Get(const LookupKey& key, std::string* value, Status* s, + const Options& options); // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } diff --git a/db/memtablelist.cc b/db/memtablelist.cc index c1ccda1a5..c12995726 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -174,10 +174,11 @@ size_t MemTableList::ApproximateMemoryUsage() { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. -bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s) { +bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, + const Options& options ) { for (list::iterator it = memlist_.begin(); it != memlist_.end(); ++it) { - if ((*it)->Get(key, value, s)) { + if ((*it)->Get(key, value, s, options)) { return true; } } diff --git a/db/memtablelist.h b/db/memtablelist.h index 9ab91a67a..de27150ef 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -71,7 +71,8 @@ class MemTableList { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. - bool Get(const LookupKey& key, std::string* value, Status* s); + bool Get(const LookupKey& key, std::string* value, Status* s, + const Options& options); // Returns the list of underlying memtables. void GetMemTables(std::vector* list); diff --git a/db/merge_helper.cc b/db/merge_helper.cc new file mode 100644 index 000000000..3520db15a --- /dev/null +++ b/db/merge_helper.cc @@ -0,0 +1,114 @@ +#include "merge_helper.h" +#include "db/dbformat.h" +#include "leveldb/comparator.h" +#include "leveldb/db.h" +#include "leveldb/merge_operator.h" +#include +#include + +namespace leveldb { + +// PRE: iter points to the first merge type entry +// POST: iter points to the first entry beyond the merge process (or the end) +// key_, value_ are updated to reflect the merge result +void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, + bool at_bottom) { + // get a copy of the internal key, before it's invalidated by iter->Next() + key_.assign(iter->key().data(), iter->key().size()); + // we need to parse the internal key again as the parsed key is + // backed by the internal key! + ParsedInternalKey orig_ikey; + // Assume no internal key corruption as it has been successfully parsed + // by the caller. + // TODO: determine a good alternative of assert (exception?) + ParseInternalKey(key_, &orig_ikey); + std::string operand(iter->value().data(), iter->value().size()); + + bool hit_the_next_user_key = false; + ParsedInternalKey ikey; + for (iter->Next(); iter->Valid(); iter->Next()) { + if (!ParseInternalKey(iter->key(), &ikey)) { + // stop at corrupted key + if (assert_valid_internal_key_) { + assert(!"corrupted internal key is not expected"); + } + break; + } + + if (user_comparator_->Compare(ikey.user_key, orig_ikey.user_key) != 0) { + // hit a different user key, stop right here + hit_the_next_user_key = true; + break; + } + + if (stop_before && ikey.sequence <= stop_before) { + // hit an entry that's visible by the previous snapshot, can't touch that + break; + } + + if (kTypeDeletion == ikey.type) { + // hit a delete + // => merge nullptr with operand + // => change the entry type to kTypeValue + // We are done! + user_merge_operator_->Merge(ikey.user_key, nullptr, operand, + &value_, logger_); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key_[0], key_.size(), + orig_ikey.sequence, orig_ikey.type); + // move iter to the next entry + iter->Next(); + return; + } + + if (kTypeValue == ikey.type) { + // hit a put + // => merge the put value with operand + // => change the entry type to kTypeValue + // We are done! + const Slice value = iter->value(); + user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), + &value_, logger_); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key_[0], key_.size(), + orig_ikey.sequence, orig_ikey.type); + // move iter to the next entry + iter->Next(); + return; + } + + if (kTypeMerge == ikey.type) { + // hit a merge + // => merge the value with operand. + // => put the result back to operand and continue + const Slice value = iter->value(); + user_merge_operator_->Merge(ikey.user_key, &value, operand, + &value_, logger_); + swap(value_, operand); + continue; + } + } + + // We have seen the root history of this key if we are at the + // bottem level and exhausted all internal keys of this user key + // NOTE: !iter->Valid() does not necessarily mean we hit the + // beginning of a user key, as versions of a user key might be + // split into multiple files and some files might not be included + // in the merge. + bool seen_the_beginning = hit_the_next_user_key && at_bottom; + + if (seen_the_beginning) { + // do a final merge with nullptr as the existing value and say + // bye to the merge type (it's now converted to a Put) + assert(kTypeMerge == orig_ikey.type); + user_merge_operator_->Merge(orig_ikey.user_key, nullptr, operand, + &value_, logger_); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key_[0], key_.size(), + orig_ikey.sequence, orig_ikey.type); + } else { + swap(value_, operand); + } +} + +} // namespace leveldb diff --git a/db/merge_helper.h b/db/merge_helper.h new file mode 100644 index 000000000..206d7a53a --- /dev/null +++ b/db/merge_helper.h @@ -0,0 +1,64 @@ +#ifndef MERGE_HELPER_H +#define MERGE_HELPER_H + +#include "db/dbformat.h" +#include "leveldb/slice.h" +#include + +namespace leveldb { + +class Comparator; +class Iterator; +class Logger; +class MergeOperator; + +class MergeHelper { + public: + MergeHelper(const Comparator* user_comparator, + const MergeOperator* user_merge_operator, + Logger* logger, + bool assert_valid_internal_key) + : user_comparator_(user_comparator), + user_merge_operator_(user_merge_operator), + logger_(logger), + assert_valid_internal_key_(assert_valid_internal_key) {} + + // Merge entries until we hit + // - a corrupted key + // - a Put/Delete, + // - a different user key, + // - a specific sequence number (snapshot boundary), + // or - the end of iteration + // iter: (IN) points to the first merge type entry + // (OUT) points to the first entry not included in the merge process + // stop_before: (IN) a sequence number that merge should not cross. + // 0 means no restriction + // at_bottom: (IN) true if the iterator covers the bottem level, which means + // we could reach the start of the history of this user key. + void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, + bool at_bottom = false); + + // Query the merge result + // These are valid until the next MergeUtil call + // IMPORTANT: the key type could change after the MergeUntil call. + // Put/Delete + Merge + ... + Merge => Put + // Merge + ... + Merge => Merge + Slice key() { return Slice(key_); } + Slice value() { return Slice(value_); } + + private: + const Comparator* user_comparator_; + const MergeOperator* user_merge_operator_; + Logger* logger_; + Iterator* iter_; // in: the internal iterator, positioned at the first merge entry + bool assert_valid_internal_key_; // enforce no internal key corruption? + + // the scratch area that holds the result of MergeUntil + // valid up to the next MergeUntil call + std::string key_; + std::string value_; +}; + +} // namespace leveldb + +#endif diff --git a/db/merge_test.cc b/db/merge_test.cc new file mode 100644 index 000000000..2d2f6514f --- /dev/null +++ b/db/merge_test.cc @@ -0,0 +1,253 @@ +#include +#include +#include + +#include "leveldb/cache.h" +#include "leveldb/comparator.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/merge_operator.h" +#include "db/dbformat.h" +#include "utilities/merge_operators.h" + +using namespace std; +using namespace leveldb; + +auto mergeOperator = MergeOperators::CreateUInt64AddOperator(); + +std::shared_ptr OpenDb() { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator = mergeOperator.get(); + Status s = DB::Open(options, "/tmp/testdb", &db); + if (!s.ok()) { + cerr << s.ToString() << endl; + assert(false); + } + return std::shared_ptr(db); +} + +// Imagine we are maintaining a set of uint64 counters. +// Each counter has a distinct name. And we would like +// to support four high level operations: +// set, add, get and remove +// This is a quick implementation without a Merge operation. +class Counters { + + protected: + std::shared_ptr db_; + + WriteOptions put_option_; + ReadOptions get_option_; + WriteOptions delete_option_; + + uint64_t default_; + + public: + Counters(std::shared_ptr db, uint64_t defaultCount = 0) + : db_(db), + put_option_(), + get_option_(), + delete_option_(), + default_(defaultCount) { + assert(db_); + } + + virtual ~Counters() {} + + // public interface of Counters. + // All four functions return false + // if the underlying level db operation failed. + + // mapped to a levedb Put + bool set(const string& key, uint64_t value) { + // just treat the internal rep of int64 as the string + Slice slice((char *)&value, sizeof(value)); + auto s = db_->Put(put_option_, key, slice); + + if (s.ok()) { + return true; + } else { + cerr << s.ToString() << endl; + return false; + } + } + + // mapped to a leveldb Delete + bool remove(const string& key) { + auto s = db_->Delete(delete_option_, key); + + if (s.ok()) { + return true; + } else { + cerr << s.ToString() << std::endl; + return false; + } + } + + // mapped to a leveldb Get + bool get(const string& key, uint64_t *value) { + string str; + auto s = db_->Get(get_option_, key, &str); + + if (s.IsNotFound()) { + // return default value if not found; + *value = default_; + return true; + } else if (s.ok()) { + // deserialization + if (str.size() != sizeof(uint64_t)) { + cerr << "value corruption\n"; + return false; + } + *value = DecodeFixed64(&str[0]); + return true; + } else { + cerr << s.ToString() << std::endl; + return false; + } + } + + // 'add' is implemented as get -> modify -> set + // An alternative is a single merge operation, see MergeBasedCounters + virtual bool add(const string& key, uint64_t value) { + uint64_t base = default_; + return get(key, &base) && set(key, base + value); + } + + + // convenience functions for testing + void assert_set(const string& key, uint64_t value) { + assert(set(key, value)); + } + + void assert_remove(const string& key) { + assert(remove(key)); + } + + uint64_t assert_get(const string& key) { + uint64_t value = default_; + assert(get(key, &value)); + return value; + } + + void assert_add(const string& key, uint64_t value) { + assert(add(key, value)); + } +}; + +// Implement 'add' directly with the new Merge operation +class MergeBasedCounters : public Counters { + private: + WriteOptions merge_option_; // for merge + + public: + MergeBasedCounters(std::shared_ptr db, uint64_t defaultCount = 0) + : Counters(db, defaultCount), + merge_option_() { + } + + // mapped to a leveldb Merge operation + virtual bool add(const string& key, uint64_t value) override { + char encoded[sizeof(uint64_t)]; + EncodeFixed64(encoded, value); + Slice slice(encoded, sizeof(uint64_t)); + auto s = db_->Merge(merge_option_, key, slice); + + if (s.ok()) { + return true; + } else { + cerr << s.ToString() << endl; + return false; + } + } +}; + +void dumpDb(DB* db) { + auto it = unique_ptr(db->NewIterator(ReadOptions())); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + uint64_t value = DecodeFixed64(it->value().data()); + cout << it->key().ToString() << ": " << value << endl; + } + assert(it->status().ok()); // Check for any errors found during the scan +} + +void testCounters(Counters& counters, DB* db, bool test_compaction) { + + FlushOptions o; + o.wait = true; + + counters.assert_set("a", 1); + + if (test_compaction) db->Flush(o); + + assert(counters.assert_get("a") == 1); + + counters.assert_remove("b"); + + // defaut value is 0 if non-existent + assert(counters.assert_get("b") == 0); + + counters.assert_add("a", 2); + + if (test_compaction) db->Flush(o); + + // 1+2 = 3 + assert(counters.assert_get("a")== 3); + + dumpDb(db); + + std::cout << "1\n"; + + // 1+...+49 = ? + uint64_t sum = 0; + for (int i = 1; i < 50; i++) { + counters.assert_add("b", i); + sum += i; + } + assert(counters.assert_get("b") == sum); + + std::cout << "2\n"; + dumpDb(db); + + std::cout << "3\n"; + + if (test_compaction) { + db->Flush(o); + + cout << "Compaction started ...\n"; + db->CompactRange(nullptr, nullptr); + cout << "Compaction ended\n"; + + dumpDb(db); + + assert(counters.assert_get("a")== 3); + assert(counters.assert_get("b") == sum); + } +} + +int main(int argc, char *argv[]) { + + auto db = OpenDb(); + + { + cout << "Test read-modify-write counters... \n"; + Counters counters(db, 0); + testCounters(counters, db.get(), true); + } + + bool compact = false; + if (argc > 1) { + compact = true; + cout << "Turn on Compaction\n"; + } + + { + cout << "Test merge-based counters... \n"; + MergeBasedCounters counters(db, 0); + testCounters(counters, db.get(), compact); + } + + return 0; +} diff --git a/db/table_cache.cc b/db/table_cache.cc index 729d47018..9af91fea6 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -100,7 +100,7 @@ Status TableCache::Get(const ReadOptions& options, uint64_t file_size, const Slice& k, void* arg, - void (*saver)(void*, const Slice&, const Slice&, bool), + bool (*saver)(void*, const Slice&, const Slice&, bool), bool* tableIO) { Cache::Handle* handle = nullptr; Status s = FindTable(storage_options_, file_number, file_size, diff --git a/db/table_cache.h b/db/table_cache.h index 7b5dcc04d..f4dc3c86a 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -40,13 +40,14 @@ class TableCache { Table** tableptr = nullptr); // If a seek to internal key "k" in specified file finds an entry, - // call (*handle_result)(arg, found_key, found_value). + // call (*handle_result)(arg, found_key, found_value) repeatedly until + // it returns false. Status Get(const ReadOptions& options, uint64_t file_number, uint64_t file_size, const Slice& k, void* arg, - void (*handle_result)(void*, const Slice&, const Slice&, bool), + bool (*handle_result)(void*, const Slice&, const Slice&, bool), bool* tableIO); // Evict any entry for the specified file number diff --git a/db/version_set.cc b/db/version_set.cc index e86b790c2..428ceaff9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -12,6 +12,7 @@ #include "db/memtable.h" #include "db/table_cache.h" #include "leveldb/env.h" +#include "leveldb/merge_operator.h" #include "leveldb/table_builder.h" #include "table/merger.h" #include "table/two_level_iterator.h" @@ -227,29 +228,78 @@ enum SaverState { kFound, kDeleted, kCorrupt, + kMerge // value contains the current merge result (the operand) }; struct Saver { SaverState state; const Comparator* ucmp; Slice user_key; std::string* value; + const MergeOperator* merge_operator; + Logger* logger; bool didIO; // did we do any disk io? }; } -static void SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ +static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ Saver* s = reinterpret_cast(arg); ParsedInternalKey parsed_key; + // TODO: didIO and Merge? s->didIO = didIO; if (!ParseInternalKey(ikey, &parsed_key)) { + // TODO: what about corrupt during Merge? s->state = kCorrupt; } else { if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { - s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; - if (s->state == kFound) { - s->value->assign(v.data(), v.size()); + switch (parsed_key.type) { + case kTypeValue: + if (kNotFound == s->state) { + s->value->assign(v.data(), v.size()); + } else if (kMerge == s->state) { + std::string operand; + swap(operand, *s->value); + s->merge_operator->Merge(s->user_key, &v, operand, + s->value, s->logger); + } else { + assert(false); + } + s->state = kFound; + return false; + + case kTypeMerge: + if (kNotFound == s->state) { + s->state = kMerge; + s->value->assign(v.data(), v.size()); + } else if (kMerge == s->state) { + std::string operand; + swap(operand, *s->value); + s->merge_operator->Merge(s->user_key, &v, operand, + s->value, s->logger); + } else { + assert(false); + } + return true; + + case kTypeDeletion: + if (kNotFound == s->state) { + s->state = kDeleted; + } else if (kMerge == s->state) { + std::string operand; + swap(operand, *s->value); + s->merge_operator->Merge(s->user_key, nullptr, operand, + s->value, s->logger); + s->state = kFound; + } else { + assert(false); + } + return false; + } } } + + // s->state could be Corrupt, merge or notfound + + return false; } static bool NewestFirst(FileMetaData* a, FileMetaData* b) { @@ -269,14 +319,28 @@ Version::Version(VersionSet* vset, uint64_t version_number) files_ = new std::vector[vset->NumberLevels()]; } -Status Version::Get(const ReadOptions& options, - const LookupKey& k, - std::string* value, - GetStats* stats) { +void Version::Get(const ReadOptions& options, + const LookupKey& k, + std::string* value, + Status *status, + GetStats* stats, + const Options& db_options) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); - Status s; + + auto merge_operator = db_options.merge_operator; + auto logger = db_options.info_log; + + assert(status->ok() || status->IsMergeInProgress()); + Saver saver; + saver.state = status->ok()? kNotFound : kMerge; + saver.ucmp = ucmp; + saver.user_key = user_key; + saver.value = value; + saver.merge_operator = merge_operator; + saver.logger = logger.get(); + saver.didIO = false; stats->seek_file = nullptr; stats->seek_file_level = -1; @@ -325,24 +389,21 @@ Status Version::Get(const ReadOptions& options, } else { files = &tmp2; num_files = 1; + // TODO, is level 1-n files all disjoint in user key space? } } } - for (uint32_t i = 0; i < num_files; ++i) { + + for (uint32_t i = 0; i < num_files; ++i) { FileMetaData* f = files[i]; - Saver saver; - saver.state = kNotFound; - saver.ucmp = ucmp; - saver.user_key = user_key; - saver.value = value; - saver.didIO = false; bool tableIO = false; - s = vset_->table_cache_->Get(options, f->number, f->file_size, - ikey, &saver, SaveValue, &tableIO); - if (!s.ok()) { - return s; + *status = vset_->table_cache_->Get(options, f->number, f->file_size, + ikey, &saver, SaveValue, &tableIO); + // TODO: examine the behavior for corrupted key + if (!status->ok()) { + return; } if (last_file_read != nullptr && stats->seek_file == nullptr) { @@ -367,18 +428,33 @@ Status Version::Get(const ReadOptions& options, case kNotFound: break; // Keep searching in other files case kFound: - return s; + return; case kDeleted: - s = Status::NotFound(Slice()); // Use empty error message for speed - return s; + *status = Status::NotFound(Slice()); // Use empty error message for speed + return; case kCorrupt: - s = Status::Corruption("corrupted key for ", user_key); - return s; + *status = Status::Corruption("corrupted key for ", user_key); + return; + case kMerge: + break; } } } - return Status::NotFound(Slice()); // Use an empty error message for speed + + if (kMerge == saver.state) { + // merge operand is in *value and we hit the beginning of the key history + // do a final merge of nullptr and operand; + std::string operand; + swap(operand, *value); + merge_operator->Merge(user_key, nullptr, operand, + value, logger.get()); + *status = Status::OK(); + return; + } else { + *status = Status::NotFound(Slice()); // Use an empty error message for speed + return; + } } bool Version::UpdateStats(const GetStats& stats) { diff --git a/db/version_set.h b/db/version_set.h index 1484adf46..e8a611384 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -73,8 +73,8 @@ class Version { FileMetaData* seek_file; int seek_file_level; }; - Status Get(const ReadOptions&, const LookupKey& key, std::string* val, - GetStats* stats); + void Get(const ReadOptions&, const LookupKey& key, std::string* val, + Status* status, GetStats* stats, const Options& db_option); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. diff --git a/db/write_batch.cc b/db/write_batch.cc index 33f4a4257..2465c966e 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -7,7 +7,8 @@ // count: fixed32 // data: record[count] // record := -// kTypeValue varstring varstring | +// kTypeValue varstring varstring +// kTypeMerge varstring varstring // kTypeDeletion varstring // varstring := // len: varint32 @@ -20,6 +21,7 @@ #include "db/memtable.h" #include "db/write_batch_internal.h" #include "util/coding.h" +#include namespace leveldb { @@ -34,6 +36,10 @@ WriteBatch::~WriteBatch() { } WriteBatch::Handler::~Handler() { } +void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) { + throw std::runtime_error("Handler::Merge not implemented!"); +} + void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); @@ -68,6 +74,14 @@ Status WriteBatch::Iterate(Handler* handler) const { return Status::Corruption("bad WriteBatch Delete"); } break; + case kTypeMerge: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + handler->Merge(key, value); + } else { + return Status::Corruption("bad WriteBatch Merge"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -108,6 +122,14 @@ void WriteBatch::Delete(const Slice& key) { PutLengthPrefixedSlice(&rep_, key); } +void WriteBatch::Merge(const Slice& key, const Slice& value) { + WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); + rep_.push_back(static_cast(kTypeMerge)); + PutLengthPrefixedSlice(&rep_, key); + PutLengthPrefixedSlice(&rep_, value); +} + + namespace { class MemTableInserter : public WriteBatch::Handler { public: @@ -118,6 +140,10 @@ class MemTableInserter : public WriteBatch::Handler { mem_->Add(sequence_, kTypeValue, key, value); sequence_++; } + virtual void Merge(const Slice& key, const Slice& value) { + mem_->Add(sequence_, kTypeMerge, key, value); + sequence_++; + } virtual void Delete(const Slice& key) { mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 71a5e89cc..5a4a604bb 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -33,6 +33,14 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; + case kTypeMerge: + state.append("Merge("); + state.append(ikey.user_key.ToString()); + state.append(", "); + state.append(iter->value().ToString()); + state.append(")"); + count++; + break; case kTypeDeletion: state.append("Delete("); state.append(ikey.user_key.ToString()); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 6a6f6fc89..261e2139b 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -83,6 +83,14 @@ class DB { // Note: consider setting options.sync = true. virtual Status Delete(const WriteOptions& options, const Slice& key) = 0; + // Merge the database entry for "key" with "value". Returns OK on success, + // and a non-OK status on error. The semantics of this operation is + // determined by the user provided merge_operator when opening DB. + // Note: consider setting options.sync = true. + virtual Status Merge(const WriteOptions& options, + const Slice& key, + const Slice& value) = 0; + // Apply the specified updates to the database. // Returns OK on success, non-OK on failure. // Note: consider setting options.sync = true. @@ -185,7 +193,7 @@ class DB { virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size) = 0; - // The sequence number of the most recent transaction. + // The sequence number of the most recent transaction. virtual SequenceNumber GetLatestSequenceNumber() = 0; // Return's an iterator for all writes since the sequence number diff --git a/include/leveldb/merge_operator.h b/include/leveldb/merge_operator.h new file mode 100644 index 000000000..d948d3c6e --- /dev/null +++ b/include/leveldb/merge_operator.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ +#define STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ + +#include + +namespace leveldb { + +class Slice; +class Logger; + +// The Merge Operator interface. +// Client needs to provide an object implementing this interface if Merge +// operation is accessed. +// Essentially, MergeOperator specifies the SEMANTICS of a merge, which only +// client knows. It could be numeric addition, list append, string +// concatenation, ... , anything. +// The library, on the other hand, is concerned with the exercise of this +// interface, at the right time (during get, iteration, compaction...) +// Note that, even though in principle we don't require any special property +// of the merge operator, the current rocksdb compaction order does imply that +// an associative operator could be exercised more naturally (and more +// efficiently). +// +// Refer to my_test.cc for an example of implementation +// +class MergeOperator { + public: + virtual ~MergeOperator() {} + + // Gives the client a way to express the read -> modify -> write semantics + // key: (IN) The key that's associated with this merge operation. + // Client could multiplex the merge operator based on it + // if the key space is partitioned and different subspaces + // refer to different types of data which have different + // merge operation semantics + // existing: (IN) null indicates that the key does not exist before this op + // value: (IN) The passed-in merge operand value (when Merge is issued) + // new_value:(OUT) Client is responsible for filling the merge result here + // logger: (IN) Client could use this to log errors during merge. + // + // Note: Merge does not return anything to indicate if a merge is successful + // or not. + // Rationale: If a merge failed due to, say de-serialization error, we still + // need to define a consistent merge result. Should we throw away + // the existing value? the merge operand? Or reset the merged value + // to sth? The rocksdb library is not in a position to make the + // right choice. On the other hand, client knows exactly what + // happened during Merge, thus is able to make the best decision. + // Just save the final decision in new_value. logger is passed in, + // in case client wants to leave a trace of what went wrong. + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const = 0; + + + // The name of the MergeOperator. Used to check for MergeOperator + // mismatches (i.e., a DB created with one MergeOperator is + // accessed using a different MergeOperator) + // TODO: the name is currently not stored persistently and thus + // no checking is enforced. Client is responsible for providing + // consistent MergeOperator between DB opens. + virtual const char* Name() const = 0; + +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 346f173d8..557303138 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -20,6 +20,7 @@ class Comparator; class Env; class FilterPolicy; class Logger; +class MergeOperator; class Snapshot; using std::shared_ptr; @@ -63,6 +64,18 @@ struct Options { // comparator provided to previous open calls on the same DB. const Comparator* comparator; + // REQUIRES: The client must provide a merge operator if Merge operation + // needs to be accessed. Calling Merge on a DB without a merge operator + // would result in Status::NotSupported. The client must ensure that the + // merge operator supplied here has the same name and *exactly* the same + // semantics as the merge operator provided to previous open calls on + // the same DB. The only exception is reserved for upgrade, where a DB + // previously without a merge operator is introduced to Merge operation + // for the first time. It's necessary to specify a merge operator when + // openning the DB in this case. + // Default: nullptr + const MergeOperator* merge_operator; + // If true, the database will be created if it is missing. // Default: false bool create_if_missing; diff --git a/include/leveldb/status.h b/include/leveldb/status.h index 3e4573cbe..a20351f32 100644 --- a/include/leveldb/status.h +++ b/include/leveldb/status.h @@ -47,6 +47,9 @@ class Status { static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIOError, msg, msg2); } + static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kMergeInProgress, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return (state_ == nullptr); } @@ -66,6 +69,9 @@ class Status { // Returns true iff the status indicates an IOError. bool IsIOError() const { return code() == kIOError; } + // Returns true iff the status indicates an MergeInProgress. + bool IsMergeInProgress() const { return code() == kMergeInProgress; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -84,7 +90,8 @@ class Status { kCorruption = 2, kNotSupported = 3, kInvalidArgument = 4, - kIOError = 5 + kIOError = 5, + kMergeInProgress = 6 }; Code code() const { diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 6546cadaa..a0f4e80ba 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -36,6 +36,10 @@ class WriteBatch { // Store the mapping "key->value" in the database. void Put(const Slice& key, const Slice& value); + // Merge "value" with the existing value of "key" in the database. + // "key->merge(existing, value)" + void Merge(const Slice& key, const Slice& value); + // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); @@ -47,6 +51,10 @@ class WriteBatch { public: virtual ~Handler(); virtual void Put(const Slice& key, const Slice& value) = 0; + // Merge is not pure virtual. Otherwise, we would break existing + // clients of Handler on a source code level. + // The default implementation simply throws a runtime exception. + virtual void Merge(const Slice& key, const Slice& value); virtual void Delete(const Slice& key) = 0; }; Status Iterate(Handler* handler) const; diff --git a/table/table.cc b/table/table.cc index b9971f464..386b9e074 100644 --- a/table/table.cc +++ b/table/table.cc @@ -285,11 +285,11 @@ Iterator* Table::NewIterator(const ReadOptions& options) const { Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, - void (*saver)(void*, const Slice&, const Slice&, bool)) { + bool (*saver)(void*, const Slice&, const Slice&, bool)) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); - iiter->Seek(k); - if (iiter->Valid()) { + bool done = false; + for (iiter->Seek(k); iiter->Valid() && !done; iiter->Next()) { Slice handle_value = iiter->value(); FilterBlockReader* filter = rep_->filter; BlockHandle handle; @@ -297,14 +297,20 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, handle.DecodeFrom(&handle_value).ok() && !filter->KeyMayMatch(handle.offset(), k)) { // Not found + // TODO: think about interaction with Merge. If a user key cannot + // cross one data block, we should be fine. RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL); + break; } else { bool didIO = false; Iterator* block_iter = BlockReader(this, options, iiter->value(), &didIO); - block_iter->Seek(k); - if (block_iter->Valid()) { - (*saver)(arg, block_iter->key(), block_iter->value(), didIO); + + for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) { + if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) { + done = true; + break; + } } s = block_iter->status(); delete block_iter; @@ -317,8 +323,9 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, return s; } -void SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) { +bool SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) { *reinterpret_cast(arg) = didIO; + return false; } bool Table::TEST_KeyInCache(const ReadOptions& options, const Slice& key) { // We use InternalGet() as it has logic that checks whether we read the diff --git a/table/table.h b/table/table.h index 816748ac4..bb0bd0385 100644 --- a/table/table.h +++ b/table/table.h @@ -74,14 +74,14 @@ class Table { static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, bool* didIO); - // Calls (*handle_result)(arg, ...) with the entry found after a call - // to Seek(key). May not make such a call if filter policy says - // that key is not present. + // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found + // after a call to Seek(key), until handle_result returns false. + // May not make such a call if filter policy says that key is not present. friend class TableCache; Status InternalGet( const ReadOptions&, const Slice& key, void* arg, - void (*handle_result)(void* arg, const Slice& k, const Slice& v, bool)); + bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool)); void ReadMeta(const Footer& footer); diff --git a/util/options.cc b/util/options.cc index 1d4836479..0fa80a06b 100644 --- a/util/options.cc +++ b/util/options.cc @@ -6,15 +6,17 @@ #include +#include "leveldb/cache.h" #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/filter_policy.h" -#include "leveldb/cache.h" +#include "leveldb/merge_operator.h" namespace leveldb { Options::Options() : comparator(BytewiseComparator()), + merge_operator(nullptr), create_if_missing(false), error_if_exists(false), paranoid_checks(false), @@ -72,7 +74,8 @@ void Options::Dump(Logger* log) const { Log(log," Options.comparator: %s", comparator->Name()); - Log(log," Options.create_if_missing: %d", create_if_missing); + Log(log," Options.merge_operator: %s", + merge_operator? merge_operator->Name() : "None"); Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.env: %p", env); diff --git a/util/status.cc b/util/status.cc index 5591381a1..2cd737579 100644 --- a/util/status.cc +++ b/util/status.cc @@ -58,6 +58,9 @@ std::string Status::ToString() const { case kIOError: type = "IO error: "; break; + case kMergeInProgress: + type = "Merge In Progress: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code())); diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h new file mode 100644 index 000000000..c7c06422a --- /dev/null +++ b/utilities/merge_operators.h @@ -0,0 +1,18 @@ +#ifndef MERGE_OPERATORS_H +#define MERGE_OPERATORS_H + +#include + +#include "leveldb/merge_operator.h" + +namespace leveldb { + +class MergeOperators { + public: + static std::shared_ptr CreatePutOperator(); + static std::shared_ptr CreateUInt64AddOperator(); +}; + +} + +#endif diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc new file mode 100644 index 000000000..9a610c4ce --- /dev/null +++ b/utilities/merge_operators/put.cc @@ -0,0 +1,35 @@ +#include +#include "leveldb/slice.h" +#include "leveldb/merge_operator.h" +#include "utilities/merge_operators.h" + +using namespace leveldb; + +namespace { // anonymous namespace + +// A merge operator that mimics Put semantics +class PutOperator : public MergeOperator { + public: + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override { + // put basically only looks at the current value + new_value->assign(value.data(), value.size()); + } + + virtual const char* Name() const override { + return "PutOperator"; + } +}; + +} // end of anonymous namespace + +namespace leveldb { + +std::shared_ptr MergeOperators::CreatePutOperator() { + return std::make_shared(); +} + +} diff --git a/utilities/merge_operators/uint64add.cc b/utilities/merge_operators/uint64add.cc new file mode 100644 index 000000000..72ef29023 --- /dev/null +++ b/utilities/merge_operators/uint64add.cc @@ -0,0 +1,63 @@ +#include +#include "leveldb/env.h" +#include "leveldb/merge_operator.h" +#include "leveldb/slice.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + + +using namespace leveldb; + +namespace { // anonymous namespace + +// A 'model' merge operator with uint64 addition semantics +class UInt64AddOperator : public MergeOperator { + public: + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override { + // assuming 0 if no existing value + uint64_t existing = 0; + if (existing_value) { + if (existing_value->size() == sizeof(uint64_t)) { + existing = DecodeFixed64(existing_value->data()); + } else { + // if existing_value is corrupted, treat it as 0 + Log(logger, "existing value corruption, size: %zu > %zu", + existing_value->size(), sizeof(uint64_t)); + existing = 0; + } + } + + uint64_t operand; + if (value.size() == sizeof(uint64_t)) { + operand = DecodeFixed64(value.data()); + } else { + // if operand is corrupted, treat it as 0 + Log(logger, "operand value corruption, size: %zu > %zu", + value.size(), sizeof(uint64_t)); + operand = 0; + } + + new_value->resize(sizeof(uint64_t)); + EncodeFixed64(&(*new_value)[0], existing + operand); + + return; + } + + virtual const char* Name() const override { + return "UInt64AddOperator"; + } +}; + +} + +namespace leveldb { + +std::shared_ptr MergeOperators::CreateUInt64AddOperator() { + return std::make_shared(); +} + +} diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index c24719582..3af366c70 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -185,6 +185,12 @@ Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) { return db_->Delete(wopts, key); } +Status DBWithTTL::Merge(const WriteOptions& options, + const Slice& key, + const Slice& value) { + return Status::NotSupported("Merge operation not supported."); +} + Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { return db_->Write(opts, updates); } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index e20c46e7c..e29a37b72 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -29,6 +29,11 @@ class DBWithTTL : public DB { virtual Status Delete(const WriteOptions& wopts, const Slice& key); + virtual Status Merge(const WriteOptions& options, + const Slice& key, + const Slice& value); + + virtual Status Write(const WriteOptions& opts, WriteBatch* updates); virtual Iterator* NewIterator(const ReadOptions& opts);