Re-generate WriteEntry on WBWIIterator::Entry()

Summary: If we don't do this, any calls to Entry() after WBWI mutation will result in undefined behavior. We need to re-fetch the offset from the skip list and regenerate the new pointer (because string's base pointer can change while mutating).

Test Plan: COMPILE_WITH_ASAN=1 make write_batch_with_index_test && ./write_batch_with_index_test

Reviewers: sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D39813
main
Igor Canadi 10 years ago
parent e409d3d745
commit 4949ef08db
  1. 1
      HISTORY.md
  2. 4
      include/rocksdb/utilities/write_batch_with_index.h
  3. 111
      utilities/write_batch_with_index/write_batch_with_index.cc
  4. 17
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -11,6 +11,7 @@
* DB::CompactRange()'s parameter reduce_level is changed to change_level, to allow users to move levels to lower levels if allowed. It can be used to migrate a DB from options.level_compaction_dynamic_level_bytes=false to options.level_compaction_dynamic_level_bytes.true. * DB::CompactRange()'s parameter reduce_level is changed to change_level, to allow users to move levels to lower levels if allowed. It can be used to migrate a DB from options.level_compaction_dynamic_level_bytes=false to options.level_compaction_dynamic_level_bytes.true.
* Change default value for options.compaction_filter_factory and options.compaction_filter_factory_v2 to nullptr instead of DefaultCompactionFilterFactory and DefaultCompactionFilterFactoryV2. * Change default value for options.compaction_filter_factory and options.compaction_filter_factory_v2 to nullptr instead of DefaultCompactionFilterFactory and DefaultCompactionFilterFactoryV2.
* If CancelAllBackgroundWork is called without doing a flush after doing loads with WAL disabled, the changes which haven't been flushed before the call to CancelAllBackgroundWork will be lost. * If CancelAllBackgroundWork is called without doing a flush after doing loads with WAL disabled, the changes which haven't been flushed before the call to CancelAllBackgroundWork will be lost.
* WBWIIterator::Entry() now returns WriteEntry instead of `const WriteEntry&`
## 3.11.0 (5/19/2015) ## 3.11.0 (5/19/2015)
### New Features ### New Features

@ -55,7 +55,9 @@ class WBWIIterator {
virtual void Prev() = 0; virtual void Prev() = 0;
virtual const WriteEntry& Entry() const = 0; // the return WriteEntry is only valid until the next mutation of
// WriteBatchWithIndex
virtual WriteEntry Entry() const = 0;
virtual Status status() const = 0; virtual Status status() const = 0;
}; };

@ -89,7 +89,8 @@ class BaseDeltaIterator : public Iterator {
AdvanceBase(); AdvanceBase();
} }
if (DeltaValid() && BaseValid()) { if (DeltaValid() && BaseValid()) {
if (Compare() == 0) { if (comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key()) == 0) {
equal_keys_ = true; equal_keys_ = true;
} }
} }
@ -123,7 +124,8 @@ class BaseDeltaIterator : public Iterator {
AdvanceBase(); AdvanceBase();
} }
if (DeltaValid() && BaseValid()) { if (DeltaValid() && BaseValid()) {
if (Compare() == 0) { if (comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key()) == 0) {
equal_keys_ = true; equal_keys_ = true;
} }
} }
@ -153,23 +155,6 @@ class BaseDeltaIterator : public Iterator {
} }
private: private:
// -1 -- delta less advanced than base
// 0 -- delta == base
// 1 -- delta more advanced than base
int Compare() const {
assert(delta_iterator_->Valid() && base_iterator_->Valid());
int cmp = comparator_->Compare(delta_iterator_->Entry().key,
base_iterator_->key());
if (forward_) {
return cmp;
} else {
return -cmp;
}
}
bool IsDeltaDelete() {
assert(DeltaValid());
return delta_iterator_->Entry().type == kDeleteRecord;
}
void AssertInvariants() { void AssertInvariants() {
#ifndef NDEBUG #ifndef NDEBUG
if (!Valid()) { if (!Valid()) {
@ -239,6 +224,10 @@ class BaseDeltaIterator : public Iterator {
bool DeltaValid() const { return delta_iterator_->Valid(); } bool DeltaValid() const { return delta_iterator_->Valid(); }
void UpdateCurrent() { void UpdateCurrent() {
while (true) { while (true) {
WriteEntry delta_entry;
if (DeltaValid()) {
delta_entry = delta_iterator_->Entry();
}
equal_keys_ = false; equal_keys_ = false;
if (!BaseValid()) { if (!BaseValid()) {
// Base has finished. // Base has finished.
@ -246,7 +235,7 @@ class BaseDeltaIterator : public Iterator {
// Finished // Finished
return; return;
} }
if (IsDeltaDelete()) { if (delta_entry.type == kDeleteRecord()) {
AdvanceDelta(); AdvanceDelta();
} else { } else {
current_at_base_ = false; current_at_base_ = false;
@ -257,12 +246,14 @@ class BaseDeltaIterator : public Iterator {
current_at_base_ = true; current_at_base_ = true;
return; return;
} else { } else {
int compare = Compare(); int compare =
(forward_ ? 1 : -1) *
comparator_->Compare(delta_entry.key, base_iterator_->key());
if (compare <= 0) { // delta bigger or equal if (compare <= 0) { // delta bigger or equal
if (compare == 0) { if (compare == 0) {
equal_keys_ = true; equal_keys_ = true;
} }
if (!IsDeltaDelete()) { if (delta_entry.type == kDeleteRecord()) {
current_at_base_ = false; current_at_base_ = false;
return; return;
} }
@ -300,23 +291,26 @@ class WBWIIteratorImpl : public WBWIIterator {
const ReadableWriteBatch* write_batch) const ReadableWriteBatch* write_batch)
: column_family_id_(column_family_id), : column_family_id_(column_family_id),
skip_list_iter_(skip_list), skip_list_iter_(skip_list),
write_batch_(write_batch), write_batch_(write_batch) {}
valid_(false) {}
virtual ~WBWIIteratorImpl() {} virtual ~WBWIIteratorImpl() {}
virtual bool Valid() const override { return valid_; } virtual bool Valid() const override {
if (!skip_list_iter_.Valid()) {
return false;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
return (iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
}
virtual void SeekToFirst() override { virtual void SeekToFirst() override {
valid_ = true;
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_); column_family_id_);
skip_list_iter_.Seek(&search_entry); skip_list_iter_.Seek(&search_entry);
ReadEntry();
} }
virtual void SeekToLast() override { virtual void SeekToLast() override {
valid_ = true;
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_ + 1); column_family_id_ + 1);
skip_list_iter_.Seek(&search_entry); skip_list_iter_.Seek(&search_entry);
@ -325,29 +319,37 @@ class WBWIIteratorImpl : public WBWIIterator {
} else { } else {
skip_list_iter_.Prev(); skip_list_iter_.Prev();
} }
ReadEntry();
} }
virtual void Seek(const Slice& key) override { virtual void Seek(const Slice& key) override {
valid_ = true;
WriteBatchIndexEntry search_entry(&key, column_family_id_); WriteBatchIndexEntry search_entry(&key, column_family_id_);
skip_list_iter_.Seek(&search_entry); skip_list_iter_.Seek(&search_entry);
ReadEntry();
} }
virtual void Next() override { virtual void Next() override { skip_list_iter_.Next(); }
skip_list_iter_.Next();
ReadEntry();
}
virtual void Prev() override { virtual void Prev() override { skip_list_iter_.Prev(); }
skip_list_iter_.Prev();
ReadEntry();
}
virtual const WriteEntry& Entry() const override { return current_; } virtual WriteEntry Entry() const override {
WriteEntry ret;
Slice blob;
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
// this is guaranteed with Valid()
assert(iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type,
&ret.key, &ret.value, &blob);
assert(s.ok());
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
ret.type == kMergeRecord);
return ret;
}
virtual Status status() const override { return status_; } virtual Status status() const override {
// this is in-memory data structure, so the only way status can be non-ok is
// through memory corruption
return Status::OK();
}
const WriteBatchIndexEntry* GetRawEntry() const { const WriteBatchIndexEntry* GetRawEntry() const {
return skip_list_iter_.key(); return skip_list_iter_.key();
@ -357,33 +359,6 @@ class WBWIIteratorImpl : public WBWIIterator {
uint32_t column_family_id_; uint32_t column_family_id_;
WriteBatchEntrySkipList::Iterator skip_list_iter_; WriteBatchEntrySkipList::Iterator skip_list_iter_;
const ReadableWriteBatch* write_batch_; const ReadableWriteBatch* write_batch_;
Status status_;
bool valid_;
WriteEntry current_;
void ReadEntry() {
if (!status_.ok() || !skip_list_iter_.Valid()) {
valid_ = false;
return;
}
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
if (iter_entry == nullptr ||
iter_entry->column_family != column_family_id_) {
valid_ = false;
return;
}
Slice blob;
status_ = write_batch_->GetEntryFromDataOffset(
iter_entry->offset, &current_.type, &current_.key, &current_.value,
&blob);
if (!status_.ok()) {
valid_ = false;
} else if (current_.type != kPutRecord && current_.type != kDeleteRecord &&
current_.type != kMergeRecord) {
valid_ = false;
status_ = Status::Corruption("write batch index is corrupted");
}
}
}; };
struct WriteBatchWithIndex::Rep { struct WriteBatchWithIndex::Rep {

@ -103,7 +103,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data)); std::unique_ptr<WBWIIterator> iter(batch->NewIterator(&data));
iter->Seek(e.key); iter->Seek(e.key);
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
auto& write_entry = iter->Entry(); auto write_entry = iter->Entry();
ASSERT_EQ(e.key, write_entry.key.ToString()); ASSERT_EQ(e.key, write_entry.key.ToString());
ASSERT_EQ(e.value, write_entry.value.ToString()); ASSERT_EQ(e.value, write_entry.value.ToString());
batch->Delete(&data, e.key); batch->Delete(&data, e.key);
@ -124,7 +124,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v : pair.second) { for (auto v : pair.second) {
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry(); auto write_entry = iter->Entry();
ASSERT_EQ(pair.first, write_entry.key.ToString()); ASSERT_EQ(pair.first, write_entry.key.ToString());
ASSERT_EQ(v->type, write_entry.type); ASSERT_EQ(v->type, write_entry.type);
if (write_entry.type != kDeleteRecord) { if (write_entry.type != kDeleteRecord) {
@ -140,7 +140,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) { for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry(); auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString()); ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ((*v)->type, write_entry.type); ASSERT_EQ((*v)->type, write_entry.type);
if (write_entry.type != kDeleteRecord) { if (write_entry.type != kDeleteRecord) {
@ -165,7 +165,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v : pair.second) { for (auto v : pair.second) {
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry(); auto write_entry = iter->Entry();
ASSERT_EQ(pair.first, write_entry.key.ToString()); ASSERT_EQ(pair.first, write_entry.key.ToString());
if (v->type != kDeleteRecord) { if (v->type != kDeleteRecord) {
ASSERT_EQ(v->key, write_entry.value.ToString()); ASSERT_EQ(v->key, write_entry.value.ToString());
@ -182,7 +182,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) { for (auto v = pair->second.rbegin(); v != pair->second.rend(); v++) {
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry(); auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString()); ASSERT_EQ(pair->first, write_entry.key.ToString());
if ((*v)->type != kDeleteRecord) { if ((*v)->type != kDeleteRecord) {
ASSERT_EQ((*v)->key, write_entry.value.ToString()); ASSERT_EQ((*v)->key, write_entry.value.ToString());
@ -204,7 +204,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
for (auto v : pair->second) { for (auto v : pair->second) {
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry(); auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString()); ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ(v->type, write_entry.type); ASSERT_EQ(v->type, write_entry.type);
if (write_entry.type != kDeleteRecord) { if (write_entry.type != kDeleteRecord) {
@ -226,7 +226,7 @@ void TestValueAsSecondaryIndexHelper(std::vector<Entry> entries,
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
for (auto v : pair->second) { for (auto v : pair->second) {
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
auto& write_entry = iter->Entry(); auto write_entry = iter->Entry();
ASSERT_EQ(pair->first, write_entry.key.ToString()); ASSERT_EQ(pair->first, write_entry.key.ToString());
ASSERT_EQ(v->value, write_entry.key.ToString()); ASSERT_EQ(v->value, write_entry.key.ToString());
if (v->type != kDeleteRecord) { if (v->type != kDeleteRecord) {
@ -1268,9 +1268,6 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) {
AssertIterKey("mm", iter.get()); AssertIterKey("mm", iter.get());
AssertIterValue("kk", iter.get()); AssertIterValue("kk", iter.get());
batch.Delete("mm"); batch.Delete("mm");
// still mm even though it's deleted
AssertIterKey("mm", iter.get());
AssertIterValue("kk", iter.get());
iter->Next(); iter->Next();
AssertIterKey("n", iter.get()); AssertIterKey("n", iter.get());
iter->Prev(); iter->Prev();

Loading…
Cancel
Save