Add blob support to DBIter (#7731)

Summary:
The patch adds iterator support to the integrated BlobDB implementation.
Whenever a blob reference is encountered during iteration, the corresponding
blob is retrieved by calling `Version::GetBlob`, assuming the `expose_blob_index`
(formerly `allow_blob`) flag is *not* set. (Note: the flag is set by the old stacked
BlobDB implementation, which has its own blob file handling/blob retrieval logic.)

In addition, `DBIter` now uniformly returns `Status::NotSupported` with the error
message `"BlobDB does not support merge operator."` when encountering a
blob reference while performing a merge (instead of potentially returning a
message that implies the database should be opened using the stacked BlobDB's
`Open`.)

TODO: We can implement support for lazily retrieving the blob value (or in other
words, bypassing the retrieval of blob values based on key) by extending the `Iterator`
API with a new `PrepareValue` method (similarly to `InternalIterator`, which already
supports lazy values).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7731

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D25256293

Pulled By: ltamasi

fbshipit-source-id: c39cd782011495a526cdff99c16f5fca400c4811
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent e102de7318
commit 61932cdf1d
  1. 44
      db/arena_wrapped_db_iter.cc
  2. 21
      db/arena_wrapped_db_iter.h
  3. 13
      db/blob/db_blob_index_test.cc
  4. 12
      db/db_impl/db_impl.cc
  5. 2
      db/db_impl/db_impl.h
  6. 5
      db/db_impl/db_impl_readonly.cc
  7. 2
      db/db_impl/db_impl_secondary.cc
  8. 142
      db/db_iter.cc
  9. 34
      db/db_iter.h
  10. 2
      db/db_iter_stress_test.cc
  11. 373
      db/db_iter_test.cc
  12. 121
      db/db_iterator_test.cc
  13. 9
      db/version_set.cc
  14. 14
      db/version_set.h
  15. 7
      table/sst_file_reader.cc
  16. 2
      utilities/blob_db/blob_db_impl.cc
  17. 12
      utilities/transactions/write_prepared_txn_db.cc
  18. 10
      utilities/transactions/write_unprepared_txn_db.cc

@ -30,20 +30,19 @@ Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
return db_iter_->GetProperty(prop_name, prop);
}
void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob,
bool allow_refresh) {
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
cf_options.user_comparator, nullptr, sequence,
true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob);
db_iter_ =
new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
cf_options.user_comparator, /* iter */ nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
@ -72,8 +71,9 @@ Status ArenaWrappedDBIter::Refresh() {
read_callback_->Refresh(latest_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
sv->current, latest_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
@ -90,16 +90,16 @@ Status ArenaWrappedDBIter::Refresh() {
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh) {
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh);
iter->Init(env, read_options, cf_options, mutable_cf_options, version,
sequence, max_sequential_skip_in_iterations, version_number,
read_callback, db_impl, cfd, expose_blob_index, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, allow_blob);
iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index);
}
return iter;

@ -23,6 +23,7 @@
namespace ROCKSDB_NAMESPACE {
class Arena;
class Version;
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed to be allocated. This class is used as an entry point of
@ -72,20 +73,20 @@ class ArenaWrappedDBIter : public Iterator {
void Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh);
bool expose_blob_index, bool allow_refresh);
// Store some parameters so we can refresh the iterator at a later point
// with these same params
void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd,
ReadCallback* read_callback, bool allow_blob) {
ReadCallback* read_callback, bool expose_blob_index) {
db_impl_ = db_impl;
cfd_ = cfd;
read_callback_ = read_callback;
allow_blob_ = allow_blob;
expose_blob_index_ = expose_blob_index;
}
private:
@ -96,7 +97,7 @@ class ArenaWrappedDBIter : public Iterator {
DBImpl* db_impl_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool allow_blob_ = false;
bool expose_blob_index_ = false;
bool allow_refresh_ = true;
};
@ -106,9 +107,9 @@ class ArenaWrappedDBIter : public Iterator {
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false,
bool allow_refresh = true);
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool expose_blob_index = false, bool allow_refresh = true);
} // namespace ROCKSDB_NAMESPACE

@ -98,7 +98,7 @@ class DBBlobIndexTest : public DBTestBase {
ArenaWrappedDBIter* GetBlobIterator() {
return dbfull()->NewIteratorImpl(
ReadOptions(), cfd(), dbfull()->GetLatestSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/);
nullptr /*read_callback*/, true /*expose_blob_index*/);
}
Options GetTestOptions() {
@ -238,8 +238,11 @@ TEST_F(DBBlobIndexTest, Updated) {
}
}
// Iterator should get blob value if allow_blob flag is set,
// otherwise return Status::NotSupported status.
// Note: the following test case pertains to the StackableDB-based BlobDB
// implementation. When a blob iterator is used, it should set the
// expose_blob_index flag for the underlying DBIter, and retrieve/return the
// corresponding blob value. If a regular DBIter is created (i.e.
// expose_blob_index is not set), it should return Status::Corruption.
TEST_F(DBBlobIndexTest, Iterate) {
const std::vector<std::vector<ValueType>> data = {
/*00*/ {kTypeValue},
@ -384,8 +387,8 @@ TEST_F(DBBlobIndexTest, Iterate) {
MoveDataTo(tier);
// Normal iterator
verify(1, Status::kNotSupported, "", "", create_normal_iterator);
verify(3, Status::kNotSupported, "", "", create_normal_iterator);
verify(1, Status::kCorruption, "", "", create_normal_iterator);
verify(3, Status::kCorruption, "", "", create_normal_iterator);
verify(5, Status::kOk, get_value(5, 0), get_value(5, 0),
create_normal_iterator);
verify(7, Status::kOk, get_value(8, 0), get_value(6, 0),

@ -2804,7 +2804,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
/* allow_unprepared_value */ true);
result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
this, cfd);
#endif
@ -2825,7 +2825,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob,
bool expose_blob_index,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
@ -2890,9 +2890,9 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, allow_blob,
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback, this, cfd, expose_blob_index,
read_options.snapshot != nullptr ? false : allow_refresh);
InternalIterator* internal_iter = NewInternalIterator(
@ -2930,7 +2930,7 @@ Status DBImpl::NewIterators(
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
cfd->user_comparator(), iter, kMaxSequenceNumber,
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
read_callback, this, cfd));
}

@ -522,7 +522,7 @@ class DBImpl : public DB {
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback,
bool allow_blob = false,
bool expose_blob_index = false,
bool allow_refresh = true);
virtual SequenceNumber GetLastPublishedSequence() const {

@ -83,7 +83,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
ReadCallback* read_callback = nullptr; // No read callback provided.
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
read_seq,
super_version->current, read_seq,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter = NewInternalIterator(
@ -115,7 +115,8 @@ Status DBImplReadOnly::NewIterators(
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
auto* sv = cfd->GetSuperVersion()->Ref();
auto* db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback);
auto* internal_iter = NewInternalIterator(

@ -421,7 +421,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
snapshot,
super_version->current, snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter = NewInternalIterator(

@ -36,16 +36,18 @@ namespace ROCKSDB_NAMESPACE {
DBIter::DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
bool arena_mode, uint64_t max_sequential_skip_in_iterations,
const Comparator* cmp, InternalIterator* iter,
const Version* version, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob)
ColumnFamilyData* cfd, bool expose_blob_index)
: prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
env_(_env),
logger_(cf_options.info_log),
user_comparator_(cmp),
merge_operator_(cf_options.merge_operator),
iter_(iter),
version_(version),
read_callback_(read_callback),
sequence_(s),
statistics_(cf_options.statistics),
@ -65,7 +67,9 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
read_options.total_order_seek ||
read_options.auto_prefix_mode),
allow_blob_(allow_blob),
read_tier_(read_options.read_tier),
verify_checksums_(read_options.verify_checksums),
expose_blob_index_(expose_blob_index),
is_blob_(false),
arena_mode_(arena_mode),
range_del_agg_(&cf_options.internal_comparator, s),
@ -165,6 +169,40 @@ void DBIter::Next() {
}
}
bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
const Slice& blob_index) {
assert(!is_blob_);
if (expose_blob_index_) { // Stacked BlobDB implementation
is_blob_ = true;
return true;
}
if (!version_) {
status_ = Status::Corruption("Encountered unexpected blob index.");
valid_ = false;
return false;
}
// TODO: consider moving ReadOptions from ArenaWrappedDBIter to DBIter to
// avoid having to copy options back and forth.
ReadOptions read_options;
read_options.read_tier = read_tier_;
read_options.verify_checksums = verify_checksums_;
const Status s =
version_->GetBlob(read_options, user_key, blob_index, &blob_value_);
if (!s.ok()) {
status_ = s;
valid_ = false;
return false;
}
is_blob_ = true;
return true;
}
// PRE: saved_key_ has the current user key if skipping_saved_key
// POST: saved_key_ should have the next user key if valid_,
// if the current entry is a result of merge
@ -319,8 +357,14 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
case kTypeBlobIndex:
if (start_seqnum_ > 0) {
if (ikey_.sequence >= start_seqnum_) {
assert(ikey_.type != kTypeBlobIndex);
saved_key_.SetInternalKey(ikey_);
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
}
valid_ = true;
return true;
} else {
@ -334,6 +378,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
}
} else if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_);
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
}
valid_ = true;
return true;
} else {
@ -348,20 +399,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
num_skipped = 0;
reseek_done = false;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
} else if (ikey_.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
valid_ = false;
} else {
if (ikey_.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
}
is_blob_ = true;
valid_ = true;
return true;
} else {
valid_ = true;
return true;
}
@ -551,15 +595,7 @@ bool DBIter::MergeValuesNewToOld() {
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
status_ = Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
return false;
} else {
@ -888,15 +924,8 @@ bool DBIter::FindValueForCurrentKey() {
merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
env_, &pinned_value_, true);
} else if (last_not_merge_type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
return false;
} else {
@ -911,15 +940,10 @@ bool DBIter::FindValueForCurrentKey() {
// do nothing - we've already has value in pinned_value_
break;
case kTypeBlobIndex:
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
valid_ = false;
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
return false;
}
is_blob_ = true;
break;
default:
valid_ = false;
@ -992,14 +1016,6 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
valid_ = false;
return true;
}
if (ikey.type == kTypeBlobIndex && !allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
valid_ = false;
return false;
}
if (!iter_.PrepareValue()) {
valid_ = false;
return false;
@ -1007,7 +1023,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
is_blob_ = (ikey.type == kTypeBlobIndex);
if (ikey.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
return false;
}
}
valid_ = true;
return true;
}
@ -1063,15 +1084,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
if (!allow_blob_) {
ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
status_ = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
} else {
status_ =
Status::NotSupported("Blob DB does not support merge operator.");
}
status_ = Status::NotSupported("BlobDB does not support merge operator.");
valid_ = false;
return false;
} else {
@ -1465,15 +1478,16 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator,
InternalIterator* internal_iter,
InternalIterator* internal_iter, const Version* version,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob) {
DBIter* db_iter = new DBIter(
env, read_options, cf_options, mutable_cf_options, user_key_comparator,
internal_iter, sequence, false, max_sequential_skip_in_iterations,
read_callback, db_impl, cfd, allow_blob);
ColumnFamilyData* cfd, bool expose_blob_index) {
DBIter* db_iter =
new DBIter(env, read_options, cf_options, mutable_cf_options,
user_key_comparator, internal_iter, version, sequence, false,
max_sequential_skip_in_iterations, read_callback, db_impl, cfd,
expose_blob_index);
return db_iter;
}

@ -22,6 +22,8 @@
namespace ROCKSDB_NAMESPACE {
class Version;
// This file declares the factory functions of DBIter, in its original form
// or a wrapped form with class ArenaWrappedDBIter, which is defined here.
// Class DBIter, which is declared and implemented inside db_iter.cc, is
@ -114,10 +116,10 @@ class DBIter final : public Iterator {
DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
InternalIterator* iter, const Version* version, SequenceNumber s,
bool arena_mode, uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob);
bool expose_blob_index);
// No copying allowed
DBIter(const DBIter&) = delete;
@ -159,7 +161,10 @@ class DBIter final : public Iterator {
}
Slice value() const override {
assert(valid_);
if (current_entry_is_merged_) {
if (!expose_blob_index_ && is_blob_) {
return blob_value_;
} else if (current_entry_is_merged_) {
// If pinned_value_ is set then the result of merge operator is one of
// the merge operands and we should return it.
return pinned_value_.data() ? pinned_value_ : saved_value_;
@ -185,7 +190,7 @@ class DBIter final : public Iterator {
return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_);
}
bool IsBlob() const {
assert(valid_ && (allow_blob_ || !is_blob_));
assert(valid_);
return is_blob_;
}
@ -287,12 +292,17 @@ class DBIter final : public Iterator {
: user_comparator_.CompareWithoutTimestamp(a, b);
}
// Retrieves the blob value for the specified user key using the given blob
// index when using the integrated BlobDB implementation.
bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index);
const SliceTransform* prefix_extractor_;
Env* const env_;
Logger* logger_;
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
IteratorWrapper iter_;
const Version* version_;
ReadCallback* read_callback_;
// Max visible sequence number. It is normally the snapshot seq unless we have
// uncommitted data in db as in WriteUnCommitted.
@ -306,6 +316,7 @@ class DBIter final : public Iterator {
std::string saved_value_;
Slice pinned_value_;
// for prefix seek mode to support prev()
PinnableSlice blob_value_;
Statistics* statistics_;
uint64_t max_skip_;
uint64_t max_skippable_internal_keys_;
@ -335,7 +346,11 @@ class DBIter final : public Iterator {
// Expect the inner iterator to maintain a total order.
// prefix_extractor_ must be non-NULL if the value is false.
const bool expect_total_order_inner_iter_;
bool allow_blob_;
ReadTier read_tier_;
bool verify_checksums_;
// Whether the iterator is allowed to expose blob references. Set to true when
// the stacked BlobDB implementation is used, false otherwise.
bool expose_blob_index_;
bool is_blob_;
bool arena_mode_;
// List of operands for merge operator.
@ -367,8 +382,9 @@ extern Iterator* NewDBIterator(
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const Comparator* user_key_comparator, InternalIterator* internal_iter,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false);
const Version* version, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, ReadCallback* read_callback,
DBImpl* db_impl = nullptr, ColumnFamilyData* cfd = nullptr,
bool expose_blob_index = false);
} // namespace ROCKSDB_NAMESPACE

@ -513,7 +513,7 @@ TEST_F(DBIteratorStressTest, StressTest) {
db_iter.reset(NewDBIterator(
env_, ropt, ImmutableCFOptions(options),
MutableCFOptions(options), BytewiseComparator(),
internal_iter, sequence,
internal_iter, nullptr /* version */, sequence,
options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
}

@ -253,8 +253,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
ReadOptions ro;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -286,8 +287,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
ReadOptions ro;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -313,8 +315,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -346,8 +349,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -382,8 +386,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(!db_iter->Valid());
@ -412,8 +417,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 7, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 7 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
SetPerfLevel(kEnableCount);
ASSERT_TRUE(GetPerfLevel() == kEnableCount);
@ -450,8 +456,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 4, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 4 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -476,8 +483,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(!db_iter->Valid());
@ -499,8 +507,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -535,8 +544,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 7, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 7 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
SetPerfLevel(kEnableCount);
ASSERT_TRUE(GetPerfLevel() == kEnableCount);
@ -565,8 +575,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
ReadOptions ro;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -608,8 +619,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
ReadOptions ro;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
@ -640,8 +652,9 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) {
ReadOptions ro;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
@ -671,8 +684,9 @@ TEST_F(DBIteratorTest, DBIteratorEmpty) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 0, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 0 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(!db_iter->Valid());
}
@ -683,8 +697,9 @@ TEST_F(DBIteratorTest, DBIteratorEmpty) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 0, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 0 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(!db_iter->Valid());
}
@ -706,8 +721,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkipCountSkips) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 2,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
2 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
@ -751,8 +767,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) {
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, i + 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, i + 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -787,8 +804,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, i + 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, i + 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -816,8 +834,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 202, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 202 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -849,8 +868,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) {
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, i, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, i /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(!db_iter->Valid());
@ -866,8 +886,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) {
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 200, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 200 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
@ -901,8 +922,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, i + 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, i + 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -936,8 +958,9 @@ TEST_F(DBIteratorTest, DBIteratorUseSkip) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, i + 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, i + 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -986,8 +1009,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = 0;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1033,8 +1057,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = 2;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1078,8 +1103,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = 2;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1117,8 +1143,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = 2;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1153,8 +1180,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = 2;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -1184,8 +1212,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = 2;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1222,8 +1251,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = 2;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1260,8 +1290,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = i;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 2 * i + 1, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 2 * i + 1 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1314,8 +1345,9 @@ TEST_F(DBIteratorTest, DBIteratorSkipInternalKeys) {
ro.max_skippable_internal_keys = i;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 2 * i + 1, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 2 * i + 1 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
@ -1353,8 +1385,9 @@ TEST_F(DBIteratorTest, DBIterator1) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 1,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
1 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1381,8 +1414,9 @@ TEST_F(DBIteratorTest, DBIterator2) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 0,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
0 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1406,8 +1440,9 @@ TEST_F(DBIteratorTest, DBIterator3) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 2,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
2 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1431,8 +1466,9 @@ TEST_F(DBIteratorTest, DBIterator4) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 4,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
4 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1465,8 +1501,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 0, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 0 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1488,8 +1525,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 1, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 1 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1511,8 +1549,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1534,8 +1573,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 3, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 3 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1557,8 +1597,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 4, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 4 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1580,8 +1621,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 5, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 5 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1603,8 +1645,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 6, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 6 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1624,8 +1667,9 @@ TEST_F(DBIteratorTest, DBIterator5) {
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 10, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->Seek("b");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
@ -1655,8 +1699,9 @@ TEST_F(DBIteratorTest, DBIterator6) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 0, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 0 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1678,8 +1723,9 @@ TEST_F(DBIteratorTest, DBIterator6) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 1, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 1 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1701,8 +1747,9 @@ TEST_F(DBIteratorTest, DBIterator6) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1724,8 +1771,9 @@ TEST_F(DBIteratorTest, DBIterator6) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 3, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 3 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(!db_iter->Valid());
}
@ -1743,8 +1791,9 @@ TEST_F(DBIteratorTest, DBIterator6) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 4, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 4 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1766,8 +1815,9 @@ TEST_F(DBIteratorTest, DBIterator6) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 5, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 5 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1789,8 +1839,9 @@ TEST_F(DBIteratorTest, DBIterator6) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 6, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 6 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1832,8 +1883,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 0, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 0 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -1867,8 +1919,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 2, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 2 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -1908,8 +1961,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 4, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 4 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -1949,8 +2003,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 5, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 5 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -1995,8 +2050,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 6, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 6 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -2042,8 +2098,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 7, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 7 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -2083,8 +2140,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 9, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 9 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -2130,8 +2188,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 13, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 13 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -2178,8 +2237,9 @@ TEST_F(DBIteratorTest, DBIterator7) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, cf_options, mutable_cf_options, BytewiseComparator(),
internal_iter, 14, options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/));
internal_iter, nullptr /* version */, 14 /* sequence */,
options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -2209,8 +2269,9 @@ TEST_F(DBIteratorTest, DBIterator8) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
@ -2240,8 +2301,9 @@ TEST_F(DBIteratorTest, DBIterator9) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
@ -2307,8 +2369,9 @@ TEST_F(DBIteratorTest, DBIterator10) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->Seek("c");
ASSERT_TRUE(db_iter->Valid());
@ -2347,8 +2410,8 @@ TEST_F(DBIteratorTest, SeekToLastOccurrenceSeq0) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10, 0 /* force seek */,
nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, 0 /* force seek */, nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -2376,8 +2439,9 @@ TEST_F(DBIteratorTest, DBIterator11) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 1,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
1 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
@ -2403,7 +2467,8 @@ TEST_F(DBIteratorTest, DBIterator12) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10, 0, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, 0 /* force seek */, nullptr /* read_callback */));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
@ -2440,7 +2505,9 @@ TEST_F(DBIteratorTest, DBIterator13) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 2, 3, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
2 /* sequence */, 3 /* max_sequential_skip_in_iterations */,
nullptr /* read_callback */));
db_iter->Seek("b");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), key);
@ -2468,7 +2535,9 @@ TEST_F(DBIteratorTest, DBIterator14) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 4, 1, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
4 /* sequence */, 1 /* max_sequential_skip_in_iterations */,
nullptr /* read_callback */));
db_iter->Seek("b");
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
@ -2495,8 +2564,9 @@ TEST_F(DBIteratorTest, DBIteratorTestDifferentialSnapshots) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 13,
options.max_sequential_skip_in_iterations, nullptr));
BytewiseComparator(), internal_iter, nullptr /* version */,
13 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
// Expecting InternalKeys in [5,8] range with correct type
int seqnums[4] = {5,8,11,13};
std::string user_keys[4] = {"1","2","3","4"};
@ -2530,8 +2600,9 @@ TEST_F(DBIteratorTest, DBIteratorTestDifferentialSnapshots) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 13,
options.max_sequential_skip_in_iterations, nullptr));
BytewiseComparator(), internal_iter, nullptr /* version */,
13 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
// Expecting InternalKeys in [5,8] range with correct type
int seqnums[4] = {5,8,11,13};
EntryType key_types[4] = {EntryType::kEntryDelete,EntryType::kEntryDelete,
@ -2580,9 +2651,9 @@ class DBIterWithMergeIterTest : public testing::Test {
db_iter_.reset(NewDBIterator(
env_, ro_, ImmutableCFOptions(options_), MutableCFOptions(options_),
BytewiseComparator(), merge_iter,
BytewiseComparator(), merge_iter, nullptr /* version */,
8 /* read data earlier than seqId 8 */,
3 /* max iterators before reseek */, nullptr /*read_callback*/));
3 /* max iterators before reseek */, nullptr /* read_callback */));
}
Env* env_;
@ -3020,8 +3091,9 @@ TEST_F(DBIteratorTest, SeekPrefixTombstones) {
ro.prefix_same_as_start = true;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
int skipped_keys = 0;
@ -3056,8 +3128,8 @@ TEST_F(DBIteratorTest, SeekToFirstLowerBound) {
Options options;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10 /* sequence */,
options.max_sequential_skip_in_iterations,
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToFirst();
@ -3095,8 +3167,9 @@ TEST_F(DBIteratorTest, PrevLowerBound) {
Options options;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10 /* sequence */,
options.max_sequential_skip_in_iterations, nullptr /* read_callback */));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
db_iter->SeekToLast();
for (int i = kNumKeys; i >= kLowerBound; --i) {
@ -3123,8 +3196,9 @@ TEST_F(DBIteratorTest, SeekLessLowerBound) {
Options options;
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ro, ImmutableCFOptions(options), MutableCFOptions(options),
BytewiseComparator(), internal_iter, 10 /* sequence */,
options.max_sequential_skip_in_iterations, nullptr /* read_callback */));
BytewiseComparator(), internal_iter, nullptr /* version */,
10 /* sequence */, options.max_sequential_skip_in_iterations,
nullptr /* read_callback */));
auto before_lower_bound_str = std::to_string(kLowerBound - 1);
Slice before_lower_bound(lower_bound_str);
@ -3148,8 +3222,9 @@ TEST_F(DBIteratorTest, ReverseToForwardWithDisappearingKeys) {
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ReadOptions(), ImmutableCFOptions(options),
MutableCFOptions(options), BytewiseComparator(), internal_iter, 10,
options.max_sequential_skip_in_iterations, nullptr /*read_callback*/));
MutableCFOptions(options), BytewiseComparator(), internal_iter,
nullptr /* version */, 10 /* sequence */,
options.max_sequential_skip_in_iterations, nullptr /* read_callback */));
db_iter->SeekForPrev("a");
ASSERT_TRUE(db_iter->Valid());

@ -2883,6 +2883,127 @@ TEST_P(DBIteratorTest, IterateWithLowerBoundAcrossFileBoundary) {
ASSERT_OK(iter->status());
}
TEST_P(DBIteratorTest, Blob) {
Options options = CurrentOptions();
options.enable_blob_files = true;
options.max_sequential_skip_in_iterations = 2;
options.statistics = CreateDBStatistics();
Reopen(options);
// Note: we have 4 KVs (3 of which are hidden) for key "b" and
// max_sequential_skip_in_iterations is set to 2. Thus, we need to do a reseek
// anytime we move from "b" to "c" or vice versa.
ASSERT_OK(Put("a", "va"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb0"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("b", "vb3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("c", "vc"));
ASSERT_OK(Flush());
std::unique_ptr<Iterator> iter_guard(NewIterator(ReadOptions()));
Iterator* const iter = iter_guard.get();
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 0);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 1);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekToLast();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->Seek("");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("a");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("ax");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->SeekForPrev("d");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->SeekForPrev("c");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 2);
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->SeekForPrev("bx");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 3);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Seek("b");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 3);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->Seek("z");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 3);
ASSERT_EQ(IterStatus(iter), "(invalid)");
iter->SeekForPrev("b");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 4);
ASSERT_EQ(IterStatus(iter), "b->vb3");
iter->SeekForPrev("");
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 4);
ASSERT_EQ(IterStatus(iter), "(invalid)");
// Switch from reverse to forward
iter->SeekToLast();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 4);
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 5);
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 5);
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 5);
ASSERT_EQ(IterStatus(iter), "b->vb3");
// Switch from forward to reverse
iter->SeekToFirst();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 5);
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 5);
iter->Next();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 6);
iter->Prev();
ASSERT_EQ(TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION), 7);
ASSERT_EQ(IterStatus(iter), "b->vb3");
}
INSTANTIATE_TEST_CASE_P(DBIteratorTestInstance, DBIteratorTest,
testing::Values(true, false));

@ -1791,9 +1791,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
io_tracer_(io_tracer) {}
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const Slice& blob_index_slice,
PinnableSlice* value) const {
assert(value);
if (read_options.read_tier == kBlockCacheTier) {
return Status::Incomplete("Cannot read blob: no disk I/O allowed");
}
@ -1801,7 +1800,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
BlobIndex blob_index;
{
Status s = blob_index.DecodeFrom(*value);
Status s = blob_index.DecodeFrom(blob_index_slice);
if (!s.ok()) {
return s;
}
@ -1813,6 +1812,8 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index,
PinnableSlice* value) const {
assert(value);
if (blob_index.HasTTL() || blob_index.IsInlined()) {
return Status::Corruption("Unexpected TTL/inlined blob index");
}
@ -1939,7 +1940,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
case GetContext::kFound:
if (is_blob_index) {
if (do_merge && value) {
*status = GetBlob(read_options, user_key, value);
*status = GetBlob(read_options, user_key, *value, value);
if (!status->ok()) {
if (status->IsIncomplete()) {
get_context.MarkKeyMayExist();

@ -684,15 +684,15 @@ class Version {
void MultiGet(const ReadOptions&, MultiGetRange* range,
ReadCallback* callback = nullptr, bool* is_blob = nullptr);
// Interprets *value as a blob reference, and (assuming the corresponding
// blob file is part of this Version) retrieves the blob and saves it in
// *value, replacing the blob reference.
// REQUIRES: *value stores an encoded blob reference
// Interprets blob_index_slice as a blob reference, and (assuming the
// corresponding blob file is part of this Version) retrieves the blob and
// saves it in *value.
// REQUIRES: blob_index_slice stores an encoded blob reference
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
PinnableSlice* value) const;
const Slice& blob_index_slice, PinnableSlice* value) const;
// Retrieves a blob using a blob reference and saves it in *value, assuming
// the corresponding blob file is part of this Version.
// Retrieves a blob using a blob reference and saves it in *value,
// assuming the corresponding blob file is part of this Version.
Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
const BlobIndex& blob_index, PinnableSlice* value) const;

@ -69,11 +69,12 @@ Iterator* SstFileReader::NewIterator(const ReadOptions& roptions) {
? roptions.snapshot->GetSequenceNumber()
: kMaxSequenceNumber;
ArenaWrappedDBIter* res = new ArenaWrappedDBIter();
res->Init(r->options.env, roptions, r->ioptions, r->moptions, sequence,
res->Init(r->options.env, roptions, r->ioptions, r->moptions,
nullptr /* version */, sequence,
r->moptions.max_sequential_skip_in_iterations,
0 /* version_number */, nullptr /* read_callback */,
nullptr /* db_impl */, nullptr /* cfd */, false /* allow_blob */,
false /* allow_refresh */);
nullptr /* db_impl */, nullptr /* cfd */,
true /* expose_blob_index */, false /* allow_refresh */);
auto internal_iter = r->table_reader->NewIterator(
res->GetReadOptions(), r->moptions.prefix_extractor.get(),
res->GetArena(), false /* skip_filters */,

@ -2045,7 +2045,7 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
}
auto* iter = db_impl_->NewIteratorImpl(
read_options, cfd, snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/);
nullptr /*read_callback*/, true /*expose_blob_index*/);
return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
}

@ -320,8 +320,8 @@ static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
constexpr bool expose_blob_index = false;
constexpr bool allow_refresh = false;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
@ -346,7 +346,7 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
expose_blob_index, allow_refresh);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
return db_iter;
}
@ -355,8 +355,8 @@ Status WritePreparedTxnDB::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
constexpr bool expose_blob_index = false;
constexpr bool allow_refresh = false;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
@ -383,7 +383,7 @@ Status WritePreparedTxnDB::NewIterators(
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
!ALLOW_BLOB, !ALLOW_REFRESH);
expose_blob_index, allow_refresh);
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
iterators->push_back(db_iter);
}

@ -384,8 +384,8 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn) {
// TODO(lth): Refactor so that this logic is shared with WritePrepared.
constexpr bool ALLOW_BLOB = true;
constexpr bool ALLOW_REFRESH = true;
constexpr bool expose_blob_index = false;
constexpr bool allow_refresh = false;
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
SequenceNumber snapshot_seq = kMaxSequenceNumber;
SequenceNumber min_uncommitted = 0;
@ -456,9 +456,9 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options,
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
auto* state =
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
auto* db_iter =
db_impl_->NewIteratorImpl(options, cfd, state->MaxVisibleSeq(),
&state->callback, !ALLOW_BLOB, !ALLOW_REFRESH);
auto* db_iter = db_impl_->NewIteratorImpl(
options, cfd, state->MaxVisibleSeq(), &state->callback, expose_blob_index,
allow_refresh);
db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
return db_iter;
}

Loading…
Cancel
Save