From c2d7826cedfff32f1307e6c4dff59f11e9c89a79 Mon Sep 17 00:00:00 2001 From: Deon Nicholas Date: Mon, 5 Aug 2013 20:14:32 -0700 Subject: [PATCH] [RocksDB] [MergeOperator] The new Merge Interface! Uses merge sequences. Summary: Here are the major changes to the Merge Interface. It has been expanded to handle cases where the MergeOperator is not associative. It does so by stacking up merge operations while scanning through the key history (i.e.: during Get() or Compaction), until a valid Put/Delete/end-of-history is encountered; it then applies all of the merge operations in the correct sequence starting with the base/sentinel value. I have also introduced an "AssociativeMerge" function which allows the user to take advantage of associative merge operations (such as in the case of counters). The implementation will always attempt to merge the operations/operands themselves together when they are encountered, and will resort to the "stacking" method if and only if the "associative-merge" fails. This implementation is conjectured to allow MergeOperator to handle the general case, while still providing the user with the ability to take advantage of certain efficiencies in their own merge-operator / data-structure. NOTE: This is a preliminary diff. This must still go through a lot of review, revision, and testing. Feedback welcome! Test Plan: -This is a preliminary diff. I have only just begun testing/debugging it. -I will be testing this with the existing MergeOperator use-cases and unit-tests (counters, string-append, and redis-lists) -I will be "desk-checking" and walking through the code with the help gdb. -I will find a way of stress-testing the new interface / implementation using db_bench, db_test, merge_test, and/or db_stress. -I will ensure that my tests cover all cases: Get-Memtable, Get-Immutable-Memtable, Get-from-Disk, Iterator-Range-Scan, Flush-Memtable-to-L0, Compaction-L0-L1, Compaction-Ln-L(n+1), Put/Delete found, Put/Delete not-found, end-of-history, end-of-file, etc. -A lot of feedback from the reviewers. Reviewers: haobo, dhruba, zshao, emayanke Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D11499 --- db/builder.cc | 101 +++--- db/db_impl.cc | 175 +++++++--- db/db_impl_readonly.cc | 5 +- db/db_impl_readonly.h | 3 + db/db_iter.cc | 49 ++- db/memtable.cc | 65 ++-- db/memtable.h | 7 +- db/memtablelist.cc | 6 +- db/memtablelist.h | 3 +- db/merge_helper.cc | 163 ++++++--- db/merge_helper.h | 51 ++- db/merge_operator.cc | 51 +++ db/version_set.cc | 315 +++++++++++++----- db/version_set.h | 16 +- include/leveldb/merge_operator.h | 120 +++++-- table/table.cc | 2 + utilities/merge_operators/put.cc | 27 +- .../string_append/stringappend.cc | 5 +- .../string_append/stringappend.h | 6 +- .../string_append/stringappend2.cc | 90 +++++ .../string_append/stringappend2.h | 42 +++ .../string_append/stringappend_test.cc | 5 +- utilities/merge_operators/uint64add.cc | 56 ++-- utilities/ttl/db_ttl.h | 75 ++++- 24 files changed, 1076 insertions(+), 362 deletions(-) create mode 100644 db/merge_operator.cc create mode 100644 utilities/merge_operators/string_append/stringappend2.cc create mode 100644 utilities/merge_operators/string_append/stringappend2.h diff --git a/db/builder.cc b/db/builder.cc index 2b7c59283..76c087346 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -48,81 +48,92 @@ Status BuildTable(const std::string& dbname, TableBuilder* builder = new TableBuilder(options, file.get(), 0); // the first key is the smallest key - Slice key = iter->key(); - meta->smallest.DecodeFrom(key); + meta->smallest.DecodeFrom(iter->key()); MergeHelper merge(user_comparator, options.merge_operator, options.info_log.get(), true /* internal key corruption is not ok */); if (purge) { - ParsedInternalKey ikey; // Ugly walkaround to avoid compiler error for release build - // TODO: find a clean way to treat in memory key corruption - ikey.type = kTypeValue; + bool ok __attribute__((unused)) = true; + + // Will write to builder if current key != prev key ParsedInternalKey prev_ikey; - std::string prev_value; std::string prev_key; - - // Ugly walkaround to avoid compiler error for release build - // TODO: find a clean way to treat in memory key corruption - auto ok __attribute__((unused)) = ParseInternalKey(key, &ikey); - // in-memory key corruption is not ok; - assert(ok); - - if (ikey.type == kTypeMerge) { - // merge values if the first entry is of merge type - merge.MergeUntil(iter, 0 /* don't worry about snapshot */); - prev_key.assign(merge.key().data(), merge.key().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - prev_value.assign(merge.value().data(), merge.value().size()); - } else { - // store first key-value - prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - assert(prev_ikey.sequence >= earliest_seqno_in_memtable); - iter->Next(); - } + bool is_first_key = true; // Also write if this is the very first key while (iter->Valid()) { bool iterator_at_next = false; + + // Get current key ParsedInternalKey this_ikey; Slice key = iter->key(); + Slice value = iter->value(); + + // In-memory key corruption is not ok; + // TODO: find a clean way to treat in memory key corruption ok = ParseInternalKey(key, &this_ikey); assert(ok); assert(this_ikey.sequence >= earliest_seqno_in_memtable); - if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) { - // This key is different from previous key. - // Output prev key and remember current key - builder->Add(Slice(prev_key), Slice(prev_value)); + // If the key is the same as the previous key (and it is not the + // first key), then we skip it, since it is an older version. + // Otherwise we output the key and mark it as the "new" previous key. + if (!is_first_key && !user_comparator->Compare(prev_ikey.user_key, + this_ikey.user_key)) { + // seqno within the same key are in decreasing order + assert(this_ikey.sequence < prev_ikey.sequence); + } else { + is_first_key = false; + if (this_ikey.type == kTypeMerge) { + // Handle merge-type keys using the MergeHelper merge.MergeUntil(iter, 0 /* don't worry about snapshot */); iterator_at_next = true; - prev_key.assign(merge.key().data(), merge.key().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - prev_value.assign(merge.value().data(), merge.value().size()); + if (merge.IsSuccess()) { + // Merge completed correctly. + // Add the resulting merge key/value and continue to next + builder->Add(merge.key(), merge.value()); + prev_key.assign(merge.key().data(), merge.key().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + } else { + // Merge did not find a Put/Delete. + // Can not compact these merges into a kValueType. + // Write them out one-by-one. (Proceed back() to front()) + const std::deque& keys = merge.keys(); + const std::deque& values = merge.values(); + assert(keys.size() == values.size() && keys.size() >= 1); + std::deque::const_reverse_iterator key_iter; + std::deque::const_reverse_iterator value_iter; + for (key_iter=keys.rbegin(), value_iter = values.rbegin(); + key_iter != keys.rend() && value_iter != values.rend(); + ++key_iter, ++value_iter) { + + builder->Add(Slice(*key_iter), Slice(*value_iter)); + } + + // Sanity check. Both iterators should end at the same time + assert(key_iter == keys.rend() && value_iter == values.rend()); + + prev_key.assign(keys.front()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + } } else { + // Handle Put/Delete-type keys by simply writing them + builder->Add(key, value); prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); ok = ParseInternalKey(Slice(prev_key), &prev_ikey); assert(ok); } - } else { - // seqno within the same key are in decreasing order - assert(this_ikey.sequence < prev_ikey.sequence); - // This key is an earlier version of the same key in prev_key. - // Skip current key. } if (!iterator_at_next) iter->Next(); } - // output last key - builder->Add(Slice(prev_key), Slice(prev_value)); + + // The last key is the largest key meta->largest.DecodeFrom(Slice(prev_key)); } else { diff --git a/db/db_impl.cc b/db/db_impl.cc index 5b104cd2e..3fc387e17 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -228,7 +228,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) char name[100]; Status st = env_->GetHostName(name, 100L); - if(st.ok()) { + if (st.ok()) { host_name_ = name; } else { Log(options_.info_log, "Can't get hostname, use localhost as host name."); @@ -469,7 +469,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { } } else { Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]); - if(!st.ok()) { + if (!st.ok()) { Log(options_.info_log, "Delete type=%d #%lld FAILED\n", int(type), static_cast(number)); @@ -1110,7 +1110,7 @@ Status DBImpl::FindProbableWALFiles(std::vector* const allLogs, } } size_t startIndex = std::max(0l, end); // end could be -ve. - for( size_t i = startIndex; i < allLogs->size(); ++i) { + for (size_t i = startIndex; i < allLogs->size(); ++i) { result->push_back(allLogs->at(i)); } if (result->empty()) { @@ -1703,7 +1703,6 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( Status DBImpl::DoCompactionWork(CompactionState* compact) { int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(options_.info_log, "Compacting %d@%d + %d@%d files, score %.2f slots available %d", compact->compaction->num_input_files(0), @@ -1793,7 +1792,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Handle key/value, add to state, etc. bool drop = false; - bool current_entry_is_merged = false; + bool current_entry_is_merging = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys // TODO: error key stays in db forever? Figure out the intention/rationale @@ -1887,11 +1886,24 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // logic could also be nicely re-used for memtable flush purge // optimization in BuildTable. merge.MergeUntil(input.get(), prev_snapshot, bottommost_level); - current_entry_is_merged = true; - // get the merge result - key = merge.key(); - ParseInternalKey(key, &ikey); - value = merge.value(); + current_entry_is_merging = true; + if (merge.IsSuccess()) { + // Successfully found Put/Delete/(end-of-key-range) while merging + // Get the merge result + key = merge.key(); + ParseInternalKey(key, &ikey); + value = merge.value(); + } else { + // Did not find a Put/Delete/(end-of-key-range) while merging + // We now have some stack of merge operands to write out. + // NOTE: key,value, and ikey are now referring to old entries. + // These will be correctly set below. + assert(!merge.keys().empty()); + assert(merge.keys().size() == merge.values().size()); + + // Hack to make sure last_sequence_for_key is correct + ParseInternalKey(merge.keys().front(), &ikey); + } } last_sequence_for_key = ikey.sequence; @@ -1909,52 +1921,97 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { #endif if (!drop) { - - char* kptr = (char*)key.data(); - std::string kstr; - - // Zeroing out the sequence number leads to better compression. - // If this is the bottommost level (no files in lower levels) - // and the earliest snapshot is larger than this seqno - // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot && - ikey.type != kTypeMerge) { - assert(ikey.type != kTypeDeletion); - // make a copy because updating in place would cause problems - // with the priority queue that is managing the input key iterator - kstr.assign(key.data(), key.size()); - kptr = (char *)kstr.c_str(); - UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type); + // We may write a single key (e.g.: for Put/Delete or successful merge). + // Or we may instead have to write a sequence/list of keys. + // We have to write a sequence iff we have an unsuccessful merge + bool has_merge_list = current_entry_is_merging && !merge.IsSuccess(); + const std::deque* keys = nullptr; + const std::deque* values = nullptr; + std::deque::const_reverse_iterator key_iter; + std::deque::const_reverse_iterator value_iter; + if (has_merge_list) { + keys = &merge.keys(); + values = &merge.values(); + key_iter = keys->rbegin(); // The back (*rbegin()) is the first key + value_iter = values->rbegin(); + + key = Slice(*key_iter); + value = Slice(*value_iter); } - Slice newkey(kptr, key.size()); - assert((key.clear(), 1)); // we do not need 'key' anymore + // If we have a list of keys to write, traverse the list. + // If we have a single key to write, simply write that key. + while (true) { + // Invariant: key,value,ikey will always be the next entry to write + char* kptr = (char*)key.data(); + std::string kstr; + + // Zeroing out the sequence number leads to better compression. + // If this is the bottommost level (no files in lower levels) + // and the earliest snapshot is larger than this seqno + // then we can squash the seqno to zero. + if (bottommost_level && ikey.sequence < earliest_snapshot && + ikey.type != kTypeMerge) { + assert(ikey.type != kTypeDeletion); + // make a copy because updating in place would cause problems + // with the priority queue that is managing the input key iterator + kstr.assign(key.data(), key.size()); + kptr = (char *)kstr.c_str(); + UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type); + } + + Slice newkey(kptr, key.size()); + assert((key.clear(), 1)); // we do not need 'key' anymore - // Open output file if necessary - if (compact->builder == nullptr) { - status = OpenCompactionOutputFile(compact); - if (!status.ok()) { - break; + // Open output file if necessary + if (compact->builder == nullptr) { + status = OpenCompactionOutputFile(compact); + if (!status.ok()) { + break; + } + } + if (compact->builder->NumEntries() == 0) { + compact->current_output()->smallest.DecodeFrom(newkey); + } + compact->current_output()->largest.DecodeFrom(newkey); + compact->builder->Add(newkey, value); + + // Close output file if it is big enough + if (compact->builder->FileSize() >= + compact->compaction->MaxOutputFileSize()) { + status = FinishCompactionOutputFile(compact, input.get()); + if (!status.ok()) { + break; + } } - } - if (compact->builder->NumEntries() == 0) { - compact->current_output()->smallest.DecodeFrom(newkey); - } - compact->current_output()->largest.DecodeFrom(newkey); - compact->builder->Add(newkey, value); - // Close output file if it is big enough - if (compact->builder->FileSize() >= - compact->compaction->MaxOutputFileSize()) { - status = FinishCompactionOutputFile(compact, input.get()); - if (!status.ok()) { + // If we have a list of entries, move to next element + // If we only had one entry, then break the loop. + if (has_merge_list) { + ++key_iter; + ++value_iter; + + // If at end of list + if (key_iter == keys->rend() || value_iter == values->rend()) { + // Sanity Check: if one ends, then both end + assert(key_iter == keys->rend() && value_iter == values->rend()); + break; + } + + // Otherwise not at end of list. Update key, value, and ikey. + key = Slice(*key_iter); + value = Slice(*value_iter); + ParseInternalKey(key, &ikey); + + } else{ + // Only had one item to begin with (Put/Delete) break; } } } // MergeUntil has moved input to the next entry - if (!current_entry_is_merged) { + if (!current_entry_is_merging) { input->Next(); } } @@ -2120,21 +2177,25 @@ Status DBImpl::GetImpl(const ReadOptions& options, current->Ref(); // Unlock while reading from files and memtables - mutex_.Unlock(); bool have_stat_update = false; Version::GetStats stats; + + // Prepare to store a list of merge operations if merge occurs. + std::deque merge_operands; + // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. - // value will contain the current merge operand in the latter case. + // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, options_)) { + if (mem->Get(lkey, value, &s, &merge_operands, options_)) { // Done - } else if (imm.Get(lkey, value, &s, options_)) { + } else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { // Done } else { - current->Get(options, lkey, value, &s, &stats, options_, no_io,value_found); + current->Get(options, lkey, value, &s, &merge_operands, &stats, + options_, value_found); have_stat_update = true; } mutex_.Lock(); @@ -2177,6 +2238,9 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, bool have_stat_update = false; Version::GetStats stats; + // Prepare to store a list of merge operations if merge occurs. + std::deque merge_operands; + // Note: this always resizes the values array int numKeys = keys.size(); std::vector statList(numKeys); @@ -2188,18 +2252,19 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, // For each of the given keys, apply the entire "get" process as follows: // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. - // value will contain the current merge operand in the latter case. - for(int i=0; iGet(lkey, value, &s, options_)) { + if (mem->Get(lkey, value, &s, &merge_operands, options_)) { // Done - } else if (imm.Get(lkey, value, &s, options_)) { + } else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { // Done } else { - current->Get(options, lkey, value, &s, &stats, options_); + current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); have_stat_update = true; } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index d4170d30f..834161860 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -53,11 +53,12 @@ Status DBImplReadOnly::Get(const ReadOptions& options, MemTable* mem = GetMemTable(); Version* current = versions_->current(); SequenceNumber snapshot = versions_->LastSequence(); + std::deque merge_operands; LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, options_)) { + if (mem->Get(lkey, value, &s, &merge_operands, options_)) { } else { Version::GetStats stats; - current->Get(options, lkey, value, &s, &stats, options_); + current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); } return s; } diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 6199b5e7b..e04495e58 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -32,6 +32,9 @@ public: virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value); + + // TODO: Implement ReadOnly MultiGet? + virtual Iterator* NewIterator(const ReadOptions&); virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) { diff --git a/db/db_iter.cc b/db/db_iter.cc index a8eb727e2..0d6a2c846 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -4,6 +4,7 @@ #include "db/db_iter.h" #include +#include #include "db/filename.h" #include "db/dbformat.h" @@ -231,10 +232,13 @@ void DBIter::FindNextUserEntry(bool skipping) { // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) void DBIter::MergeValuesNewToOld() { + // TODO: Is there a way to unite with MergeHelper or other similar code? - const Slice value = iter_->value(); - std::string operand(value.data(), value.size()); + // Start the merge process by pushing the first operand + std::deque operands; + operands.push_front(iter_->value().ToString()); + std::string merge_result; // Temporary string to hold merge result later ParsedInternalKey ikey; for (iter_->Next(); iter_->Valid(); iter_->Next()) { if (!ParseKey(&ikey)) { @@ -255,10 +259,11 @@ void DBIter::MergeValuesNewToOld() { } if (kTypeValue == ikey.type) { - // hit a put, merge the put value with operand and store it in the - // final result saved_value_. We are done! + // hit a put, merge the put value with operands and store the + // final result in saved_value_. We are done! + // ignore corruption if there is any. const Slice value = iter_->value(); - user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), + user_merge_operator_->Merge(ikey.user_key, &value, operands, &saved_value_, logger_.get()); // iter_ is positioned after put iter_->Next(); @@ -266,29 +271,41 @@ void DBIter::MergeValuesNewToOld() { } if (kTypeMerge == ikey.type) { - // hit a merge, merge the value with operand and continue. - // saved_value_ is used as a scratch area. The result is put - // back in operand - const Slice value = iter_->value(); - user_merge_operator_->Merge(ikey.user_key, &value, operand, - &saved_value_, logger_.get()); - swap(saved_value_, operand); + // hit a merge, add the value as an operand and run associative merge. + // when complete, add result to operands and continue. + const Slice& value = iter_->value(); + operands.push_front(value.ToString()); + while(operands.size() >= 2) { + // Call user associative-merge until it returns false + if (user_merge_operator_->PartialMerge(ikey.user_key, + Slice(operands[0]), + Slice(operands[1]), + &merge_result, + logger_.get())) { + operands.pop_front(); + swap(operands.front(), merge_result); + } else { + // Associative merge returns false ==> stack the operands + break; + } + } + } } // we either exhausted all internal keys under this user key, or hit // a deletion marker. - // feed null as the existing value to the merge opexrator, such that + // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. - user_merge_operator_->Merge(ikey.user_key, nullptr, operand, + user_merge_operator_->Merge(ikey.user_key, nullptr, operands, &saved_value_, logger_.get()); } void DBIter::Prev() { assert(valid_); - // TODO: support backward iteration // Throw an exception now if merge_operator is provided + // TODO: support backward iteration if (user_merge_operator_) { Log(logger_, "Prev not supported yet if merge_operator is provided"); throw std::logic_error("DBIter::Prev backward iteration not supported" @@ -387,8 +404,8 @@ void DBIter::SeekToFirst() { } void DBIter::SeekToLast() { + // Throw an exception for now if merge_operator is provided // TODO: support backward iteration - // throw an exception for now if merge_operator is provided if (user_merge_operator_) { Log(logger_, "SeekToLast not supported yet if merge_operator is provided"); throw std::logic_error("DBIter::SeekToLast: backward iteration not" diff --git a/db/memtable.cc b/db/memtable.cc index 622058804..a9a684967 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -130,20 +130,19 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options) { + std::deque* operands, const Options& options) { Slice memkey = key.memtable_key(); std::shared_ptr iter(table_.get()->GetIterator()); iter->Seek(memkey.data()); - bool merge_in_progress = false; - std::string operand; - if (s->IsMergeInProgress()) { - swap(*value, operand); - merge_in_progress = true; - } + // It is the caller's responsibility to allocate/delete operands list + assert(operands != nullptr); + bool merge_in_progress = s->IsMergeInProgress(); auto merge_operator = options.merge_operator; auto logger = options.info_log; + std::string merge_result; + for (; iter->Valid(); iter->Next()) { // entry format is: // klength varint32 @@ -165,36 +164,51 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *s = Status::OK(); if (merge_in_progress) { - merge_operator->Merge(key.user_key(), &v, operand, - value, logger.get()); + assert(merge_operator); + if (!merge_operator->Merge(key.user_key(), &v, *operands, + value, logger.get())) { + *s = Status::Corruption("Error: Could not perform merge."); + } } else { value->assign(v.data(), v.size()); } return true; } - case kTypeMerge: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - if (merge_in_progress) { - merge_operator->Merge(key.user_key(), &v, operand, - value, logger.get()); - swap(*value, operand); - } else { - assert(merge_operator); - merge_in_progress = true; - operand.assign(v.data(), v.size()); - } - break; - } case kTypeDeletion: { if (merge_in_progress) { - merge_operator->Merge(key.user_key(), nullptr, operand, - value, logger.get()); + assert(merge_operator); + *s = Status::OK(); + if (!merge_operator->Merge(key.user_key(), nullptr, *operands, + value, logger.get())) { + *s = Status::Corruption("Error: Could not perform merge."); + } } else { *s = Status::NotFound(Slice()); } return true; } + case kTypeMerge: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + merge_in_progress = true; + operands->push_front(v.ToString()); + while(operands->size() >= 2) { + // Attempt to associative merge. (Returns true if successful) + if (merge_operator->PartialMerge(key.user_key(), + Slice((*operands)[0]), + Slice((*operands)[1]), + &merge_result, + logger.get())) { + operands->pop_front(); + swap(operands->front(), merge_result); + } else { + // Stack them because user can't associative merge + break; + } + } + break; + } } } else { // exit loop if user key does not match @@ -202,8 +216,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } } + // No change to value, since we have not yet found a Put/Delete + if (merge_in_progress) { - swap(*value, operand); *s = Status::MergeInProgress(""); } return false; diff --git a/db/memtable.h b/db/memtable.h index 73d32fc4c..fe8cec86b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -7,6 +7,7 @@ #include #include +#include #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" @@ -74,11 +75,11 @@ class MemTable { // in *status and return true. // If memtable contains Merge operation as the most recent entry for a key, // and the merge process does not stop (not reaching a value or delete), - // store the current merged result in value and MergeInProgress in s. - // return false + // prepend the current merge operand to *operands. + // store MergeInProgress in s, and return false. // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options); + std::deque* operands, const Options& options); // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 9d8b675bf..6f66faab8 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -193,11 +193,13 @@ size_t MemTableList::ApproximateMemoryUsage() { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. +// Operands stores the list of merge operations to apply, so far. bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options) { + std::deque* operands, + const Options& options) { for (list::iterator it = memlist_.begin(); it != memlist_.end(); ++it) { - if ((*it)->Get(key, value, s, options)) { + if ((*it)->Get(key, value, s, operands, options)) { return true; } } diff --git a/db/memtablelist.h b/db/memtablelist.h index b30089cf6..05a60d248 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -5,6 +5,7 @@ #include #include +#include #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" @@ -70,7 +71,7 @@ class MemTableList { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options); + std::deque* operands, const Options& options); // Returns the list of underlying memtables. void GetMemTables(std::vector* list); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 3520db15a..4f363fec3 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -10,23 +10,36 @@ namespace leveldb { // PRE: iter points to the first merge type entry // POST: iter points to the first entry beyond the merge process (or the end) -// key_, value_ are updated to reflect the merge result +// keys_, operands_ are updated to reflect the merge result. +// keys_ stores the list of keys encountered while merging. +// operands_ stores the list of merge operands encountered while merging. +// keys_[i] corresponds to operands_[i] for each i. void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, bool at_bottom) { - // get a copy of the internal key, before it's invalidated by iter->Next() - key_.assign(iter->key().data(), iter->key().size()); - // we need to parse the internal key again as the parsed key is + // Get a copy of the internal key, before it's invalidated by iter->Next() + // Also maintain the list of merge operands seen. + keys_.clear(); + operands_.clear(); + keys_.push_front(iter->key().ToString()); + operands_.push_front(iter->value().ToString()); + + success_ = false; // Will become true if we hit Put/Delete or bottom + + // We need to parse the internal key again as the parsed key is // backed by the internal key! - ParsedInternalKey orig_ikey; // Assume no internal key corruption as it has been successfully parsed // by the caller. - // TODO: determine a good alternative of assert (exception?) - ParseInternalKey(key_, &orig_ikey); - std::string operand(iter->value().data(), iter->value().size()); + // Invariant: keys_.back() will not change. Hence, orig_ikey is always valid. + ParsedInternalKey orig_ikey; + ParseInternalKey(keys_.back(), &orig_ikey); bool hit_the_next_user_key = false; ParsedInternalKey ikey; + std::string merge_result; // Temporary value for merge results for (iter->Next(); iter->Valid(); iter->Next()) { + assert(operands_.size() >= 1); // Should be invariants! + assert(keys_.size() == operands_.size()); + if (!ParseInternalKey(iter->key(), &ikey)) { // stop at corrupted key if (assert_valid_internal_key_) { @@ -46,32 +59,54 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, break; } + // At this point we are guaranteed that we need to process this key. + if (kTypeDeletion == ikey.type) { // hit a delete - // => merge nullptr with operand - // => change the entry type to kTypeValue - // We are done! - user_merge_operator_->Merge(ikey.user_key, nullptr, operand, - &value_, logger_); - orig_ikey.type = kTypeValue; - UpdateInternalKey(&key_[0], key_.size(), - orig_ikey.sequence, orig_ikey.type); - // move iter to the next entry + // => merge nullptr with operands_ + // => store result in operands_.back() (and update keys_.back()) + // => change the entry type to kTypeValue for keys_.back() + // We are done! Return a success if the merge passes. + success_ = user_merge_operator_->Merge(ikey.user_key, nullptr, + operands_, &merge_result, + logger_); + + // We store the result in keys_.back() and operands_.back() + // if nothing went wrong (i.e.: no operand corruption on disk) + if (success_) { + std::string& key = keys_.back(); // The original key encountered + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key[0], key.size(), + orig_ikey.sequence, orig_ikey.type); + swap(operands_.back(), merge_result); + } + + // move iter to the next entry (before doing anything else) iter->Next(); return; } if (kTypeValue == ikey.type) { // hit a put - // => merge the put value with operand - // => change the entry type to kTypeValue - // We are done! + // => merge the put value with operands_ + // => store result in operands_.back() (and update keys_.back()) + // => change the entry type to kTypeValue for keys_.back() + // We are done! Success! const Slice value = iter->value(); - user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), - &value_, logger_); - orig_ikey.type = kTypeValue; - UpdateInternalKey(&key_[0], key_.size(), - orig_ikey.sequence, orig_ikey.type); + success_ = user_merge_operator_->Merge(ikey.user_key, &value, + operands_, &merge_result, + logger_); + + // We store the result in keys_.back() and operands_.back() + // if nothing went wrong (i.e.: no operand corruption on disk) + if (success_) { + std::string& key = keys_.back(); // The original key encountered + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key[0], key.size(), + orig_ikey.sequence, orig_ikey.type); + swap(operands_.back(), merge_result); + } + // move iter to the next entry iter->Next(); return; @@ -79,35 +114,73 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, if (kTypeMerge == ikey.type) { // hit a merge - // => merge the value with operand. - // => put the result back to operand and continue - const Slice value = iter->value(); - user_merge_operator_->Merge(ikey.user_key, &value, operand, - &value_, logger_); - swap(value_, operand); + // => merge the operand into the front of the operands_ list + // => use the user's associative merge function to determine how. + // => then continue because we haven't yet seen a Put/Delete. + assert(!operands_.empty()); // Should have at least one element in it + + keys_.push_front(iter->key().ToString()); + operands_.push_front(iter->value().ToString()); + while (operands_.size() >= 2) { + // Returns false when the merge_operator can no longer process it + if (user_merge_operator_->PartialMerge(ikey.user_key, + Slice(operands_[0]), + Slice(operands_[1]), + &merge_result, + logger_)) { + // Merging of operands (associative merge) was successful. + // Replace these frontmost two operands with the merge result + keys_.pop_front(); + operands_.pop_front(); + swap(operands_.front(), merge_result); + } else { + // Merging of operands (associative merge) returned false. + // The user merge_operator does not know how to merge these operands. + // So we just stack them up until we find a Put/Delete or end of key. + break; + } + } continue; } } - // We have seen the root history of this key if we are at the - // bottem level and exhausted all internal keys of this user key + // We are sure we have seen this key's entire history if we are at the + // last level and exhausted all internal keys of this user key. // NOTE: !iter->Valid() does not necessarily mean we hit the // beginning of a user key, as versions of a user key might be - // split into multiple files and some files might not be included - // in the merge. - bool seen_the_beginning = hit_the_next_user_key && at_bottom; - - if (seen_the_beginning) { + // split into multiple files (even files on the same level) + // and some files might not be included in the compaction/merge. + // + // There are also cases where we have seen the root of history of this + // key without being sure of it. Then, we simply miss the opportunity + // to combine the keys. Since VersionSet::SetupOtherInputs() always makes + // sure that all merge-operands on the same level get compacted together, + // this will simply lead to these merge operands moving to the next level. + // + // So, we only perform the following logic (to merge all operands together + // without a Put/Delete) if we are certain that we have seen the end of key. + bool surely_seen_the_beginning = hit_the_next_user_key && at_bottom; + if (surely_seen_the_beginning) { // do a final merge with nullptr as the existing value and say // bye to the merge type (it's now converted to a Put) assert(kTypeMerge == orig_ikey.type); - user_merge_operator_->Merge(orig_ikey.user_key, nullptr, operand, - &value_, logger_); - orig_ikey.type = kTypeValue; - UpdateInternalKey(&key_[0], key_.size(), - orig_ikey.sequence, orig_ikey.type); - } else { - swap(value_, operand); + assert(operands_.size() >= 1); + assert(operands_.size() == keys_.size()); + success_ = user_merge_operator_->Merge(ikey.user_key, nullptr, + operands_, &merge_result, + logger_); + + if (success_) { + std::string& key = keys_.back(); // The original key encountered + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key[0], key.size(), + orig_ikey.sequence, orig_ikey.type); + + // The final value() is always stored in operands_.back() + swap(operands_.back(),merge_result); + } else { + // Do nothing if not success_. Leave keys() and operands() as they are. + } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 206d7a53a..9cad8ad04 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -4,6 +4,7 @@ #include "db/dbformat.h" #include "leveldb/slice.h" #include +#include namespace leveldb { @@ -21,7 +22,10 @@ class MergeHelper { : user_comparator_(user_comparator), user_merge_operator_(user_merge_operator), logger_(logger), - assert_valid_internal_key_(assert_valid_internal_key) {} + assert_valid_internal_key_(assert_valid_internal_key), + keys_(), + operands_(), + success_(false) {} // Merge entries until we hit // - a corrupted key @@ -39,12 +43,40 @@ class MergeHelper { bool at_bottom = false); // Query the merge result - // These are valid until the next MergeUtil call - // IMPORTANT: the key type could change after the MergeUntil call. - // Put/Delete + Merge + ... + Merge => Put - // Merge + ... + Merge => Merge - Slice key() { return Slice(key_); } - Slice value() { return Slice(value_); } + // These are valid until the next MergeUntil call + // If the merging was successful: + // - IsSuccess() will be true + // - key() will have the latest sequence number of the merges. + // The type will be Put or Merge. See IMPORTANT 1 note, below. + // - value() will be the result of merging all the operands together + // - The user should ignore keys() and values(). + // + // IMPORTANT 1: the key type could change after the MergeUntil call. + // Put/Delete + Merge + ... + Merge => Put + // Merge + ... + Merge => Merge + // + // If the merge operator is not associative, and if a Put/Delete is not found + // then the merging will be unsuccessful. In this case: + // - IsSuccess() will be false + // - keys() contains the list of internal keys seen in order of iteration. + // - values() contains the list of values (merges) seen in the same order. + // values() is parallel to keys() so that the first entry in + // keys() is the key associated with the first entry in values() + // and so on. These lists will be the same length. + // All of these pairs will be merges over the same user key. + // See IMPORTANT 2 note below. + // - The user should ignore key() and value(). + // + // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. + // So keys().back() was the first key seen by iterator. + // TODO: Re-style this comment to be like the first one + bool IsSuccess() { return success_; } + Slice key() { assert(success_); return Slice(keys_.back()); } + Slice value() { assert(success_); return Slice(operands_.back()); } + const std::deque& keys() { assert(!success_); return keys_; } + const std::deque& values() { + assert(!success_); return operands_; + } private: const Comparator* user_comparator_; @@ -55,8 +87,9 @@ class MergeHelper { // the scratch area that holds the result of MergeUntil // valid up to the next MergeUntil call - std::string key_; - std::string value_; + std::deque keys_; // Keeps track of the sequence of keys seen + std::deque operands_; // Parallel with keys_; stores the values + bool success_; }; } // namespace leveldb diff --git a/db/merge_operator.cc b/db/merge_operator.cc new file mode 100644 index 000000000..89c64b5d9 --- /dev/null +++ b/db/merge_operator.cc @@ -0,0 +1,51 @@ +/** + * Back-end implementation details specific to the Merge Operator. + * + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "leveldb/merge_operator.h" + +namespace leveldb { + +// Given a "real" merge from the library, call the user's +// associative merge function one-by-one on each of the operands. +// NOTE: It is assumed that the client's merge-operator will handle any errors. +bool AssociativeMergeOperator::Merge( + const Slice& key, + const Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const { + + // Simply loop through the operands + Slice temp_existing; + std::string temp_value; + for (const auto& operand : operand_list) { + Slice value(operand); + if (!Merge(key, existing_value, value, &temp_value, logger)) { + return false; + } + swap(temp_value, *new_value); + temp_existing = Slice(*new_value); + existing_value = &temp_existing; + } + + // The result will be in *new_value. All merges succeeded. + return true; +} + +// Call the user defined simple merge on the operands; +// NOTE: It is assumed that the client's merge-operator will handle any errors. +bool AssociativeMergeOperator::PartialMerge( + const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const { + + return Merge(key, &left_operand, right_operand, new_value, logger); +} + +} // namespace leveldb diff --git a/db/version_set.cc b/db/version_set.cc index 48588c712..9ea9ad809 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -232,7 +232,7 @@ enum SaverState { kFound, kDeleted, kCorrupt, - kMerge // value contains the current merge result (the operand) + kMerge // saver contains the current merge result (the operands) }; struct Saver { SaverState state; @@ -241,6 +241,7 @@ struct Saver { bool* value_found; // Is value set correctly? Used by KeyMayExist std::string* value; const MergeOperator* merge_operator; + std::deque* merge_operands; // the merge operations encountered Logger* logger; bool didIO; // did we do any disk io? }; @@ -261,6 +262,11 @@ static void MarkKeyMayExist(void* arg) { static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ Saver* s = reinterpret_cast(arg); + std::deque* const ops = s->merge_operands; // shorter alias + std::string merge_result; // temporary area for merge results later + + assert(s != nullptr && ops != nullptr); + ParsedInternalKey parsed_key; // TODO: didIO and Merge? s->didIO = didIO; @@ -269,49 +275,57 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ s->state = kCorrupt; } else { if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { + // Key matches. Process it switch (parsed_key.type) { case kTypeValue: if (kNotFound == s->state) { + s->state = kFound; s->value->assign(v.data(), v.size()); } else if (kMerge == s->state) { - std::string operand; - swap(operand, *s->value); - s->merge_operator->Merge(s->user_key, &v, operand, - s->value, s->logger); + assert(s->merge_operator != nullptr); + s->state = kFound; + if (!s->merge_operator->Merge(s->user_key, &v, *ops, + s->value, s->logger)) { + s->state = kCorrupt; + } } else { assert(false); } - s->state = kFound; return false; - case kTypeMerge: - if (kNotFound == s->state) { - s->state = kMerge; - s->value->assign(v.data(), v.size()); - } else if (kMerge == s->state) { - std::string operand; - swap(operand, *s->value); - s->merge_operator->Merge(s->user_key, &v, operand, - s->value, s->logger); - } else { - assert(false); - } - return true; - case kTypeDeletion: if (kNotFound == s->state) { s->state = kDeleted; } else if (kMerge == s->state) { - std::string operand; - swap(operand, *s->value); - s->merge_operator->Merge(s->user_key, nullptr, operand, - s->value, s->logger); s->state = kFound; + if (!s->merge_operator->Merge(s->user_key, nullptr, *ops, + s->value, s->logger)) { + s->state = kCorrupt; + } } else { assert(false); } return false; + case kTypeMerge: + assert(s->state == kNotFound || s->state == kMerge); + s->state = kMerge; + ops->push_front(v.ToString()); + while (ops->size() >= 2) { + // Attempt to merge operands together via user associateive merge + if (s->merge_operator->PartialMerge(s->user_key, + Slice((*ops)[0]), + Slice((*ops)[1]), + &merge_result, + s->logger)) { + ops->pop_front(); + swap(ops->front(), merge_result); + } else { + // Associative merge returns false ==> stack the operands + break; + } + } + return true; } } } @@ -341,7 +355,8 @@ Version::Version(VersionSet* vset, uint64_t version_number) void Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, - Status *status, + Status* status, + std::deque* operands, GetStats* stats, const Options& db_options, const bool no_io, @@ -364,6 +379,7 @@ void Version::Get(const ReadOptions& options, saver.value_found = value_found; saver.value = value; saver.merge_operator = merge_operator; + saver.merge_operands = operands; saver.logger = logger.get(); saver.didIO = false; @@ -374,55 +390,68 @@ void Version::Get(const ReadOptions& options, // We can search level-by-level since entries never hop across // levels. Therefore we are guaranteed that if we find data - // in an smaller level, later levels are irrelevant. - std::vector tmp; - FileMetaData* tmp2; + // in an smaller level, later levels are irrelevant (unless we + // are MergeInProgress). + std::vector important_files; for (int level = 0; level < vset_->NumberLevels(); level++) { size_t num_files = files_[level].size(); if (num_files == 0) continue; // Get the list of files to search in this level FileMetaData* const* files = &files_[level][0]; + important_files.clear(); + important_files.reserve(num_files); + + // Some files may overlap each other. We find + // all files that overlap user_key and process them in order from + // newest to oldest. In the context of merge-operator, + // this can occur at any level. Otherwise, it only occurs + // at Level-0 (since Put/Deletes are always compacted into a single entry). + uint32_t start_index; if (level == 0) { - // Level-0 files may overlap each other. Find all files that - // overlap user_key and process them in order from newest to oldest. - tmp.reserve(num_files); - for (uint32_t i = 0; i < num_files; i++) { - FileMetaData* f = files[i]; - if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && - ucmp->Compare(user_key, f->largest.user_key()) <= 0) { - tmp.push_back(f); - } - } - if (tmp.empty()) continue; - - std::sort(tmp.begin(), tmp.end(), NewestFirst); - files = &tmp[0]; - num_files = tmp.size(); + // On Level-0, we read through all files to check for overlap. + start_index = 0; } else { + // On Level-n (n>=1), files are sorted. // Binary search to find earliest index whose largest key >= ikey. - uint32_t index = FindFile(vset_->icmp_, files_[level], ikey); - if (index >= num_files) { - files = nullptr; - num_files = 0; - } else { - tmp2 = files[index]; - if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) { - // All of "tmp2" is past any data for user_key - files = nullptr; - num_files = 0; - } else { - files = &tmp2; - num_files = 1; - // TODO, is level 1-n files all disjoint in user key space? - } + // We will also stop when the file no longer overlaps ikey + start_index = FindFile(vset_->icmp_, files_[level], ikey); + } + + // Traverse the list, finding all overlapping files. + for (uint32_t i = start_index; i < num_files; i++) { + FileMetaData* f = files[i]; + if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && + ucmp->Compare(user_key, f->largest.user_key()) <= 0) { + important_files.push_back(f); + } else if (level > 0) { + // If on Level-n (n>=1) then the files are sorted. + // So we can stop looking when we are past the ikey. + break; } } + if (important_files.empty()) continue; + if (level == 0) { + std::sort(important_files.begin(), important_files.end(), NewestFirst); + } else { + // Sanity check to make sure that the files are correctly sorted +#ifndef NDEBUG + num_files = important_files.size(); + for (uint32_t i = 1; i < num_files; ++i) { + FileMetaData* a = important_files[i-1]; + FileMetaData* b = important_files[i]; + int comp_sign = vset_->icmp_.Compare(a->largest, b->smallest); + assert(comp_sign < 0); + } +#endif + } + // Traverse each relevant file to find the desired key + num_files = important_files.size(); for (uint32_t i = 0; i < num_files; ++i) { - FileMetaData* f = files[i]; + FileMetaData* f = important_files[i]; bool tableIO = false; *status = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue, &tableIO, @@ -469,17 +498,17 @@ void Version::Get(const ReadOptions& options, if (kMerge == saver.state) { - // merge operand is in *value and we hit the beginning of the key history - // do a final merge of nullptr and operand; - std::string operand; - swap(operand, *value); - merge_operator->Merge(user_key, nullptr, operand, - value, logger.get()); - *status = Status::OK(); - return; + // merge_operands are in saver and we hit the beginning of the key history + // do a final merge of nullptr and operands; + if (merge_operator->Merge(user_key, nullptr, *saver.merge_operands, + value, logger.get())) { + *status = Status::OK(); + } else { + *status = Status::Corruption("could not perform end-of-key merge for ", + user_key); + } } else { *status = Status::NotFound(Slice()); // Use an empty error message for speed - return; } } @@ -717,6 +746,53 @@ void Version::ExtendOverlappingInputs( } } +// Returns true iff the first or last file in inputs contains +// an overlapping user key to the file "just outside" of it (i.e. +// just after the last file, or just before the first file) +// REQUIRES: "*inputs" is a sorted list of non-overlapping files +bool Version::HasOverlappingUserKey( + const std::vector* inputs, + int level) { + + // If inputs empty, there is no overlap. + // If level == 0, it is assumed that all needed files were already included. + if (inputs->empty() || level == 0){ + return false; + } + + const Comparator* user_cmp = vset_->icmp_.user_comparator(); + const std::vector& files = files_[level]; + const size_t kNumFiles = files.size(); + + // Check the last file in inputs against the file after it + size_t last_file = FindFile(vset_->icmp_, files, + inputs->back()->largest.Encode()); + assert(0 <= last_file && last_file < kNumFiles); // File should exist! + if (last_file < kNumFiles-1) { // If not the last file + const Slice last_key_in_input = files[last_file]->largest.user_key(); + const Slice first_key_after = files[last_file+1]->smallest.user_key(); + if (user_cmp->Compare(last_key_in_input, first_key_after) == 0) { + // The last user key in input overlaps with the next file's first key + return true; + } + } + + // Check the first file in inputs against the file just before it + size_t first_file = FindFile(vset_->icmp_, files, + inputs->front()->smallest.Encode()); + assert(0 <= first_file && first_file <= last_file); // File should exist! + if (first_file > 0) { // If not first file + const Slice& first_key_in_input = files[first_file]->smallest.user_key(); + const Slice& last_key_before = files[first_file-1]->largest.user_key(); + if (user_cmp->Compare(first_key_in_input, last_key_before) == 0) { + // The first user key in input overlaps with the previous file's last key + return true; + } + } + + return false; +} + std::string Version::DebugString(bool hex) const { std::string r; for (int level = 0; level < vset_->NumberLevels(); level++) { @@ -1566,7 +1642,7 @@ void VersionSet::Finalize(Version* v, // sort all the levels based on their score. Higher scores get listed // first. Use bubble sort because the number of entries are small. - for(int i = 0; i < NumberLevels()-2; i++) { + for (int i = 0; i < NumberLevels()-2; i++) { for (int j = i+1; j < NumberLevels()-1; j++) { if (v->compaction_score_[i] < v->compaction_score_[j]) { double score = v->compaction_score_[i]; @@ -2060,7 +2136,7 @@ Compaction* VersionSet::PickCompaction() { Compaction* c = nullptr; int level = -1; - // compute the compactions needed. It is better to do it here + // Compute the compactions needed. It is better to do it here // and also in LogAndApply(), otherwise the values could be stale. std::vector size_being_compacted(NumberLevels()-1); current_->vset_->SizeBeingCompacted(size_being_compacted); @@ -2076,6 +2152,7 @@ Compaction* VersionSet::PickCompaction() { level = current_->compaction_level_[i]; if ((current_->compaction_score_[i] >= 1)) { c = PickCompactionBySize(level, current_->compaction_score_[i]); + ExpandWhileOverlapping(c); if (c != nullptr) { break; } @@ -2092,13 +2169,14 @@ Compaction* VersionSet::PickCompaction() { // Only allow one level 0 compaction at a time. // Do not pick this file if its parents at level+1 are being compacted. if (level != 0 || compactions_in_progress_[0].empty()) { - if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, - &parent_index)) { + if (!ParentRangeInCompaction(&f->smallest, &f->largest, level, + &parent_index)) { c = new Compaction(level, MaxFileSizeForLevel(level+1), MaxGrandParentOverlapBytes(level), NumberLevels(), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; current_->file_to_compact_ = nullptr; + ExpandWhileOverlapping(c); } } } @@ -2110,7 +2188,6 @@ Compaction* VersionSet::PickCompaction() { c->input_version_ = current_; c->input_version_->Ref(); - // Files in level 0 may overlap each other, so pick up all overlapping ones // Two level 0 compaction won't run at the same time, so don't need to worry // about files on level 0 being compacted. if (level == 0) { @@ -2135,6 +2212,8 @@ Compaction* VersionSet::PickCompaction() { assert(!c->inputs_[0].empty()); } + + // Setup "level+1" files (inputs_[1]) SetupOtherInputs(c); // mark all the files that are being compacted @@ -2166,11 +2245,76 @@ bool VersionSet::FilesInCompaction(std::vector& files) { return false; } +// Add more files to the inputs on "level" to make sure that +// no newer version of a key is compacted to "level+1" while leaving an older +// version in a "level". Otherwise, any Get() will search "level" first, +// and will likely return an old/stale value for the key, since it always +// searches in increasing order of level to find the value. This could +// also scramble the order of merge operands. This function should be +// called any time a new Compaction is created, and its inputs_[0] are +// populated. +// +// Will set c to nullptr if it is impossible to apply this compaction. +void VersionSet::ExpandWhileOverlapping(Compaction* c) { + // If inputs are empty then there is nothing to expand. + if (!c || c->inputs_[0].empty()) { + return; + } + + // GetOverlappingInputs will always do the right thing for level-0. + // So we don't need to do any expansion if level == 0. + if (c->level() == 0) { + return; + } + + const int level = c->level(); + InternalKey smallest, largest; + + // Keep expanding c->inputs_[0] until we are sure that there is a + // "clean cut" boundary between the files in input and the surrounding files. + // This will ensure that no parts of a key are lost during compaction. + int hint_index = -1; + size_t old_size; + do { + old_size = c->inputs_[0].size(); + GetRange(c->inputs_[0], &smallest, &largest); + c->inputs_[0].clear(); + current_->GetOverlappingInputs(level, &smallest, &largest, &c->inputs_[0], + hint_index, &hint_index); + } while(c->inputs_[0].size() > old_size); + + // Get the new range + GetRange(c->inputs_[0], &smallest, &largest); + + // If, after the expansion, there are files that are already under + // compaction, then we must drop/cancel this compaction. + int parent_index = -1; + if (FilesInCompaction(c->inputs_[0]) || + ParentRangeInCompaction(&smallest, &largest, level, &parent_index)) { + c->inputs_[0].clear(); + c->inputs_[1].clear(); + delete c; + c = nullptr; + } +} + +// Populates the set of inputs from "level+1" that overlap with "level". +// Will also attempt to expand "level" if that doesn't expand "level+1" +// or cause "level" to include a file for compaction that has an overlapping +// user-key with another file. void VersionSet::SetupOtherInputs(Compaction* c) { + // If inputs are empty, then there is nothing to expand. + if (c->inputs_[0].empty()) { + return; + } + const int level = c->level(); InternalKey smallest, largest; + + // Get the range one last time. GetRange(c->inputs_[0], &smallest, &largest); + // Populate the set of next-level files (inputs_[1]) to include in compaction current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1], c->parent_index_, &c->parent_index_); @@ -2178,8 +2322,11 @@ void VersionSet::SetupOtherInputs(Compaction* c) { InternalKey all_start, all_limit; GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit); - // See if we can grow the number of inputs in "level" without - // changing the number of "level+1" files we pick up. + // See if we can further grow the number of inputs in "level" without + // changing the number of "level+1" files we pick up. We also choose NOT + // to expand if this would cause "level" to include some entries for some + // user key, while excluding other entries for the same user key. This + // can happen when one user key spans multiple files. if (!c->inputs_[1].empty()) { std::vector expanded0; current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, @@ -2190,7 +2337,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) { int64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && - !FilesInCompaction(expanded0)) { + !FilesInCompaction(expanded0) && + !current_->HasOverlappingUserKey(&expanded0, level)) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; @@ -2249,13 +2397,11 @@ Compaction* VersionSet::CompactRange( return nullptr; } - // Avoid compacting too much in one shot in case the range is large. // But we cannot do this for level-0 since level-0 files can overlap // and we must not pick one file and drop another older file if the // two files overlap. if (level > 0) { - const uint64_t limit = MaxFileSizeForLevel(level) * options_->source_compaction_factor; uint64_t total = 0; @@ -2268,12 +2414,18 @@ Compaction* VersionSet::CompactRange( } } } - Compaction* c = new Compaction(level, MaxFileSizeForLevel(level+1), MaxGrandParentOverlapBytes(level), NumberLevels()); + + c->inputs_[0] = inputs; + ExpandWhileOverlapping(c); + if (c == nullptr) { + Log(options_->info_log, "Could not compact due to expansion failure.\n"); + return nullptr; + } + c->input_version_ = current_; c->input_version_->Ref(); - c->inputs_[0] = inputs; SetupOtherInputs(c); // These files that are to be manaully compacted do not trample @@ -2418,8 +2570,9 @@ void Compaction::Summary(char* output, int len) { int write = snprintf(output, len, "Base version %ld Base level %d, seek compaction:%d, inputs:", input_version_->GetVersionNumber(), level_, seek_compaction_); - if(write < 0 || write > len) + if (write < 0 || write > len) { return; + } char level_low_summary[100]; InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary)); diff --git a/db/version_set.h b/db/version_set.h index 320ff8e68..a8403868b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -68,14 +68,16 @@ class Version { // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. + // Uses *operands to store merge_operator operations to apply later // REQUIRES: lock is not held struct GetStats { FileMetaData* seek_file; int seek_file_level; }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, - Status* status, GetStats* stats, const Options& db_option, - const bool no_io = false, bool* value_found = nullptr); + Status* status, std::deque* operands, GetStats* stats, + const Options& db_option, const bool no_io = false, + bool* value_found = nullptr); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. @@ -118,6 +120,14 @@ class Version { const Slice* smallest_user_key, const Slice* largest_user_key); + // Returns true iff the first or last file in inputs contains + // an overlapping user key to the file "just outside" of it (i.e. + // just after the last file, or just before the first file) + // REQUIRES: "*inputs" is a sorted list of non-overlapping files + bool HasOverlappingUserKey(const std::vector* inputs, + int level); + + // Return the level at which we should place a new memtable compaction // result that covers the range [smallest_user_key,largest_user_key]. int PickLevelForMemTableOutput(const Slice& smallest_user_key, @@ -405,6 +415,8 @@ class VersionSet { InternalKey* smallest, InternalKey* largest); + void ExpandWhileOverlapping(Compaction* c); + void SetupOtherInputs(Compaction* c); // Save current contents to *log diff --git a/include/leveldb/merge_operator.h b/include/leveldb/merge_operator.h index d948d3c6e..44c4db66b 100644 --- a/include/leveldb/merge_operator.h +++ b/include/leveldb/merge_operator.h @@ -6,26 +6,38 @@ #define STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ #include +#include +#include "leveldb/slice.h" // TODO: Remove this when migration is done; namespace leveldb { class Slice; class Logger; -// The Merge Operator interface. -// Client needs to provide an object implementing this interface if Merge -// operation is accessed. -// Essentially, MergeOperator specifies the SEMANTICS of a merge, which only +// The Merge Operator +// +// Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only // client knows. It could be numeric addition, list append, string -// concatenation, ... , anything. +// concatenation, edit data structure, ... , anything. // The library, on the other hand, is concerned with the exercise of this // interface, at the right time (during get, iteration, compaction...) -// Note that, even though in principle we don't require any special property -// of the merge operator, the current rocksdb compaction order does imply that -// an associative operator could be exercised more naturally (and more -// efficiently). // -// Refer to my_test.cc for an example of implementation +// To use merge, the client needs to provide an object implementing one of +// the following interfaces: +// a) AssociativeMergeOperator - for most simple semantics (always take +// two values, and merge them into one value, which is then put back +// into rocksdb); numeric addition and string concatenation are examples; +// +// b) MergeOperator - the generic class for all the more abstract / complex +// operations; one method to merge a Put/Delete value with a merge operand; +// and another method (PartialMerge) that merges two operands together. +// this is especially useful if your key values have a complex structure, +// but you would still like to support client-specific incremental updates. +// +// AssociativeMergeOperator is simpler to implement. MergeOperator is simply +// more powerful. +// +// Refer to rocksdb-merge wiki for more details and example implementations. // class MergeOperator { public: @@ -38,35 +50,95 @@ class MergeOperator { // refer to different types of data which have different // merge operation semantics // existing: (IN) null indicates that the key does not exist before this op - // value: (IN) The passed-in merge operand value (when Merge is issued) + // operand_list:(IN) the sequence of merge operations to apply, front() first. // new_value:(OUT) Client is responsible for filling the merge result here // logger: (IN) Client could use this to log errors during merge. // - // Note: Merge does not return anything to indicate if a merge is successful - // or not. - // Rationale: If a merge failed due to, say de-serialization error, we still - // need to define a consistent merge result. Should we throw away - // the existing value? the merge operand? Or reset the merged value - // to sth? The rocksdb library is not in a position to make the - // right choice. On the other hand, client knows exactly what - // happened during Merge, thus is able to make the best decision. - // Just save the final decision in new_value. logger is passed in, - // in case client wants to leave a trace of what went wrong. - virtual void Merge(const Slice& key, + // Return true on success. + // All values passed in will be client-specific values. So if this method + // returns false, it is because client specified bad data or there was + // internal corruption. This will be treated as an error by the library. + // + // Also make use of the *logger for error messages. + virtual bool Merge(const Slice& key, const Slice* existing_value, - const Slice& value, + const std::deque& operand_list, std::string* new_value, Logger* logger) const = 0; + // This function performs merge(left_op, right_op) + // when both the operands are themselves merge operation types + // that you would have passed to a DB::Merge() call in the same order + // (i.e.: DB::Merge(key,left_op), followed by DB::Merge(key,right_op)). + // + // PartialMerge should combine them into a single merge operation that is + // saved into *new_value, and then it should return true. + // *new_value should be constructed such that a call to + // DB::Merge(key, *new_value) would yield the same result as a call + // to DB::Merge(key, left_op) followed by DB::Merge(key, right_op). + // + // If it is impossible or infeasible to combine the two operations, + // leave new_value unchanged and return false. The library will + // internally keep track of the operations, and apply them in the + // correct order once a base-value (a Put/Delete/End-of-Database) is seen. + // + // TODO: Presently there is no way to differentiate between error/corruption + // and simply "return false". For now, the client should simply return + // false in any case it cannot perform partial-merge, regardless of reason. + // If there is corruption in the data, handle it in the above Merge() function, + // and return false there. + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const = 0; - // The name of the MergeOperator. Used to check for MergeOperator + // The name of the MergeOperator. Used to check for MergeOperator // mismatches (i.e., a DB created with one MergeOperator is // accessed using a different MergeOperator) // TODO: the name is currently not stored persistently and thus // no checking is enforced. Client is responsible for providing // consistent MergeOperator between DB opens. virtual const char* Name() const = 0; +}; + +// The simpler, associative merge operator. +class AssociativeMergeOperator : public MergeOperator { + public: + virtual ~AssociativeMergeOperator() {} + + // Gives the client a way to express the read -> modify -> write semantics + // key: (IN) The key that's associated with this merge operation. + // existing_value:(IN) null indicates the key does not exist before this op + // value: (IN) the value to update/merge the existing_value with + // new_value: (OUT) Client is responsible for filling the merge result here + // logger: (IN) Client could use this to log errors during merge. + // + // Return true on success. + // All values passed in will be client-specific values. So if this method + // returns false, it is because client specified bad data or there was + // internal corruption. The client should assume that this will be treated + // as an error by the library. + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const = 0; + + + private: + // Default implementations of the MergeOperator functions + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override; + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override; }; } // namespace leveldb diff --git a/table/table.cc b/table/table.cc index 80c5ef491..be0f43c90 100644 --- a/table/table.cc +++ b/table/table.cc @@ -356,6 +356,8 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, (*mark_key_may_exist)(arg); break; } + + // Call the *saver function on each entry/block until it returns false for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) { if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) { done = true; diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc index 9a610c4ce..123dce66a 100644 --- a/utilities/merge_operators/put.cc +++ b/utilities/merge_operators/put.cc @@ -8,15 +8,34 @@ using namespace leveldb; namespace { // anonymous namespace // A merge operator that mimics Put semantics +// Since this merge-operator will not be used in production, +// it is implemented as a non-associative merge operator to illustrate the +// new interface and for testing purposes. (That is, we inherit from +// the MergeOperator class rather than the AssociativeMergeOperator +// which would be simpler in this case). +// +// From the client-perspective, semantics are the same. class PutOperator : public MergeOperator { public: - virtual void Merge(const Slice& key, + virtual bool Merge(const Slice& key, const Slice* existing_value, - const Slice& value, + const std::deque& operand_sequence, std::string* new_value, Logger* logger) const override { - // put basically only looks at the current value - new_value->assign(value.data(), value.size()); + // Put basically only looks at the current/latest value + assert(!operand_sequence.empty()); + assert(new_value != nullptr); + new_value->assign(operand_sequence.back()); + return true; + } + + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override { + new_value->assign(right_operand.data(), right_operand.size()); + return true; } virtual const char* Name() const override { diff --git a/utilities/merge_operators/string_append/stringappend.cc b/utilities/merge_operators/string_append/stringappend.cc index 06056c429..9bded8a98 100644 --- a/utilities/merge_operators/string_append/stringappend.cc +++ b/utilities/merge_operators/string_append/stringappend.cc @@ -15,14 +15,13 @@ namespace leveldb { - // Constructor: also specify the delimiter character. StringAppendOperator::StringAppendOperator(char delim_char) : delim_(delim_char) { } // Implementation for the merge operation (concatenates two strings) -void StringAppendOperator::Merge(const Slice& key, +bool StringAppendOperator::Merge(const Slice& key, const Slice* existing_value, const Slice& value, std::string* new_value, @@ -43,6 +42,8 @@ void StringAppendOperator::Merge(const Slice& key, new_value->append(1,delim_); new_value->append(value.data(), value.size()); } + + return true; } const char* StringAppendOperator::Name() const { diff --git a/utilities/merge_operators/string_append/stringappend.h b/utilities/merge_operators/string_append/stringappend.h index 1cf11d008..34cca6944 100644 --- a/utilities/merge_operators/string_append/stringappend.h +++ b/utilities/merge_operators/string_append/stringappend.h @@ -9,12 +9,12 @@ namespace leveldb { -class StringAppendOperator : public MergeOperator { +class StringAppendOperator : public AssociativeMergeOperator { public: StringAppendOperator(char delim_char); /// Constructor: specify delimiter - virtual void Merge(const Slice& key, + virtual bool Merge(const Slice& key, const Slice* existing_value, const Slice& value, std::string* new_value, @@ -23,7 +23,7 @@ class StringAppendOperator : public MergeOperator { virtual const char* Name() const override; private: - char delim_; // The delimiter is inserted between elements + char delim_; // The delimiter is inserted between elements }; diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc new file mode 100644 index 000000000..c0930d653 --- /dev/null +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -0,0 +1,90 @@ +/** + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "stringappend2.h" + +#include +#include + +#include "leveldb/slice.h" +#include "leveldb/merge_operator.h" +#include "utilities/merge_operators.h" + +namespace leveldb { + +// Constructor: also specify the delimiter character. +StringAppendTESTOperator::StringAppendTESTOperator(char delim_char) + : delim_(delim_char) { +} + +// Implementation for the merge operation (concatenates two strings) +bool StringAppendTESTOperator::Merge(const Slice& key, + const Slice* existing_value, + const std::deque& operands, + std::string* new_value, + Logger* logger) const { + + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + // Compute the space needed for the final result. + int numBytes = 0; + for(auto it = operands.begin(); it != operands.end(); ++it) { + numBytes += it->size() + 1; // Plus 1 for the delimiter + } + + // Only print the delimiter after the first entry has been printed + bool printDelim = false; + + // Prepend the *existing_value if one exists. + if (existing_value) { + new_value->reserve(numBytes + existing_value->size()); + new_value->append(existing_value->data(), existing_value->size()); + printDelim = true; + } else if (numBytes) { + new_value->reserve(numBytes-1); // Minus 1 since we have one less delimiter + } + + // Concatenate the sequence of strings (and add a delimiter between each) + for(auto it = operands.begin(); it != operands.end(); ++it) { + if (printDelim) { + new_value->append(1,delim_); + } + new_value->append(*it); + printDelim = true; + } + + return true; +} + +bool StringAppendTESTOperator::PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const { + return false; + +// // Clear the *new_value for writing. +// assert(new_value); +// new_value->clear(); +// +// // Generic append +// // Reserve correct size for *new_value, and apply concatenation. +// new_value->reserve(left_operand.size() + 1 + right_operand.size()); +// new_value->assign(left_operand.data(), left_operand.size()); +// new_value->append(1,delim_); +// new_value->append(right_operand.data(), right_operand.size()); +// +// return true; + +} + +const char* StringAppendTESTOperator::Name() const { + return "StringAppendTESTOperator"; +} + +} // namespace leveldb + diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h new file mode 100644 index 000000000..71135c826 --- /dev/null +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -0,0 +1,42 @@ +/** + * A TEST MergeOperator for rocksdb/leveldb that implements string append. + * It is built using the MergeOperator interface rather than the simpler + * AssociativeMergeOperator interface. This is useful for testing/benchmarking. + * While the two operators are semantically the same, all production code + * should use the StringAppendOperator defined in stringappend.{h,cc}. The + * operator defined in the present file is primarily for testing. + * + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "leveldb/merge_operator.h" +#include "leveldb/slice.h" + +namespace leveldb { + +class StringAppendTESTOperator : public MergeOperator { + public: + + StringAppendTESTOperator(char delim_char); /// Constructor with delimiter + + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_sequence, + std::string* new_value, + Logger* logger) const override; + + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override; + + private: + char delim_; // The delimiter is inserted between elements + +}; + +} // namespace leveldb diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/string_append/stringappend_test.cc index 7fff3e5d9..b852d30d9 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/string_append/stringappend_test.cc @@ -20,7 +20,8 @@ using namespace leveldb; namespace leveldb { -const std::string kDbName = "/tmp/mergetestdb"; // Path to the database on file system +// Path to the database on file system +const std::string kDbName = "/tmp/mergetestdb"; // OpenDb opens a (possibly new) rocksdb database with a StringAppendOperator std::shared_ptr OpenDb(StringAppendOperator* append_op) { @@ -251,7 +252,7 @@ TEST(StringAppendOperatorTest,BIGRandomMixGetAppend) { // Generate a bunch of random queries (Append and Get)! enum query_t { APPEND_OP, GET_OP, NUM_OPS }; - Random randomGen(9138204); //deterministic seed; always get same results! + Random randomGen(9138204); // deterministic seed const int kNumQueries = 1000; for (int q=0; qsize() == sizeof(uint64_t)) { - existing = DecodeFixed64(existing_value->data()); - } else { - // if existing_value is corrupted, treat it as 0 - Log(logger, "existing value corruption, size: %zu > %zu", - existing_value->size(), sizeof(uint64_t)); - existing = 0; - } - } - - uint64_t operand; - if (value.size() == sizeof(uint64_t)) { - operand = DecodeFixed64(value.data()); - } else { - // if operand is corrupted, treat it as 0 - Log(logger, "operand value corruption, size: %zu > %zu", - value.size(), sizeof(uint64_t)); - operand = 0; + uint64_t orig_value = 0; + if (existing_value){ + orig_value = DecodeInteger(*existing_value, logger); } + uint64_t operand = DecodeInteger(value, logger); - new_value->resize(sizeof(uint64_t)); - EncodeFixed64(&(*new_value)[0], existing + operand); + assert(new_value); + new_value->clear(); + PutFixed64(new_value, orig_value + operand); - return; + return true; // Return true always since corruption will be treated as 0 } virtual const char* Name() const override { return "UInt64AddOperator"; } + + private: + // Takes the string and decodes it into a uint64_t + // On error, prints a message and returns 0 + uint64_t DecodeInteger(const Slice& value, Logger* logger) const { + uint64_t result = 0; + + if (value.size() == sizeof(uint64_t)) { + result = DecodeFixed64(value.data()); + } else if (logger != nullptr) { + // If value is corrupted, treat it as 0 + Log(logger, "uint64 value corruption, size: %zu > %zu", + value.size(), sizeof(uint64_t)); + } + + return result; + } + }; } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 48aee4a10..79f75bb6c 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -225,39 +225,86 @@ class TtlMergeOperator : public MergeOperator { assert(merge_op); } - virtual void Merge(const Slice& key, + virtual bool Merge(const Slice& key, const Slice* existing_value, - const Slice& value, + const std::deque& operands, std::string* new_value, Logger* logger) const override { const uint32_t ts_len = DBWithTTL::kTSLength; - if ((existing_value && existing_value->size() < ts_len) || - value.size() < ts_len) { - Log(logger, "Error: Could not remove timestamp correctly from value."); - assert(false); - //TODO: Remove assert and make this function return false. - //TODO: Change Merge semantics and add a counter here + if (existing_value && existing_value->size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from existing value."); + return false; + // TODO: Change Merge semantics and add a counter here + } + + // Extract time-stamp from each operand to be passed to user_merge_op_ + std::deque operands_without_ts; + for (auto it = operands.begin(); it != operands.end(); ++it) { + if (it->size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from operand value."); + return false; + } + operands_without_ts.push_back(it->substr(0, it->size() - ts_len)); } - Slice value_without_ts(value.data(), value.size() - ts_len); + + // Apply the user merge operator (store result in *new_value) if (existing_value) { Slice existing_value_without_ts(existing_value->data(), existing_value->size() - ts_len); - user_merge_op_->Merge(key, &existing_value_without_ts, value_without_ts, - new_value, logger); + user_merge_op_->Merge(key, &existing_value_without_ts, + operands_without_ts, new_value, logger); + } else { + user_merge_op_->Merge(key, nullptr, operands_without_ts, new_value, + logger); + } + + // Augment the *new_value with the ttl time-stamp + int32_t curtime; + if (!DBWithTTL::GetCurrentTime(curtime).ok()) { + Log(logger, "Error: Could not get current time to be attached internally " + "to the new value."); + return false; } else { - user_merge_op_->Merge(key, nullptr, value_without_ts, new_value, logger); + char ts_string[ts_len]; + EncodeFixed32(ts_string, curtime); + new_value->append(ts_string, ts_len); + return true; + } + } + + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override { + const uint32_t& ts_len = DBWithTTL::kTSLength; + + if (left_operand.size() < ts_len || right_operand.size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from value."); + return false; + //TODO: Change Merge semantics and add a counter here } + + // Apply the user partial-merge operator (store result in *new_value) + assert(new_value); + Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len); + Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len); + user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts, + new_value, logger); + + // Augment the *new_value with the ttl time-stamp int32_t curtime; if (!DBWithTTL::GetCurrentTime(curtime).ok()) { Log(logger, "Error: Could not get current time to be attached internally " "to the new value."); - assert(false); - //TODO: Remove assert and make this function return false. + return false; } else { char ts_string[ts_len]; EncodeFixed32(ts_string, curtime); new_value->append(ts_string, ts_len); + return true; } + } virtual const char* Name() const override {