[Rocksdb] Support Merge operation in rocksdb

Summary:
This diff introduces a new Merge operation into rocksdb.
The purpose of this review is mostly getting feedback from the team (everyone please) on the design.

Please focus on the four files under include/leveldb/, as they spell the client visible interface change.
include/leveldb/db.h
include/leveldb/merge_operator.h
include/leveldb/options.h
include/leveldb/write_batch.h

Please go over local/my_test.cc carefully, as it is a concerete use case.

Please also review the impelmentation files to see if the straw man implementation makes sense.

Note that, the diff does pass all make check and truly supports forward iterator over db and a version
of Get that's based on iterator.

Future work:
- Integration with compaction
- A raw Get implementation

I am working on a wiki that explains the design and implementation choices, but coding comes
just naturally and I think it might be a good idea to share the code earlier. The code is
heavily commented.

Test Plan: run all local tests

Reviewers: dhruba, heyongqiang

Reviewed By: dhruba

CC: leveldb, zshao, sheki, emayanke, MarkCallaghan

Differential Revision: https://reviews.facebook.net/D9651
main
Haobo Xu 11 years ago
parent 37e97b1297
commit 05e8854085
  1. 7
      Makefile
  2. 60
      db/builder.cc
  3. 71
      db/db_impl.cc
  4. 11
      db/db_impl.h
  5. 6
      db/db_impl_readonly.cc
  6. 4
      db/db_impl_readonly.h
  7. 174
      db/db_iter.cc
  8. 3
      db/db_iter.h
  9. 36
      db/db_test.cc
  10. 7
      db/dbformat.h
  11. 52
      db/memtable.cc
  12. 7
      db/memtable.h
  13. 5
      db/memtablelist.cc
  14. 3
      db/memtablelist.h
  15. 114
      db/merge_helper.cc
  16. 64
      db/merge_helper.h
  17. 253
      db/merge_test.cc
  18. 2
      db/table_cache.cc
  19. 5
      db/table_cache.h
  20. 128
      db/version_set.cc
  21. 4
      db/version_set.h
  22. 28
      db/write_batch.cc
  23. 8
      db/write_batch_test.cc
  24. 10
      include/leveldb/db.h
  25. 74
      include/leveldb/merge_operator.h
  26. 13
      include/leveldb/options.h
  27. 9
      include/leveldb/status.h
  28. 8
      include/leveldb/write_batch.h
  29. 21
      table/table.cc
  30. 8
      table/table.h
  31. 7
      util/options.cc
  32. 3
      util/status.cc
  33. 18
      utilities/merge_operators.h
  34. 35
      utilities/merge_operators/put.cc
  35. 63
      utilities/merge_operators/uint64add.cc
  36. 6
      utilities/ttl/db_ttl.cc
  37. 5
      utilities/ttl/db_ttl.h

@ -60,8 +60,8 @@ TESTS = \
reduce_levels_test \
write_batch_test \
auto_roll_logger_test \
filelock_test
filelock_test \
merge_test
TOOLS = \
sst_dump \
@ -225,6 +225,9 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
merge_test: db/merge_test.o $(LIBOBJECTS)
$(CXX) db/merge_test.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
$(MEMENVLIBRARY) : $(MEMENVOBJECTS)
rm -f $@
$(AR) -rs $@ $(MEMENVOBJECTS)

@ -6,6 +6,7 @@
#include "db/filename.h"
#include "db/dbformat.h"
#include "db/merge_helper.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "leveldb/db.h"
@ -49,36 +50,75 @@ Status BuildTable(const std::string& dbname,
Slice key = iter->key();
meta->smallest.DecodeFrom(key);
MergeHelper merge(user_comparator, options.merge_operator,
options.info_log.get(),
true /* internal key corruption is not ok */);
if (purge) {
ParsedInternalKey ikey;
// Ugly walkaround to avoid compiler error for release build
// TODO: find a clean way to treat in memory key corruption
ikey.type = kTypeValue;
ParsedInternalKey prev_ikey;
std::string prev_value;
std::string prev_key;
// store first key-value
prev_key.assign(key.data(), key.size());
prev_value.assign(iter->value().data(), iter->value().size());
ParseInternalKey(Slice(prev_key), &prev_ikey);
assert(prev_ikey.sequence >= earliest_seqno_in_memtable);
// Ugly walkaround to avoid compiler error for release build
// TODO: find a clean way to treat in memory key corruption
auto ok __attribute__((unused)) = ParseInternalKey(key, &ikey);
// in-memory key corruption is not ok;
assert(ok);
if (ikey.type == kTypeMerge) {
// merge values if the first entry is of merge type
merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
prev_key.assign(merge.key().data(), merge.key().size());
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
assert(ok);
prev_value.assign(merge.value().data(), merge.value().size());
} else {
// store first key-value
prev_key.assign(key.data(), key.size());
prev_value.assign(iter->value().data(), iter->value().size());
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
assert(ok);
assert(prev_ikey.sequence >= earliest_seqno_in_memtable);
iter->Next();
}
for (iter->Next(); iter->Valid(); iter->Next()) {
while (iter->Valid()) {
bool iterator_at_next = false;
ParsedInternalKey this_ikey;
Slice key = iter->key();
ParseInternalKey(key, &this_ikey);
ok = ParseInternalKey(key, &this_ikey);
assert(ok);
assert(this_ikey.sequence >= earliest_seqno_in_memtable);
if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) {
// This key is different from previous key.
// Output prev key and remember current key
builder->Add(Slice(prev_key), Slice(prev_value));
prev_key.assign(key.data(), key.size());
prev_value.assign(iter->value().data(), iter->value().size());
ParseInternalKey(Slice(prev_key), &prev_ikey);
if (this_ikey.type == kTypeMerge) {
merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
iterator_at_next = true;
prev_key.assign(merge.key().data(), merge.key().size());
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
assert(ok);
prev_value.assign(merge.value().data(), merge.value().size());
} else {
prev_key.assign(key.data(), key.size());
prev_value.assign(iter->value().data(), iter->value().size());
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
assert(ok);
}
} else {
// seqno within the same key are in decreasing order
assert(this_ikey.sequence < prev_ikey.sequence);
// This key is an earlier version of the same key in prev_key.
// Skip current key.
}
if (!iterator_at_next) iter->Next();
}
// output last key
builder->Add(Slice(prev_key), Slice(prev_value));

@ -21,12 +21,14 @@
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtablelist.h"
#include "db/merge_helper.h"
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "db/transaction_log_iterator_impl.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/merge_operator.h"
#include "leveldb/statistics.h"
#include "leveldb/status.h"
#include "leveldb/table_builder.h"
@ -497,6 +499,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
// TODO: add merge_operator name check
s = NewDB();
if (!s.ok()) {
return s;
@ -1514,11 +1517,13 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
SequenceNumber in, std::vector<SequenceNumber>& snapshots) {
SequenceNumber in, std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot) {
SequenceNumber prev __attribute__((unused)) = 0;
for (const auto cur : snapshots) {
assert(prev <= cur);
if (cur >= in) {
*prev_snapshot = prev;
return cur;
}
prev = cur; // assignment
@ -1591,6 +1596,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
kMaxSequenceNumber;
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
std::string compaction_filter_value;
MergeHelper merge(user_comparator(), options_.merge_operator,
options_.info_log.get(),
false /* internal key corruption is expected */);
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
@ -1617,8 +1625,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Handle key/value, add to state, etc.
bool drop = false;
bool current_entry_is_merged = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
// TODO: error key stays in db forever? Figure out the intention/rationale
// v10 error v8 : we cannot hide v8 even though it's pretty obvious.
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
@ -1637,15 +1648,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find
// the earlist snapshot that is affected by this kv.
SequenceNumber visible = visible_at_tip ? visible_at_tip :
findEarliestVisibleSnapshot(ikey.sequence,
compact->existing_snapshots);
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
SequenceNumber visible = visible_at_tip ?
visible_at_tip :
findEarliestVisibleSnapshot(ikey.sequence,
compact->existing_snapshots,
&prev_snapshot);
if (visible_in_snapshot == visible) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibily of a previous instance of the
// same key, then this kv is not visible in any snapshot.
// Hidden by an newer entry for same user key
// TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence);
drop = true; // (A)
RecordTick(options_.statistics, COMPACTION_KEY_DROP_NEWER_ENTRY);
@ -1661,6 +1676,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE);
} else if (ikey.type == kTypeMerge) {
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level);
current_entry_is_merged = true;
// get the merge result
key = merge.key();
ParseInternalKey(key, &ikey);
value = merge.value();
} else if (options_.CompactionFilter != nullptr &&
ikey.type != kTypeDeletion &&
ikey.sequence < earliest_snapshot) {
@ -1709,7 +1737,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if (bottommost_level && ikey.sequence < earliest_snapshot) {
if (bottommost_level && ikey.sequence < earliest_snapshot &&
ikey.type != kTypeMerge) {
assert(ikey.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
@ -1744,7 +1773,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
}
input->Next();
// MergeUntil has moved input to the next entry
if (!current_entry_is_merged) {
input->Next();
}
}
if (status.ok() && shutting_down_.Acquire_Load()) {
@ -1906,14 +1938,17 @@ Status DBImpl::Get(const ReadOptions& options,
mutex_.Unlock();
bool have_stat_update = false;
Version::GetStats stats;
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// value will contain the current merge operand in the latter case.
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
if (mem->Get(lkey, value, &s, options_)) {
// Done
} else if (imm.Get(lkey, value, &s)) {
} else if (imm.Get(lkey, value, &s, options_)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
current->Get(options, lkey, value, &s, &stats, options_);
have_stat_update = true;
}
mutex_.Lock();
@ -1934,7 +1969,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
&dbname_, env_, options_, user_comparator(), internal_iter,
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
@ -1955,6 +1990,15 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Merge(const WriteOptions& o, const Slice& key,
const Slice& val) {
if (!options_.merge_operator) {
return Status::NotSupported("Provide a merge_operator when opening DB");
} else {
return DB::Merge(o, key, val);
}
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
@ -2382,6 +2426,13 @@ Status DB::Delete(const WriteOptions& opt, const Slice& key) {
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, const Slice& key,
const Slice& value) {
WriteBatch batch;
batch.Merge(key, value);
return Write(opt, &batch);
}
DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname,

@ -36,6 +36,8 @@ class DBImpl : public DB {
// Implementations of the DB interface
virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value);
virtual Status Merge(const WriteOptions&, const Slice& key,
const Slice& value);
virtual Status Delete(const WriteOptions&, const Slice& key);
virtual Status Write(const WriteOptions& options, WriteBatch* updates);
virtual Status Get(const ReadOptions& options,
@ -339,9 +341,12 @@ class DBImpl : public DB {
// dump the delayed_writes_ to the log file and reset counter.
void DelayLoggingAndReset();
// find the earliest snapshot where seqno is visible
inline SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in,
std::vector<SequenceNumber>& snapshots);
// Return the earliest snapshot where seqno is visible.
// Store the snapshot right before that, if any, in prev_snapshot
inline SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in,
std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
};
// Sanitize db options. The caller should delete result.info_log if

@ -54,10 +54,10 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
Version* current = versions_->current();
SequenceNumber snapshot = versions_->LastSequence();
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
if (mem->Get(lkey, value, &s, options_)) {
} else {
Version::GetStats stats;
s = current->Get(options, lkey, value, &stats);
current->Get(options, lkey, value, &s, &stats, options_);
}
return s;
}
@ -66,7 +66,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
&dbname_, env_, options_, user_comparator(),internal_iter,
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));

@ -37,6 +37,10 @@ public:
virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Merge(const WriteOptions&, const Slice& key,
const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Delete(const WriteOptions&, const Slice& key) {
return Status::NotSupported("Not supported operation in read only mode.");
}

@ -2,12 +2,15 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <stdexcept>
#include "db/db_iter.h"
#include "db/filename.h"
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/iterator.h"
#include "leveldb/merge_operator.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
@ -36,6 +39,7 @@ namespace {
// numbers, deletion markers, overwrites, etc.
class DBIter: public Iterator {
public:
// The following is grossly complicated. TODO: clean it up
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
@ -46,15 +50,18 @@ class DBIter: public Iterator {
kReverse
};
DBIter(const std::string* dbname, Env* env,
DBIter(const std::string* dbname, Env* env, const Options& options,
const Comparator* cmp, Iterator* iter, SequenceNumber s)
: dbname_(dbname),
env_(env),
logger_(options.info_log),
user_comparator_(cmp),
user_merge_operator_(options.merge_operator),
iter_(iter),
sequence_(s),
direction_(kForward),
valid_(false) {
valid_(false),
current_entry_is_merged_(false) {
}
virtual ~DBIter() {
delete iter_;
@ -62,11 +69,12 @@ class DBIter: public Iterator {
virtual bool Valid() const { return valid_; }
virtual Slice key() const {
assert(valid_);
return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
return saved_key_;
}
virtual Slice value() const {
assert(valid_);
return (direction_ == kForward) ? iter_->value() : saved_value_;
return (direction_ == kForward && !current_entry_is_merged_) ?
iter_->value() : saved_value_;
}
virtual Status status() const {
if (status_.ok()) {
@ -83,9 +91,10 @@ class DBIter: public Iterator {
virtual void SeekToLast();
private:
void FindNextUserEntry(bool skipping, std::string* skip);
void FindNextUserEntry(bool skipping);
void FindPrevUserEntry();
bool ParseKey(ParsedInternalKey* key);
void MergeValuesNewToOld();
inline void SaveKey(const Slice& k, std::string* dst) {
dst->assign(k.data(), k.size());
@ -102,15 +111,19 @@ class DBIter: public Iterator {
const std::string* const dbname_;
Env* const env_;
shared_ptr<Logger> logger_;
const Comparator* const user_comparator_;
const MergeOperator* const user_merge_operator_;
Iterator* const iter_;
SequenceNumber const sequence_;
Status status_;
std::string saved_key_; // == current key when direction_==kReverse
std::string saved_value_; // == current raw value when direction_==kReverse
std::string skip_key_;
Direction direction_;
bool valid_;
bool current_entry_is_merged_;
// No copying allowed
DBIter(const DBIter&);
@ -120,6 +133,8 @@ class DBIter: public Iterator {
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
if (!ParseInternalKey(iter_->key(), ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter");
Log(logger_, "corrupted internal key in DBIter: %s",
iter_->key().ToString(true).c_str());
return false;
} else {
return true;
@ -146,47 +161,136 @@ void DBIter::Next() {
}
}
// Temporarily use saved_key_ as storage for key to skip.
std::string* skip = &saved_key_;
SaveKey(ExtractUserKey(iter_->key()), skip);
FindNextUserEntry(true, skip);
// If the current value is merged, we might already hit end of iter_
if (!iter_->Valid()) {
valid_ = false;
return;
}
FindNextUserEntry(true /* skipping the current user key */);
}
void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
// PRE: saved_key_ has the current user key if skipping
// POST: saved_key_ should have the next user key if valid_,
// if the current entry is a result of merge
// current_entry_is_merged_ => true
// saved_value_ => the merged value
//
// NOTE: In between, saved_key_ can point to a user key that has
// a delete marker
void DBIter::FindNextUserEntry(bool skipping) {
// Loop until we hit an acceptable entry to yield
assert(iter_->Valid());
assert(direction_ == kForward);
current_entry_is_merged_ = false;
do {
ParsedInternalKey ikey;
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
switch (ikey.type) {
case kTypeDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
SaveKey(ikey.user_key, skip);
skipping = true;
break;
case kTypeValue:
if (skipping &&
user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
// Entry hidden
} else {
if (skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_) <= 0) {
// skip this entry
} else {
skipping = false;
switch (ikey.type) {
case kTypeDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
SaveKey(ikey.user_key, &saved_key_);
skipping = true;
break;
case kTypeValue:
valid_ = true;
saved_key_.clear();
SaveKey(ikey.user_key, &saved_key_);
return;
}
break;
case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value
SaveKey(ikey.user_key, &saved_key_);
current_entry_is_merged_ = true;
valid_ = true;
// Go to a different state machine
MergeValuesNewToOld();
// TODO: what if !iter_->Valid()
return;
break;
}
}
}
iter_->Next();
} while (iter_->Valid());
saved_key_.clear();
valid_ = false;
}
// Merge values of the same user key starting from the current iter_ position
// Scan from the newer entries to older entries.
// PRE: iter_->key() points to the first merge type entry
// saved_key_ stores the user key
// POST: saved_value_ has the merged value for the user key
// iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
const Slice value = iter_->value();
std::string operand(value.data(), value.size());
ParsedInternalKey ikey;
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
if (!ParseKey(&ikey)) {
// skip corrupted key
continue;
}
if (user_comparator_->Compare(ikey.user_key, saved_key_) != 0) {
// hit the next user key, stop right here
break;
}
if (kTypeDeletion == ikey.type) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_->Next();
break;
}
if (kTypeValue == ikey.type) {
// hit a put, merge the put value with operand and store it in the
// final result saved_value_. We are done!
const Slice value = iter_->value();
user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand),
&saved_value_, logger_.get());
// iter_ is positioned after put
iter_->Next();
return;
}
if (kTypeMerge == ikey.type) {
// hit a merge, merge the value with operand and continue.
// saved_value_ is used as a scratch area. The result is put
// back in operand
const Slice value = iter_->value();
user_merge_operator_->Merge(ikey.user_key, &value, operand,
&saved_value_, logger_.get());
swap(saved_value_, operand);
}
}
// we either exhausted all internal keys under this user key, or hit
// a deletion marker.
// feed null as the existing value to the merge opexrator, such that
// client can differentiate this scenario and do things accordingly.
user_merge_operator_->Merge(ikey.user_key, nullptr, operand,
&saved_value_, logger_.get());
}
void DBIter::Prev() {
assert(valid_);
// TODO: support backward iteration
// Throw an exception now if merge_operator is provided
if (user_merge_operator_) {
Log(logger_, "Prev not supported yet if merge_operator is provided");
throw std::logic_error("DBIter::Prev backward iteration not supported"
" if merge_operator is provided");
}
if (direction_ == kForward) { // Switch directions?
// iter_ is pointing at the current entry. Scan backwards until
// the key changes so we can use the normal reverse scanning code.
@ -261,7 +365,7 @@ void DBIter::Seek(const Slice& target) {
&saved_key_, ParsedInternalKey(target, sequence_, kValueTypeForSeek));
iter_->Seek(saved_key_);
if (iter_->Valid()) {
FindNextUserEntry(false, &saved_key_ /* temporary storage */);
FindNextUserEntry(false /*not skipping */);
} else {
valid_ = false;
}
@ -272,13 +376,21 @@ void DBIter::SeekToFirst() {
ClearSavedValue();
iter_->SeekToFirst();
if (iter_->Valid()) {
FindNextUserEntry(false, &saved_key_ /* temporary storage */);
FindNextUserEntry(false /* not skipping */);
} else {
valid_ = false;
}
}
void DBIter::SeekToLast() {
// TODO: support backward iteration
// throw an exception for now if merge_operator is provided
if (user_merge_operator_) {
Log(logger_, "SeekToLast not supported yet if merge_operator is provided");
throw std::logic_error("DBIter::SeekToLast: backward iteration not"
" supported if merge_operator is provided");
}
direction_ = kReverse;
ClearSavedValue();
iter_->SeekToLast();
@ -290,10 +402,12 @@ void DBIter::SeekToLast() {
Iterator* NewDBIterator(
const std::string* dbname,
Env* env,
const Comparator* user_key_comparator,
const Options& options,
const Comparator *user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence) {
return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
return new DBIter(dbname, env, options, user_key_comparator,
internal_iter, sequence);
}
} // namespace leveldb

@ -17,7 +17,8 @@ namespace leveldb {
extern Iterator* NewDBIterator(
const std::string* dbname,
Env* env,
const Comparator* user_key_comparator,
const Options& options,
const Comparator *user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence);

@ -20,6 +20,7 @@
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/storage_options.h"
#include "utilities/merge_operators.h"
namespace leveldb {
@ -209,6 +210,7 @@ class DBTest {
// Sequence of option configurations to try
enum OptionConfig {
kDefault,
kMergePut,
kFilter,
kUncompressed,
kNumLevel_3,
@ -220,6 +222,8 @@ class DBTest {
};
int option_config_;
std::shared_ptr<MergeOperator> merge_operator_;
public:
std::string dbname_;
SpecialEnv* env_;
@ -228,6 +232,7 @@ class DBTest {
Options last_options_;
DBTest() : option_config_(kDefault),
merge_operator_(MergeOperators::CreatePutOperator()),
env_(new SpecialEnv(Env::Default())) {
filter_policy_ = NewBloomFilterPolicy(10);
dbname_ = test::TmpDir() + "/db_test";
@ -259,6 +264,9 @@ class DBTest {
Options CurrentOptions() {
Options options;
switch (option_config_) {
case kMergePut:
options.merge_operator = merge_operator_.get();
break;
case kFilter:
options.filter_policy = filter_policy_;
break;
@ -326,8 +334,12 @@ class DBTest {
return DB::Open(opts, dbname_, &db_);
}
Status Put(const std::string& k, const std::string& v) {
return db_->Put(WriteOptions(), k, v);
Status Put(const Slice& k, const Slice& v) {
if (kMergePut == option_config_ ) {
return db_->Merge(WriteOptions(), k, v);
} else {
return db_->Put(WriteOptions(), k, v);
}
}
Status Delete(const std::string& k) {
@ -400,6 +412,10 @@ class DBTest {
case kTypeValue:
result += iter->value().ToString();
break;
case kTypeMerge:
// keep it the same as kTypeValue for testing kMergePut
result += iter->value().ToString();
break;
case kTypeDeletion:
result += "DEL";
break;
@ -935,8 +951,11 @@ TEST(DBTest, IterMultiWithDelete) {
Iterator* iter = db_->NewIterator(ReadOptions());
iter->Seek("c");
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
if (!CurrentOptions().merge_operator) {
// TODO: merge operator does not support backward iteration yet
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
}
delete iter;
} while (ChangeOptions());
}
@ -2822,7 +2841,7 @@ static void MTThreadBody(void* arg) {
// We add some padding for force compactions.
snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
key, id, static_cast<int>(counter));
ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
ASSERT_OK(t->state->test->Put(Slice(keybuf), Slice(valbuf)));
} else {
// Read a value and verify that it matches the pattern written above.
Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
@ -2895,6 +2914,9 @@ class ModelDB: public DB {
virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) {
return DB::Put(o, k, v);
}
virtual Status Merge(const WriteOptions& o, const Slice& k, const Slice& v) {
return DB::Merge(o, k, v);
}
virtual Status Delete(const WriteOptions& o, const Slice& key) {
return DB::Delete(o, key);
}
@ -2930,6 +2952,10 @@ class ModelDB: public DB {
virtual void Put(const Slice& key, const Slice& value) {
(*map_)[key.ToString()] = value.ToString();
}
virtual void Merge(const Slice& key, const Slice& value) {
// ignore merge for now
//(*map_)[key.ToString()] = value.ToString();
}
virtual void Delete(const Slice& key) {
map_->erase(key.ToString());
}

@ -24,7 +24,8 @@ class InternalKey;
// data structures.
enum ValueType {
kTypeDeletion = 0x0,
kTypeValue = 0x1
kTypeValue = 0x1,
kTypeMerge = 0x2
};
// kValueTypeForSeek defines the ValueType that should be passed when
// constructing a ParsedInternalKey object for seeking to a particular
@ -32,7 +33,7 @@ enum ValueType {
// and the value type is embedded as the low 8 bits in the sequence
// number in internal keys, we need to use the highest-numbered
// ValueType, not the lowest).
static const ValueType kValueTypeForSeek = kTypeValue;
static const ValueType kValueTypeForSeek = kTypeMerge;
// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
@ -154,7 +155,7 @@ inline bool ParseInternalKey(const Slice& internal_key,
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
result->user_key = Slice(internal_key.data(), n - 8);
return (c <= static_cast<unsigned char>(kTypeValue));
return (c <= static_cast<unsigned char>(kValueTypeForSeek));
}
// Update the sequence number in the internal key

@ -7,6 +7,7 @@
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "leveldb/merge_operator.h"
#include "util/coding.h"
namespace leveldb {
@ -116,11 +117,23 @@ void MemTable::Add(SequenceNumber s, ValueType type,
}
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
const Options& options) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
bool merge_in_progress = false;
std::string operand;
if (s->IsMergeInProgress()) {
swap(*value, operand);
merge_in_progress = true;
}
auto merge_operator = options.merge_operator;
auto logger = options.info_log;
for (; iter.Valid(); iter.Next()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
@ -141,15 +154,44 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
if (merge_in_progress) {
merge_operator->Merge(key.user_key(), &v, operand,
value, logger.get());
} else {
value->assign(v.data(), v.size());
}
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
case kTypeMerge: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
if (merge_in_progress) {
merge_operator->Merge(key.user_key(), &v, operand,
value, logger.get());
swap(*value, operand);
} else {
assert(merge_operator);
merge_in_progress = true;
operand.assign(v.data(), v.size());
}
break;
}
case kTypeDeletion: {
if (merge_in_progress) {
merge_operator->Merge(key.user_key(), nullptr, operand,
value, logger.get());
} else {
*s = Status::NotFound(Slice());
}
return true;
}
}
}
}
if (merge_in_progress) {
swap(*value, operand);
*s = Status::MergeInProgress("");
}
return false;
}

@ -62,8 +62,13 @@ class MemTable {
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// If memtable contains Merge operation as the most recent entry for a key,
// and the merge process does not stop (not reaching a value or delete),
// store the current merged result in value and MergeInProgress in s.
// return false
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);
bool Get(const LookupKey& key, std::string* value, Status* s,
const Options& options);
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }

@ -174,10 +174,11 @@ size_t MemTableList::ApproximateMemoryUsage() {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s) {
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s,
const Options& options ) {
for (list<MemTable*>::iterator it = memlist_.begin();
it != memlist_.end(); ++it) {
if ((*it)->Get(key, value, s)) {
if ((*it)->Get(key, value, s, options)) {
return true;
}
}

@ -71,7 +71,8 @@ class MemTableList {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s);
bool Get(const LookupKey& key, std::string* value, Status* s,
const Options& options);
// Returns the list of underlying memtables.
void GetMemTables(std::vector<MemTable*>* list);

@ -0,0 +1,114 @@
#include "merge_helper.h"
#include "db/dbformat.h"
#include "leveldb/comparator.h"
#include "leveldb/db.h"
#include "leveldb/merge_operator.h"
#include <string>
#include <stdio.h>
namespace leveldb {
// PRE: iter points to the first merge type entry
// POST: iter points to the first entry beyond the merge process (or the end)
// key_, value_ are updated to reflect the merge result
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool at_bottom) {
// get a copy of the internal key, before it's invalidated by iter->Next()
key_.assign(iter->key().data(), iter->key().size());
// we need to parse the internal key again as the parsed key is
// backed by the internal key!
ParsedInternalKey orig_ikey;
// Assume no internal key corruption as it has been successfully parsed
// by the caller.
// TODO: determine a good alternative of assert (exception?)
ParseInternalKey(key_, &orig_ikey);
std::string operand(iter->value().data(), iter->value().size());
bool hit_the_next_user_key = false;
ParsedInternalKey ikey;
for (iter->Next(); iter->Valid(); iter->Next()) {
if (!ParseInternalKey(iter->key(), &ikey)) {
// stop at corrupted key
if (assert_valid_internal_key_) {
assert(!"corrupted internal key is not expected");
}
break;
}
if (user_comparator_->Compare(ikey.user_key, orig_ikey.user_key) != 0) {
// hit a different user key, stop right here
hit_the_next_user_key = true;
break;
}
if (stop_before && ikey.sequence <= stop_before) {
// hit an entry that's visible by the previous snapshot, can't touch that
break;
}
if (kTypeDeletion == ikey.type) {
// hit a delete
// => merge nullptr with operand
// => change the entry type to kTypeValue
// We are done!
user_merge_operator_->Merge(ikey.user_key, nullptr, operand,
&value_, logger_);
orig_ikey.type = kTypeValue;
UpdateInternalKey(&key_[0], key_.size(),
orig_ikey.sequence, orig_ikey.type);
// move iter to the next entry
iter->Next();
return;
}
if (kTypeValue == ikey.type) {
// hit a put
// => merge the put value with operand
// => change the entry type to kTypeValue
// We are done!
const Slice value = iter->value();
user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand),
&value_, logger_);
orig_ikey.type = kTypeValue;
UpdateInternalKey(&key_[0], key_.size(),
orig_ikey.sequence, orig_ikey.type);
// move iter to the next entry
iter->Next();
return;
}
if (kTypeMerge == ikey.type) {
// hit a merge
// => merge the value with operand.
// => put the result back to operand and continue
const Slice value = iter->value();
user_merge_operator_->Merge(ikey.user_key, &value, operand,
&value_, logger_);
swap(value_, operand);
continue;
}
}
// We have seen the root history of this key if we are at the
// bottem level and exhausted all internal keys of this user key
// NOTE: !iter->Valid() does not necessarily mean we hit the
// beginning of a user key, as versions of a user key might be
// split into multiple files and some files might not be included
// in the merge.
bool seen_the_beginning = hit_the_next_user_key && at_bottom;
if (seen_the_beginning) {
// do a final merge with nullptr as the existing value and say
// bye to the merge type (it's now converted to a Put)
assert(kTypeMerge == orig_ikey.type);
user_merge_operator_->Merge(orig_ikey.user_key, nullptr, operand,
&value_, logger_);
orig_ikey.type = kTypeValue;
UpdateInternalKey(&key_[0], key_.size(),
orig_ikey.sequence, orig_ikey.type);
} else {
swap(value_, operand);
}
}
} // namespace leveldb

@ -0,0 +1,64 @@
#ifndef MERGE_HELPER_H
#define MERGE_HELPER_H
#include "db/dbformat.h"
#include "leveldb/slice.h"
#include <string>
namespace leveldb {
class Comparator;
class Iterator;
class Logger;
class MergeOperator;
class MergeHelper {
public:
MergeHelper(const Comparator* user_comparator,
const MergeOperator* user_merge_operator,
Logger* logger,
bool assert_valid_internal_key)
: user_comparator_(user_comparator),
user_merge_operator_(user_merge_operator),
logger_(logger),
assert_valid_internal_key_(assert_valid_internal_key) {}
// Merge entries until we hit
// - a corrupted key
// - a Put/Delete,
// - a different user key,
// - a specific sequence number (snapshot boundary),
// or - the end of iteration
// iter: (IN) points to the first merge type entry
// (OUT) points to the first entry not included in the merge process
// stop_before: (IN) a sequence number that merge should not cross.
// 0 means no restriction
// at_bottom: (IN) true if the iterator covers the bottem level, which means
// we could reach the start of the history of this user key.
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
bool at_bottom = false);
// Query the merge result
// These are valid until the next MergeUtil call
// IMPORTANT: the key type could change after the MergeUntil call.
// Put/Delete + Merge + ... + Merge => Put
// Merge + ... + Merge => Merge
Slice key() { return Slice(key_); }
Slice value() { return Slice(value_); }
private:
const Comparator* user_comparator_;
const MergeOperator* user_merge_operator_;
Logger* logger_;
Iterator* iter_; // in: the internal iterator, positioned at the first merge entry
bool assert_valid_internal_key_; // enforce no internal key corruption?
// the scratch area that holds the result of MergeUntil
// valid up to the next MergeUntil call
std::string key_;
std::string value_;
};
} // namespace leveldb
#endif

@ -0,0 +1,253 @@
#include <assert.h>
#include <memory>
#include <iostream>
#include "leveldb/cache.h"
#include "leveldb/comparator.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/merge_operator.h"
#include "db/dbformat.h"
#include "utilities/merge_operators.h"
using namespace std;
using namespace leveldb;
auto mergeOperator = MergeOperators::CreateUInt64AddOperator();
std::shared_ptr<DB> OpenDb() {
DB* db;
Options options;
options.create_if_missing = true;
options.merge_operator = mergeOperator.get();
Status s = DB::Open(options, "/tmp/testdb", &db);
if (!s.ok()) {
cerr << s.ToString() << endl;
assert(false);
}
return std::shared_ptr<DB>(db);
}
// Imagine we are maintaining a set of uint64 counters.
// Each counter has a distinct name. And we would like
// to support four high level operations:
// set, add, get and remove
// This is a quick implementation without a Merge operation.
class Counters {
protected:
std::shared_ptr<DB> db_;
WriteOptions put_option_;
ReadOptions get_option_;
WriteOptions delete_option_;
uint64_t default_;
public:
Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
: db_(db),
put_option_(),
get_option_(),
delete_option_(),
default_(defaultCount) {
assert(db_);
}
virtual ~Counters() {}
// public interface of Counters.
// All four functions return false
// if the underlying level db operation failed.
// mapped to a levedb Put
bool set(const string& key, uint64_t value) {
// just treat the internal rep of int64 as the string
Slice slice((char *)&value, sizeof(value));
auto s = db_->Put(put_option_, key, slice);
if (s.ok()) {
return true;
} else {
cerr << s.ToString() << endl;
return false;
}
}
// mapped to a leveldb Delete
bool remove(const string& key) {
auto s = db_->Delete(delete_option_, key);
if (s.ok()) {
return true;
} else {
cerr << s.ToString() << std::endl;
return false;
}
}
// mapped to a leveldb Get
bool get(const string& key, uint64_t *value) {
string str;
auto s = db_->Get(get_option_, key, &str);
if (s.IsNotFound()) {
// return default value if not found;
*value = default_;
return true;
} else if (s.ok()) {
// deserialization
if (str.size() != sizeof(uint64_t)) {
cerr << "value corruption\n";
return false;
}
*value = DecodeFixed64(&str[0]);
return true;
} else {
cerr << s.ToString() << std::endl;
return false;
}
}
// 'add' is implemented as get -> modify -> set
// An alternative is a single merge operation, see MergeBasedCounters
virtual bool add(const string& key, uint64_t value) {
uint64_t base = default_;
return get(key, &base) && set(key, base + value);
}
// convenience functions for testing
void assert_set(const string& key, uint64_t value) {
assert(set(key, value));
}
void assert_remove(const string& key) {
assert(remove(key));
}
uint64_t assert_get(const string& key) {
uint64_t value = default_;
assert(get(key, &value));
return value;
}
void assert_add(const string& key, uint64_t value) {
assert(add(key, value));
}
};
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public Counters {
private:
WriteOptions merge_option_; // for merge
public:
MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
: Counters(db, defaultCount),
merge_option_() {
}
// mapped to a leveldb Merge operation
virtual bool add(const string& key, uint64_t value) override {
char encoded[sizeof(uint64_t)];
EncodeFixed64(encoded, value);
Slice slice(encoded, sizeof(uint64_t));
auto s = db_->Merge(merge_option_, key, slice);
if (s.ok()) {
return true;
} else {
cerr << s.ToString() << endl;
return false;
}
}
};
void dumpDb(DB* db) {
auto it = unique_ptr<Iterator>(db->NewIterator(ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
uint64_t value = DecodeFixed64(it->value().data());
cout << it->key().ToString() << ": " << value << endl;
}
assert(it->status().ok()); // Check for any errors found during the scan
}
void testCounters(Counters& counters, DB* db, bool test_compaction) {
FlushOptions o;
o.wait = true;
counters.assert_set("a", 1);
if (test_compaction) db->Flush(o);
assert(counters.assert_get("a") == 1);
counters.assert_remove("b");
// defaut value is 0 if non-existent
assert(counters.assert_get("b") == 0);
counters.assert_add("a", 2);
if (test_compaction) db->Flush(o);
// 1+2 = 3
assert(counters.assert_get("a")== 3);
dumpDb(db);
std::cout << "1\n";
// 1+...+49 = ?
uint64_t sum = 0;
for (int i = 1; i < 50; i++) {
counters.assert_add("b", i);
sum += i;
}
assert(counters.assert_get("b") == sum);
std::cout << "2\n";
dumpDb(db);
std::cout << "3\n";
if (test_compaction) {
db->Flush(o);
cout << "Compaction started ...\n";
db->CompactRange(nullptr, nullptr);
cout << "Compaction ended\n";
dumpDb(db);
assert(counters.assert_get("a")== 3);
assert(counters.assert_get("b") == sum);
}
}
int main(int argc, char *argv[]) {
auto db = OpenDb();
{
cout << "Test read-modify-write counters... \n";
Counters counters(db, 0);
testCounters(counters, db.get(), true);
}
bool compact = false;
if (argc > 1) {
compact = true;
cout << "Turn on Compaction\n";
}
{
cout << "Test merge-based counters... \n";
MergeBasedCounters counters(db, 0);
testCounters(counters, db.get(), compact);
}
return 0;
}

@ -100,7 +100,7 @@ Status TableCache::Get(const ReadOptions& options,
uint64_t file_size,
const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&, bool),
bool (*saver)(void*, const Slice&, const Slice&, bool),
bool* tableIO) {
Cache::Handle* handle = nullptr;
Status s = FindTable(storage_options_, file_number, file_size,

@ -40,13 +40,14 @@ class TableCache {
Table** tableptr = nullptr);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value).
// call (*handle_result)(arg, found_key, found_value) repeatedly until
// it returns false.
Status Get(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
const Slice& k,
void* arg,
void (*handle_result)(void*, const Slice&, const Slice&, bool),
bool (*handle_result)(void*, const Slice&, const Slice&, bool),
bool* tableIO);
// Evict any entry for the specified file number

@ -12,6 +12,7 @@
#include "db/memtable.h"
#include "db/table_cache.h"
#include "leveldb/env.h"
#include "leveldb/merge_operator.h"
#include "leveldb/table_builder.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
@ -227,29 +228,78 @@ enum SaverState {
kFound,
kDeleted,
kCorrupt,
kMerge // value contains the current merge result (the operand)
};
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
std::string* value;
const MergeOperator* merge_operator;
Logger* logger;
bool didIO; // did we do any disk io?
};
}
static void SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
Saver* s = reinterpret_cast<Saver*>(arg);
ParsedInternalKey parsed_key;
// TODO: didIO and Merge?
s->didIO = didIO;
if (!ParseInternalKey(ikey, &parsed_key)) {
// TODO: what about corrupt during Merge?
s->state = kCorrupt;
} else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
switch (parsed_key.type) {
case kTypeValue:
if (kNotFound == s->state) {
s->value->assign(v.data(), v.size());
} else if (kMerge == s->state) {
std::string operand;
swap(operand, *s->value);
s->merge_operator->Merge(s->user_key, &v, operand,
s->value, s->logger);
} else {
assert(false);
}
s->state = kFound;
return false;
case kTypeMerge:
if (kNotFound == s->state) {
s->state = kMerge;
s->value->assign(v.data(), v.size());
} else if (kMerge == s->state) {
std::string operand;
swap(operand, *s->value);
s->merge_operator->Merge(s->user_key, &v, operand,
s->value, s->logger);
} else {
assert(false);
}
return true;
case kTypeDeletion:
if (kNotFound == s->state) {
s->state = kDeleted;
} else if (kMerge == s->state) {
std::string operand;
swap(operand, *s->value);
s->merge_operator->Merge(s->user_key, nullptr, operand,
s->value, s->logger);
s->state = kFound;
} else {
assert(false);
}
return false;
}
}
}
// s->state could be Corrupt, merge or notfound
return false;
}
static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
@ -269,14 +319,28 @@ Version::Version(VersionSet* vset, uint64_t version_number)
files_ = new std::vector<FileMetaData*>[vset->NumberLevels()];
}
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
void Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
Status *status,
GetStats* stats,
const Options& db_options) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
auto merge_operator = db_options.merge_operator;
auto logger = db_options.info_log;
assert(status->ok() || status->IsMergeInProgress());
Saver saver;
saver.state = status->ok()? kNotFound : kMerge;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
saver.merge_operator = merge_operator;
saver.logger = logger.get();
saver.didIO = false;
stats->seek_file = nullptr;
stats->seek_file_level = -1;
@ -325,24 +389,21 @@ Status Version::Get(const ReadOptions& options,
} else {
files = &tmp2;
num_files = 1;
// TODO, is level 1-n files all disjoint in user key space?
}
}
}
for (uint32_t i = 0; i < num_files; ++i) {
for (uint32_t i = 0; i < num_files; ++i) {
FileMetaData* f = files[i];
Saver saver;
saver.state = kNotFound;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value = value;
saver.didIO = false;
bool tableIO = false;
s = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue, &tableIO);
if (!s.ok()) {
return s;
*status = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue, &tableIO);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
}
if (last_file_read != nullptr && stats->seek_file == nullptr) {
@ -367,18 +428,33 @@ Status Version::Get(const ReadOptions& options,
case kNotFound:
break; // Keep searching in other files
case kFound:
return s;
return;
case kDeleted:
s = Status::NotFound(Slice()); // Use empty error message for speed
return s;
*status = Status::NotFound(Slice()); // Use empty error message for speed
return;
case kCorrupt:
s = Status::Corruption("corrupted key for ", user_key);
return s;
*status = Status::Corruption("corrupted key for ", user_key);
return;
case kMerge:
break;
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
if (kMerge == saver.state) {
// merge operand is in *value and we hit the beginning of the key history
// do a final merge of nullptr and operand;
std::string operand;
swap(operand, *value);
merge_operator->Merge(user_key, nullptr, operand,
value, logger.get());
*status = Status::OK();
return;
} else {
*status = Status::NotFound(Slice()); // Use an empty error message for speed
return;
}
}
bool Version::UpdateStats(const GetStats& stats) {

@ -73,8 +73,8 @@ class Version {
FileMetaData* seek_file;
int seek_file_level;
};
Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);
void Get(const ReadOptions&, const LookupKey& key, std::string* val,
Status* status, GetStats* stats, const Options& db_option);
// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.

@ -7,7 +7,8 @@
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring |
// kTypeValue varstring varstring
// kTypeMerge varstring varstring
// kTypeDeletion varstring
// varstring :=
// len: varint32
@ -20,6 +21,7 @@
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "util/coding.h"
#include <stdexcept>
namespace leveldb {
@ -34,6 +36,10 @@ WriteBatch::~WriteBatch() { }
WriteBatch::Handler::~Handler() { }
void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) {
throw std::runtime_error("Handler::Merge not implemented!");
}
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(kHeader);
@ -68,6 +74,14 @@ Status WriteBatch::Iterate(Handler* handler) const {
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeMerge:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Merge(key, value);
} else {
return Status::Corruption("bad WriteBatch Merge");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
@ -108,6 +122,14 @@ void WriteBatch::Delete(const Slice& key) {
PutLengthPrefixedSlice(&rep_, key);
}
void WriteBatch::Merge(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeMerge));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
namespace {
class MemTableInserter : public WriteBatch::Handler {
public:
@ -118,6 +140,10 @@ class MemTableInserter : public WriteBatch::Handler {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
virtual void Merge(const Slice& key, const Slice& value) {
mem_->Add(sequence_, kTypeMerge, key, value);
sequence_++;
}
virtual void Delete(const Slice& key) {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;

@ -33,6 +33,14 @@ static std::string PrintContents(WriteBatch* b) {
state.append(")");
count++;
break;
case kTypeMerge:
state.append("Merge(");
state.append(ikey.user_key.ToString());
state.append(", ");
state.append(iter->value().ToString());
state.append(")");
count++;
break;
case kTypeDeletion:
state.append("Delete(");
state.append(ikey.user_key.ToString());

@ -83,6 +83,14 @@ class DB {
// Note: consider setting options.sync = true.
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Merge the database entry for "key" with "value". Returns OK on success,
// and a non-OK status on error. The semantics of this operation is
// determined by the user provided merge_operator when opening DB.
// Note: consider setting options.sync = true.
virtual Status Merge(const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
// Apply the specified updates to the database.
// Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = true.
@ -185,7 +193,7 @@ class DB {
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size) = 0;
// The sequence number of the most recent transaction.
// The sequence number of the most recent transaction.
virtual SequenceNumber GetLatestSequenceNumber() = 0;
// Return's an iterator for all writes since the sequence number

@ -0,0 +1,74 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_
#define STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_
#include <string>
namespace leveldb {
class Slice;
class Logger;
// The Merge Operator interface.
// Client needs to provide an object implementing this interface if Merge
// operation is accessed.
// Essentially, MergeOperator specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
// Note that, even though in principle we don't require any special property
// of the merge operator, the current rocksdb compaction order does imply that
// an associative operator could be exercised more naturally (and more
// efficiently).
//
// Refer to my_test.cc for an example of implementation
//
class MergeOperator {
public:
virtual ~MergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// Client could multiplex the merge operator based on it
// if the key space is partitioned and different subspaces
// refer to different types of data which have different
// merge operation semantics
// existing: (IN) null indicates that the key does not exist before this op
// value: (IN) The passed-in merge operand value (when Merge is issued)
// new_value:(OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Note: Merge does not return anything to indicate if a merge is successful
// or not.
// Rationale: If a merge failed due to, say de-serialization error, we still
// need to define a consistent merge result. Should we throw away
// the existing value? the merge operand? Or reset the merged value
// to sth? The rocksdb library is not in a position to make the
// right choice. On the other hand, client knows exactly what
// happened during Merge, thus is able to make the best decision.
// Just save the final decision in new_value. logger is passed in,
// in case client wants to leave a trace of what went wrong.
virtual void Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
// TODO: the name is currently not stored persistently and thus
// no checking is enforced. Client is responsible for providing
// consistent MergeOperator between DB opens.
virtual const char* Name() const = 0;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_

@ -20,6 +20,7 @@ class Comparator;
class Env;
class FilterPolicy;
class Logger;
class MergeOperator;
class Snapshot;
using std::shared_ptr;
@ -63,6 +64,18 @@ struct Options {
// comparator provided to previous open calls on the same DB.
const Comparator* comparator;
// REQUIRES: The client must provide a merge operator if Merge operation
// needs to be accessed. Calling Merge on a DB without a merge operator
// would result in Status::NotSupported. The client must ensure that the
// merge operator supplied here has the same name and *exactly* the same
// semantics as the merge operator provided to previous open calls on
// the same DB. The only exception is reserved for upgrade, where a DB
// previously without a merge operator is introduced to Merge operation
// for the first time. It's necessary to specify a merge operator when
// openning the DB in this case.
// Default: nullptr
const MergeOperator* merge_operator;
// If true, the database will be created if it is missing.
// Default: false
bool create_if_missing;

@ -47,6 +47,9 @@ class Status {
static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, msg, msg2);
}
static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kMergeInProgress, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return (state_ == nullptr); }
@ -66,6 +69,9 @@ class Status {
// Returns true iff the status indicates an IOError.
bool IsIOError() const { return code() == kIOError; }
// Returns true iff the status indicates an MergeInProgress.
bool IsMergeInProgress() const { return code() == kMergeInProgress; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
@ -84,7 +90,8 @@ class Status {
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5
kIOError = 5,
kMergeInProgress = 6
};
Code code() const {

@ -36,6 +36,10 @@ class WriteBatch {
// Store the mapping "key->value" in the database.
void Put(const Slice& key, const Slice& value);
// Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)"
void Merge(const Slice& key, const Slice& value);
// If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(const Slice& key);
@ -47,6 +51,10 @@ class WriteBatch {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
// Merge is not pure virtual. Otherwise, we would break existing
// clients of Handler on a source code level.
// The default implementation simply throws a runtime exception.
virtual void Merge(const Slice& key, const Slice& value);
virtual void Delete(const Slice& key) = 0;
};
Status Iterate(Handler* handler) const;

@ -285,11 +285,11 @@ Iterator* Table::NewIterator(const ReadOptions& options) const {
Status Table::InternalGet(const ReadOptions& options, const Slice& k,
void* arg,
void (*saver)(void*, const Slice&, const Slice&, bool)) {
bool (*saver)(void*, const Slice&, const Slice&, bool)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
iiter->Seek(k);
if (iiter->Valid()) {
bool done = false;
for (iiter->Seek(k); iiter->Valid() && !done; iiter->Next()) {
Slice handle_value = iiter->value();
FilterBlockReader* filter = rep_->filter;
BlockHandle handle;
@ -297,14 +297,20 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
handle.DecodeFrom(&handle_value).ok() &&
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
// TODO: think about interaction with Merge. If a user key cannot
// cross one data block, we should be fine.
RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL);
break;
} else {
bool didIO = false;
Iterator* block_iter = BlockReader(this, options, iiter->value(),
&didIO);
block_iter->Seek(k);
if (block_iter->Valid()) {
(*saver)(arg, block_iter->key(), block_iter->value(), didIO);
for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) {
if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) {
done = true;
break;
}
}
s = block_iter->status();
delete block_iter;
@ -317,8 +323,9 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
return s;
}
void SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) {
bool SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) {
*reinterpret_cast<bool*>(arg) = didIO;
return false;
}
bool Table::TEST_KeyInCache(const ReadOptions& options, const Slice& key) {
// We use InternalGet() as it has logic that checks whether we read the

@ -74,14 +74,14 @@ class Table {
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO);
// Calls (*handle_result)(arg, ...) with the entry found after a call
// to Seek(key). May not make such a call if filter policy says
// that key is not present.
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
// May not make such a call if filter policy says that key is not present.
friend class TableCache;
Status InternalGet(
const ReadOptions&, const Slice& key,
void* arg,
void (*handle_result)(void* arg, const Slice& k, const Slice& v, bool));
bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool));
void ReadMeta(const Footer& footer);

@ -6,15 +6,17 @@
#include <limits>
#include "leveldb/cache.h"
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/cache.h"
#include "leveldb/merge_operator.h"
namespace leveldb {
Options::Options()
: comparator(BytewiseComparator()),
merge_operator(nullptr),
create_if_missing(false),
error_if_exists(false),
paranoid_checks(false),
@ -72,7 +74,8 @@ void
Options::Dump(Logger* log) const
{
Log(log," Options.comparator: %s", comparator->Name());
Log(log," Options.create_if_missing: %d", create_if_missing);
Log(log," Options.merge_operator: %s",
merge_operator? merge_operator->Name() : "None");
Log(log," Options.error_if_exists: %d", error_if_exists);
Log(log," Options.paranoid_checks: %d", paranoid_checks);
Log(log," Options.env: %p", env);

@ -58,6 +58,9 @@ std::string Status::ToString() const {
case kIOError:
type = "IO error: ";
break;
case kMergeInProgress:
type = "Merge In Progress: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code()));

@ -0,0 +1,18 @@
#ifndef MERGE_OPERATORS_H
#define MERGE_OPERATORS_H
#include <memory>
#include "leveldb/merge_operator.h"
namespace leveldb {
class MergeOperators {
public:
static std::shared_ptr<leveldb::MergeOperator> CreatePutOperator();
static std::shared_ptr<leveldb::MergeOperator> CreateUInt64AddOperator();
};
}
#endif

@ -0,0 +1,35 @@
#include <memory>
#include "leveldb/slice.h"
#include "leveldb/merge_operator.h"
#include "utilities/merge_operators.h"
using namespace leveldb;
namespace { // anonymous namespace
// A merge operator that mimics Put semantics
class PutOperator : public MergeOperator {
public:
virtual void Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
// put basically only looks at the current value
new_value->assign(value.data(), value.size());
}
virtual const char* Name() const override {
return "PutOperator";
}
};
} // end of anonymous namespace
namespace leveldb {
std::shared_ptr<MergeOperator> MergeOperators::CreatePutOperator() {
return std::make_shared<PutOperator>();
}
}

@ -0,0 +1,63 @@
#include <memory>
#include "leveldb/env.h"
#include "leveldb/merge_operator.h"
#include "leveldb/slice.h"
#include "util/coding.h"
#include "utilities/merge_operators.h"
using namespace leveldb;
namespace { // anonymous namespace
// A 'model' merge operator with uint64 addition semantics
class UInt64AddOperator : public MergeOperator {
public:
virtual void Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
// assuming 0 if no existing value
uint64_t existing = 0;
if (existing_value) {
if (existing_value->size() == sizeof(uint64_t)) {
existing = DecodeFixed64(existing_value->data());
} else {
// if existing_value is corrupted, treat it as 0
Log(logger, "existing value corruption, size: %zu > %zu",
existing_value->size(), sizeof(uint64_t));
existing = 0;
}
}
uint64_t operand;
if (value.size() == sizeof(uint64_t)) {
operand = DecodeFixed64(value.data());
} else {
// if operand is corrupted, treat it as 0
Log(logger, "operand value corruption, size: %zu > %zu",
value.size(), sizeof(uint64_t));
operand = 0;
}
new_value->resize(sizeof(uint64_t));
EncodeFixed64(&(*new_value)[0], existing + operand);
return;
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
};
}
namespace leveldb {
std::shared_ptr<MergeOperator> MergeOperators::CreateUInt64AddOperator() {
return std::make_shared<UInt64AddOperator>();
}
}

@ -185,6 +185,12 @@ Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) {
return db_->Delete(wopts, key);
}
Status DBWithTTL::Merge(const WriteOptions& options,
const Slice& key,
const Slice& value) {
return Status::NotSupported("Merge operation not supported.");
}
Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
return db_->Write(opts, updates);
}

@ -29,6 +29,11 @@ class DBWithTTL : public DB {
virtual Status Delete(const WriteOptions& wopts, const Slice& key);
virtual Status Merge(const WriteOptions& options,
const Slice& key,
const Slice& value);
virtual Status Write(const WriteOptions& opts, WriteBatch* updates);
virtual Iterator* NewIterator(const ReadOptions& opts);

Loading…
Cancel
Save