diff --git a/db/column_family.cc b/db/column_family.cc index 8bc989179..a8ffe8cf6 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -270,6 +270,10 @@ ColumnFamilyData::~ColumnFamilyData() { } } +const EnvOptions* ColumnFamilyData::soptions() const { + return &(column_family_set_->storage_options_); +} + void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; need_slowdown_for_num_level0_files_ = diff --git a/db/column_family.h b/db/column_family.h index 9843e4125..d306f4e66 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -168,6 +168,7 @@ class ColumnFamilyData { // thread-safe const Options* options() const { return &options_; } + const EnvOptions* soptions() const; InternalStats* internal_stats() { return internal_stats_.get(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 440685508..e02df44b5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3238,45 +3238,6 @@ Iterator* DBImpl::TEST_NewInternalIterator(ColumnFamilyHandle* column_family) { return NewInternalIterator(roptions, cfd, super_version); } -std::pair DBImpl::GetTailingIteratorPair( - const ReadOptions& options, ColumnFamilyData* cfd, - uint64_t* superversion_number) { - - mutex_.Lock(); - SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); - if (superversion_number != nullptr) { - *superversion_number = cfd->GetSuperVersionNumber(); - } - mutex_.Unlock(); - - Iterator* mutable_iter = super_version->mem->NewIterator(options); - // create a DBIter that only uses memtable content; see NewIterator() - mutable_iter = - NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), - mutable_iter, kMaxSequenceNumber); - - std::vector list; - super_version->imm->AddIterators(options, &list); - super_version->current->AddIterators(options, storage_options_, &list); - Iterator* immutable_iter = - NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size()); - - // create a DBIter that only uses memtable content; see NewIterator() - immutable_iter = - NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), - immutable_iter, kMaxSequenceNumber); - - // register cleanups - mutable_iter->RegisterCleanup(CleanupIteratorState, - new IterState(this, &mutex_, super_version), nullptr); - - // bump the ref one more time since it will be Unref'ed twice - immutable_iter->RegisterCleanup(CleanupIteratorState, - new IterState(this, &mutex_, super_version->Ref()), nullptr); - - return std::make_pair(mutable_iter, immutable_iter); -} - int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; @@ -3628,7 +3589,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, Iterator* iter; if (options.tailing) { - iter = new TailingIterator(this, options, cfd); + iter = new TailingIterator(env_, this, options, cfd); } else { SequenceNumber latest_snapshot = versions_->LastSequence(); SuperVersion* sv = nullptr; @@ -3640,7 +3601,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot; - iter = NewDBIterator(&dbname_, env_, *cfd->options(), + iter = NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter, snapshot); } @@ -3682,7 +3643,7 @@ Status DBImpl::NewIterators( if (options.tailing) { for (auto cfh : column_families) { auto cfd = reinterpret_cast(cfh)->cfd(); - iterators->push_back(new TailingIterator(this, options, cfd)); + iterators->push_back(new TailingIterator(env_, this, options, cfd)); } } else { for (size_t i = 0; i < column_families.size(); ++i) { @@ -3695,7 +3656,7 @@ Status DBImpl::NewIterators( : latest_snapshot; auto iter = NewInternalIterator(options, cfd, super_versions[i]); - iter = NewDBIterator(&dbname_, env_, *cfd->options(), + iter = NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter, snapshot); iterators->push_back(iter); } diff --git a/db/db_impl.h b/db/db_impl.h index e16bf3bb4..c4efb919e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -409,14 +409,6 @@ class DBImpl : public DB { // hold the data set. Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1); - // Returns a pair of iterators (mutable-only and immutable-only) used - // internally by TailingIterator and stores cfd->GetSuperVersionNumber() in - // *superversion_number. These iterators are always up-to-date, i.e. can - // be used to read new data. - std::pair GetTailingIteratorPair( - const ReadOptions& options, ColumnFamilyData* cfd, - uint64_t* superversion_number); - // table_cache_ provides its own synchronization std::shared_ptr table_cache_; diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index d016931e8..435384bd3 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -80,7 +80,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options, SequenceNumber latest_snapshot = versions_->LastSequence(); Iterator* internal_iter = NewInternalIterator(options, cfd, super_version); return NewDBIterator( - &dbname_, env_, *cfd->options(), cfd->user_comparator(), internal_iter, + env_, *cfd->options(), cfd->user_comparator(), internal_iter, (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); diff --git a/db/db_iter.cc b/db/db_iter.cc index 47c07bfd9..a6d765db5 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -57,10 +57,9 @@ class DBIter: public Iterator { kReverse }; - DBIter(const std::string* dbname, Env* env, const Options& options, + DBIter(Env* env, const Options& options, const Comparator* cmp, Iterator* iter, SequenceNumber s) - : dbname_(dbname), - env_(env), + : env_(env), logger_(options.info_log.get()), user_comparator_(cmp), user_merge_operator_(options.merge_operator.get()), @@ -117,7 +116,6 @@ class DBIter: public Iterator { } } - const std::string* const dbname_; Env* const env_; Logger* logger_; const Comparator* const user_comparator_; @@ -467,13 +465,12 @@ void DBIter::SeekToLast() { } // anonymous namespace Iterator* NewDBIterator( - const std::string* dbname, Env* env, const Options& options, const Comparator *user_key_comparator, Iterator* internal_iter, const SequenceNumber& sequence) { - return new DBIter(dbname, env, options, user_key_comparator, + return new DBIter(env, options, user_key_comparator, internal_iter, sequence); } diff --git a/db/db_iter.h b/db/db_iter.h index b44e67454..d8a3bad51 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -18,7 +18,6 @@ namespace rocksdb { // "*internal_iter") that were live at the specified "sequence" number // into appropriate user keys. extern Iterator* NewDBIterator( - const std::string* dbname, Env* env, const Options& options, const Comparator *user_key_comparator, diff --git a/db/tailing_iter.cc b/db/tailing_iter.cc index cd0335fef..f22dcc3f8 100644 --- a/db/tailing_iter.cc +++ b/db/tailing_iter.cc @@ -7,22 +7,31 @@ #include #include +#include #include "db/db_impl.h" +#include "db/db_iter.h" #include "db/column_family.h" +#include "rocksdb/env.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" +#include "table/merger.h" namespace rocksdb { -TailingIterator::TailingIterator(DBImpl* db, const ReadOptions& options, - ColumnFamilyData* cfd) - : db_(db), - options_(options), +TailingIterator::TailingIterator(Env* const env, DBImpl* db, + const ReadOptions& read_options, ColumnFamilyData* cfd) + : env_(env), + db_(db), + read_options_(read_options), cfd_(cfd), - version_number_(0), + super_version_(nullptr), current_(nullptr), status_(Status::InvalidArgument("Seek() not called on this iterator")) {} +TailingIterator::~TailingIterator() { + Cleanup(); +} + bool TailingIterator::Valid() const { return current_ != nullptr; } @@ -60,7 +69,7 @@ void TailingIterator::Seek(const Slice& target) { const Comparator* cmp = cfd_->user_comparator(); if (!is_prev_set_ || cmp->Compare(prev_key_, target) >= !is_prev_inclusive_ || (immutable_->Valid() && cmp->Compare(target, immutable_->key()) > 0) || - (options_.prefix_seek && !IsSamePrefix(target))) { + (read_options_.prefix_seek && !IsSamePrefix(target))) { SeekImmutable(target); } @@ -122,14 +131,45 @@ void TailingIterator::SeekToLast() { status_ = Status::NotSupported("This iterator doesn't support SeekToLast()"); } -void TailingIterator::CreateIterators() { - std::pair iters = - db_->GetTailingIteratorPair(options_, cfd_, &version_number_); +void TailingIterator::Cleanup() { + // Release old super version if necessary + mutable_.reset(); + immutable_.reset(); + if (super_version_ != nullptr && super_version_->Unref()) { + DBImpl::DeletionState deletion_state; + db_->mutex_.Lock(); + super_version_->Cleanup(); + db_->FindObsoleteFiles(deletion_state, false, true); + db_->mutex_.Unlock(); + delete super_version_; + if (deletion_state.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(deletion_state); + } + } +} - assert(iters.first && iters.second); +void TailingIterator::CreateIterators() { + Cleanup(); + super_version_= cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + + Iterator* mutable_iter = super_version_->mem->NewIterator(read_options_); + // create a DBIter that only uses memtable content; see NewIterator() + mutable_.reset( + NewDBIterator(env_, *cfd_->options(), cfd_->user_comparator(), + mutable_iter, kMaxSequenceNumber)); + + std::vector list; + super_version_->imm->AddIterators(read_options_, &list); + super_version_->current->AddIterators( + read_options_, *cfd_->soptions(), &list); + Iterator* immutable_iter = + NewMergingIterator(&cfd_->internal_comparator(), &list[0], list.size()); + + // create a DBIter that only uses memtable content; see NewIterator() + immutable_.reset( + NewDBIterator(env_, *cfd_->options(), cfd_->user_comparator(), + immutable_iter, kMaxSequenceNumber)); - mutable_.reset(iters.first); - immutable_.reset(iters.second); current_ = nullptr; is_prev_set_ = false; } @@ -154,8 +194,8 @@ void TailingIterator::UpdateCurrent() { } bool TailingIterator::IsCurrentVersion() const { - return mutable_ != nullptr && immutable_ != nullptr && - version_number_ == cfd_->GetSuperVersionNumber(); + return super_version_ != nullptr && + super_version_->version_number == cfd_->GetSuperVersionNumber(); } bool TailingIterator::IsSamePrefix(const Slice& target) const { diff --git a/db/tailing_iter.h b/db/tailing_iter.h index 2a5a02e24..138738775 100644 --- a/db/tailing_iter.h +++ b/db/tailing_iter.h @@ -13,6 +13,8 @@ namespace rocksdb { class DBImpl; +class Env; +class SuperVersion; class ColumnFamilyData; /** @@ -25,9 +27,9 @@ class ColumnFamilyData; */ class TailingIterator : public Iterator { public: - TailingIterator(DBImpl* db, const ReadOptions& options, + TailingIterator(Env* const env, DBImpl* db, const ReadOptions& read_options, ColumnFamilyData* cfd); - virtual ~TailingIterator() {} + virtual ~TailingIterator(); virtual bool Valid() const override; virtual void SeekToFirst() override; @@ -40,10 +42,13 @@ class TailingIterator : public Iterator { virtual Status status() const override; private: + void Cleanup(); + + Env* const env_; DBImpl* const db_; - const ReadOptions options_; + const ReadOptions read_options_; ColumnFamilyData* const cfd_; - uint64_t version_number_; + SuperVersion* super_version_; // TailingIterator merges the contents of the two iterators below (one using // mutable memtable contents only, other over SSTs and immutable memtables).