From 821cff114e57efa67711c1c1c105aa02831a0d23 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 10 Jun 2015 12:57:38 -0700 Subject: [PATCH] Re-generate WriteEntry on WBWIIterator::Entry() Summary: [This is the resubmit of D39813. Tests were failing, so I reverted the diff. I found the bug and I'm now resubmitting] If we don't do this, any calls to Entry() after WBWI mutation will result in undefined behavior. We need to re-fetch the offset from the skip list and regenerate the new pointer (because string's base pointer can change while mutating). Test Plan: COMPILE_WITH_ASAN=1 make write_batch_with_index_test && ./write_batch_with_index_test Reviewers: sdong Reviewed By: sdong Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D39897 --- HISTORY.md | 1 + .../utilities/write_batch_with_index.h | 4 +- .../write_batch_with_index.cc | 111 +++++++----------- .../write_batch_with_index_test.cc | 19 ++- 4 files changed, 56 insertions(+), 79 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 290b613d9..559f942c1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ * DB::CompactRange()'s parameter reduce_level is changed to change_level, to allow users to move levels to lower levels if allowed. It can be used to migrate a DB from options.level_compaction_dynamic_level_bytes=false to options.level_compaction_dynamic_level_bytes.true. * Change default value for options.compaction_filter_factory and options.compaction_filter_factory_v2 to nullptr instead of DefaultCompactionFilterFactory and DefaultCompactionFilterFactoryV2. * If CancelAllBackgroundWork is called without doing a flush after doing loads with WAL disabled, the changes which haven't been flushed before the call to CancelAllBackgroundWork will be lost. +* WBWIIterator::Entry() now returns WriteEntry instead of `const WriteEntry&` ## 3.11.0 (5/19/2015) ### New Features diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 7c17534aa..1da4421f3 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -55,7 +55,9 @@ class WBWIIterator { virtual void Prev() = 0; - virtual const WriteEntry& Entry() const = 0; + // the return WriteEntry is only valid until the next mutation of + // WriteBatchWithIndex + virtual WriteEntry Entry() const = 0; virtual Status status() const = 0; }; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 0c3e02f3e..5c8292e45 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -89,7 +89,8 @@ class BaseDeltaIterator : public Iterator { AdvanceBase(); } if (DeltaValid() && BaseValid()) { - if (Compare() == 0) { + if (comparator_->Compare(delta_iterator_->Entry().key, + base_iterator_->key()) == 0) { equal_keys_ = true; } } @@ -123,7 +124,8 @@ class BaseDeltaIterator : public Iterator { AdvanceBase(); } if (DeltaValid() && BaseValid()) { - if (Compare() == 0) { + if (comparator_->Compare(delta_iterator_->Entry().key, + base_iterator_->key()) == 0) { equal_keys_ = true; } } @@ -153,23 +155,6 @@ class BaseDeltaIterator : public Iterator { } private: - // -1 -- delta less advanced than base - // 0 -- delta == base - // 1 -- delta more advanced than base - int Compare() const { - assert(delta_iterator_->Valid() && base_iterator_->Valid()); - int cmp = comparator_->Compare(delta_iterator_->Entry().key, - base_iterator_->key()); - if (forward_) { - return cmp; - } else { - return -cmp; - } - } - bool IsDeltaDelete() { - assert(DeltaValid()); - return delta_iterator_->Entry().type == kDeleteRecord; - } void AssertInvariants() { #ifndef NDEBUG if (!Valid()) { @@ -239,6 +224,10 @@ class BaseDeltaIterator : public Iterator { bool DeltaValid() const { return delta_iterator_->Valid(); } void UpdateCurrent() { while (true) { + WriteEntry delta_entry; + if (DeltaValid()) { + delta_entry = delta_iterator_->Entry(); + } equal_keys_ = false; if (!BaseValid()) { // Base has finished. @@ -246,7 +235,7 @@ class BaseDeltaIterator : public Iterator { // Finished return; } - if (IsDeltaDelete()) { + if (delta_entry.type == kDeleteRecord) { AdvanceDelta(); } else { current_at_base_ = false; @@ -257,12 +246,14 @@ class BaseDeltaIterator : public Iterator { current_at_base_ = true; return; } else { - int compare = Compare(); + int compare = + (forward_ ? 1 : -1) * + comparator_->Compare(delta_entry.key, base_iterator_->key()); if (compare <= 0) { // delta bigger or equal if (compare == 0) { equal_keys_ = true; } - if (!IsDeltaDelete()) { + if (delta_entry.type != kDeleteRecord) { current_at_base_ = false; return; } @@ -300,23 +291,26 @@ class WBWIIteratorImpl : public WBWIIterator { const ReadableWriteBatch* write_batch) : column_family_id_(column_family_id), skip_list_iter_(skip_list), - write_batch_(write_batch), - valid_(false) {} + write_batch_(write_batch) {} virtual ~WBWIIteratorImpl() {} - virtual bool Valid() const override { return valid_; } + virtual bool Valid() const override { + if (!skip_list_iter_.Valid()) { + return false; + } + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + return (iter_entry != nullptr && + iter_entry->column_family == column_family_id_); + } virtual void SeekToFirst() override { - valid_ = true; WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, column_family_id_); skip_list_iter_.Seek(&search_entry); - ReadEntry(); } virtual void SeekToLast() override { - valid_ = true; WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, column_family_id_ + 1); skip_list_iter_.Seek(&search_entry); @@ -325,29 +319,37 @@ class WBWIIteratorImpl : public WBWIIterator { } else { skip_list_iter_.Prev(); } - ReadEntry(); } virtual void Seek(const Slice& key) override { - valid_ = true; WriteBatchIndexEntry search_entry(&key, column_family_id_); skip_list_iter_.Seek(&search_entry); - ReadEntry(); } - virtual void Next() override { - skip_list_iter_.Next(); - ReadEntry(); - } + virtual void Next() override { skip_list_iter_.Next(); } - virtual void Prev() override { - skip_list_iter_.Prev(); - ReadEntry(); - } + virtual void Prev() override { skip_list_iter_.Prev(); } - virtual const WriteEntry& Entry() const override { return current_; } + virtual WriteEntry Entry() const override { + WriteEntry ret; + Slice blob; + const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); + // this is guaranteed with Valid() + assert(iter_entry != nullptr && + iter_entry->column_family == column_family_id_); + auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type, + &ret.key, &ret.value, &blob); + assert(s.ok()); + assert(ret.type == kPutRecord || ret.type == kDeleteRecord || + ret.type == kMergeRecord); + return ret; + } - virtual Status status() const override { return status_; } + virtual Status status() const override { + // this is in-memory data structure, so the only way status can be non-ok is + // through memory corruption + return Status::OK(); + } const WriteBatchIndexEntry* GetRawEntry() const { return skip_list_iter_.key(); @@ -357,33 +359,6 @@ class WBWIIteratorImpl : public WBWIIterator { uint32_t column_family_id_; WriteBatchEntrySkipList::Iterator skip_list_iter_; const ReadableWriteBatch* write_batch_; - Status status_; - bool valid_; - WriteEntry current_; - - void ReadEntry() { - if (!status_.ok() || !skip_list_iter_.Valid()) { - valid_ = false; - return; - } - const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); - if (iter_entry == nullptr || - iter_entry->column_family != column_family_id_) { - valid_ = false; - return; - } - Slice blob; - status_ = write_batch_->GetEntryFromDataOffset( - iter_entry->offset, ¤t_.type, ¤t_.key, ¤t_.value, - &blob); - if (!status_.ok()) { - valid_ = false; - } else if (current_.type != kPutRecord && current_.type != kDeleteRecord && - current_.type != kMergeRecord) { - valid_ = false; - status_ = Status::Corruption("write batch index is corrupted"); - } - } }; struct WriteBatchWithIndex::Rep { diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index d324eba85..488d1b70a 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -11,6 +11,7 @@ #include #include #include "db/column_family.h" +#include "port/stack_trace.h" #include "rocksdb/utilities/write_batch_with_index.h" #include "util/string_util.h" #include "util/testharness.h" @@ -103,7 +104,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, std::unique_ptr iter(batch->NewIterator(&data)); iter->Seek(e.key); ASSERT_OK(iter->status()); - auto& write_entry = iter->Entry(); + auto write_entry = iter->Entry(); ASSERT_EQ(e.key, write_entry.key.ToString()); ASSERT_EQ(e.value, write_entry.value.ToString()); batch->Delete(&data, e.key); @@ -124,7 +125,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, for (auto v : pair.second) { ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); - auto& write_entry = iter->Entry(); + auto write_entry = iter->Entry(); ASSERT_EQ(pair.first, write_entry.key.ToString()); ASSERT_EQ(v->type, write_entry.type); if (write_entry.type != kDeleteRecord) { @@ -140,7 +141,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) { ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); - auto& write_entry = iter->Entry(); + auto write_entry = iter->Entry(); ASSERT_EQ(pair->first, write_entry.key.ToString()); ASSERT_EQ((*v)->type, write_entry.type); if (write_entry.type != kDeleteRecord) { @@ -165,7 +166,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, for (auto v : pair.second) { ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); - auto& write_entry = iter->Entry(); + auto write_entry = iter->Entry(); ASSERT_EQ(pair.first, write_entry.key.ToString()); if (v->type != kDeleteRecord) { ASSERT_EQ(v->key, write_entry.value.ToString()); @@ -182,7 +183,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) { ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); - auto& write_entry = iter->Entry(); + auto write_entry = iter->Entry(); ASSERT_EQ(pair->first, write_entry.key.ToString()); if ((*v)->type != kDeleteRecord) { ASSERT_EQ((*v)->key, write_entry.value.ToString()); @@ -204,7 +205,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, ASSERT_OK(iter->status()); for (auto v : pair->second) { ASSERT_TRUE(iter->Valid()); - auto& write_entry = iter->Entry(); + auto write_entry = iter->Entry(); ASSERT_EQ(pair->first, write_entry.key.ToString()); ASSERT_EQ(v->type, write_entry.type); if (write_entry.type != kDeleteRecord) { @@ -226,7 +227,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, ASSERT_OK(iter->status()); for (auto v : pair->second) { ASSERT_TRUE(iter->Valid()); - auto& write_entry = iter->Entry(); + auto write_entry = iter->Entry(); ASSERT_EQ(pair->first, write_entry.key.ToString()); ASSERT_EQ(v->value, write_entry.key.ToString()); if (v->type != kDeleteRecord) { @@ -1268,9 +1269,6 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { AssertIterKey("mm", iter.get()); AssertIterValue("kk", iter.get()); batch.Delete("mm"); - // still mm even though it's deleted - AssertIterKey("mm", iter.get()); - AssertIterValue("kk", iter.get()); iter->Next(); AssertIterKey("n", iter.get()); iter->Prev(); @@ -1368,6 +1366,7 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { } // namespace int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }