diff --git a/db/builder.cc b/db/builder.cc index c84e92ec0..51c5ff093 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -113,6 +113,10 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, is_first_key = false; if (this_ikey.type == kTypeMerge) { + if (!merge.HasOperator()) { + return Status::InvalidArgument( + "merge_operator is not properly initialized."); + } // Handle merge-type keys using the MergeHelper // TODO: pass statistics to MergeUntil merge.MergeUntil(iter, 0 /* don't worry about snapshot */); diff --git a/db/db_impl.cc b/db/db_impl.cc index f57e8c687..986fda3e3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1139,7 +1139,6 @@ Status DBImpl::Recover( if (!env_->FileExists(CurrentFileName(dbname_))) { if (options_.create_if_missing) { - // TODO: add merge_operator name check s = NewDB(); is_new_db = true; if (!s.ok()) { @@ -2699,6 +2698,12 @@ Status DBImpl::ProcessKeyValueCompaction( drop = true; RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE); } else if (ikey.type == kTypeMerge) { + if (!merge.HasOperator()) { + LogToBuffer(log_buffer, "Options::merge_operator is null."); + status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + break; + } // We know the merge type entry is not hidden, otherwise we would // have hit (A) // We encapsulate the merge related state machine in a different diff --git a/db/memtable.cc b/db/memtable.cc index 6023edde9..4dabfb782 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -390,6 +390,16 @@ static bool SaveValue(void* arg, const char* entry) { return false; } case kTypeMerge: { + if (!merge_operator) { + *(s->status) = Status::InvalidArgument( + "merge_operator is not properly initialized."); + // Normally we continue the loop (return true) when we see a merge + // operator. But in case of an error, we should stop the loop + // immediately and pretend we have found the value to stop further + // seek. Otherwise, the later call will override this error status. + *(s->found_final_value) = true; + return false; + } std::string merge_result; // temporary area for merge results later Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->merge_in_progress) = true; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 0e36f6ae0..7bde824ab 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -24,10 +24,12 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, bool at_bottom, Statistics* stats, int* steps) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. + assert(HasOperator()); keys_.clear(); operands_.clear(); keys_.push_front(iter->key().ToString()); operands_.push_front(iter->value().ToString()); + assert(user_merge_operator_); success_ = false; // Will become true if we hit Put/Delete or bottom diff --git a/db/merge_helper.h b/db/merge_helper.h index fef153eb0..7188d02a4 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -78,13 +78,16 @@ class MergeHelper { // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. // So keys().back() was the first key seen by iterator. // TODO: Re-style this comment to be like the first one - bool IsSuccess() { return success_; } - Slice key() { assert(success_); return Slice(keys_.back()); } - Slice value() { assert(success_); return Slice(operands_.back()); } - const std::deque& keys() { assert(!success_); return keys_; } - const std::deque& values() { + bool IsSuccess() const { return success_; } + Slice key() const { assert(success_); return Slice(keys_.back()); } + Slice value() const { assert(success_); return Slice(operands_.back()); } + const std::deque& keys() const { + assert(!success_); return keys_; + } + const std::deque& values() const { assert(!success_); return operands_; } + bool HasOperator() const { return user_merge_operator_ != nullptr; } private: const Comparator* user_comparator_; diff --git a/db/merge_test.cc b/db/merge_test.cc index bd8da5a87..a4cca5c51 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -460,6 +460,39 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { } } } + + { + cout << "Test merge-operator not set after reopen\n"; + { + auto db = OpenDb(dbname); + MergeBasedCounters counters(db, 0); + counters.add("test-key", 1); + counters.add("test-key", 1); + counters.add("test-key", 1); + db->CompactRange(nullptr, nullptr); + } + + DB* reopen_db; + ASSERT_OK(DB::Open(Options(), dbname, &reopen_db)); + std::string value; + ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok())); + delete reopen_db; + DestroyDB(dbname, Options()); + } + + { + cout << "Test merge-operator not set after reopen (recovery case)\n"; + { + auto db = OpenDb(dbname); + MergeBasedCounters counters(db, 0); + counters.add("test-key", 1); + counters.add("test-key", 1); + counters.add("test-key", 1); + } + + DB* reopen_db; + ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument()); + } } } // namespace diff --git a/db/version_set.cc b/db/version_set.cc index a8df5f860..ff44aeed4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -845,6 +845,11 @@ void Version::Get(const ReadOptions& options, } if (kMerge == saver.state) { + if (!merge_operator_) { + *status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + return; + } // merge_operands are in saver and we hit the beginning of the key history // do a final merge of nullptr and operands; if (merge_operator_->FullMerge(user_key, nullptr,