WriteBatchWithIndex's Iterator bug of SeekToFirst() and SeekToLast()

Summary: WriteBatchWithIndex's iterator's SeekToFirst() and SeekToLast() use offset=0 to indicate it is smaller than all the keys, which is wrong. offset=0 will decode a key "" (the header decodes like that). It could be larger than other keys in non-default comparators. Fix it by using a special flag of offset to indicate searching to the beginning of the CF.

Test Plan: Add a unit test that used to fail. Also, add some more tests to related cases, though they don't fail for now.

Reviewers: igor

Reviewed By: igor

Subscribers: rven, yhchiang, ljin, leveldb

Differential Revision: https://reviews.facebook.net/D24873
main
sdong 10 years ago
parent 7658bcc1e5
commit 5cc9adf5ba
  1. 16
      utilities/write_batch_with_index/write_batch_with_index.cc
  2. 81
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -304,6 +304,10 @@ struct WriteBatchIndexEntry {
WriteBatchIndexEntry(const Slice* sk, uint32_t c) WriteBatchIndexEntry(const Slice* sk, uint32_t c)
: offset(0), column_family(c), search_key(sk) {} : offset(0), column_family(c), search_key(sk) {}
// If this flag appears in the offset, it indicates a key that is smaller
// than any other entry for the same column family
static const size_t kFlagMin = std::numeric_limits<size_t>::max();
size_t offset; // offset of an entry in write batch's string buffer. size_t offset; // offset of an entry in write batch's string buffer.
uint32_t column_family; // column family of the entry uint32_t column_family; // column family of the entry
const Slice* search_key; // if not null, instead of reading keys from const Slice* search_key; // if not null, instead of reading keys from
@ -354,14 +358,16 @@ class WBWIIteratorImpl : public WBWIIterator {
virtual void SeekToFirst() { virtual void SeekToFirst() {
valid_ = true; valid_ = true;
WriteBatchIndexEntry search_entry(nullptr, column_family_id_); WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_);
skip_list_iter_.Seek(&search_entry); skip_list_iter_.Seek(&search_entry);
ReadEntry(); ReadEntry();
} }
virtual void SeekToLast() { virtual void SeekToLast() {
valid_ = true; valid_ = true;
WriteBatchIndexEntry search_entry(nullptr, column_family_id_ + 1); WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
column_family_id_ + 1);
skip_list_iter_.Seek(&search_entry); skip_list_iter_.Seek(&search_entry);
if (!skip_list_iter_.Valid()) { if (!skip_list_iter_.Valid()) {
skip_list_iter_.SeekToLast(); skip_list_iter_.SeekToLast();
@ -636,6 +642,12 @@ int WriteBatchEntryComparator::operator()(
return -1; return -1;
} }
if (entry1->offset == WriteBatchIndexEntry::kFlagMin) {
return -1;
} else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) {
return 1;
}
Status s; Status s;
Slice key1, key2; Slice key1, key2;
if (entry1->search_key == nullptr) { if (entry1->search_key == nullptr) {

@ -522,7 +522,18 @@ TEST(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
Random rnd(rand_seed); Random rnd(rand_seed);
ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20, true); WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
if (rand_seed % 2 == 0) {
batch.Put(&cf2, "zoo", "bar");
}
if (rand_seed % 4 == 1) {
batch.Put(&cf3, "zoo", "bar");
}
KVMap map; KVMap map;
KVMap merged_map; KVMap merged_map;
for (auto key : source_strings) { for (auto key : source_strings) {
@ -619,6 +630,7 @@ TEST(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) { TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator());
ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20, true); WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
{ {
@ -659,7 +671,21 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
AssertIter(iter.get(), "a", "aa"); AssertIter(iter.get(), "a", "aa");
} }
// Test the case that there is one element in the write batch
batch.Put(&cf2, "zoo", "bar");
batch.Put(&cf1, "a", "aa"); batch.Put(&cf1, "a", "aa");
{
KVMap empty_map;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
iter->SeekToFirst();
AssertIter(iter.get(), "a", "aa");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
batch.Delete(&cf1, "b"); batch.Delete(&cf1, "b");
batch.Put(&cf1, "c", "cc"); batch.Put(&cf1, "c", "cc");
batch.Put(&cf1, "d", "dd"); batch.Put(&cf1, "d", "dd");
@ -725,6 +751,7 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
iter->Next(); iter->Next();
AssertIter(iter.get(), "f", "ff"); AssertIter(iter.get(), "f", "ff");
} }
{ {
KVMap empty_map; KVMap empty_map;
std::unique_ptr<Iterator> iter( std::unique_ptr<Iterator> iter(
@ -763,6 +790,60 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) {
AssertIter(iter.get(), "c", "cc"); AssertIter(iter.get(), "c", "cc");
} }
} }
TEST(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) {
ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator());
ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator());
WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
// Test the case that there is one element in the write batch
batch.Put(&cf2, "zoo", "bar");
batch.Put(&cf1, "a", "aa");
{
KVMap empty_map;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map)));
iter->SeekToFirst();
AssertIter(iter.get(), "a", "aa");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
batch.Put(&cf1, "c", "cc");
{
KVMap map;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
iter->SeekToFirst();
AssertIter(iter.get(), "c", "cc");
iter->Next();
AssertIter(iter.get(), "a", "aa");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->SeekToLast();
AssertIter(iter.get(), "a", "aa");
iter->Prev();
AssertIter(iter.get(), "c", "cc");
iter->Prev();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("b");
AssertIter(iter.get(), "a", "aa");
iter->Prev();
AssertIter(iter.get(), "c", "cc");
iter->Seek("a");
AssertIter(iter.get(), "a", "aa");
}
}
} // namespace } // namespace
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); } int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

Loading…
Cancel
Save