Fixed the crash when merge_operator is not properly set after reopen.

Summary:
Fixed the crash when merge_operator is not properly set after reopen
and added two test cases for this.

Test Plan:
make merge_test
./merge_test

Reviewers: igor, ljin, sdong

Reviewed By: sdong

Subscribers: benj, mvikjord, leveldb

Differential Revision: https://reviews.facebook.net/D20793
main
Yueh-Hsuan Chiang 11 years ago
parent 8f09d53fd1
commit 49ee5a4ac4
  1. 4
      db/builder.cc
  2. 7
      db/db_impl.cc
  3. 10
      db/memtable.cc
  4. 2
      db/merge_helper.cc
  5. 13
      db/merge_helper.h
  6. 33
      db/merge_test.cc
  7. 5
      db/version_set.cc

@ -113,6 +113,10 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
is_first_key = false; is_first_key = false;
if (this_ikey.type == kTypeMerge) { if (this_ikey.type == kTypeMerge) {
if (!merge.HasOperator()) {
return Status::InvalidArgument(
"merge_operator is not properly initialized.");
}
// Handle merge-type keys using the MergeHelper // Handle merge-type keys using the MergeHelper
// TODO: pass statistics to MergeUntil // TODO: pass statistics to MergeUntil
merge.MergeUntil(iter, 0 /* don't worry about snapshot */); merge.MergeUntil(iter, 0 /* don't worry about snapshot */);

@ -1139,7 +1139,6 @@ Status DBImpl::Recover(
if (!env_->FileExists(CurrentFileName(dbname_))) { if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) { if (options_.create_if_missing) {
// TODO: add merge_operator name check
s = NewDB(); s = NewDB();
is_new_db = true; is_new_db = true;
if (!s.ok()) { if (!s.ok()) {
@ -2699,6 +2698,12 @@ Status DBImpl::ProcessKeyValueCompaction(
drop = true; drop = true;
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE); RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE);
} else if (ikey.type == kTypeMerge) { } else if (ikey.type == kTypeMerge) {
if (!merge.HasOperator()) {
LogToBuffer(log_buffer, "Options::merge_operator is null.");
status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
break;
}
// We know the merge type entry is not hidden, otherwise we would // We know the merge type entry is not hidden, otherwise we would
// have hit (A) // have hit (A)
// We encapsulate the merge related state machine in a different // We encapsulate the merge related state machine in a different

@ -390,6 +390,16 @@ static bool SaveValue(void* arg, const char* entry) {
return false; return false;
} }
case kTypeMerge: { case kTypeMerge: {
if (!merge_operator) {
*(s->status) = Status::InvalidArgument(
"merge_operator is not properly initialized.");
// Normally we continue the loop (return true) when we see a merge
// operator. But in case of an error, we should stop the loop
// immediately and pretend we have found the value to stop further
// seek. Otherwise, the later call will override this error status.
*(s->found_final_value) = true;
return false;
}
std::string merge_result; // temporary area for merge results later std::string merge_result; // temporary area for merge results later
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->merge_in_progress) = true; *(s->merge_in_progress) = true;

@ -24,10 +24,12 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool at_bottom, Statistics* stats, int* steps) { bool at_bottom, Statistics* stats, int* steps) {
// Get a copy of the internal key, before it's invalidated by iter->Next() // Get a copy of the internal key, before it's invalidated by iter->Next()
// Also maintain the list of merge operands seen. // Also maintain the list of merge operands seen.
assert(HasOperator());
keys_.clear(); keys_.clear();
operands_.clear(); operands_.clear();
keys_.push_front(iter->key().ToString()); keys_.push_front(iter->key().ToString());
operands_.push_front(iter->value().ToString()); operands_.push_front(iter->value().ToString());
assert(user_merge_operator_);
success_ = false; // Will become true if we hit Put/Delete or bottom success_ = false; // Will become true if we hit Put/Delete or bottom

@ -78,13 +78,16 @@ class MergeHelper {
// IMPORTANT 2: The entries were traversed in order from BACK to FRONT. // IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
// So keys().back() was the first key seen by iterator. // So keys().back() was the first key seen by iterator.
// TODO: Re-style this comment to be like the first one // TODO: Re-style this comment to be like the first one
bool IsSuccess() { return success_; } bool IsSuccess() const { return success_; }
Slice key() { assert(success_); return Slice(keys_.back()); } Slice key() const { assert(success_); return Slice(keys_.back()); }
Slice value() { assert(success_); return Slice(operands_.back()); } Slice value() const { assert(success_); return Slice(operands_.back()); }
const std::deque<std::string>& keys() { assert(!success_); return keys_; } const std::deque<std::string>& keys() const {
const std::deque<std::string>& values() { assert(!success_); return keys_;
}
const std::deque<std::string>& values() const {
assert(!success_); return operands_; assert(!success_); return operands_;
} }
bool HasOperator() const { return user_merge_operator_ != nullptr; }
private: private:
const Comparator* user_comparator_; const Comparator* user_comparator_;

@ -460,6 +460,39 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
} }
} }
} }
{
cout << "Test merge-operator not set after reopen\n";
{
auto db = OpenDb(dbname);
MergeBasedCounters counters(db, 0);
counters.add("test-key", 1);
counters.add("test-key", 1);
counters.add("test-key", 1);
db->CompactRange(nullptr, nullptr);
}
DB* reopen_db;
ASSERT_OK(DB::Open(Options(), dbname, &reopen_db));
std::string value;
ASSERT_TRUE(!(reopen_db->Get(ReadOptions(), "test-key", &value).ok()));
delete reopen_db;
DestroyDB(dbname, Options());
}
{
cout << "Test merge-operator not set after reopen (recovery case)\n";
{
auto db = OpenDb(dbname);
MergeBasedCounters counters(db, 0);
counters.add("test-key", 1);
counters.add("test-key", 1);
counters.add("test-key", 1);
}
DB* reopen_db;
ASSERT_TRUE(DB::Open(Options(), dbname, &reopen_db).IsInvalidArgument());
}
} }
} // namespace } // namespace

@ -845,6 +845,11 @@ void Version::Get(const ReadOptions& options,
} }
if (kMerge == saver.state) { if (kMerge == saver.state) {
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
return;
}
// merge_operands are in saver and we hit the beginning of the key history // merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands; // do a final merge of nullptr and operands;
if (merge_operator_->FullMerge(user_key, nullptr, if (merge_operator_->FullMerge(user_key, nullptr,

Loading…
Cancel
Save