Make iterator invalid on Merge error

Summary:
Since #1665, on merge error, iterator will be set to corrupted status, but it doesn't invalidate the iterator. Fixing it.
Closes https://github.com/facebook/rocksdb/pull/3226

Differential Revision: D6499094

Pulled By: yiwu-arbug

fbshipit-source-id: 80222930f949e31f90a6feaa37ddc3529b510d2c
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 36911f55dd
commit 20995c5729
  1. 1
      HISTORY.md
  2. 20
      db/db_iter.cc
  3. 25
      db/db_merge_operator_test.cc

@ -4,6 +4,7 @@
### New Features ### New Features
### Bug Fixes ### Bug Fixes
* Fix IOError on WAL write doesn't propagate to write group follower * Fix IOError on WAL write doesn't propagate to write group follower
* Make iterator invalid on merge error.
## 5.9.0 (11/1/2017) ## 5.9.0 (11/1/2017)
### Public API Change ### Public API Change

@ -640,6 +640,7 @@ void DBIter::MergeValuesNewToOld() {
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_, &pinned_value_, true); &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
if (!s.ok()) { if (!s.ok()) {
valid_ = false;
status_ = s; status_ = s;
} }
// iter_ is positioned after put // iter_ is positioned after put
@ -677,6 +678,7 @@ void DBIter::MergeValuesNewToOld() {
&saved_value_, logger_, statistics_, env_, &saved_value_, logger_, statistics_, env_,
&pinned_value_, true); &pinned_value_, true);
if (!s.ok()) { if (!s.ok()) {
valid_ = false;
status_ = s; status_ = s;
} }
} }
@ -946,8 +948,10 @@ bool DBIter::FindValueForCurrentKey() {
assert(false); assert(false);
break; break;
} }
valid_ = true; if (s.ok()) {
if (!s.ok()) { valid_ = true;
} else {
valid_ = false;
status_ = s; status_ = s;
} }
return true; return true;
@ -1023,8 +1027,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
iter_->Seek(last_key); iter_->Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
} }
valid_ = true; if (s.ok()) {
if (!s.ok()) { valid_ = true;
} else {
valid_ = false;
status_ = s; status_ = s;
} }
return true; return true;
@ -1035,8 +1041,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
&val, merge_context_.GetOperands(), &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_, &saved_value_, logger_, statistics_, env_,
&pinned_value_, true); &pinned_value_, true);
valid_ = true; if (s.ok()) {
if (!s.ok()) { valid_ = true;
} else {
valid_ = false;
status_ = s; status_ = s;
} }
return true; return true;

@ -133,16 +133,33 @@ TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
ASSERT_OK(Merge("k1", "v1")); ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Merge("k1", "corrupted")); ASSERT_OK(Merge("k1", "corrupted"));
ASSERT_OK(Put("k2", "v2")); ASSERT_OK(Put("k2", "v2"));
VerifyDBFromMap({{"k1", ""}, {"k2", "v2"}}, nullptr, false, auto* iter = db_->NewIterator(ReadOptions());
{{"k1", Status::Corruption()}}); iter->Seek("k1");
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
iter = db_->NewIterator(ReadOptions());
iter->Seek("k2");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
iter->Prev();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}}); VerifyDBInternal({{"k1", "corrupted"}, {"k1", "v1"}, {"k2", "v2"}});
DestroyAndReopen(options); DestroyAndReopen(options);
ASSERT_OK(Merge("k1", "v1")); ASSERT_OK(Merge("k1", "v1"));
ASSERT_OK(Put("k2", "v2")); ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Merge("k2", "corrupted")); ASSERT_OK(Merge("k2", "corrupted"));
VerifyDBFromMap({{"k1", "v1"}, {"k2", ""}}, nullptr, false, iter = db_->NewIterator(ReadOptions());
{{"k2", Status::Corruption()}}); iter->Seek("k1");
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}}); VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
} }

Loading…
Cancel
Save