thread local for tailing iterator

Summary:
replace the super version acquisision in tailing itrator with thread
local

Test Plan: will post results

Reviewers: igor, haobo, sdong, yhchiang, dhruba

Reviewed By: igor

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17757
main
Lei Jin 11 years ago
parent 539dd207df
commit 82b37a18bd
  1. 4
      db/column_family.cc
  2. 1
      db/column_family.h
  3. 47
      db/db_impl.cc
  4. 8
      db/db_impl.h
  5. 2
      db/db_impl_readonly.cc
  6. 9
      db/db_iter.cc
  7. 1
      db/db_iter.h
  8. 68
      db/tailing_iter.cc
  9. 13
      db/tailing_iter.h

@ -270,6 +270,10 @@ ColumnFamilyData::~ColumnFamilyData() {
} }
} }
const EnvOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->storage_options_);
}
void ColumnFamilyData::SetCurrent(Version* current) { void ColumnFamilyData::SetCurrent(Version* current) {
current_ = current; current_ = current;
need_slowdown_for_num_level0_files_ = need_slowdown_for_num_level0_files_ =

@ -168,6 +168,7 @@ class ColumnFamilyData {
// thread-safe // thread-safe
const Options* options() const { return &options_; } const Options* options() const { return &options_; }
const EnvOptions* soptions() const;
InternalStats* internal_stats() { return internal_stats_.get(); } InternalStats* internal_stats() { return internal_stats_.get(); }

@ -3238,45 +3238,6 @@ Iterator* DBImpl::TEST_NewInternalIterator(ColumnFamilyHandle* column_family) {
return NewInternalIterator(roptions, cfd, super_version); return NewInternalIterator(roptions, cfd, super_version);
} }
std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
const ReadOptions& options, ColumnFamilyData* cfd,
uint64_t* superversion_number) {
mutex_.Lock();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
if (superversion_number != nullptr) {
*superversion_number = cfd->GetSuperVersionNumber();
}
mutex_.Unlock();
Iterator* mutable_iter = super_version->mem->NewIterator(options);
// create a DBIter that only uses memtable content; see NewIterator()
mutable_iter =
NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(),
mutable_iter, kMaxSequenceNumber);
std::vector<Iterator*> list;
super_version->imm->AddIterators(options, &list);
super_version->current->AddIterators(options, storage_options_, &list);
Iterator* immutable_iter =
NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size());
// create a DBIter that only uses memtable content; see NewIterator()
immutable_iter =
NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(),
immutable_iter, kMaxSequenceNumber);
// register cleanups
mutable_iter->RegisterCleanup(CleanupIteratorState,
new IterState(this, &mutex_, super_version), nullptr);
// bump the ref one more time since it will be Unref'ed twice
immutable_iter->RegisterCleanup(CleanupIteratorState,
new IterState(this, &mutex_, super_version->Ref()), nullptr);
return std::make_pair(mutable_iter, immutable_iter);
}
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family) { ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
@ -3628,7 +3589,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
Iterator* iter; Iterator* iter;
if (options.tailing) { if (options.tailing) {
iter = new TailingIterator(this, options, cfd); iter = new TailingIterator(env_, this, options, cfd);
} else { } else {
SequenceNumber latest_snapshot = versions_->LastSequence(); SequenceNumber latest_snapshot = versions_->LastSequence();
SuperVersion* sv = nullptr; SuperVersion* sv = nullptr;
@ -3640,7 +3601,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
options.snapshot != nullptr options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot; : latest_snapshot;
iter = NewDBIterator(&dbname_, env_, *cfd->options(), iter = NewDBIterator(env_, *cfd->options(),
cfd->user_comparator(), iter, snapshot); cfd->user_comparator(), iter, snapshot);
} }
@ -3682,7 +3643,7 @@ Status DBImpl::NewIterators(
if (options.tailing) { if (options.tailing) {
for (auto cfh : column_families) { for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
iterators->push_back(new TailingIterator(this, options, cfd)); iterators->push_back(new TailingIterator(env_, this, options, cfd));
} }
} else { } else {
for (size_t i = 0; i < column_families.size(); ++i) { for (size_t i = 0; i < column_families.size(); ++i) {
@ -3695,7 +3656,7 @@ Status DBImpl::NewIterators(
: latest_snapshot; : latest_snapshot;
auto iter = NewInternalIterator(options, cfd, super_versions[i]); auto iter = NewInternalIterator(options, cfd, super_versions[i]);
iter = NewDBIterator(&dbname_, env_, *cfd->options(), iter = NewDBIterator(env_, *cfd->options(),
cfd->user_comparator(), iter, snapshot); cfd->user_comparator(), iter, snapshot);
iterators->push_back(iter); iterators->push_back(iter);
} }

@ -409,14 +409,6 @@ class DBImpl : public DB {
// hold the data set. // hold the data set.
Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1); Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1);
// Returns a pair of iterators (mutable-only and immutable-only) used
// internally by TailingIterator and stores cfd->GetSuperVersionNumber() in
// *superversion_number. These iterators are always up-to-date, i.e. can
// be used to read new data.
std::pair<Iterator*, Iterator*> GetTailingIteratorPair(
const ReadOptions& options, ColumnFamilyData* cfd,
uint64_t* superversion_number);
// table_cache_ provides its own synchronization // table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_; std::shared_ptr<Cache> table_cache_;

@ -80,7 +80,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options,
SequenceNumber latest_snapshot = versions_->LastSequence(); SequenceNumber latest_snapshot = versions_->LastSequence();
Iterator* internal_iter = NewInternalIterator(options, cfd, super_version); Iterator* internal_iter = NewInternalIterator(options, cfd, super_version);
return NewDBIterator( return NewDBIterator(
&dbname_, env_, *cfd->options(), cfd->user_comparator(), internal_iter, env_, *cfd->options(), cfd->user_comparator(), internal_iter,
(options.snapshot != nullptr (options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot)); : latest_snapshot));

@ -57,10 +57,9 @@ class DBIter: public Iterator {
kReverse kReverse
}; };
DBIter(const std::string* dbname, Env* env, const Options& options, DBIter(Env* env, const Options& options,
const Comparator* cmp, Iterator* iter, SequenceNumber s) const Comparator* cmp, Iterator* iter, SequenceNumber s)
: dbname_(dbname), : env_(env),
env_(env),
logger_(options.info_log.get()), logger_(options.info_log.get()),
user_comparator_(cmp), user_comparator_(cmp),
user_merge_operator_(options.merge_operator.get()), user_merge_operator_(options.merge_operator.get()),
@ -117,7 +116,6 @@ class DBIter: public Iterator {
} }
} }
const std::string* const dbname_;
Env* const env_; Env* const env_;
Logger* logger_; Logger* logger_;
const Comparator* const user_comparator_; const Comparator* const user_comparator_;
@ -467,13 +465,12 @@ void DBIter::SeekToLast() {
} // anonymous namespace } // anonymous namespace
Iterator* NewDBIterator( Iterator* NewDBIterator(
const std::string* dbname,
Env* env, Env* env,
const Options& options, const Options& options,
const Comparator *user_key_comparator, const Comparator *user_key_comparator,
Iterator* internal_iter, Iterator* internal_iter,
const SequenceNumber& sequence) { const SequenceNumber& sequence) {
return new DBIter(dbname, env, options, user_key_comparator, return new DBIter(env, options, user_key_comparator,
internal_iter, sequence); internal_iter, sequence);
} }

@ -18,7 +18,6 @@ namespace rocksdb {
// "*internal_iter") that were live at the specified "sequence" number // "*internal_iter") that were live at the specified "sequence" number
// into appropriate user keys. // into appropriate user keys.
extern Iterator* NewDBIterator( extern Iterator* NewDBIterator(
const std::string* dbname,
Env* env, Env* env,
const Options& options, const Options& options,
const Comparator *user_key_comparator, const Comparator *user_key_comparator,

@ -7,22 +7,31 @@
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "table/merger.h"
namespace rocksdb { namespace rocksdb {
TailingIterator::TailingIterator(DBImpl* db, const ReadOptions& options, TailingIterator::TailingIterator(Env* const env, DBImpl* db,
ColumnFamilyData* cfd) const ReadOptions& read_options, ColumnFamilyData* cfd)
: db_(db), : env_(env),
options_(options), db_(db),
read_options_(read_options),
cfd_(cfd), cfd_(cfd),
version_number_(0), super_version_(nullptr),
current_(nullptr), current_(nullptr),
status_(Status::InvalidArgument("Seek() not called on this iterator")) {} status_(Status::InvalidArgument("Seek() not called on this iterator")) {}
TailingIterator::~TailingIterator() {
Cleanup();
}
bool TailingIterator::Valid() const { bool TailingIterator::Valid() const {
return current_ != nullptr; return current_ != nullptr;
} }
@ -60,7 +69,7 @@ void TailingIterator::Seek(const Slice& target) {
const Comparator* cmp = cfd_->user_comparator(); const Comparator* cmp = cfd_->user_comparator();
if (!is_prev_set_ || cmp->Compare(prev_key_, target) >= !is_prev_inclusive_ || if (!is_prev_set_ || cmp->Compare(prev_key_, target) >= !is_prev_inclusive_ ||
(immutable_->Valid() && cmp->Compare(target, immutable_->key()) > 0) || (immutable_->Valid() && cmp->Compare(target, immutable_->key()) > 0) ||
(options_.prefix_seek && !IsSamePrefix(target))) { (read_options_.prefix_seek && !IsSamePrefix(target))) {
SeekImmutable(target); SeekImmutable(target);
} }
@ -122,14 +131,45 @@ void TailingIterator::SeekToLast() {
status_ = Status::NotSupported("This iterator doesn't support SeekToLast()"); status_ = Status::NotSupported("This iterator doesn't support SeekToLast()");
} }
void TailingIterator::CreateIterators() { void TailingIterator::Cleanup() {
std::pair<Iterator*, Iterator*> iters = // Release old super version if necessary
db_->GetTailingIteratorPair(options_, cfd_, &version_number_); mutable_.reset();
immutable_.reset();
if (super_version_ != nullptr && super_version_->Unref()) {
DBImpl::DeletionState deletion_state;
db_->mutex_.Lock();
super_version_->Cleanup();
db_->FindObsoleteFiles(deletion_state, false, true);
db_->mutex_.Unlock();
delete super_version_;
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
}
}
}
assert(iters.first && iters.second); void TailingIterator::CreateIterators() {
Cleanup();
super_version_= cfd_->GetReferencedSuperVersion(&(db_->mutex_));
Iterator* mutable_iter = super_version_->mem->NewIterator(read_options_);
// create a DBIter that only uses memtable content; see NewIterator()
mutable_.reset(
NewDBIterator(env_, *cfd_->options(), cfd_->user_comparator(),
mutable_iter, kMaxSequenceNumber));
std::vector<Iterator*> list;
super_version_->imm->AddIterators(read_options_, &list);
super_version_->current->AddIterators(
read_options_, *cfd_->soptions(), &list);
Iterator* immutable_iter =
NewMergingIterator(&cfd_->internal_comparator(), &list[0], list.size());
// create a DBIter that only uses memtable content; see NewIterator()
immutable_.reset(
NewDBIterator(env_, *cfd_->options(), cfd_->user_comparator(),
immutable_iter, kMaxSequenceNumber));
mutable_.reset(iters.first);
immutable_.reset(iters.second);
current_ = nullptr; current_ = nullptr;
is_prev_set_ = false; is_prev_set_ = false;
} }
@ -154,8 +194,8 @@ void TailingIterator::UpdateCurrent() {
} }
bool TailingIterator::IsCurrentVersion() const { bool TailingIterator::IsCurrentVersion() const {
return mutable_ != nullptr && immutable_ != nullptr && return super_version_ != nullptr &&
version_number_ == cfd_->GetSuperVersionNumber(); super_version_->version_number == cfd_->GetSuperVersionNumber();
} }
bool TailingIterator::IsSamePrefix(const Slice& target) const { bool TailingIterator::IsSamePrefix(const Slice& target) const {

@ -13,6 +13,8 @@
namespace rocksdb { namespace rocksdb {
class DBImpl; class DBImpl;
class Env;
class SuperVersion;
class ColumnFamilyData; class ColumnFamilyData;
/** /**
@ -25,9 +27,9 @@ class ColumnFamilyData;
*/ */
class TailingIterator : public Iterator { class TailingIterator : public Iterator {
public: public:
TailingIterator(DBImpl* db, const ReadOptions& options, TailingIterator(Env* const env, DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd); ColumnFamilyData* cfd);
virtual ~TailingIterator() {} virtual ~TailingIterator();
virtual bool Valid() const override; virtual bool Valid() const override;
virtual void SeekToFirst() override; virtual void SeekToFirst() override;
@ -40,10 +42,13 @@ class TailingIterator : public Iterator {
virtual Status status() const override; virtual Status status() const override;
private: private:
void Cleanup();
Env* const env_;
DBImpl* const db_; DBImpl* const db_;
const ReadOptions options_; const ReadOptions read_options_;
ColumnFamilyData* const cfd_; ColumnFamilyData* const cfd_;
uint64_t version_number_; SuperVersion* super_version_;
// TailingIterator merges the contents of the two iterators below (one using // TailingIterator merges the contents of the two iterators below (one using
// mutable memtable contents only, other over SSTs and immutable memtables). // mutable memtable contents only, other over SSTs and immutable memtables).

Loading…
Cancel
Save