BaseDeltaIterator: always check valid() before accessing key() (#4702)

Summary:
Current implementation of `current_over_upper_bound_` fails to take into consideration that keys might be invalid in either base iterator or delta iterator. Calling key() in such scenario will lead to assertion failure and runtime errors.
This PR addresses the bug by adding check for valid keys before calling `IsOverUpperBound()`, also added test coverage for iterate_upper_bound usage in BaseDeltaIterator
Also recommit https://github.com/facebook/rocksdb/pull/4656 (It was reverted earlier due to bugs)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4702

Differential Revision: D13146643

Pulled By: miasantreble

fbshipit-source-id: 6d136929da12d0f2e2a5cea474a8038ec5cdf1d0
main
Zhongyi Xie 6 years ago committed by Facebook Github Bot
parent 6e938c904f
commit 3a18bb3e15
  1. 28
      db/c.cc
  2. 7
      db/c_test.c
  3. 9
      include/rocksdb/c.h
  4. 2
      include/rocksdb/iterator.h
  5. 6
      include/rocksdb/utilities/write_batch_with_index.h
  6. 7
      java/rocksjni/write_batch_with_index.cc
  7. 69
      java/src/main/java/org/rocksdb/WriteBatchWithIndex.java
  8. 5
      java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java
  9. 45
      utilities/transactions/optimistic_transaction_test.cc
  10. 8
      utilities/transactions/transaction_base.cc
  11. 4
      utilities/transactions/write_prepared_txn.cc
  12. 2
      utilities/transactions/write_unprepared_txn.cc
  13. 64
      utilities/write_batch_with_index/write_batch_with_index.cc
  14. 70
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -1777,18 +1777,38 @@ void rocksdb_writebatch_wi_rollback_to_save_point(rocksdb_writebatch_wi_t* b,
rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base(
rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator) {
rocksdb_readoptions_t options;
return rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
&options, wbwi, base_iterator);
}
rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base_cf(
rocksdb_writebatch_wi_t* wbwi, rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* column_family) {
rocksdb_readoptions_t options;
return rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
&options, wbwi, base_iterator, column_family);
}
rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = wbwi->rep->NewIteratorWithBase(base_iterator->rep);
result->rep =
wbwi->rep->NewIteratorWithBase(options->rep, base_iterator->rep);
delete base_iterator;
return result;
}
rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base_cf(
rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* column_family) {
rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = wbwi->rep->NewIteratorWithBase(column_family->rep, base_iterator->rep);
result->rep = wbwi->rep->NewIteratorWithBase(options->rep, column_family->rep,
base_iterator->rep);
delete base_iterator;
return result;
}

@ -917,7 +917,9 @@ int main(int argc, char** argv) {
rocksdb_writebatch_wi_t* wbi = rocksdb_writebatch_wi_create(0, 1);
rocksdb_writebatch_wi_put(wbi, "bar", 3, "b", 1);
rocksdb_writebatch_wi_delete(wbi, "foo", 3);
rocksdb_iterator_t* iter = rocksdb_writebatch_wi_create_iterator_with_base(wbi, base_iter);
rocksdb_iterator_t* iter =
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
roptions, wbi, base_iter);
CheckCondition(!rocksdb_iter_valid(iter));
rocksdb_iter_seek_to_first(iter);
CheckCondition(rocksdb_iter_valid(iter));
@ -1527,7 +1529,7 @@ int main(int argc, char** argv) {
const rocksdb_snapshot_t* snapshot;
snapshot = rocksdb_transactiondb_create_snapshot(txn_db);
rocksdb_readoptions_set_snapshot(roptions, snapshot);
rocksdb_transactiondb_put(txn_db, woptions, "foo", 3, "hey", 3, &err);
CheckNoError(err);
@ -1643,6 +1645,7 @@ int main(int argc, char** argv) {
// Check iterator with column family
rocksdb_transaction_put_cf(txn, cfh1, "key1_cf", 7, "val1_cf", 7, &err);
CheckNoError(err);
rocksdb_readoptions_set_iterate_upper_bound(roptions, "key2", 4);
rocksdb_iterator_t* iter =
rocksdb_transaction_create_iterator_cf(txn, roptions, cfh1);
CheckCondition(!rocksdb_iter_valid(iter));

@ -636,7 +636,14 @@ extern ROCKSDB_LIBRARY_API rocksdb_iterator_t* rocksdb_writebatch_wi_create_iter
rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* cf);
extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator);
extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator, rocksdb_column_family_handle_t* cf);
/* Block based table options */

@ -54,6 +54,8 @@ class Iterator : public Cleanable {
// Position at the last key in the source that at or before target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or before target.
// Note: If iterate_upper_bound is set and SeekForPrev is called with target
// greater or equal to iterate_upper_bound, the behavior is undefined
virtual void SeekForPrev(const Slice& target) = 0;
// Moves to the next entry in the source. After this call, Valid() is

@ -161,9 +161,15 @@ class WriteBatchWithIndex : public WriteBatchBase {
// key() and value() of the iterator. This invalidation happens even before
// the write batch update finishes. The state may recover after Next() is
// called.
Iterator* NewIteratorWithBase(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
Iterator* base_iterator);
Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family,
Iterator* base_iterator);
// default column family
Iterator* NewIteratorWithBase(const ReadOptions& read_options,
Iterator* base_iterator);
Iterator* NewIteratorWithBase(Iterator* base_iterator);
// Similar to DB::Get() but will only read the key from this batch.

@ -457,11 +457,14 @@ jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(JNIEnv* /*env*/,
jobject /*jobj*/,
jlong jwbwi_handle,
jlong jcf_handle,
jlong jbi_handle) {
jlong jbi_handle,
jlong jreadopt_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto* base_iterator = reinterpret_cast<rocksdb::Iterator*>(jbi_handle);
auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator);
auto* readopt = reinterpret_cast<rocksdb::ReadOptions*>(jreadopt_handle);
auto* iterator =
wbwi->NewIteratorWithBase(*readopt, cf_handle, base_iterator);
return reinterpret_cast<jlong>(iterator);
}

@ -14,8 +14,9 @@ package org.rocksdb;
*
* A user can call {@link org.rocksdb.WriteBatchWithIndex#newIterator()} to
* create an iterator over the write batch or
* {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.RocksIterator)}
* to get an iterator for the database with Read-Your-Own-Writes like capability
* {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.ReadOptions, org.rocksdb.RocksIterator)}
* to get an iterator for the database with Read-Your-Own-Writes like
* capability
*/
public class WriteBatchWithIndex extends AbstractWriteBatch {
/**
@ -120,25 +121,65 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* the write batch update finishes. The state may recover after Next() is
* called.
*
* @param read_opts The read options to use
* @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-time from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(
final ColumnFamilyHandle columnFamilyHandle,
final RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator(
baseIterator.parent_,
iteratorWithBase(nativeHandle_,
columnFamilyHandle.nativeHandle_,
baseIterator.nativeHandle_));
public RocksIterator newIteratorWithBase(final ReadOptions read_opts,
final ColumnFamilyHandle columnFamilyHandle, final RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator(baseIterator.parent_,
iteratorWithBase(nativeHandle_, columnFamilyHandle.nativeHandle_,
baseIterator.nativeHandle_, read_opts.nativeHandle_));
//when the iterator is deleted it will also delete the baseIterator
baseIterator.disOwnNativeHandle();
return iterator;
}
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base
*
* Updating write batch with the current key of the iterator is not safe.
* We strongly recommand users not to do it. It will invalidate the current
* key() and value() of the iterator. This invalidation happens even before
* the write batch update finishes. The state may recover after Next() is
* called.
*
* @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-time from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(
final ColumnFamilyHandle columnFamilyHandle,
final RocksIterator baseIterator) {
ReadOptions read_opts = new ReadOptions();
return newIteratorWithBase(read_opts, columnFamilyHandle, baseIterator);
}
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base. Operates on the default column
* family.
*
* @param read_opts The read options to use
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-timefrom baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(
final ReadOptions read_opts, final RocksIterator baseIterator) {
return newIteratorWithBase(
read_opts, baseIterator.parent_.getDefaultColumnFamily(), baseIterator);
}
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
@ -151,8 +192,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* point-in-timefrom baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(final RocksIterator baseIterator) {
return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(),
baseIterator);
ReadOptions read_opts = new ReadOptions();
return newIteratorWithBase(read_opts, baseIterator);
}
/**
@ -295,8 +336,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
final boolean overwriteKey);
private native long iterator0(final long handle);
private native long iterator1(final long handle, final long cfHandle);
private native long iteratorWithBase(final long handle,
final long baseIteratorHandle, final long cfHandle);
private native long iteratorWithBase(final long handle, final long baseIteratorHandle,
final long cfHandle, final long jreadopt_handle);
private native byte[] getFromBatch(final long handle, final long optHandle,
final byte[] key, final int keyLen);
private native byte[] getFromBatch(final long handle, final long optHandle,

@ -47,7 +47,6 @@ public class WriteBatchWithIndexTest {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final RocksIterator base = db.newIterator();
final RocksIterator it = wbwi.newIteratorWithBase(base)) {
it.seek(k1);
assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k1);
@ -421,8 +420,8 @@ public class WriteBatchWithIndexTest {
final ReadOptions readOptions, final WriteBatchWithIndex wbwi,
final String skey) {
final byte[] key = skey.getBytes();
try(final RocksIterator baseIterator = db.newIterator(readOptions);
final RocksIterator iterator = wbwi.newIteratorWithBase(baseIterator)) {
try (final RocksIterator baseIterator = db.newIterator(readOptions);
final RocksIterator iterator = wbwi.newIteratorWithBase(baseIterator)) {
iterator.seek(key);
// Arrays.equals(key, iterator.key()) ensures an exact match in Rocks,

@ -852,6 +852,51 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) {
delete txn;
}
TEST_F(OptimisticTransactionTest, IteratorUpperBoundTest) {
WriteOptions write_options;
auto txn = unique_ptr<Transaction>(txn_db->BeginTransaction(write_options));
string key1 = "a1";
string key2 = "a3";
string key3 = "b1";
string val = "123";
txn->Put(key1, val);
txn->Put(key2, val);
Status s = txn->Commit();
ASSERT_OK(s);
txn = unique_ptr<Transaction>(txn_db->BeginTransaction(write_options));
txn->Put(key3, val);
string ubKey("a2");
Slice upperbound(ubKey);
ReadOptions read_options;
read_options.iterate_upper_bound = &upperbound;
auto it = unique_ptr<Iterator>(txn->GetIterator(read_options));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
EXPECT_LT(it->key().ToString(), ubKey);
}
EXPECT_GE(it->key().ToString(), ubKey);
int key_count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
EXPECT_LT(it->key().ToString(), ubKey);
key_count++;
}
ASSERT_EQ(key_count, 1);
// Test Seek to a key equal or over upper bound
it->Seek("a2");
ASSERT_FALSE(it->Valid());
it->Seek("a3");
ASSERT_FALSE(it->Valid());
it->Seek("a1");
ASSERT_TRUE(it->Valid());
it.reset();
s = txn->Commit();
ASSERT_OK(s);
txn.reset();
}
TEST_F(OptimisticTransactionTest, IteratorTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;

@ -178,7 +178,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
return Status::NotFound();
}
}
Status TransactionBaseImpl::PopSavePoint() {
if (save_points_ == nullptr ||
save_points_->empty()) {
@ -187,7 +187,7 @@ Status TransactionBaseImpl::PopSavePoint() {
return Status::NotFound();
}
assert(!save_points_->empty());
assert(!save_points_->empty());
save_points_->pop();
return write_batch_.PopSavePoint();
}
@ -291,7 +291,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter);
return write_batch_.NewIteratorWithBase(db_iter);
return write_batch_.NewIteratorWithBase(read_options, db_iter);
}
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
@ -299,7 +299,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter);
return write_batch_.NewIteratorWithBase(read_options, column_family, db_iter);
}
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,

@ -62,7 +62,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
Iterator* db_iter = wpt_db_->NewIterator(options);
assert(db_iter);
return write_batch_.NewIteratorWithBase(db_iter);
return write_batch_.NewIteratorWithBase(options, db_iter);
}
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
@ -71,7 +71,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
assert(db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter);
return write_batch_.NewIteratorWithBase(options, column_family, db_iter);
}
Status WritePreparedTxn::PrepareInternal() {

@ -506,7 +506,7 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter);
return write_batch_.NewIteratorWithBase(column_family, db_iter);
return write_batch_.NewIteratorWithBase(options, column_family, db_iter);
}
const std::map<SequenceNumber, size_t>&

@ -32,11 +32,13 @@ namespace rocksdb {
// * equal_keys_ <=> base_iterator == delta_iterator
class BaseDeltaIterator : public Iterator {
public:
BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
const Comparator* comparator)
: forward_(true),
BaseDeltaIterator(const ReadOptions& read_options, Iterator* base_iterator,
WBWIIterator* delta_iterator, const Comparator* comparator)
: read_options_(read_options),
forward_(true),
current_at_base_(true),
equal_keys_(false),
current_over_upper_bound_(false),
status_(Status::OK()),
base_iterator_(base_iterator),
delta_iterator_(delta_iterator),
@ -45,7 +47,7 @@ class BaseDeltaIterator : public Iterator {
virtual ~BaseDeltaIterator() {}
bool Valid() const override {
return current_at_base_ ? BaseValid() : DeltaValid();
return BaseDeltaValid() ? !current_over_upper_bound_ : false;
}
void SeekToFirst() override {
@ -216,9 +218,15 @@ class BaseDeltaIterator : public Iterator {
}
// equal_keys_ <=> compare == 0
assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
#endif
}
bool IsOverUpperBound() {
return read_options_.iterate_upper_bound != nullptr &&
comparator_->Compare(key(), *read_options_.iterate_upper_bound) >= 0;
}
void Advance() {
if (equal_keys_) {
assert(BaseValid() && DeltaValid());
@ -252,6 +260,9 @@ class BaseDeltaIterator : public Iterator {
}
bool BaseValid() const { return base_iterator_->Valid(); }
bool DeltaValid() const { return delta_iterator_->Valid(); }
bool BaseDeltaValid() const {
return (current_at_base_ ? BaseValid() : DeltaValid());
}
void UpdateCurrent() {
// Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__
@ -264,32 +275,37 @@ class BaseDeltaIterator : public Iterator {
} else if (!delta_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = false;
return;
break;
}
equal_keys_ = false;
if (!BaseValid()) {
if (!base_iterator_->status().ok()) {
// Expose the error status and stop.
current_at_base_ = true;
return;
break;
}
// Base has finished.
if (!DeltaValid()) {
// Finished
return;
break;
}
if (delta_entry.type == kDeleteRecord ||
delta_entry.type == kSingleDeleteRecord) {
AdvanceDelta();
// If the new Delta is valid and >= iterate_upper_bound, stop
current_over_upper_bound_ = DeltaValid() && IsOverUpperBound();
if (current_over_upper_bound_) {
return;
}
} else {
current_at_base_ = false;
return;
break;
}
} else if (!DeltaValid()) {
// Delta has finished.
current_at_base_ = true;
return;
break;
} else {
int compare =
(forward_ ? 1 : -1) *
@ -301,7 +317,7 @@ class BaseDeltaIterator : public Iterator {
if (delta_entry.type != kDeleteRecord &&
delta_entry.type != kSingleDeleteRecord) {
current_at_base_ = false;
return;
break;
}
// Delta is less advanced and is delete.
AdvanceDelta();
@ -310,18 +326,20 @@ class BaseDeltaIterator : public Iterator {
}
} else {
current_at_base_ = true;
return;
break;
}
}
}
AssertInvariants();
current_over_upper_bound_ = BaseDeltaValid() && IsOverUpperBound();
#endif // __clang_analyzer__
}
ReadOptions read_options_;
bool forward_;
bool current_at_base_;
bool equal_keys_;
bool current_over_upper_bound_;
Status status_;
std::unique_ptr<Iterator> base_iterator_;
std::unique_ptr<WBWIIterator> delta_iterator_;
@ -642,25 +660,39 @@ WBWIIterator* WriteBatchWithIndex::NewIterator(
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
ColumnFamilyHandle* column_family, Iterator* base_iterator) {
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
Iterator* base_iterator) {
if (rep->overwrite_key == false) {
assert(false);
return nullptr;
}
return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
return new BaseDeltaIterator(read_options, base_iterator,
NewIterator(column_family),
GetColumnFamilyUserComparator(column_family));
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
ColumnFamilyHandle* column_family, Iterator* base_iterator) {
ReadOptions read_options;
return NewIteratorWithBase(read_options, column_family, base_iterator);
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
const ReadOptions& read_options, Iterator* base_iterator) {
if (rep->overwrite_key == false) {
assert(false);
return nullptr;
}
// default column family's comparator
return new BaseDeltaIterator(base_iterator, NewIterator(),
return new BaseDeltaIterator(read_options, base_iterator, NewIterator(),
rep->comparator.default_comparator());
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
ReadOptions read_options;
return NewIteratorWithBase(read_options, base_iterator);
}
Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();

@ -9,11 +9,12 @@
#ifndef ROCKSDB_LITE
#include <memory>
#include "rocksdb/utilities/write_batch_with_index.h"
#include <map>
#include <memory>
#include "db/column_family.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/comparator.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/testharness.h"
@ -506,7 +507,19 @@ typedef std::map<std::string, std::string> KVMap;
class KVIter : public Iterator {
public:
explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {}
virtual bool Valid() const { return iter_ != map_->end(); }
explicit KVIter(const KVMap* map, const Slice* iterate_upper_bound)
: map_(map),
iter_(map_->end()),
iterate_upper_bound_(iterate_upper_bound) {}
virtual bool Valid() const {
if (iterate_upper_bound_ == nullptr) {
return iter_ != map_->end();
} else {
if (iter_ == map_->end()) return false;
const Comparator* cmp = BytewiseComparator();
return cmp->Compare(key(), *iterate_upper_bound_) < 0;
}
}
virtual void SeekToFirst() { iter_ = map_->begin(); }
virtual void SeekToLast() {
if (map_->empty()) {
@ -536,6 +549,7 @@ class KVIter : public Iterator {
private:
const KVMap* const map_;
KVMap::const_iterator iter_;
const Slice* iterate_upper_bound_ = nullptr;
};
void AssertIter(Iterator* iter, const std::string& key,
@ -553,6 +567,7 @@ void AssertItersEqual(Iterator* iter1, Iterator* iter2) {
ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString());
}
}
} // namespace
TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
@ -613,9 +628,14 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
}
}
auto rnd_key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
Slice random_upper_bound(source_strings[rnd_key_idx]);
ReadOptions read_options;
read_options.iterate_upper_bound = &random_upper_bound;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
std::unique_ptr<Iterator> result_iter(new KVIter(&merged_map));
batch.NewIteratorWithBase(read_options, &cf1, new KVIter(&map)));
std::unique_ptr<Iterator> result_iter(
new KVIter(&merged_map, &random_upper_bound));
bool is_valid = false;
for (int i = 0; i < 128; i++) {
@ -837,6 +857,46 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) {
iter->Prev();
AssertIter(iter.get(), "c", "cc");
}
// Test iterate_upper_bound
{
KVMap empty_map;
Slice upper_bound("cd");
ReadOptions read_options;
read_options.iterate_upper_bound = &upper_bound;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(read_options, &cf1, new KVIter(&empty_map)));
iter->SeekToFirst();
AssertIter(iter.get(), "a", "aa");
iter->Next();
AssertIter(iter.get(), "c", "cc");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->SeekToLast();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("aa");
AssertIter(iter.get(), "c", "cc");
iter->Prev();
AssertIter(iter.get(), "a", "aa");
iter->Next();
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("ca");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
// Seek to outside of upper bound, should not crash
iter->Seek("zz");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
}
TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) {

Loading…
Cancel
Save