diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 41ed2fb9a..4657e9e38 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -788,6 +788,63 @@ TEST(ColumnFamilyTest, DifferentCompactionStyles) { Close(); } +namespace { +std::string IterStatus(Iterator* iter) { + std::string result; + if (iter->Valid()) { + result = iter->key().ToString() + "->" + iter->value().ToString(); + } else { + result = "(invalid)"; + } + return result; +} +} // namespace anonymous + +TEST(ColumnFamilyTest, NewIteratorsTest) { + // iter == 0 -- no tailing + // iter == 2 -- tailing + for (int iter = 0; iter < 2; ++iter) { + Open(); + CreateColumnFamiliesAndReopen({"one", "two"}); + ASSERT_OK(Put(0, "a", "b")); + ASSERT_OK(Put(1, "b", "a")); + ASSERT_OK(Put(2, "c", "m")); + ASSERT_OK(Put(2, "v", "t")); + std::vector iterators; + ReadOptions options; + options.tailing = (iter == 1); + ASSERT_OK(db_->NewIterators(options, handles_, &iterators)); + + for (auto it : iterators) { + it->SeekToFirst(); + } + ASSERT_EQ(IterStatus(iterators[0]), "a->b"); + ASSERT_EQ(IterStatus(iterators[1]), "b->a"); + ASSERT_EQ(IterStatus(iterators[2]), "c->m"); + + ASSERT_OK(Put(1, "x", "x")); + + for (auto it : iterators) { + it->Next(); + } + + ASSERT_EQ(IterStatus(iterators[0]), "(invalid)"); + if (iter == 0) { + // no tailing + ASSERT_EQ(IterStatus(iterators[1]), "(invalid)"); + } else { + // tailing + ASSERT_EQ(IterStatus(iterators[1]), "x->x"); + } + ASSERT_EQ(IterStatus(iterators[2]), "v->t"); + + for (auto it : iterators) { + delete it; + } + Destroy(); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index cc15049cf..57e54c821 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3214,12 +3214,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, SuperVersion* super_version = nullptr; auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - mutex_.Lock(); if (!options.tailing) { + mutex_.Lock(); super_version = cfd->GetSuperVersion()->Ref(); latest_snapshot = versions_->LastSequence(); + mutex_.Unlock(); } - mutex_.Unlock(); Iterator* iter; if (options.tailing) { @@ -3227,11 +3227,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, } else { iter = NewInternalIterator(options, cfd, super_version); - iter = NewDBIterator( - &dbname_, env_, *cfd->full_options(), cfd->user_comparator(), iter, - (options.snapshot != nullptr - ? reinterpret_cast(options.snapshot)->number_ - : latest_snapshot)); + auto snapshot = + options.snapshot != nullptr + ? reinterpret_cast(options.snapshot)->number_ + : latest_snapshot; + iter = NewDBIterator(&dbname_, env_, *cfd->full_options(), + cfd->user_comparator(), iter, snapshot); } if (options.prefix) { @@ -3245,10 +3246,53 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, Status DBImpl::NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_families, std::vector* iterators) { - // TODO(icanadi) - return Status::NotSupported("Not yet!"); + + if (options.prefix) { + return Status::NotSupported( + "NewIterators doesn't support ReadOptions::prefix"); + } + + iterators->clear(); + iterators->reserve(column_families.size()); + SequenceNumber latest_snapshot = 0; + std::vector super_versions; + super_versions.reserve(column_families.size()); + + if (!options.tailing) { + mutex_.Lock(); + latest_snapshot = versions_->LastSequence(); + for (auto cfh : column_families) { + auto cfd = reinterpret_cast(cfh)->cfd(); + super_versions.push_back(cfd->GetSuperVersion()->Ref()); + } + mutex_.Unlock(); + } + + if (options.tailing) { + for (auto cfh : column_families) { + auto cfd = reinterpret_cast(cfh)->cfd(); + iterators->push_back(new TailingIterator(this, options, cfd)); + } + } else { + for (size_t i = 0; i < column_families.size(); ++i) { + auto cfh = reinterpret_cast(column_families[i]); + auto cfd = cfh->cfd(); + + auto snapshot = + options.snapshot != nullptr + ? reinterpret_cast(options.snapshot)->number_ + : latest_snapshot; + + auto iter = NewInternalIterator(options, cfd, super_versions[i]); + iter = NewDBIterator(&dbname_, env_, *cfd->full_options(), + cfd->user_comparator(), iter, snapshot); + iterators->push_back(iter); + } + } + + return Status::OK(); } const Snapshot* DBImpl::GetSnapshot() { diff --git a/db/db_impl.h b/db/db_impl.h index d36f6a41f..13cb38900 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -86,7 +86,7 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family); virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_families, std::vector* iterators); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 34fbb065d..1cb0b2283 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -255,7 +255,7 @@ class DB { // before the db is deleted virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_families, std::vector* iterators) = 0; // Return a handle to the current DB state. Iterators created with diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 4d56890f5..57f444802 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -80,9 +80,9 @@ class StackableDB : public DB { virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_families, std::vector* iterators) { - return db_->NewIterators(options, column_family, iterators); + return db_->NewIterators(options, column_families, iterators); }