Revert "BaseDeltaIterator: always check valid() before accessing key(… (#4744)

Summary:
…) (#4702)"

This reverts commit 3a18bb3e15.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4744

Differential Revision: D13311869

Pulled By: miasantreble

fbshipit-source-id: 6300b12cc34828d8b9274e907a3aef1506d5d553
main
Zhongyi Xie 6 years ago committed by Facebook Github Bot
parent 55479eb572
commit 2f1ca4e838
  1. 28
      db/c.cc
  2. 7
      db/c_test.c
  3. 9
      include/rocksdb/c.h
  4. 2
      include/rocksdb/iterator.h
  5. 6
      include/rocksdb/utilities/write_batch_with_index.h
  6. 7
      java/rocksjni/write_batch_with_index.cc
  7. 69
      java/src/main/java/org/rocksdb/WriteBatchWithIndex.java
  8. 5
      java/src/test/java/org/rocksdb/WriteBatchWithIndexTest.java
  9. 45
      utilities/transactions/optimistic_transaction_test.cc
  10. 8
      utilities/transactions/transaction_base.cc
  11. 4
      utilities/transactions/write_prepared_txn.cc
  12. 2
      utilities/transactions/write_unprepared_txn.cc
  13. 64
      utilities/write_batch_with_index/write_batch_with_index.cc
  14. 70
      utilities/write_batch_with_index/write_batch_with_index_test.cc

@ -1777,38 +1777,18 @@ void rocksdb_writebatch_wi_rollback_to_save_point(rocksdb_writebatch_wi_t* b,
rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base( rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base(
rocksdb_writebatch_wi_t* wbwi, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator) { rocksdb_iterator_t* base_iterator) {
rocksdb_readoptions_t options;
return rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
&options, wbwi, base_iterator);
}
rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base_cf(
rocksdb_writebatch_wi_t* wbwi, rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* column_family) {
rocksdb_readoptions_t options;
return rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
&options, wbwi, base_iterator, column_family);
}
rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator) {
rocksdb_iterator_t* result = new rocksdb_iterator_t; rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = result->rep = wbwi->rep->NewIteratorWithBase(base_iterator->rep);
wbwi->rep->NewIteratorWithBase(options->rep, base_iterator->rep);
delete base_iterator; delete base_iterator;
return result; return result;
} }
rocksdb_iterator_t* rocksdb_iterator_t* rocksdb_writebatch_wi_create_iterator_with_base_cf(
rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions( rocksdb_writebatch_wi_t* wbwi,
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator, rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* column_family) { rocksdb_column_family_handle_t* column_family) {
rocksdb_iterator_t* result = new rocksdb_iterator_t; rocksdb_iterator_t* result = new rocksdb_iterator_t;
result->rep = wbwi->rep->NewIteratorWithBase(options->rep, column_family->rep, result->rep = wbwi->rep->NewIteratorWithBase(column_family->rep, base_iterator->rep);
base_iterator->rep);
delete base_iterator; delete base_iterator;
return result; return result;
} }

@ -917,9 +917,7 @@ int main(int argc, char** argv) {
rocksdb_writebatch_wi_t* wbi = rocksdb_writebatch_wi_create(0, 1); rocksdb_writebatch_wi_t* wbi = rocksdb_writebatch_wi_create(0, 1);
rocksdb_writebatch_wi_put(wbi, "bar", 3, "b", 1); rocksdb_writebatch_wi_put(wbi, "bar", 3, "b", 1);
rocksdb_writebatch_wi_delete(wbi, "foo", 3); rocksdb_writebatch_wi_delete(wbi, "foo", 3);
rocksdb_iterator_t* iter = rocksdb_iterator_t* iter = rocksdb_writebatch_wi_create_iterator_with_base(wbi, base_iter);
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
roptions, wbi, base_iter);
CheckCondition(!rocksdb_iter_valid(iter)); CheckCondition(!rocksdb_iter_valid(iter));
rocksdb_iter_seek_to_first(iter); rocksdb_iter_seek_to_first(iter);
CheckCondition(rocksdb_iter_valid(iter)); CheckCondition(rocksdb_iter_valid(iter));
@ -1529,7 +1527,7 @@ int main(int argc, char** argv) {
const rocksdb_snapshot_t* snapshot; const rocksdb_snapshot_t* snapshot;
snapshot = rocksdb_transactiondb_create_snapshot(txn_db); snapshot = rocksdb_transactiondb_create_snapshot(txn_db);
rocksdb_readoptions_set_snapshot(roptions, snapshot); rocksdb_readoptions_set_snapshot(roptions, snapshot);
rocksdb_transactiondb_put(txn_db, woptions, "foo", 3, "hey", 3, &err); rocksdb_transactiondb_put(txn_db, woptions, "foo", 3, "hey", 3, &err);
CheckNoError(err); CheckNoError(err);
@ -1645,7 +1643,6 @@ int main(int argc, char** argv) {
// Check iterator with column family // Check iterator with column family
rocksdb_transaction_put_cf(txn, cfh1, "key1_cf", 7, "val1_cf", 7, &err); rocksdb_transaction_put_cf(txn, cfh1, "key1_cf", 7, "val1_cf", 7, &err);
CheckNoError(err); CheckNoError(err);
rocksdb_readoptions_set_iterate_upper_bound(roptions, "key2", 4);
rocksdb_iterator_t* iter = rocksdb_iterator_t* iter =
rocksdb_transaction_create_iterator_cf(txn, roptions, cfh1); rocksdb_transaction_create_iterator_cf(txn, roptions, cfh1);
CheckCondition(!rocksdb_iter_valid(iter)); CheckCondition(!rocksdb_iter_valid(iter));

@ -636,14 +636,7 @@ extern ROCKSDB_LIBRARY_API rocksdb_iterator_t* rocksdb_writebatch_wi_create_iter
rocksdb_writebatch_wi_t* wbwi, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator, rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* cf); rocksdb_column_family_handle_t* cf);
extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator);
extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_cf_and_readoptions(
const rocksdb_readoptions_t* options, rocksdb_writebatch_wi_t* wbwi,
rocksdb_iterator_t* base_iterator, rocksdb_column_family_handle_t* cf);
/* Block based table options */ /* Block based table options */

@ -54,8 +54,6 @@ class Iterator : public Cleanable {
// Position at the last key in the source that at or before target. // Position at the last key in the source that at or before target.
// The iterator is Valid() after this call iff the source contains // The iterator is Valid() after this call iff the source contains
// an entry that comes at or before target. // an entry that comes at or before target.
// Note: If iterate_upper_bound is set and SeekForPrev is called with target
// greater or equal to iterate_upper_bound, the behavior is undefined
virtual void SeekForPrev(const Slice& target) = 0; virtual void SeekForPrev(const Slice& target) = 0;
// Moves to the next entry in the source. After this call, Valid() is // Moves to the next entry in the source. After this call, Valid() is

@ -161,15 +161,9 @@ class WriteBatchWithIndex : public WriteBatchBase {
// key() and value() of the iterator. This invalidation happens even before // key() and value() of the iterator. This invalidation happens even before
// the write batch update finishes. The state may recover after Next() is // the write batch update finishes. The state may recover after Next() is
// called. // called.
Iterator* NewIteratorWithBase(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
Iterator* base_iterator);
Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family, Iterator* NewIteratorWithBase(ColumnFamilyHandle* column_family,
Iterator* base_iterator); Iterator* base_iterator);
// default column family // default column family
Iterator* NewIteratorWithBase(const ReadOptions& read_options,
Iterator* base_iterator);
Iterator* NewIteratorWithBase(Iterator* base_iterator); Iterator* NewIteratorWithBase(Iterator* base_iterator);
// Similar to DB::Get() but will only read the key from this batch. // Similar to DB::Get() but will only read the key from this batch.

@ -457,14 +457,11 @@ jlong Java_org_rocksdb_WriteBatchWithIndex_iteratorWithBase(JNIEnv* /*env*/,
jobject /*jobj*/, jobject /*jobj*/,
jlong jwbwi_handle, jlong jwbwi_handle,
jlong jcf_handle, jlong jcf_handle,
jlong jbi_handle, jlong jbi_handle) {
jlong jreadopt_handle) {
auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle); auto* wbwi = reinterpret_cast<rocksdb::WriteBatchWithIndex*>(jwbwi_handle);
auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle); auto* cf_handle = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcf_handle);
auto* base_iterator = reinterpret_cast<rocksdb::Iterator*>(jbi_handle); auto* base_iterator = reinterpret_cast<rocksdb::Iterator*>(jbi_handle);
auto* readopt = reinterpret_cast<rocksdb::ReadOptions*>(jreadopt_handle); auto* iterator = wbwi->NewIteratorWithBase(cf_handle, base_iterator);
auto* iterator =
wbwi->NewIteratorWithBase(*readopt, cf_handle, base_iterator);
return reinterpret_cast<jlong>(iterator); return reinterpret_cast<jlong>(iterator);
} }

@ -14,9 +14,8 @@ package org.rocksdb;
* *
* A user can call {@link org.rocksdb.WriteBatchWithIndex#newIterator()} to * A user can call {@link org.rocksdb.WriteBatchWithIndex#newIterator()} to
* create an iterator over the write batch or * create an iterator over the write batch or
* {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.ReadOptions, org.rocksdb.RocksIterator)} * {@link org.rocksdb.WriteBatchWithIndex#newIteratorWithBase(org.rocksdb.RocksIterator)}
* to get an iterator for the database with Read-Your-Own-Writes like * to get an iterator for the database with Read-Your-Own-Writes like capability
* capability
*/ */
public class WriteBatchWithIndex extends AbstractWriteBatch { public class WriteBatchWithIndex extends AbstractWriteBatch {
/** /**
@ -110,34 +109,6 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
return new WBWIRocksIterator(this, iterator0(nativeHandle_)); return new WBWIRocksIterator(this, iterator0(nativeHandle_));
} }
/**
* Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
* as a delta and baseIterator as a base
*
* Updating write batch with the current key of the iterator is not safe.
* We strongly recommand users not to do it. It will invalidate the current
* key() and value() of the iterator. This invalidation happens even before
* the write batch update finishes. The state may recover after Next() is
* called.
*
* @param read_opts The read options to use
* @param columnFamilyHandle The column family to iterate over
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-time from baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(final ReadOptions read_opts,
final ColumnFamilyHandle columnFamilyHandle, final RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator(baseIterator.parent_,
iteratorWithBase(nativeHandle_, columnFamilyHandle.nativeHandle_,
baseIterator.nativeHandle_, read_opts.nativeHandle_));
//when the iterator is deleted it will also delete the baseIterator
baseIterator.disOwnNativeHandle();
return iterator;
}
/** /**
* Provides Read-Your-Own-Writes like functionality by * Provides Read-Your-Own-Writes like functionality by
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator} * creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator}
@ -158,26 +129,14 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
public RocksIterator newIteratorWithBase( public RocksIterator newIteratorWithBase(
final ColumnFamilyHandle columnFamilyHandle, final ColumnFamilyHandle columnFamilyHandle,
final RocksIterator baseIterator) { final RocksIterator baseIterator) {
ReadOptions read_opts = new ReadOptions(); RocksIterator iterator = new RocksIterator(
return newIteratorWithBase(read_opts, columnFamilyHandle, baseIterator); baseIterator.parent_,
} iteratorWithBase(nativeHandle_,
columnFamilyHandle.nativeHandle_,
/** baseIterator.nativeHandle_));
* Provides Read-Your-Own-Writes like functionality by //when the iterator is deleted it will also delete the baseIterator
* creating a new Iterator that will use {@link org.rocksdb.WBWIRocksIterator} baseIterator.disOwnNativeHandle();
* as a delta and baseIterator as a base. Operates on the default column return iterator;
* family.
*
* @param read_opts The read options to use
* @param baseIterator The base iterator,
* e.g. {@link org.rocksdb.RocksDB#newIterator()}
* @return An iterator which shows a view comprised of both the database
* point-in-timefrom baseIterator and modifications made in this write batch.
*/
public RocksIterator newIteratorWithBase(
final ReadOptions read_opts, final RocksIterator baseIterator) {
return newIteratorWithBase(
read_opts, baseIterator.parent_.getDefaultColumnFamily(), baseIterator);
} }
/** /**
@ -192,8 +151,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
* point-in-timefrom baseIterator and modifications made in this write batch. * point-in-timefrom baseIterator and modifications made in this write batch.
*/ */
public RocksIterator newIteratorWithBase(final RocksIterator baseIterator) { public RocksIterator newIteratorWithBase(final RocksIterator baseIterator) {
ReadOptions read_opts = new ReadOptions(); return newIteratorWithBase(baseIterator.parent_.getDefaultColumnFamily(),
return newIteratorWithBase(read_opts, baseIterator); baseIterator);
} }
/** /**
@ -336,8 +295,8 @@ public class WriteBatchWithIndex extends AbstractWriteBatch {
final boolean overwriteKey); final boolean overwriteKey);
private native long iterator0(final long handle); private native long iterator0(final long handle);
private native long iterator1(final long handle, final long cfHandle); private native long iterator1(final long handle, final long cfHandle);
private native long iteratorWithBase(final long handle, final long baseIteratorHandle, private native long iteratorWithBase(final long handle,
final long cfHandle, final long jreadopt_handle); final long baseIteratorHandle, final long cfHandle);
private native byte[] getFromBatch(final long handle, final long optHandle, private native byte[] getFromBatch(final long handle, final long optHandle,
final byte[] key, final int keyLen); final byte[] key, final int keyLen);
private native byte[] getFromBatch(final long handle, final long optHandle, private native byte[] getFromBatch(final long handle, final long optHandle,

@ -47,6 +47,7 @@ public class WriteBatchWithIndexTest {
try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true); try (final WriteBatchWithIndex wbwi = new WriteBatchWithIndex(true);
final RocksIterator base = db.newIterator(); final RocksIterator base = db.newIterator();
final RocksIterator it = wbwi.newIteratorWithBase(base)) { final RocksIterator it = wbwi.newIteratorWithBase(base)) {
it.seek(k1); it.seek(k1);
assertThat(it.isValid()).isTrue(); assertThat(it.isValid()).isTrue();
assertThat(it.key()).isEqualTo(k1); assertThat(it.key()).isEqualTo(k1);
@ -420,8 +421,8 @@ public class WriteBatchWithIndexTest {
final ReadOptions readOptions, final WriteBatchWithIndex wbwi, final ReadOptions readOptions, final WriteBatchWithIndex wbwi,
final String skey) { final String skey) {
final byte[] key = skey.getBytes(); final byte[] key = skey.getBytes();
try (final RocksIterator baseIterator = db.newIterator(readOptions); try(final RocksIterator baseIterator = db.newIterator(readOptions);
final RocksIterator iterator = wbwi.newIteratorWithBase(baseIterator)) { final RocksIterator iterator = wbwi.newIteratorWithBase(baseIterator)) {
iterator.seek(key); iterator.seek(key);
// Arrays.equals(key, iterator.key()) ensures an exact match in Rocks, // Arrays.equals(key, iterator.key()) ensures an exact match in Rocks,

@ -852,51 +852,6 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) {
delete txn; delete txn;
} }
TEST_F(OptimisticTransactionTest, IteratorUpperBoundTest) {
WriteOptions write_options;
auto txn = unique_ptr<Transaction>(txn_db->BeginTransaction(write_options));
string key1 = "a1";
string key2 = "a3";
string key3 = "b1";
string val = "123";
txn->Put(key1, val);
txn->Put(key2, val);
Status s = txn->Commit();
ASSERT_OK(s);
txn = unique_ptr<Transaction>(txn_db->BeginTransaction(write_options));
txn->Put(key3, val);
string ubKey("a2");
Slice upperbound(ubKey);
ReadOptions read_options;
read_options.iterate_upper_bound = &upperbound;
auto it = unique_ptr<Iterator>(txn->GetIterator(read_options));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
EXPECT_LT(it->key().ToString(), ubKey);
}
EXPECT_GE(it->key().ToString(), ubKey);
int key_count = 0;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
EXPECT_LT(it->key().ToString(), ubKey);
key_count++;
}
ASSERT_EQ(key_count, 1);
// Test Seek to a key equal or over upper bound
it->Seek("a2");
ASSERT_FALSE(it->Valid());
it->Seek("a3");
ASSERT_FALSE(it->Valid());
it->Seek("a1");
ASSERT_TRUE(it->Valid());
it.reset();
s = txn->Commit();
ASSERT_OK(s);
txn.reset();
}
TEST_F(OptimisticTransactionTest, IteratorTest) { TEST_F(OptimisticTransactionTest, IteratorTest) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options, snapshot_read_options; ReadOptions read_options, snapshot_read_options;

@ -178,7 +178,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
return Status::NotFound(); return Status::NotFound();
} }
} }
Status TransactionBaseImpl::PopSavePoint() { Status TransactionBaseImpl::PopSavePoint() {
if (save_points_ == nullptr || if (save_points_ == nullptr ||
save_points_->empty()) { save_points_->empty()) {
@ -187,7 +187,7 @@ Status TransactionBaseImpl::PopSavePoint() {
return Status::NotFound(); return Status::NotFound();
} }
assert(!save_points_->empty()); assert(!save_points_->empty());
save_points_->pop(); save_points_->pop();
return write_batch_.PopSavePoint(); return write_batch_.PopSavePoint();
} }
@ -291,7 +291,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
Iterator* db_iter = db_->NewIterator(read_options); Iterator* db_iter = db_->NewIterator(read_options);
assert(db_iter); assert(db_iter);
return write_batch_.NewIteratorWithBase(read_options, db_iter); return write_batch_.NewIteratorWithBase(db_iter);
} }
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
@ -299,7 +299,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
Iterator* db_iter = db_->NewIterator(read_options, column_family); Iterator* db_iter = db_->NewIterator(read_options, column_family);
assert(db_iter); assert(db_iter);
return write_batch_.NewIteratorWithBase(read_options, column_family, db_iter); return write_batch_.NewIteratorWithBase(column_family, db_iter);
} }
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,

@ -62,7 +62,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
Iterator* db_iter = wpt_db_->NewIterator(options); Iterator* db_iter = wpt_db_->NewIterator(options);
assert(db_iter); assert(db_iter);
return write_batch_.NewIteratorWithBase(options, db_iter); return write_batch_.NewIteratorWithBase(db_iter);
} }
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
@ -71,7 +71,7 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wpt_db_->NewIterator(options, column_family); Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
assert(db_iter); assert(db_iter);
return write_batch_.NewIteratorWithBase(options, column_family, db_iter); return write_batch_.NewIteratorWithBase(column_family, db_iter);
} }
Status WritePreparedTxn::PrepareInternal() { Status WritePreparedTxn::PrepareInternal() {

@ -506,7 +506,7 @@ Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter); assert(db_iter);
return write_batch_.NewIteratorWithBase(options, column_family, db_iter); return write_batch_.NewIteratorWithBase(column_family, db_iter);
} }
const std::map<SequenceNumber, size_t>& const std::map<SequenceNumber, size_t>&

@ -32,13 +32,11 @@ namespace rocksdb {
// * equal_keys_ <=> base_iterator == delta_iterator // * equal_keys_ <=> base_iterator == delta_iterator
class BaseDeltaIterator : public Iterator { class BaseDeltaIterator : public Iterator {
public: public:
BaseDeltaIterator(const ReadOptions& read_options, Iterator* base_iterator, BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator,
WBWIIterator* delta_iterator, const Comparator* comparator) const Comparator* comparator)
: read_options_(read_options), : forward_(true),
forward_(true),
current_at_base_(true), current_at_base_(true),
equal_keys_(false), equal_keys_(false),
current_over_upper_bound_(false),
status_(Status::OK()), status_(Status::OK()),
base_iterator_(base_iterator), base_iterator_(base_iterator),
delta_iterator_(delta_iterator), delta_iterator_(delta_iterator),
@ -47,7 +45,7 @@ class BaseDeltaIterator : public Iterator {
virtual ~BaseDeltaIterator() {} virtual ~BaseDeltaIterator() {}
bool Valid() const override { bool Valid() const override {
return BaseDeltaValid() ? !current_over_upper_bound_ : false; return current_at_base_ ? BaseValid() : DeltaValid();
} }
void SeekToFirst() override { void SeekToFirst() override {
@ -218,15 +216,9 @@ class BaseDeltaIterator : public Iterator {
} }
// equal_keys_ <=> compare == 0 // equal_keys_ <=> compare == 0
assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0));
#endif #endif
} }
bool IsOverUpperBound() {
return read_options_.iterate_upper_bound != nullptr &&
comparator_->Compare(key(), *read_options_.iterate_upper_bound) >= 0;
}
void Advance() { void Advance() {
if (equal_keys_) { if (equal_keys_) {
assert(BaseValid() && DeltaValid()); assert(BaseValid() && DeltaValid());
@ -260,9 +252,6 @@ class BaseDeltaIterator : public Iterator {
} }
bool BaseValid() const { return base_iterator_->Valid(); } bool BaseValid() const { return base_iterator_->Valid(); }
bool DeltaValid() const { return delta_iterator_->Valid(); } bool DeltaValid() const { return delta_iterator_->Valid(); }
bool BaseDeltaValid() const {
return (current_at_base_ ? BaseValid() : DeltaValid());
}
void UpdateCurrent() { void UpdateCurrent() {
// Suppress false positive clang analyzer warnings. // Suppress false positive clang analyzer warnings.
#ifndef __clang_analyzer__ #ifndef __clang_analyzer__
@ -275,37 +264,32 @@ class BaseDeltaIterator : public Iterator {
} else if (!delta_iterator_->status().ok()) { } else if (!delta_iterator_->status().ok()) {
// Expose the error status and stop. // Expose the error status and stop.
current_at_base_ = false; current_at_base_ = false;
break; return;
} }
equal_keys_ = false; equal_keys_ = false;
if (!BaseValid()) { if (!BaseValid()) {
if (!base_iterator_->status().ok()) { if (!base_iterator_->status().ok()) {
// Expose the error status and stop. // Expose the error status and stop.
current_at_base_ = true; current_at_base_ = true;
break; return;
} }
// Base has finished. // Base has finished.
if (!DeltaValid()) { if (!DeltaValid()) {
// Finished // Finished
break; return;
} }
if (delta_entry.type == kDeleteRecord || if (delta_entry.type == kDeleteRecord ||
delta_entry.type == kSingleDeleteRecord) { delta_entry.type == kSingleDeleteRecord) {
AdvanceDelta(); AdvanceDelta();
// If the new Delta is valid and >= iterate_upper_bound, stop
current_over_upper_bound_ = DeltaValid() && IsOverUpperBound();
if (current_over_upper_bound_) {
return;
}
} else { } else {
current_at_base_ = false; current_at_base_ = false;
break; return;
} }
} else if (!DeltaValid()) { } else if (!DeltaValid()) {
// Delta has finished. // Delta has finished.
current_at_base_ = true; current_at_base_ = true;
break; return;
} else { } else {
int compare = int compare =
(forward_ ? 1 : -1) * (forward_ ? 1 : -1) *
@ -317,7 +301,7 @@ class BaseDeltaIterator : public Iterator {
if (delta_entry.type != kDeleteRecord && if (delta_entry.type != kDeleteRecord &&
delta_entry.type != kSingleDeleteRecord) { delta_entry.type != kSingleDeleteRecord) {
current_at_base_ = false; current_at_base_ = false;
break; return;
} }
// Delta is less advanced and is delete. // Delta is less advanced and is delete.
AdvanceDelta(); AdvanceDelta();
@ -326,20 +310,18 @@ class BaseDeltaIterator : public Iterator {
} }
} else { } else {
current_at_base_ = true; current_at_base_ = true;
break; return;
} }
} }
} }
current_over_upper_bound_ = BaseDeltaValid() && IsOverUpperBound(); AssertInvariants();
#endif // __clang_analyzer__ #endif // __clang_analyzer__
} }
ReadOptions read_options_;
bool forward_; bool forward_;
bool current_at_base_; bool current_at_base_;
bool equal_keys_; bool equal_keys_;
bool current_over_upper_bound_;
Status status_; Status status_;
std::unique_ptr<Iterator> base_iterator_; std::unique_ptr<Iterator> base_iterator_;
std::unique_ptr<WBWIIterator> delta_iterator_; std::unique_ptr<WBWIIterator> delta_iterator_;
@ -660,39 +642,25 @@ WBWIIterator* WriteBatchWithIndex::NewIterator(
} }
Iterator* WriteBatchWithIndex::NewIteratorWithBase( Iterator* WriteBatchWithIndex::NewIteratorWithBase(
const ReadOptions& read_options, ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family, Iterator* base_iterator) {
Iterator* base_iterator) {
if (rep->overwrite_key == false) { if (rep->overwrite_key == false) {
assert(false); assert(false);
return nullptr; return nullptr;
} }
return new BaseDeltaIterator(read_options, base_iterator, return new BaseDeltaIterator(base_iterator, NewIterator(column_family),
NewIterator(column_family),
GetColumnFamilyUserComparator(column_family)); GetColumnFamilyUserComparator(column_family));
} }
Iterator* WriteBatchWithIndex::NewIteratorWithBase( Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
ColumnFamilyHandle* column_family, Iterator* base_iterator) {
ReadOptions read_options;
return NewIteratorWithBase(read_options, column_family, base_iterator);
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase(
const ReadOptions& read_options, Iterator* base_iterator) {
if (rep->overwrite_key == false) { if (rep->overwrite_key == false) {
assert(false); assert(false);
return nullptr; return nullptr;
} }
// default column family's comparator // default column family's comparator
return new BaseDeltaIterator(read_options, base_iterator, NewIterator(), return new BaseDeltaIterator(base_iterator, NewIterator(),
rep->comparator.default_comparator()); rep->comparator.default_comparator());
} }
Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
ReadOptions read_options;
return NewIteratorWithBase(read_options, base_iterator);
}
Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();

@ -9,12 +9,11 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "rocksdb/utilities/write_batch_with_index.h"
#include <map>
#include <memory> #include <memory>
#include <map>
#include "db/column_family.h" #include "db/column_family.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/comparator.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/testharness.h" #include "util/testharness.h"
@ -507,19 +506,7 @@ typedef std::map<std::string, std::string> KVMap;
class KVIter : public Iterator { class KVIter : public Iterator {
public: public:
explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {} explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {}
explicit KVIter(const KVMap* map, const Slice* iterate_upper_bound) virtual bool Valid() const { return iter_ != map_->end(); }
: map_(map),
iter_(map_->end()),
iterate_upper_bound_(iterate_upper_bound) {}
virtual bool Valid() const {
if (iterate_upper_bound_ == nullptr) {
return iter_ != map_->end();
} else {
if (iter_ == map_->end()) return false;
const Comparator* cmp = BytewiseComparator();
return cmp->Compare(key(), *iterate_upper_bound_) < 0;
}
}
virtual void SeekToFirst() { iter_ = map_->begin(); } virtual void SeekToFirst() { iter_ = map_->begin(); }
virtual void SeekToLast() { virtual void SeekToLast() {
if (map_->empty()) { if (map_->empty()) {
@ -549,7 +536,6 @@ class KVIter : public Iterator {
private: private:
const KVMap* const map_; const KVMap* const map_;
KVMap::const_iterator iter_; KVMap::const_iterator iter_;
const Slice* iterate_upper_bound_ = nullptr;
}; };
void AssertIter(Iterator* iter, const std::string& key, void AssertIter(Iterator* iter, const std::string& key,
@ -567,7 +553,6 @@ void AssertItersEqual(Iterator* iter1, Iterator* iter2) {
ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString());
} }
} }
} // namespace } // namespace
TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
@ -628,14 +613,9 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) {
} }
} }
auto rnd_key_idx = rnd.Uniform(static_cast<int>(source_strings.size()));
Slice random_upper_bound(source_strings[rnd_key_idx]);
ReadOptions read_options;
read_options.iterate_upper_bound = &random_upper_bound;
std::unique_ptr<Iterator> iter( std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(read_options, &cf1, new KVIter(&map))); batch.NewIteratorWithBase(&cf1, new KVIter(&map)));
std::unique_ptr<Iterator> result_iter( std::unique_ptr<Iterator> result_iter(new KVIter(&merged_map));
new KVIter(&merged_map, &random_upper_bound));
bool is_valid = false; bool is_valid = false;
for (int i = 0; i < 128; i++) { for (int i = 0; i < 128; i++) {
@ -857,46 +837,6 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) {
iter->Prev(); iter->Prev();
AssertIter(iter.get(), "c", "cc"); AssertIter(iter.get(), "c", "cc");
} }
// Test iterate_upper_bound
{
KVMap empty_map;
Slice upper_bound("cd");
ReadOptions read_options;
read_options.iterate_upper_bound = &upper_bound;
std::unique_ptr<Iterator> iter(
batch.NewIteratorWithBase(read_options, &cf1, new KVIter(&empty_map)));
iter->SeekToFirst();
AssertIter(iter.get(), "a", "aa");
iter->Next();
AssertIter(iter.get(), "c", "cc");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->SeekToLast();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("aa");
AssertIter(iter.get(), "c", "cc");
iter->Prev();
AssertIter(iter.get(), "a", "aa");
iter->Next();
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter->Seek("ca");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
// Seek to outside of upper bound, should not crash
iter->Seek("zz");
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
}
} }
TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) {

Loading…
Cancel
Save