diff --git a/db/builder.cc b/db/builder.cc index 2b7c59283..76c087346 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -48,81 +48,92 @@ Status BuildTable(const std::string& dbname, TableBuilder* builder = new TableBuilder(options, file.get(), 0); // the first key is the smallest key - Slice key = iter->key(); - meta->smallest.DecodeFrom(key); + meta->smallest.DecodeFrom(iter->key()); MergeHelper merge(user_comparator, options.merge_operator, options.info_log.get(), true /* internal key corruption is not ok */); if (purge) { - ParsedInternalKey ikey; // Ugly walkaround to avoid compiler error for release build - // TODO: find a clean way to treat in memory key corruption - ikey.type = kTypeValue; + bool ok __attribute__((unused)) = true; + + // Will write to builder if current key != prev key ParsedInternalKey prev_ikey; - std::string prev_value; std::string prev_key; - - // Ugly walkaround to avoid compiler error for release build - // TODO: find a clean way to treat in memory key corruption - auto ok __attribute__((unused)) = ParseInternalKey(key, &ikey); - // in-memory key corruption is not ok; - assert(ok); - - if (ikey.type == kTypeMerge) { - // merge values if the first entry is of merge type - merge.MergeUntil(iter, 0 /* don't worry about snapshot */); - prev_key.assign(merge.key().data(), merge.key().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - prev_value.assign(merge.value().data(), merge.value().size()); - } else { - // store first key-value - prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - assert(prev_ikey.sequence >= earliest_seqno_in_memtable); - iter->Next(); - } + bool is_first_key = true; // Also write if this is the very first key while (iter->Valid()) { bool iterator_at_next = false; + + // Get current key ParsedInternalKey this_ikey; Slice key = iter->key(); + Slice value = iter->value(); + + // In-memory key corruption is not ok; + // TODO: find a clean way to treat in memory key corruption ok = ParseInternalKey(key, &this_ikey); assert(ok); assert(this_ikey.sequence >= earliest_seqno_in_memtable); - if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) { - // This key is different from previous key. - // Output prev key and remember current key - builder->Add(Slice(prev_key), Slice(prev_value)); + // If the key is the same as the previous key (and it is not the + // first key), then we skip it, since it is an older version. + // Otherwise we output the key and mark it as the "new" previous key. + if (!is_first_key && !user_comparator->Compare(prev_ikey.user_key, + this_ikey.user_key)) { + // seqno within the same key are in decreasing order + assert(this_ikey.sequence < prev_ikey.sequence); + } else { + is_first_key = false; + if (this_ikey.type == kTypeMerge) { + // Handle merge-type keys using the MergeHelper merge.MergeUntil(iter, 0 /* don't worry about snapshot */); iterator_at_next = true; - prev_key.assign(merge.key().data(), merge.key().size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - prev_value.assign(merge.value().data(), merge.value().size()); + if (merge.IsSuccess()) { + // Merge completed correctly. + // Add the resulting merge key/value and continue to next + builder->Add(merge.key(), merge.value()); + prev_key.assign(merge.key().data(), merge.key().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + } else { + // Merge did not find a Put/Delete. + // Can not compact these merges into a kValueType. + // Write them out one-by-one. (Proceed back() to front()) + const std::deque& keys = merge.keys(); + const std::deque& values = merge.values(); + assert(keys.size() == values.size() && keys.size() >= 1); + std::deque::const_reverse_iterator key_iter; + std::deque::const_reverse_iterator value_iter; + for (key_iter=keys.rbegin(), value_iter = values.rbegin(); + key_iter != keys.rend() && value_iter != values.rend(); + ++key_iter, ++value_iter) { + + builder->Add(Slice(*key_iter), Slice(*value_iter)); + } + + // Sanity check. Both iterators should end at the same time + assert(key_iter == keys.rend() && value_iter == values.rend()); + + prev_key.assign(keys.front()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + } } else { + // Handle Put/Delete-type keys by simply writing them + builder->Add(key, value); prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); ok = ParseInternalKey(Slice(prev_key), &prev_ikey); assert(ok); } - } else { - // seqno within the same key are in decreasing order - assert(this_ikey.sequence < prev_ikey.sequence); - // This key is an earlier version of the same key in prev_key. - // Skip current key. } if (!iterator_at_next) iter->Next(); } - // output last key - builder->Add(Slice(prev_key), Slice(prev_value)); + + // The last key is the largest key meta->largest.DecodeFrom(Slice(prev_key)); } else { diff --git a/db/db_impl.cc b/db/db_impl.cc index 5b104cd2e..3fc387e17 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -228,7 +228,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) char name[100]; Status st = env_->GetHostName(name, 100L); - if(st.ok()) { + if (st.ok()) { host_name_ = name; } else { Log(options_.info_log, "Can't get hostname, use localhost as host name."); @@ -469,7 +469,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { } } else { Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]); - if(!st.ok()) { + if (!st.ok()) { Log(options_.info_log, "Delete type=%d #%lld FAILED\n", int(type), static_cast(number)); @@ -1110,7 +1110,7 @@ Status DBImpl::FindProbableWALFiles(std::vector* const allLogs, } } size_t startIndex = std::max(0l, end); // end could be -ve. - for( size_t i = startIndex; i < allLogs->size(); ++i) { + for (size_t i = startIndex; i < allLogs->size(); ++i) { result->push_back(allLogs->at(i)); } if (result->empty()) { @@ -1703,7 +1703,6 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( Status DBImpl::DoCompactionWork(CompactionState* compact) { int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(options_.info_log, "Compacting %d@%d + %d@%d files, score %.2f slots available %d", compact->compaction->num_input_files(0), @@ -1793,7 +1792,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Handle key/value, add to state, etc. bool drop = false; - bool current_entry_is_merged = false; + bool current_entry_is_merging = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys // TODO: error key stays in db forever? Figure out the intention/rationale @@ -1887,11 +1886,24 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // logic could also be nicely re-used for memtable flush purge // optimization in BuildTable. merge.MergeUntil(input.get(), prev_snapshot, bottommost_level); - current_entry_is_merged = true; - // get the merge result - key = merge.key(); - ParseInternalKey(key, &ikey); - value = merge.value(); + current_entry_is_merging = true; + if (merge.IsSuccess()) { + // Successfully found Put/Delete/(end-of-key-range) while merging + // Get the merge result + key = merge.key(); + ParseInternalKey(key, &ikey); + value = merge.value(); + } else { + // Did not find a Put/Delete/(end-of-key-range) while merging + // We now have some stack of merge operands to write out. + // NOTE: key,value, and ikey are now referring to old entries. + // These will be correctly set below. + assert(!merge.keys().empty()); + assert(merge.keys().size() == merge.values().size()); + + // Hack to make sure last_sequence_for_key is correct + ParseInternalKey(merge.keys().front(), &ikey); + } } last_sequence_for_key = ikey.sequence; @@ -1909,52 +1921,97 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { #endif if (!drop) { - - char* kptr = (char*)key.data(); - std::string kstr; - - // Zeroing out the sequence number leads to better compression. - // If this is the bottommost level (no files in lower levels) - // and the earliest snapshot is larger than this seqno - // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot && - ikey.type != kTypeMerge) { - assert(ikey.type != kTypeDeletion); - // make a copy because updating in place would cause problems - // with the priority queue that is managing the input key iterator - kstr.assign(key.data(), key.size()); - kptr = (char *)kstr.c_str(); - UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type); + // We may write a single key (e.g.: for Put/Delete or successful merge). + // Or we may instead have to write a sequence/list of keys. + // We have to write a sequence iff we have an unsuccessful merge + bool has_merge_list = current_entry_is_merging && !merge.IsSuccess(); + const std::deque* keys = nullptr; + const std::deque* values = nullptr; + std::deque::const_reverse_iterator key_iter; + std::deque::const_reverse_iterator value_iter; + if (has_merge_list) { + keys = &merge.keys(); + values = &merge.values(); + key_iter = keys->rbegin(); // The back (*rbegin()) is the first key + value_iter = values->rbegin(); + + key = Slice(*key_iter); + value = Slice(*value_iter); } - Slice newkey(kptr, key.size()); - assert((key.clear(), 1)); // we do not need 'key' anymore + // If we have a list of keys to write, traverse the list. + // If we have a single key to write, simply write that key. + while (true) { + // Invariant: key,value,ikey will always be the next entry to write + char* kptr = (char*)key.data(); + std::string kstr; + + // Zeroing out the sequence number leads to better compression. + // If this is the bottommost level (no files in lower levels) + // and the earliest snapshot is larger than this seqno + // then we can squash the seqno to zero. + if (bottommost_level && ikey.sequence < earliest_snapshot && + ikey.type != kTypeMerge) { + assert(ikey.type != kTypeDeletion); + // make a copy because updating in place would cause problems + // with the priority queue that is managing the input key iterator + kstr.assign(key.data(), key.size()); + kptr = (char *)kstr.c_str(); + UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type); + } + + Slice newkey(kptr, key.size()); + assert((key.clear(), 1)); // we do not need 'key' anymore - // Open output file if necessary - if (compact->builder == nullptr) { - status = OpenCompactionOutputFile(compact); - if (!status.ok()) { - break; + // Open output file if necessary + if (compact->builder == nullptr) { + status = OpenCompactionOutputFile(compact); + if (!status.ok()) { + break; + } + } + if (compact->builder->NumEntries() == 0) { + compact->current_output()->smallest.DecodeFrom(newkey); + } + compact->current_output()->largest.DecodeFrom(newkey); + compact->builder->Add(newkey, value); + + // Close output file if it is big enough + if (compact->builder->FileSize() >= + compact->compaction->MaxOutputFileSize()) { + status = FinishCompactionOutputFile(compact, input.get()); + if (!status.ok()) { + break; + } } - } - if (compact->builder->NumEntries() == 0) { - compact->current_output()->smallest.DecodeFrom(newkey); - } - compact->current_output()->largest.DecodeFrom(newkey); - compact->builder->Add(newkey, value); - // Close output file if it is big enough - if (compact->builder->FileSize() >= - compact->compaction->MaxOutputFileSize()) { - status = FinishCompactionOutputFile(compact, input.get()); - if (!status.ok()) { + // If we have a list of entries, move to next element + // If we only had one entry, then break the loop. + if (has_merge_list) { + ++key_iter; + ++value_iter; + + // If at end of list + if (key_iter == keys->rend() || value_iter == values->rend()) { + // Sanity Check: if one ends, then both end + assert(key_iter == keys->rend() && value_iter == values->rend()); + break; + } + + // Otherwise not at end of list. Update key, value, and ikey. + key = Slice(*key_iter); + value = Slice(*value_iter); + ParseInternalKey(key, &ikey); + + } else{ + // Only had one item to begin with (Put/Delete) break; } } } // MergeUntil has moved input to the next entry - if (!current_entry_is_merged) { + if (!current_entry_is_merging) { input->Next(); } } @@ -2120,21 +2177,25 @@ Status DBImpl::GetImpl(const ReadOptions& options, current->Ref(); // Unlock while reading from files and memtables - mutex_.Unlock(); bool have_stat_update = false; Version::GetStats stats; + + // Prepare to store a list of merge operations if merge occurs. + std::deque merge_operands; + // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. - // value will contain the current merge operand in the latter case. + // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, options_)) { + if (mem->Get(lkey, value, &s, &merge_operands, options_)) { // Done - } else if (imm.Get(lkey, value, &s, options_)) { + } else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { // Done } else { - current->Get(options, lkey, value, &s, &stats, options_, no_io,value_found); + current->Get(options, lkey, value, &s, &merge_operands, &stats, + options_, value_found); have_stat_update = true; } mutex_.Lock(); @@ -2177,6 +2238,9 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, bool have_stat_update = false; Version::GetStats stats; + // Prepare to store a list of merge operations if merge occurs. + std::deque merge_operands; + // Note: this always resizes the values array int numKeys = keys.size(); std::vector statList(numKeys); @@ -2188,18 +2252,19 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, // For each of the given keys, apply the entire "get" process as follows: // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. - // value will contain the current merge operand in the latter case. - for(int i=0; iGet(lkey, value, &s, options_)) { + if (mem->Get(lkey, value, &s, &merge_operands, options_)) { // Done - } else if (imm.Get(lkey, value, &s, options_)) { + } else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { // Done } else { - current->Get(options, lkey, value, &s, &stats, options_); + current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); have_stat_update = true; } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index d4170d30f..834161860 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -53,11 +53,12 @@ Status DBImplReadOnly::Get(const ReadOptions& options, MemTable* mem = GetMemTable(); Version* current = versions_->current(); SequenceNumber snapshot = versions_->LastSequence(); + std::deque merge_operands; LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, options_)) { + if (mem->Get(lkey, value, &s, &merge_operands, options_)) { } else { Version::GetStats stats; - current->Get(options, lkey, value, &s, &stats, options_); + current->Get(options, lkey, value, &s, &merge_operands, &stats, options_); } return s; } diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 6199b5e7b..e04495e58 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -32,6 +32,9 @@ public: virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value); + + // TODO: Implement ReadOnly MultiGet? + virtual Iterator* NewIterator(const ReadOptions&); virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) { diff --git a/db/db_iter.cc b/db/db_iter.cc index a8eb727e2..0d6a2c846 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -4,6 +4,7 @@ #include "db/db_iter.h" #include +#include #include "db/filename.h" #include "db/dbformat.h" @@ -231,10 +232,13 @@ void DBIter::FindNextUserEntry(bool skipping) { // POST: saved_value_ has the merged value for the user key // iter_ points to the next entry (or invalid) void DBIter::MergeValuesNewToOld() { + // TODO: Is there a way to unite with MergeHelper or other similar code? - const Slice value = iter_->value(); - std::string operand(value.data(), value.size()); + // Start the merge process by pushing the first operand + std::deque operands; + operands.push_front(iter_->value().ToString()); + std::string merge_result; // Temporary string to hold merge result later ParsedInternalKey ikey; for (iter_->Next(); iter_->Valid(); iter_->Next()) { if (!ParseKey(&ikey)) { @@ -255,10 +259,11 @@ void DBIter::MergeValuesNewToOld() { } if (kTypeValue == ikey.type) { - // hit a put, merge the put value with operand and store it in the - // final result saved_value_. We are done! + // hit a put, merge the put value with operands and store the + // final result in saved_value_. We are done! + // ignore corruption if there is any. const Slice value = iter_->value(); - user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), + user_merge_operator_->Merge(ikey.user_key, &value, operands, &saved_value_, logger_.get()); // iter_ is positioned after put iter_->Next(); @@ -266,29 +271,41 @@ void DBIter::MergeValuesNewToOld() { } if (kTypeMerge == ikey.type) { - // hit a merge, merge the value with operand and continue. - // saved_value_ is used as a scratch area. The result is put - // back in operand - const Slice value = iter_->value(); - user_merge_operator_->Merge(ikey.user_key, &value, operand, - &saved_value_, logger_.get()); - swap(saved_value_, operand); + // hit a merge, add the value as an operand and run associative merge. + // when complete, add result to operands and continue. + const Slice& value = iter_->value(); + operands.push_front(value.ToString()); + while(operands.size() >= 2) { + // Call user associative-merge until it returns false + if (user_merge_operator_->PartialMerge(ikey.user_key, + Slice(operands[0]), + Slice(operands[1]), + &merge_result, + logger_.get())) { + operands.pop_front(); + swap(operands.front(), merge_result); + } else { + // Associative merge returns false ==> stack the operands + break; + } + } + } } // we either exhausted all internal keys under this user key, or hit // a deletion marker. - // feed null as the existing value to the merge opexrator, such that + // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. - user_merge_operator_->Merge(ikey.user_key, nullptr, operand, + user_merge_operator_->Merge(ikey.user_key, nullptr, operands, &saved_value_, logger_.get()); } void DBIter::Prev() { assert(valid_); - // TODO: support backward iteration // Throw an exception now if merge_operator is provided + // TODO: support backward iteration if (user_merge_operator_) { Log(logger_, "Prev not supported yet if merge_operator is provided"); throw std::logic_error("DBIter::Prev backward iteration not supported" @@ -387,8 +404,8 @@ void DBIter::SeekToFirst() { } void DBIter::SeekToLast() { + // Throw an exception for now if merge_operator is provided // TODO: support backward iteration - // throw an exception for now if merge_operator is provided if (user_merge_operator_) { Log(logger_, "SeekToLast not supported yet if merge_operator is provided"); throw std::logic_error("DBIter::SeekToLast: backward iteration not" diff --git a/db/memtable.cc b/db/memtable.cc index 622058804..a9a684967 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -130,20 +130,19 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options) { + std::deque* operands, const Options& options) { Slice memkey = key.memtable_key(); std::shared_ptr iter(table_.get()->GetIterator()); iter->Seek(memkey.data()); - bool merge_in_progress = false; - std::string operand; - if (s->IsMergeInProgress()) { - swap(*value, operand); - merge_in_progress = true; - } + // It is the caller's responsibility to allocate/delete operands list + assert(operands != nullptr); + bool merge_in_progress = s->IsMergeInProgress(); auto merge_operator = options.merge_operator; auto logger = options.info_log; + std::string merge_result; + for (; iter->Valid(); iter->Next()) { // entry format is: // klength varint32 @@ -165,36 +164,51 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *s = Status::OK(); if (merge_in_progress) { - merge_operator->Merge(key.user_key(), &v, operand, - value, logger.get()); + assert(merge_operator); + if (!merge_operator->Merge(key.user_key(), &v, *operands, + value, logger.get())) { + *s = Status::Corruption("Error: Could not perform merge."); + } } else { value->assign(v.data(), v.size()); } return true; } - case kTypeMerge: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - if (merge_in_progress) { - merge_operator->Merge(key.user_key(), &v, operand, - value, logger.get()); - swap(*value, operand); - } else { - assert(merge_operator); - merge_in_progress = true; - operand.assign(v.data(), v.size()); - } - break; - } case kTypeDeletion: { if (merge_in_progress) { - merge_operator->Merge(key.user_key(), nullptr, operand, - value, logger.get()); + assert(merge_operator); + *s = Status::OK(); + if (!merge_operator->Merge(key.user_key(), nullptr, *operands, + value, logger.get())) { + *s = Status::Corruption("Error: Could not perform merge."); + } } else { *s = Status::NotFound(Slice()); } return true; } + case kTypeMerge: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + merge_in_progress = true; + operands->push_front(v.ToString()); + while(operands->size() >= 2) { + // Attempt to associative merge. (Returns true if successful) + if (merge_operator->PartialMerge(key.user_key(), + Slice((*operands)[0]), + Slice((*operands)[1]), + &merge_result, + logger.get())) { + operands->pop_front(); + swap(operands->front(), merge_result); + } else { + // Stack them because user can't associative merge + break; + } + } + break; + } } } else { // exit loop if user key does not match @@ -202,8 +216,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } } + // No change to value, since we have not yet found a Put/Delete + if (merge_in_progress) { - swap(*value, operand); *s = Status::MergeInProgress(""); } return false; diff --git a/db/memtable.h b/db/memtable.h index 73d32fc4c..fe8cec86b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -7,6 +7,7 @@ #include #include +#include #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" @@ -74,11 +75,11 @@ class MemTable { // in *status and return true. // If memtable contains Merge operation as the most recent entry for a key, // and the merge process does not stop (not reaching a value or delete), - // store the current merged result in value and MergeInProgress in s. - // return false + // prepend the current merge operand to *operands. + // store MergeInProgress in s, and return false. // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options); + std::deque* operands, const Options& options); // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 9d8b675bf..6f66faab8 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -193,11 +193,13 @@ size_t MemTableList::ApproximateMemoryUsage() { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. +// Operands stores the list of merge operations to apply, so far. bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options) { + std::deque* operands, + const Options& options) { for (list::iterator it = memlist_.begin(); it != memlist_.end(); ++it) { - if ((*it)->Get(key, value, s, options)) { + if ((*it)->Get(key, value, s, operands, options)) { return true; } } diff --git a/db/memtablelist.h b/db/memtablelist.h index b30089cf6..05a60d248 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -5,6 +5,7 @@ #include #include +#include #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" @@ -70,7 +71,7 @@ class MemTableList { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options); + std::deque* operands, const Options& options); // Returns the list of underlying memtables. void GetMemTables(std::vector* list); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 3520db15a..4f363fec3 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -10,23 +10,36 @@ namespace leveldb { // PRE: iter points to the first merge type entry // POST: iter points to the first entry beyond the merge process (or the end) -// key_, value_ are updated to reflect the merge result +// keys_, operands_ are updated to reflect the merge result. +// keys_ stores the list of keys encountered while merging. +// operands_ stores the list of merge operands encountered while merging. +// keys_[i] corresponds to operands_[i] for each i. void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, bool at_bottom) { - // get a copy of the internal key, before it's invalidated by iter->Next() - key_.assign(iter->key().data(), iter->key().size()); - // we need to parse the internal key again as the parsed key is + // Get a copy of the internal key, before it's invalidated by iter->Next() + // Also maintain the list of merge operands seen. + keys_.clear(); + operands_.clear(); + keys_.push_front(iter->key().ToString()); + operands_.push_front(iter->value().ToString()); + + success_ = false; // Will become true if we hit Put/Delete or bottom + + // We need to parse the internal key again as the parsed key is // backed by the internal key! - ParsedInternalKey orig_ikey; // Assume no internal key corruption as it has been successfully parsed // by the caller. - // TODO: determine a good alternative of assert (exception?) - ParseInternalKey(key_, &orig_ikey); - std::string operand(iter->value().data(), iter->value().size()); + // Invariant: keys_.back() will not change. Hence, orig_ikey is always valid. + ParsedInternalKey orig_ikey; + ParseInternalKey(keys_.back(), &orig_ikey); bool hit_the_next_user_key = false; ParsedInternalKey ikey; + std::string merge_result; // Temporary value for merge results for (iter->Next(); iter->Valid(); iter->Next()) { + assert(operands_.size() >= 1); // Should be invariants! + assert(keys_.size() == operands_.size()); + if (!ParseInternalKey(iter->key(), &ikey)) { // stop at corrupted key if (assert_valid_internal_key_) { @@ -46,32 +59,54 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, break; } + // At this point we are guaranteed that we need to process this key. + if (kTypeDeletion == ikey.type) { // hit a delete - // => merge nullptr with operand - // => change the entry type to kTypeValue - // We are done! - user_merge_operator_->Merge(ikey.user_key, nullptr, operand, - &value_, logger_); - orig_ikey.type = kTypeValue; - UpdateInternalKey(&key_[0], key_.size(), - orig_ikey.sequence, orig_ikey.type); - // move iter to the next entry + // => merge nullptr with operands_ + // => store result in operands_.back() (and update keys_.back()) + // => change the entry type to kTypeValue for keys_.back() + // We are done! Return a success if the merge passes. + success_ = user_merge_operator_->Merge(ikey.user_key, nullptr, + operands_, &merge_result, + logger_); + + // We store the result in keys_.back() and operands_.back() + // if nothing went wrong (i.e.: no operand corruption on disk) + if (success_) { + std::string& key = keys_.back(); // The original key encountered + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key[0], key.size(), + orig_ikey.sequence, orig_ikey.type); + swap(operands_.back(), merge_result); + } + + // move iter to the next entry (before doing anything else) iter->Next(); return; } if (kTypeValue == ikey.type) { // hit a put - // => merge the put value with operand - // => change the entry type to kTypeValue - // We are done! + // => merge the put value with operands_ + // => store result in operands_.back() (and update keys_.back()) + // => change the entry type to kTypeValue for keys_.back() + // We are done! Success! const Slice value = iter->value(); - user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), - &value_, logger_); - orig_ikey.type = kTypeValue; - UpdateInternalKey(&key_[0], key_.size(), - orig_ikey.sequence, orig_ikey.type); + success_ = user_merge_operator_->Merge(ikey.user_key, &value, + operands_, &merge_result, + logger_); + + // We store the result in keys_.back() and operands_.back() + // if nothing went wrong (i.e.: no operand corruption on disk) + if (success_) { + std::string& key = keys_.back(); // The original key encountered + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key[0], key.size(), + orig_ikey.sequence, orig_ikey.type); + swap(operands_.back(), merge_result); + } + // move iter to the next entry iter->Next(); return; @@ -79,35 +114,73 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, if (kTypeMerge == ikey.type) { // hit a merge - // => merge the value with operand. - // => put the result back to operand and continue - const Slice value = iter->value(); - user_merge_operator_->Merge(ikey.user_key, &value, operand, - &value_, logger_); - swap(value_, operand); + // => merge the operand into the front of the operands_ list + // => use the user's associative merge function to determine how. + // => then continue because we haven't yet seen a Put/Delete. + assert(!operands_.empty()); // Should have at least one element in it + + keys_.push_front(iter->key().ToString()); + operands_.push_front(iter->value().ToString()); + while (operands_.size() >= 2) { + // Returns false when the merge_operator can no longer process it + if (user_merge_operator_->PartialMerge(ikey.user_key, + Slice(operands_[0]), + Slice(operands_[1]), + &merge_result, + logger_)) { + // Merging of operands (associative merge) was successful. + // Replace these frontmost two operands with the merge result + keys_.pop_front(); + operands_.pop_front(); + swap(operands_.front(), merge_result); + } else { + // Merging of operands (associative merge) returned false. + // The user merge_operator does not know how to merge these operands. + // So we just stack them up until we find a Put/Delete or end of key. + break; + } + } continue; } } - // We have seen the root history of this key if we are at the - // bottem level and exhausted all internal keys of this user key + // We are sure we have seen this key's entire history if we are at the + // last level and exhausted all internal keys of this user key. // NOTE: !iter->Valid() does not necessarily mean we hit the // beginning of a user key, as versions of a user key might be - // split into multiple files and some files might not be included - // in the merge. - bool seen_the_beginning = hit_the_next_user_key && at_bottom; - - if (seen_the_beginning) { + // split into multiple files (even files on the same level) + // and some files might not be included in the compaction/merge. + // + // There are also cases where we have seen the root of history of this + // key without being sure of it. Then, we simply miss the opportunity + // to combine the keys. Since VersionSet::SetupOtherInputs() always makes + // sure that all merge-operands on the same level get compacted together, + // this will simply lead to these merge operands moving to the next level. + // + // So, we only perform the following logic (to merge all operands together + // without a Put/Delete) if we are certain that we have seen the end of key. + bool surely_seen_the_beginning = hit_the_next_user_key && at_bottom; + if (surely_seen_the_beginning) { // do a final merge with nullptr as the existing value and say // bye to the merge type (it's now converted to a Put) assert(kTypeMerge == orig_ikey.type); - user_merge_operator_->Merge(orig_ikey.user_key, nullptr, operand, - &value_, logger_); - orig_ikey.type = kTypeValue; - UpdateInternalKey(&key_[0], key_.size(), - orig_ikey.sequence, orig_ikey.type); - } else { - swap(value_, operand); + assert(operands_.size() >= 1); + assert(operands_.size() == keys_.size()); + success_ = user_merge_operator_->Merge(ikey.user_key, nullptr, + operands_, &merge_result, + logger_); + + if (success_) { + std::string& key = keys_.back(); // The original key encountered + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key[0], key.size(), + orig_ikey.sequence, orig_ikey.type); + + // The final value() is always stored in operands_.back() + swap(operands_.back(),merge_result); + } else { + // Do nothing if not success_. Leave keys() and operands() as they are. + } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 206d7a53a..9cad8ad04 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -4,6 +4,7 @@ #include "db/dbformat.h" #include "leveldb/slice.h" #include +#include namespace leveldb { @@ -21,7 +22,10 @@ class MergeHelper { : user_comparator_(user_comparator), user_merge_operator_(user_merge_operator), logger_(logger), - assert_valid_internal_key_(assert_valid_internal_key) {} + assert_valid_internal_key_(assert_valid_internal_key), + keys_(), + operands_(), + success_(false) {} // Merge entries until we hit // - a corrupted key @@ -39,12 +43,40 @@ class MergeHelper { bool at_bottom = false); // Query the merge result - // These are valid until the next MergeUtil call - // IMPORTANT: the key type could change after the MergeUntil call. - // Put/Delete + Merge + ... + Merge => Put - // Merge + ... + Merge => Merge - Slice key() { return Slice(key_); } - Slice value() { return Slice(value_); } + // These are valid until the next MergeUntil call + // If the merging was successful: + // - IsSuccess() will be true + // - key() will have the latest sequence number of the merges. + // The type will be Put or Merge. See IMPORTANT 1 note, below. + // - value() will be the result of merging all the operands together + // - The user should ignore keys() and values(). + // + // IMPORTANT 1: the key type could change after the MergeUntil call. + // Put/Delete + Merge + ... + Merge => Put + // Merge + ... + Merge => Merge + // + // If the merge operator is not associative, and if a Put/Delete is not found + // then the merging will be unsuccessful. In this case: + // - IsSuccess() will be false + // - keys() contains the list of internal keys seen in order of iteration. + // - values() contains the list of values (merges) seen in the same order. + // values() is parallel to keys() so that the first entry in + // keys() is the key associated with the first entry in values() + // and so on. These lists will be the same length. + // All of these pairs will be merges over the same user key. + // See IMPORTANT 2 note below. + // - The user should ignore key() and value(). + // + // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. + // So keys().back() was the first key seen by iterator. + // TODO: Re-style this comment to be like the first one + bool IsSuccess() { return success_; } + Slice key() { assert(success_); return Slice(keys_.back()); } + Slice value() { assert(success_); return Slice(operands_.back()); } + const std::deque& keys() { assert(!success_); return keys_; } + const std::deque& values() { + assert(!success_); return operands_; + } private: const Comparator* user_comparator_; @@ -55,8 +87,9 @@ class MergeHelper { // the scratch area that holds the result of MergeUntil // valid up to the next MergeUntil call - std::string key_; - std::string value_; + std::deque keys_; // Keeps track of the sequence of keys seen + std::deque operands_; // Parallel with keys_; stores the values + bool success_; }; } // namespace leveldb diff --git a/db/merge_operator.cc b/db/merge_operator.cc new file mode 100644 index 000000000..89c64b5d9 --- /dev/null +++ b/db/merge_operator.cc @@ -0,0 +1,51 @@ +/** + * Back-end implementation details specific to the Merge Operator. + * + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "leveldb/merge_operator.h" + +namespace leveldb { + +// Given a "real" merge from the library, call the user's +// associative merge function one-by-one on each of the operands. +// NOTE: It is assumed that the client's merge-operator will handle any errors. +bool AssociativeMergeOperator::Merge( + const Slice& key, + const Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const { + + // Simply loop through the operands + Slice temp_existing; + std::string temp_value; + for (const auto& operand : operand_list) { + Slice value(operand); + if (!Merge(key, existing_value, value, &temp_value, logger)) { + return false; + } + swap(temp_value, *new_value); + temp_existing = Slice(*new_value); + existing_value = &temp_existing; + } + + // The result will be in *new_value. All merges succeeded. + return true; +} + +// Call the user defined simple merge on the operands; +// NOTE: It is assumed that the client's merge-operator will handle any errors. +bool AssociativeMergeOperator::PartialMerge( + const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const { + + return Merge(key, &left_operand, right_operand, new_value, logger); +} + +} // namespace leveldb diff --git a/db/version_set.cc b/db/version_set.cc index 48588c712..9ea9ad809 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -232,7 +232,7 @@ enum SaverState { kFound, kDeleted, kCorrupt, - kMerge // value contains the current merge result (the operand) + kMerge // saver contains the current merge result (the operands) }; struct Saver { SaverState state; @@ -241,6 +241,7 @@ struct Saver { bool* value_found; // Is value set correctly? Used by KeyMayExist std::string* value; const MergeOperator* merge_operator; + std::deque* merge_operands; // the merge operations encountered Logger* logger; bool didIO; // did we do any disk io? }; @@ -261,6 +262,11 @@ static void MarkKeyMayExist(void* arg) { static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ Saver* s = reinterpret_cast(arg); + std::deque* const ops = s->merge_operands; // shorter alias + std::string merge_result; // temporary area for merge results later + + assert(s != nullptr && ops != nullptr); + ParsedInternalKey parsed_key; // TODO: didIO and Merge? s->didIO = didIO; @@ -269,49 +275,57 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ s->state = kCorrupt; } else { if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { + // Key matches. Process it switch (parsed_key.type) { case kTypeValue: if (kNotFound == s->state) { + s->state = kFound; s->value->assign(v.data(), v.size()); } else if (kMerge == s->state) { - std::string operand; - swap(operand, *s->value); - s->merge_operator->Merge(s->user_key, &v, operand, - s->value, s->logger); + assert(s->merge_operator != nullptr); + s->state = kFound; + if (!s->merge_operator->Merge(s->user_key, &v, *ops, + s->value, s->logger)) { + s->state = kCorrupt; + } } else { assert(false); } - s->state = kFound; return false; - case kTypeMerge: - if (kNotFound == s->state) { - s->state = kMerge; - s->value->assign(v.data(), v.size()); - } else if (kMerge == s->state) { - std::string operand; - swap(operand, *s->value); - s->merge_operator->Merge(s->user_key, &v, operand, - s->value, s->logger); - } else { - assert(false); - } - return true; - case kTypeDeletion: if (kNotFound == s->state) { s->state = kDeleted; } else if (kMerge == s->state) { - std::string operand; - swap(operand, *s->value); - s->merge_operator->Merge(s->user_key, nullptr, operand, - s->value, s->logger); s->state = kFound; + if (!s->merge_operator->Merge(s->user_key, nullptr, *ops, + s->value, s->logger)) { + s->state = kCorrupt; + } } else { assert(false); } return false; + case kTypeMerge: + assert(s->state == kNotFound || s->state == kMerge); + s->state = kMerge; + ops->push_front(v.ToString()); + while (ops->size() >= 2) { + // Attempt to merge operands together via user associateive merge + if (s->merge_operator->PartialMerge(s->user_key, + Slice((*ops)[0]), + Slice((*ops)[1]), + &merge_result, + s->logger)) { + ops->pop_front(); + swap(ops->front(), merge_result); + } else { + // Associative merge returns false ==> stack the operands + break; + } + } + return true; } } } @@ -341,7 +355,8 @@ Version::Version(VersionSet* vset, uint64_t version_number) void Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, - Status *status, + Status* status, + std::deque* operands, GetStats* stats, const Options& db_options, const bool no_io, @@ -364,6 +379,7 @@ void Version::Get(const ReadOptions& options, saver.value_found = value_found; saver.value = value; saver.merge_operator = merge_operator; + saver.merge_operands = operands; saver.logger = logger.get(); saver.didIO = false; @@ -374,55 +390,68 @@ void Version::Get(const ReadOptions& options, // We can search level-by-level since entries never hop across // levels. Therefore we are guaranteed that if we find data - // in an smaller level, later levels are irrelevant. - std::vector tmp; - FileMetaData* tmp2; + // in an smaller level, later levels are irrelevant (unless we + // are MergeInProgress). + std::vector important_files; for (int level = 0; level < vset_->NumberLevels(); level++) { size_t num_files = files_[level].size(); if (num_files == 0) continue; // Get the list of files to search in this level FileMetaData* const* files = &files_[level][0]; + important_files.clear(); + important_files.reserve(num_files); + + // Some files may overlap each other. We find + // all files that overlap user_key and process them in order from + // newest to oldest. In the context of merge-operator, + // this can occur at any level. Otherwise, it only occurs + // at Level-0 (since Put/Deletes are always compacted into a single entry). + uint32_t start_index; if (level == 0) { - // Level-0 files may overlap each other. Find all files that - // overlap user_key and process them in order from newest to oldest. - tmp.reserve(num_files); - for (uint32_t i = 0; i < num_files; i++) { - FileMetaData* f = files[i]; - if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && - ucmp->Compare(user_key, f->largest.user_key()) <= 0) { - tmp.push_back(f); - } - } - if (tmp.empty()) continue; - - std::sort(tmp.begin(), tmp.end(), NewestFirst); - files = &tmp[0]; - num_files = tmp.size(); + // On Level-0, we read through all files to check for overlap. + start_index = 0; } else { + // On Level-n (n>=1), files are sorted. // Binary search to find earliest index whose largest key >= ikey. - uint32_t index = FindFile(vset_->icmp_, files_[level], ikey); - if (index >= num_files) { - files = nullptr; - num_files = 0; - } else { - tmp2 = files[index]; - if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) { - // All of "tmp2" is past any data for user_key - files = nullptr; - num_files = 0; - } else { - files = &tmp2; - num_files = 1; - // TODO, is level 1-n files all disjoint in user key space? - } + // We will also stop when the file no longer overlaps ikey + start_index = FindFile(vset_->icmp_, files_[level], ikey); + } + + // Traverse the list, finding all overlapping files. + for (uint32_t i = start_index; i < num_files; i++) { + FileMetaData* f = files[i]; + if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && + ucmp->Compare(user_key, f->largest.user_key()) <= 0) { + important_files.push_back(f); + } else if (level > 0) { + // If on Level-n (n>=1) then the files are sorted. + // So we can stop looking when we are past the ikey. + break; } } + if (important_files.empty()) continue; + if (level == 0) { + std::sort(important_files.begin(), important_files.end(), NewestFirst); + } else { + // Sanity check to make sure that the files are correctly sorted +#ifndef NDEBUG + num_files = important_files.size(); + for (uint32_t i = 1; i < num_files; ++i) { + FileMetaData* a = important_files[i-1]; + FileMetaData* b = important_files[i]; + int comp_sign = vset_->icmp_.Compare(a->largest, b->smallest); + assert(comp_sign < 0); + } +#endif + } + // Traverse each relevant file to find the desired key + num_files = important_files.size(); for (uint32_t i = 0; i < num_files; ++i) { - FileMetaData* f = files[i]; + FileMetaData* f = important_files[i]; bool tableIO = false; *status = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue, &tableIO, @@ -469,17 +498,17 @@ void Version::Get(const ReadOptions& options, if (kMerge == saver.state) { - // merge operand is in *value and we hit the beginning of the key history - // do a final merge of nullptr and operand; - std::string operand; - swap(operand, *value); - merge_operator->Merge(user_key, nullptr, operand, - value, logger.get()); - *status = Status::OK(); - return; + // merge_operands are in saver and we hit the beginning of the key history + // do a final merge of nullptr and operands; + if (merge_operator->Merge(user_key, nullptr, *saver.merge_operands, + value, logger.get())) { + *status = Status::OK(); + } else { + *status = Status::Corruption("could not perform end-of-key merge for ", + user_key); + } } else { *status = Status::NotFound(Slice()); // Use an empty error message for speed - return; } } @@ -717,6 +746,53 @@ void Version::ExtendOverlappingInputs( } } +// Returns true iff the first or last file in inputs contains +// an overlapping user key to the file "just outside" of it (i.e. +// just after the last file, or just before the first file) +// REQUIRES: "*inputs" is a sorted list of non-overlapping files +bool Version::HasOverlappingUserKey( + const std::vector* inputs, + int level) { + + // If inputs empty, there is no overlap. + // If level == 0, it is assumed that all needed files were already included. + if (inputs->empty() || level == 0){ + return false; + } + + const Comparator* user_cmp = vset_->icmp_.user_comparator(); + const std::vector& files = files_[level]; + const size_t kNumFiles = files.size(); + + // Check the last file in inputs against the file after it + size_t last_file = FindFile(vset_->icmp_, files, + inputs->back()->largest.Encode()); + assert(0 <= last_file && last_file < kNumFiles); // File should exist! + if (last_file < kNumFiles-1) { // If not the last file + const Slice last_key_in_input = files[last_file]->largest.user_key(); + const Slice first_key_after = files[last_file+1]->smallest.user_key(); + if (user_cmp->Compare(last_key_in_input, first_key_after) == 0) { + // The last user key in input overlaps with the next file's first key + return true; + } + } + + // Check the first file in inputs against the file just before it + size_t first_file = FindFile(vset_->icmp_, files, + inputs->front()->smallest.Encode()); + assert(0 <= first_file && first_file <= last_file); // File should exist! + if (first_file > 0) { // If not first file + const Slice& first_key_in_input = files[first_file]->smallest.user_key(); + const Slice& last_key_before = files[first_file-1]->largest.user_key(); + if (user_cmp->Compare(first_key_in_input, last_key_before) == 0) { + // The first user key in input overlaps with the previous file's last key + return true; + } + } + + return false; +} + std::string Version::DebugString(bool hex) const { std::string r; for (int level = 0; level < vset_->NumberLevels(); level++) { @@ -1566,7 +1642,7 @@ void VersionSet::Finalize(Version* v, // sort all the levels based on their score. Higher scores get listed // first. Use bubble sort because the number of entries are small. - for(int i = 0; i < NumberLevels()-2; i++) { + for (int i = 0; i < NumberLevels()-2; i++) { for (int j = i+1; j < NumberLevels()-1; j++) { if (v->compaction_score_[i] < v->compaction_score_[j]) { double score = v->compaction_score_[i]; @@ -2060,7 +2136,7 @@ Compaction* VersionSet::PickCompaction() { Compaction* c = nullptr; int level = -1; - // compute the compactions needed. It is better to do it here + // Compute the compactions needed. It is better to do it here // and also in LogAndApply(), otherwise the values could be stale. std::vector size_being_compacted(NumberLevels()-1); current_->vset_->SizeBeingCompacted(size_being_compacted); @@ -2076,6 +2152,7 @@ Compaction* VersionSet::PickCompaction() { level = current_->compaction_level_[i]; if ((current_->compaction_score_[i] >= 1)) { c = PickCompactionBySize(level, current_->compaction_score_[i]); + ExpandWhileOverlapping(c); if (c != nullptr) { break; } @@ -2092,13 +2169,14 @@ Compaction* VersionSet::PickCompaction() { // Only allow one level 0 compaction at a time. // Do not pick this file if its parents at level+1 are being compacted. if (level != 0 || compactions_in_progress_[0].empty()) { - if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, - &parent_index)) { + if (!ParentRangeInCompaction(&f->smallest, &f->largest, level, + &parent_index)) { c = new Compaction(level, MaxFileSizeForLevel(level+1), MaxGrandParentOverlapBytes(level), NumberLevels(), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; current_->file_to_compact_ = nullptr; + ExpandWhileOverlapping(c); } } } @@ -2110,7 +2188,6 @@ Compaction* VersionSet::PickCompaction() { c->input_version_ = current_; c->input_version_->Ref(); - // Files in level 0 may overlap each other, so pick up all overlapping ones // Two level 0 compaction won't run at the same time, so don't need to worry // about files on level 0 being compacted. if (level == 0) { @@ -2135,6 +2212,8 @@ Compaction* VersionSet::PickCompaction() { assert(!c->inputs_[0].empty()); } + + // Setup "level+1" files (inputs_[1]) SetupOtherInputs(c); // mark all the files that are being compacted @@ -2166,11 +2245,76 @@ bool VersionSet::FilesInCompaction(std::vector& files) { return false; } +// Add more files to the inputs on "level" to make sure that +// no newer version of a key is compacted to "level+1" while leaving an older +// version in a "level". Otherwise, any Get() will search "level" first, +// and will likely return an old/stale value for the key, since it always +// searches in increasing order of level to find the value. This could +// also scramble the order of merge operands. This function should be +// called any time a new Compaction is created, and its inputs_[0] are +// populated. +// +// Will set c to nullptr if it is impossible to apply this compaction. +void VersionSet::ExpandWhileOverlapping(Compaction* c) { + // If inputs are empty then there is nothing to expand. + if (!c || c->inputs_[0].empty()) { + return; + } + + // GetOverlappingInputs will always do the right thing for level-0. + // So we don't need to do any expansion if level == 0. + if (c->level() == 0) { + return; + } + + const int level = c->level(); + InternalKey smallest, largest; + + // Keep expanding c->inputs_[0] until we are sure that there is a + // "clean cut" boundary between the files in input and the surrounding files. + // This will ensure that no parts of a key are lost during compaction. + int hint_index = -1; + size_t old_size; + do { + old_size = c->inputs_[0].size(); + GetRange(c->inputs_[0], &smallest, &largest); + c->inputs_[0].clear(); + current_->GetOverlappingInputs(level, &smallest, &largest, &c->inputs_[0], + hint_index, &hint_index); + } while(c->inputs_[0].size() > old_size); + + // Get the new range + GetRange(c->inputs_[0], &smallest, &largest); + + // If, after the expansion, there are files that are already under + // compaction, then we must drop/cancel this compaction. + int parent_index = -1; + if (FilesInCompaction(c->inputs_[0]) || + ParentRangeInCompaction(&smallest, &largest, level, &parent_index)) { + c->inputs_[0].clear(); + c->inputs_[1].clear(); + delete c; + c = nullptr; + } +} + +// Populates the set of inputs from "level+1" that overlap with "level". +// Will also attempt to expand "level" if that doesn't expand "level+1" +// or cause "level" to include a file for compaction that has an overlapping +// user-key with another file. void VersionSet::SetupOtherInputs(Compaction* c) { + // If inputs are empty, then there is nothing to expand. + if (c->inputs_[0].empty()) { + return; + } + const int level = c->level(); InternalKey smallest, largest; + + // Get the range one last time. GetRange(c->inputs_[0], &smallest, &largest); + // Populate the set of next-level files (inputs_[1]) to include in compaction current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1], c->parent_index_, &c->parent_index_); @@ -2178,8 +2322,11 @@ void VersionSet::SetupOtherInputs(Compaction* c) { InternalKey all_start, all_limit; GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit); - // See if we can grow the number of inputs in "level" without - // changing the number of "level+1" files we pick up. + // See if we can further grow the number of inputs in "level" without + // changing the number of "level+1" files we pick up. We also choose NOT + // to expand if this would cause "level" to include some entries for some + // user key, while excluding other entries for the same user key. This + // can happen when one user key spans multiple files. if (!c->inputs_[1].empty()) { std::vector expanded0; current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, @@ -2190,7 +2337,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) { int64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && - !FilesInCompaction(expanded0)) { + !FilesInCompaction(expanded0) && + !current_->HasOverlappingUserKey(&expanded0, level)) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; @@ -2249,13 +2397,11 @@ Compaction* VersionSet::CompactRange( return nullptr; } - // Avoid compacting too much in one shot in case the range is large. // But we cannot do this for level-0 since level-0 files can overlap // and we must not pick one file and drop another older file if the // two files overlap. if (level > 0) { - const uint64_t limit = MaxFileSizeForLevel(level) * options_->source_compaction_factor; uint64_t total = 0; @@ -2268,12 +2414,18 @@ Compaction* VersionSet::CompactRange( } } } - Compaction* c = new Compaction(level, MaxFileSizeForLevel(level+1), MaxGrandParentOverlapBytes(level), NumberLevels()); + + c->inputs_[0] = inputs; + ExpandWhileOverlapping(c); + if (c == nullptr) { + Log(options_->info_log, "Could not compact due to expansion failure.\n"); + return nullptr; + } + c->input_version_ = current_; c->input_version_->Ref(); - c->inputs_[0] = inputs; SetupOtherInputs(c); // These files that are to be manaully compacted do not trample @@ -2418,8 +2570,9 @@ void Compaction::Summary(char* output, int len) { int write = snprintf(output, len, "Base version %ld Base level %d, seek compaction:%d, inputs:", input_version_->GetVersionNumber(), level_, seek_compaction_); - if(write < 0 || write > len) + if (write < 0 || write > len) { return; + } char level_low_summary[100]; InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary)); diff --git a/db/version_set.h b/db/version_set.h index 320ff8e68..a8403868b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -68,14 +68,16 @@ class Version { // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. + // Uses *operands to store merge_operator operations to apply later // REQUIRES: lock is not held struct GetStats { FileMetaData* seek_file; int seek_file_level; }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, - Status* status, GetStats* stats, const Options& db_option, - const bool no_io = false, bool* value_found = nullptr); + Status* status, std::deque* operands, GetStats* stats, + const Options& db_option, const bool no_io = false, + bool* value_found = nullptr); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. @@ -118,6 +120,14 @@ class Version { const Slice* smallest_user_key, const Slice* largest_user_key); + // Returns true iff the first or last file in inputs contains + // an overlapping user key to the file "just outside" of it (i.e. + // just after the last file, or just before the first file) + // REQUIRES: "*inputs" is a sorted list of non-overlapping files + bool HasOverlappingUserKey(const std::vector* inputs, + int level); + + // Return the level at which we should place a new memtable compaction // result that covers the range [smallest_user_key,largest_user_key]. int PickLevelForMemTableOutput(const Slice& smallest_user_key, @@ -405,6 +415,8 @@ class VersionSet { InternalKey* smallest, InternalKey* largest); + void ExpandWhileOverlapping(Compaction* c); + void SetupOtherInputs(Compaction* c); // Save current contents to *log diff --git a/include/leveldb/merge_operator.h b/include/leveldb/merge_operator.h index d948d3c6e..44c4db66b 100644 --- a/include/leveldb/merge_operator.h +++ b/include/leveldb/merge_operator.h @@ -6,26 +6,38 @@ #define STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ #include +#include +#include "leveldb/slice.h" // TODO: Remove this when migration is done; namespace leveldb { class Slice; class Logger; -// The Merge Operator interface. -// Client needs to provide an object implementing this interface if Merge -// operation is accessed. -// Essentially, MergeOperator specifies the SEMANTICS of a merge, which only +// The Merge Operator +// +// Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only // client knows. It could be numeric addition, list append, string -// concatenation, ... , anything. +// concatenation, edit data structure, ... , anything. // The library, on the other hand, is concerned with the exercise of this // interface, at the right time (during get, iteration, compaction...) -// Note that, even though in principle we don't require any special property -// of the merge operator, the current rocksdb compaction order does imply that -// an associative operator could be exercised more naturally (and more -// efficiently). // -// Refer to my_test.cc for an example of implementation +// To use merge, the client needs to provide an object implementing one of +// the following interfaces: +// a) AssociativeMergeOperator - for most simple semantics (always take +// two values, and merge them into one value, which is then put back +// into rocksdb); numeric addition and string concatenation are examples; +// +// b) MergeOperator - the generic class for all the more abstract / complex +// operations; one method to merge a Put/Delete value with a merge operand; +// and another method (PartialMerge) that merges two operands together. +// this is especially useful if your key values have a complex structure, +// but you would still like to support client-specific incremental updates. +// +// AssociativeMergeOperator is simpler to implement. MergeOperator is simply +// more powerful. +// +// Refer to rocksdb-merge wiki for more details and example implementations. // class MergeOperator { public: @@ -38,35 +50,95 @@ class MergeOperator { // refer to different types of data which have different // merge operation semantics // existing: (IN) null indicates that the key does not exist before this op - // value: (IN) The passed-in merge operand value (when Merge is issued) + // operand_list:(IN) the sequence of merge operations to apply, front() first. // new_value:(OUT) Client is responsible for filling the merge result here // logger: (IN) Client could use this to log errors during merge. // - // Note: Merge does not return anything to indicate if a merge is successful - // or not. - // Rationale: If a merge failed due to, say de-serialization error, we still - // need to define a consistent merge result. Should we throw away - // the existing value? the merge operand? Or reset the merged value - // to sth? The rocksdb library is not in a position to make the - // right choice. On the other hand, client knows exactly what - // happened during Merge, thus is able to make the best decision. - // Just save the final decision in new_value. logger is passed in, - // in case client wants to leave a trace of what went wrong. - virtual void Merge(const Slice& key, + // Return true on success. + // All values passed in will be client-specific values. So if this method + // returns false, it is because client specified bad data or there was + // internal corruption. This will be treated as an error by the library. + // + // Also make use of the *logger for error messages. + virtual bool Merge(const Slice& key, const Slice* existing_value, - const Slice& value, + const std::deque& operand_list, std::string* new_value, Logger* logger) const = 0; + // This function performs merge(left_op, right_op) + // when both the operands are themselves merge operation types + // that you would have passed to a DB::Merge() call in the same order + // (i.e.: DB::Merge(key,left_op), followed by DB::Merge(key,right_op)). + // + // PartialMerge should combine them into a single merge operation that is + // saved into *new_value, and then it should return true. + // *new_value should be constructed such that a call to + // DB::Merge(key, *new_value) would yield the same result as a call + // to DB::Merge(key, left_op) followed by DB::Merge(key, right_op). + // + // If it is impossible or infeasible to combine the two operations, + // leave new_value unchanged and return false. The library will + // internally keep track of the operations, and apply them in the + // correct order once a base-value (a Put/Delete/End-of-Database) is seen. + // + // TODO: Presently there is no way to differentiate between error/corruption + // and simply "return false". For now, the client should simply return + // false in any case it cannot perform partial-merge, regardless of reason. + // If there is corruption in the data, handle it in the above Merge() function, + // and return false there. + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const = 0; - // The name of the MergeOperator. Used to check for MergeOperator + // The name of the MergeOperator. Used to check for MergeOperator // mismatches (i.e., a DB created with one MergeOperator is // accessed using a different MergeOperator) // TODO: the name is currently not stored persistently and thus // no checking is enforced. Client is responsible for providing // consistent MergeOperator between DB opens. virtual const char* Name() const = 0; +}; + +// The simpler, associative merge operator. +class AssociativeMergeOperator : public MergeOperator { + public: + virtual ~AssociativeMergeOperator() {} + + // Gives the client a way to express the read -> modify -> write semantics + // key: (IN) The key that's associated with this merge operation. + // existing_value:(IN) null indicates the key does not exist before this op + // value: (IN) the value to update/merge the existing_value with + // new_value: (OUT) Client is responsible for filling the merge result here + // logger: (IN) Client could use this to log errors during merge. + // + // Return true on success. + // All values passed in will be client-specific values. So if this method + // returns false, it is because client specified bad data or there was + // internal corruption. The client should assume that this will be treated + // as an error by the library. + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const = 0; + + + private: + // Default implementations of the MergeOperator functions + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const override; + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override; }; } // namespace leveldb diff --git a/table/table.cc b/table/table.cc index 80c5ef491..be0f43c90 100644 --- a/table/table.cc +++ b/table/table.cc @@ -356,6 +356,8 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, (*mark_key_may_exist)(arg); break; } + + // Call the *saver function on each entry/block until it returns false for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) { if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) { done = true; diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc index 9a610c4ce..123dce66a 100644 --- a/utilities/merge_operators/put.cc +++ b/utilities/merge_operators/put.cc @@ -8,15 +8,34 @@ using namespace leveldb; namespace { // anonymous namespace // A merge operator that mimics Put semantics +// Since this merge-operator will not be used in production, +// it is implemented as a non-associative merge operator to illustrate the +// new interface and for testing purposes. (That is, we inherit from +// the MergeOperator class rather than the AssociativeMergeOperator +// which would be simpler in this case). +// +// From the client-perspective, semantics are the same. class PutOperator : public MergeOperator { public: - virtual void Merge(const Slice& key, + virtual bool Merge(const Slice& key, const Slice* existing_value, - const Slice& value, + const std::deque& operand_sequence, std::string* new_value, Logger* logger) const override { - // put basically only looks at the current value - new_value->assign(value.data(), value.size()); + // Put basically only looks at the current/latest value + assert(!operand_sequence.empty()); + assert(new_value != nullptr); + new_value->assign(operand_sequence.back()); + return true; + } + + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override { + new_value->assign(right_operand.data(), right_operand.size()); + return true; } virtual const char* Name() const override { diff --git a/utilities/merge_operators/string_append/stringappend.cc b/utilities/merge_operators/string_append/stringappend.cc index 06056c429..9bded8a98 100644 --- a/utilities/merge_operators/string_append/stringappend.cc +++ b/utilities/merge_operators/string_append/stringappend.cc @@ -15,14 +15,13 @@ namespace leveldb { - // Constructor: also specify the delimiter character. StringAppendOperator::StringAppendOperator(char delim_char) : delim_(delim_char) { } // Implementation for the merge operation (concatenates two strings) -void StringAppendOperator::Merge(const Slice& key, +bool StringAppendOperator::Merge(const Slice& key, const Slice* existing_value, const Slice& value, std::string* new_value, @@ -43,6 +42,8 @@ void StringAppendOperator::Merge(const Slice& key, new_value->append(1,delim_); new_value->append(value.data(), value.size()); } + + return true; } const char* StringAppendOperator::Name() const { diff --git a/utilities/merge_operators/string_append/stringappend.h b/utilities/merge_operators/string_append/stringappend.h index 1cf11d008..34cca6944 100644 --- a/utilities/merge_operators/string_append/stringappend.h +++ b/utilities/merge_operators/string_append/stringappend.h @@ -9,12 +9,12 @@ namespace leveldb { -class StringAppendOperator : public MergeOperator { +class StringAppendOperator : public AssociativeMergeOperator { public: StringAppendOperator(char delim_char); /// Constructor: specify delimiter - virtual void Merge(const Slice& key, + virtual bool Merge(const Slice& key, const Slice* existing_value, const Slice& value, std::string* new_value, @@ -23,7 +23,7 @@ class StringAppendOperator : public MergeOperator { virtual const char* Name() const override; private: - char delim_; // The delimiter is inserted between elements + char delim_; // The delimiter is inserted between elements }; diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc new file mode 100644 index 000000000..c0930d653 --- /dev/null +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -0,0 +1,90 @@ +/** + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "stringappend2.h" + +#include +#include + +#include "leveldb/slice.h" +#include "leveldb/merge_operator.h" +#include "utilities/merge_operators.h" + +namespace leveldb { + +// Constructor: also specify the delimiter character. +StringAppendTESTOperator::StringAppendTESTOperator(char delim_char) + : delim_(delim_char) { +} + +// Implementation for the merge operation (concatenates two strings) +bool StringAppendTESTOperator::Merge(const Slice& key, + const Slice* existing_value, + const std::deque& operands, + std::string* new_value, + Logger* logger) const { + + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + // Compute the space needed for the final result. + int numBytes = 0; + for(auto it = operands.begin(); it != operands.end(); ++it) { + numBytes += it->size() + 1; // Plus 1 for the delimiter + } + + // Only print the delimiter after the first entry has been printed + bool printDelim = false; + + // Prepend the *existing_value if one exists. + if (existing_value) { + new_value->reserve(numBytes + existing_value->size()); + new_value->append(existing_value->data(), existing_value->size()); + printDelim = true; + } else if (numBytes) { + new_value->reserve(numBytes-1); // Minus 1 since we have one less delimiter + } + + // Concatenate the sequence of strings (and add a delimiter between each) + for(auto it = operands.begin(); it != operands.end(); ++it) { + if (printDelim) { + new_value->append(1,delim_); + } + new_value->append(*it); + printDelim = true; + } + + return true; +} + +bool StringAppendTESTOperator::PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const { + return false; + +// // Clear the *new_value for writing. +// assert(new_value); +// new_value->clear(); +// +// // Generic append +// // Reserve correct size for *new_value, and apply concatenation. +// new_value->reserve(left_operand.size() + 1 + right_operand.size()); +// new_value->assign(left_operand.data(), left_operand.size()); +// new_value->append(1,delim_); +// new_value->append(right_operand.data(), right_operand.size()); +// +// return true; + +} + +const char* StringAppendTESTOperator::Name() const { + return "StringAppendTESTOperator"; +} + +} // namespace leveldb + diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h new file mode 100644 index 000000000..71135c826 --- /dev/null +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -0,0 +1,42 @@ +/** + * A TEST MergeOperator for rocksdb/leveldb that implements string append. + * It is built using the MergeOperator interface rather than the simpler + * AssociativeMergeOperator interface. This is useful for testing/benchmarking. + * While the two operators are semantically the same, all production code + * should use the StringAppendOperator defined in stringappend.{h,cc}. The + * operator defined in the present file is primarily for testing. + * + * @author Deon Nicholas (dnicholas@fb.com) + * Copyright 2013 Facebook + */ + +#include "leveldb/merge_operator.h" +#include "leveldb/slice.h" + +namespace leveldb { + +class StringAppendTESTOperator : public MergeOperator { + public: + + StringAppendTESTOperator(char delim_char); /// Constructor with delimiter + + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const std::deque& operand_sequence, + std::string* new_value, + Logger* logger) const override; + + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override; + + private: + char delim_; // The delimiter is inserted between elements + +}; + +} // namespace leveldb diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/string_append/stringappend_test.cc index 7fff3e5d9..b852d30d9 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/string_append/stringappend_test.cc @@ -20,7 +20,8 @@ using namespace leveldb; namespace leveldb { -const std::string kDbName = "/tmp/mergetestdb"; // Path to the database on file system +// Path to the database on file system +const std::string kDbName = "/tmp/mergetestdb"; // OpenDb opens a (possibly new) rocksdb database with a StringAppendOperator std::shared_ptr OpenDb(StringAppendOperator* append_op) { @@ -251,7 +252,7 @@ TEST(StringAppendOperatorTest,BIGRandomMixGetAppend) { // Generate a bunch of random queries (Append and Get)! enum query_t { APPEND_OP, GET_OP, NUM_OPS }; - Random randomGen(9138204); //deterministic seed; always get same results! + Random randomGen(9138204); // deterministic seed const int kNumQueries = 1000; for (int q=0; qsize() == sizeof(uint64_t)) { - existing = DecodeFixed64(existing_value->data()); - } else { - // if existing_value is corrupted, treat it as 0 - Log(logger, "existing value corruption, size: %zu > %zu", - existing_value->size(), sizeof(uint64_t)); - existing = 0; - } - } - - uint64_t operand; - if (value.size() == sizeof(uint64_t)) { - operand = DecodeFixed64(value.data()); - } else { - // if operand is corrupted, treat it as 0 - Log(logger, "operand value corruption, size: %zu > %zu", - value.size(), sizeof(uint64_t)); - operand = 0; + uint64_t orig_value = 0; + if (existing_value){ + orig_value = DecodeInteger(*existing_value, logger); } + uint64_t operand = DecodeInteger(value, logger); - new_value->resize(sizeof(uint64_t)); - EncodeFixed64(&(*new_value)[0], existing + operand); + assert(new_value); + new_value->clear(); + PutFixed64(new_value, orig_value + operand); - return; + return true; // Return true always since corruption will be treated as 0 } virtual const char* Name() const override { return "UInt64AddOperator"; } + + private: + // Takes the string and decodes it into a uint64_t + // On error, prints a message and returns 0 + uint64_t DecodeInteger(const Slice& value, Logger* logger) const { + uint64_t result = 0; + + if (value.size() == sizeof(uint64_t)) { + result = DecodeFixed64(value.data()); + } else if (logger != nullptr) { + // If value is corrupted, treat it as 0 + Log(logger, "uint64 value corruption, size: %zu > %zu", + value.size(), sizeof(uint64_t)); + } + + return result; + } + }; } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 48aee4a10..79f75bb6c 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -225,39 +225,86 @@ class TtlMergeOperator : public MergeOperator { assert(merge_op); } - virtual void Merge(const Slice& key, + virtual bool Merge(const Slice& key, const Slice* existing_value, - const Slice& value, + const std::deque& operands, std::string* new_value, Logger* logger) const override { const uint32_t ts_len = DBWithTTL::kTSLength; - if ((existing_value && existing_value->size() < ts_len) || - value.size() < ts_len) { - Log(logger, "Error: Could not remove timestamp correctly from value."); - assert(false); - //TODO: Remove assert and make this function return false. - //TODO: Change Merge semantics and add a counter here + if (existing_value && existing_value->size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from existing value."); + return false; + // TODO: Change Merge semantics and add a counter here + } + + // Extract time-stamp from each operand to be passed to user_merge_op_ + std::deque operands_without_ts; + for (auto it = operands.begin(); it != operands.end(); ++it) { + if (it->size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from operand value."); + return false; + } + operands_without_ts.push_back(it->substr(0, it->size() - ts_len)); } - Slice value_without_ts(value.data(), value.size() - ts_len); + + // Apply the user merge operator (store result in *new_value) if (existing_value) { Slice existing_value_without_ts(existing_value->data(), existing_value->size() - ts_len); - user_merge_op_->Merge(key, &existing_value_without_ts, value_without_ts, - new_value, logger); + user_merge_op_->Merge(key, &existing_value_without_ts, + operands_without_ts, new_value, logger); + } else { + user_merge_op_->Merge(key, nullptr, operands_without_ts, new_value, + logger); + } + + // Augment the *new_value with the ttl time-stamp + int32_t curtime; + if (!DBWithTTL::GetCurrentTime(curtime).ok()) { + Log(logger, "Error: Could not get current time to be attached internally " + "to the new value."); + return false; } else { - user_merge_op_->Merge(key, nullptr, value_without_ts, new_value, logger); + char ts_string[ts_len]; + EncodeFixed32(ts_string, curtime); + new_value->append(ts_string, ts_len); + return true; + } + } + + virtual bool PartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const override { + const uint32_t& ts_len = DBWithTTL::kTSLength; + + if (left_operand.size() < ts_len || right_operand.size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from value."); + return false; + //TODO: Change Merge semantics and add a counter here } + + // Apply the user partial-merge operator (store result in *new_value) + assert(new_value); + Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len); + Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len); + user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts, + new_value, logger); + + // Augment the *new_value with the ttl time-stamp int32_t curtime; if (!DBWithTTL::GetCurrentTime(curtime).ok()) { Log(logger, "Error: Could not get current time to be attached internally " "to the new value."); - assert(false); - //TODO: Remove assert and make this function return false. + return false; } else { char ts_string[ts_len]; EncodeFixed32(ts_string, curtime); new_value->append(ts_string, ts_len); + return true; } + } virtual const char* Name() const override {