Eliminate memcpy from ForwardIterator

Summary:
This diff update ForwardIterator to support pinning keys and values, which will allow DBIter to take advantage of that and eliminate memcpy when executing merge operators
This diff is stacked on D61305

Test Plan:
existing tests (updated them to test tailing iterator)
new test

Reviewers: andrewkr, yhchiang, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60009
main
Islam AbdelRahman 9 years ago
parent d367555027
commit d11c09d9e2
  1. 95
      db/db_test2.cc
  2. 76
      db/db_test_util.cc
  3. 3
      db/db_test_util.h
  4. 156
      db/forward_iterator.cc
  5. 13
      db/forward_iterator.h

@ -1628,37 +1628,7 @@ TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
// 3 L4 Files
ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
// Verify Get()
for (auto kv : true_data) {
ASSERT_EQ(Get(kv.first), kv.second);
}
Iterator* iter = db_->NewIterator(ReadOptions());
// Verify Iterator::Next()
auto data_iter = true_data.begin();
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
}
ASSERT_EQ(data_iter, true_data.end());
// Verify Iterator::Prev()
auto data_rev = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
ASSERT_EQ(iter->key().ToString(), data_rev->first);
ASSERT_EQ(iter->value().ToString(), data_rev->second);
}
ASSERT_EQ(data_rev, true_data.rend());
// Verify Iterator::Seek()
for (auto kv : true_data) {
iter->Seek(kv.first);
ASSERT_EQ(kv.first, iter->key().ToString());
ASSERT_EQ(kv.second, iter->value().ToString());
}
delete iter;
VerifyDBFromMap(true_data);
}
TEST_P(MergeOperatorPinningTest, Randomized) {
@ -1807,12 +1777,69 @@ TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
}
};
VerifyDBFromMap(true_data);
ASSERT_EQ(merge_cnt, kNumKeys * 4 /* get + next + prev + seek */);
size_t total_reads;
VerifyDBFromMap(true_data, &total_reads);
ASSERT_EQ(merge_cnt, total_reads);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
VerifyDBFromMap(true_data);
VerifyDBFromMap(true_data, &total_reads);
}
TEST_P(MergeOperatorPinningTest, TailingIterator) {
Options options = CurrentOptions();
options.merge_operator = MergeOperators::CreateMaxOperator();
BlockBasedTableOptions bbto;
bbto.no_block_cache = disable_block_cache_;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
const int kNumOperands = 100;
const int kNumWrites = 100000;
std::function<void()> writer_func = [&]() {
int k = 0;
for (int i = 0; i < kNumWrites; i++) {
db_->Merge(WriteOptions(), Key(k), Key(k));
if (i && i % kNumOperands == 0) {
k++;
}
if (i && i % 127 == 0) {
ASSERT_OK(Flush());
}
if (i && i % 317 == 0) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
}
};
std::function<void()> reader_func = [&]() {
ReadOptions ro;
ro.tailing = true;
Iterator* iter = db_->NewIterator(ro);
iter->SeekToFirst();
for (int i = 0; i < (kNumWrites / kNumOperands); i++) {
while (!iter->Valid()) {
// wait for the key to be written
env_->SleepForMicroseconds(100);
iter->Seek(Key(i));
}
ASSERT_EQ(iter->key(), Key(i));
ASSERT_EQ(iter->value(), Key(i));
iter->Next();
}
delete iter;
};
std::thread writer_thread(writer_func);
std::thread reader_thread(reader_func);
writer_thread.join();
reader_thread.join();
}
#endif // ROCKSDB_LITE

@ -1076,38 +1076,90 @@ std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
return file_numbers;
}
void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data) {
void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
size_t* total_reads_res) {
size_t total_reads = 0;
for (auto& kv : true_data) {
ASSERT_EQ(Get(kv.first), kv.second);
total_reads++;
}
// Normal Iterator
{
int iter_cnt = 0;
ReadOptions ro;
ro.total_order_seek = true;
Iterator* iter = db_->NewIterator(ro);
// Verify Iterator::Next()
iter_cnt = 0;
auto data_iter = true_data.begin();
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
iter_cnt++;
total_reads++;
}
ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
<< true_data.size();
// Verify Iterator::Prev()
iter_cnt = 0;
auto data_rev = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
ASSERT_EQ(iter->key().ToString(), data_rev->first);
ASSERT_EQ(iter->value().ToString(), data_rev->second);
iter_cnt++;
total_reads++;
}
ASSERT_EQ(data_rev, true_data.rend()) << iter_cnt << " / "
<< true_data.size();
// Verify Iterator::Seek()
for (auto kv : true_data) {
iter->Seek(kv.first);
ASSERT_EQ(kv.first, iter->key().ToString());
ASSERT_EQ(kv.second, iter->value().ToString());
total_reads++;
}
delete iter;
}
#ifndef ROCKSDB_LITE
// Tailing iterator
int iter_cnt = 0;
ReadOptions ro;
ro.tailing = true;
ro.total_order_seek = true;
Iterator* iter = db_->NewIterator(ro);
// Verify Iterator::Next()
// Verify ForwardIterator::Next()
iter_cnt = 0;
auto data_iter = true_data.begin();
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
ASSERT_EQ(iter->key().ToString(), data_iter->first);
ASSERT_EQ(iter->value().ToString(), data_iter->second);
iter_cnt++;
total_reads++;
}
ASSERT_EQ(data_iter, true_data.end());
// Verify Iterator::Prev()
auto data_rev = true_data.rbegin();
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
ASSERT_EQ(iter->key().ToString(), data_rev->first);
ASSERT_EQ(iter->value().ToString(), data_rev->second);
}
ASSERT_EQ(data_rev, true_data.rend());
ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
<< true_data.size();
// Verify Iterator::Seek()
// Verify ForwardIterator::Seek()
for (auto kv : true_data) {
iter->Seek(kv.first);
ASSERT_EQ(kv.first, iter->key().ToString());
ASSERT_EQ(kv.second, iter->value().ToString());
total_reads++;
}
delete iter;
#endif // ROCKSDB_LITE
if (total_reads_res) {
*total_reads_res = total_reads;
}
}
#ifndef ROCKSDB_LITE

@ -810,7 +810,8 @@ class DBTestBase : public testing::Test {
std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path);
void VerifyDBFromMap(std::map<std::string, std::string> true_data);
void VerifyDBFromMap(std::map<std::string, std::string> true_data,
size_t* total_reads_res = nullptr);
#ifndef ROCKSDB_LITE
Status GenerateAndAddExternalFile(const Options options,

@ -32,10 +32,15 @@ namespace rocksdb {
class LevelIterator : public InternalIterator {
public:
LevelIterator(const ColumnFamilyData* const cfd,
const ReadOptions& read_options,
const std::vector<FileMetaData*>& files)
: cfd_(cfd), read_options_(read_options), files_(files), valid_(false),
file_index_(std::numeric_limits<uint32_t>::max()) {}
const ReadOptions& read_options,
const std::vector<FileMetaData*>& files)
: cfd_(cfd),
read_options_(read_options),
files_(files),
valid_(false),
file_index_(std::numeric_limits<uint32_t>::max()),
file_iter_(nullptr),
pinned_iters_mgr_(nullptr) {}
void SetFileIndex(uint32_t file_index) {
assert(file_index < files_.size());
@ -47,10 +52,20 @@ class LevelIterator : public InternalIterator {
}
void Reset() {
assert(file_index_ < files_.size());
file_iter_.reset(cfd_->table_cache()->NewIterator(
// Reset current pointer
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIterator(file_iter_);
} else {
delete file_iter_;
}
file_iter_ = cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
files_[file_index_]->fd, nullptr /* table_reader_ptr */, nullptr,
false));
false);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
}
void SeekToLast() override {
status_ = Status::NotSupported("LevelIterator::SeekToLast()");
@ -105,6 +120,20 @@ class LevelIterator : public InternalIterator {
}
return Status::OK();
}
bool IsKeyPinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_->IsKeyPinned();
}
bool IsValuePinned() const override {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
file_iter_->IsValuePinned();
}
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
if (file_iter_) {
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
}
}
private:
const ColumnFamilyData* const cfd_;
@ -114,7 +143,8 @@ class LevelIterator : public InternalIterator {
bool valid_;
uint32_t file_index_;
Status status_;
std::unique_ptr<InternalIterator> file_iter_;
InternalIterator* file_iter_;
PinnedIteratorsManager* pinned_iters_mgr_;
};
ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
@ -135,7 +165,8 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
has_iter_trimmed_for_upper_bound_(false),
current_over_upper_bound_(false),
is_prev_set_(false),
is_prev_inclusive_(false) {
is_prev_inclusive_(false),
pinned_iters_mgr_(nullptr) {
if (sv_) {
RebuildIterators(false);
}
@ -145,6 +176,13 @@ ForwardIterator::~ForwardIterator() {
Cleanup(true);
}
namespace {
// Used in PinnedIteratorsManager to release pinned SuperVersion
static void ReleaseSuperVersionFunc(void* sv) {
delete reinterpret_cast<SuperVersion*>(sv);
}
} // namespace
void ForwardIterator::SVCleanup() {
if (sv_ != nullptr && sv_->Unref()) {
// Job id == 0 means that this is not our background process, but rather
@ -157,7 +195,11 @@ void ForwardIterator::SVCleanup() {
db_->ScheduleBgLogWriterClose(&job_context);
}
db_->mutex_.Unlock();
delete sv_;
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinPtr(sv_, &ReleaseSuperVersionFunc);
} else {
delete sv_;
}
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(
job_context, read_options_.background_purge_on_iterator_cleanup);
@ -168,18 +210,21 @@ void ForwardIterator::SVCleanup() {
void ForwardIterator::Cleanup(bool release_sv) {
if (mutable_iter_ != nullptr) {
mutable_iter_->~InternalIterator();
DeleteIterator(mutable_iter_, true /* is_arena */);
}
for (auto* m : imm_iters_) {
m->~InternalIterator();
DeleteIterator(m, true /* is_arena */);
}
imm_iters_.clear();
for (auto* f : l0_iters_) {
delete f;
DeleteIterator(f);
}
l0_iters_.clear();
for (auto* l : level_iters_) {
delete l;
DeleteIterator(l);
}
level_iters_.clear();
@ -280,7 +325,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
l0[i]->largest.user_key()) > 0) {
if (read_options_.iterate_upper_bound != nullptr) {
has_iter_trimmed_for_upper_bound_ = true;
delete l0_iters_[i];
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = nullptr;
}
continue;
@ -295,7 +340,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
immutable_min_heap_.push(l0_iters_[i]);
} else {
has_iter_trimmed_for_upper_bound_ = true;
delete l0_iters_[i];
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = nullptr;
}
}
@ -372,7 +417,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
} else {
// Nothing in this level is interesting. Remove.
has_iter_trimmed_for_upper_bound_ = true;
delete level_iters_[level - 1];
DeleteIterator(level_iters_[level - 1]);
level_iters_[level - 1] = nullptr;
}
}
@ -485,6 +530,50 @@ Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
return Status::InvalidArgument();
}
void ForwardIterator::SetPinnedItersMgr(
PinnedIteratorsManager* pinned_iters_mgr) {
pinned_iters_mgr_ = pinned_iters_mgr;
UpdateChildrenPinnedItersMgr();
}
void ForwardIterator::UpdateChildrenPinnedItersMgr() {
// Set PinnedIteratorsManager for mutable memtable iterator.
if (mutable_iter_) {
mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
}
// Set PinnedIteratorsManager for immutable memtable iterators.
for (InternalIterator* child_iter : imm_iters_) {
if (child_iter) {
child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
}
// Set PinnedIteratorsManager for L0 files iterators.
for (InternalIterator* child_iter : l0_iters_) {
if (child_iter) {
child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
}
// Set PinnedIteratorsManager for L1+ levels iterators.
for (LevelIterator* child_iter : level_iters_) {
if (child_iter) {
child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
}
}
bool ForwardIterator::IsKeyPinned() const {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
current_->IsKeyPinned();
}
bool ForwardIterator::IsValuePinned() const {
return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
current_->IsValuePinned();
}
void ForwardIterator::RebuildIterators(bool refresh_sv) {
// Clean up
Cleanup(refresh_sv);
@ -513,6 +602,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
BuildLevelIterators(vstorage);
current_ = nullptr;
is_prev_set_ = false;
UpdateChildrenPinnedItersMgr();
}
void ForwardIterator::RenewIterators() {
@ -521,10 +612,10 @@ void ForwardIterator::RenewIterators() {
svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
if (mutable_iter_ != nullptr) {
mutable_iter_->~InternalIterator();
DeleteIterator(mutable_iter_, true /* is_arena */);
}
for (auto* m : imm_iters_) {
m->~InternalIterator();
DeleteIterator(m, true /* is_arena */);
}
imm_iters_.clear();
@ -565,13 +656,13 @@ void ForwardIterator::RenewIterators() {
}
for (auto* f : l0_iters_) {
delete f;
DeleteIterator(f);
}
l0_iters_.clear();
l0_iters_ = l0_iters_new;
for (auto* l : level_iters_) {
delete l;
DeleteIterator(l);
}
level_iters_.clear();
BuildLevelIterators(vstorage_new);
@ -579,6 +670,8 @@ void ForwardIterator::RenewIterators() {
is_prev_set_ = false;
SVCleanup();
sv_ = svnew;
UpdateChildrenPinnedItersMgr();
}
void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
@ -608,10 +701,11 @@ void ForwardIterator::ResetIncompleteIterators() {
if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
continue;
}
delete l0_iters_[i];
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
l0_files[i]->fd);
l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
}
for (auto* level_iter : level_iters_) {
@ -700,7 +794,7 @@ void ForwardIterator::DeleteCurrentIter() {
}
if (l0_iters_[i] == current_) {
has_iter_trimmed_for_upper_bound_ = true;
delete l0_iters_[i];
DeleteIterator(l0_iters_[i]);
l0_iters_[i] = nullptr;
return;
}
@ -712,7 +806,7 @@ void ForwardIterator::DeleteCurrentIter() {
}
if (level_iters_[level - 1] == current_) {
has_iter_trimmed_for_upper_bound_ = true;
delete level_iters_[level - 1];
DeleteIterator(level_iters_[level - 1]);
level_iters_[level - 1] = nullptr;
}
}
@ -776,6 +870,22 @@ uint32_t ForwardIterator::FindFileInRange(
return right;
}
void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
if (iter == nullptr) {
return;
}
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIterator(iter, is_arena);
} else {
if (is_arena) {
iter->~InternalIterator();
} else {
delete iter;
}
}
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -72,6 +72,10 @@ class ForwardIterator : public InternalIterator {
virtual Slice value() const override;
virtual Status status() const override;
virtual Status GetProperty(std::string prop_name, std::string* prop) override;
virtual void SetPinnedItersMgr(
PinnedIteratorsManager* pinned_iters_mgr) override;
virtual bool IsKeyPinned() const override;
virtual bool IsValuePinned() const override;
bool TEST_CheckDeletedIters(int* deleted_iters, int* num_iters);
@ -92,6 +96,14 @@ class ForwardIterator : public InternalIterator {
bool IsOverUpperBound(const Slice& internal_key) const;
// Set PinnedIteratorsManager for all children Iterators, this function should
// be called whenever we update children Iterators or pinned_iters_mgr_.
void UpdateChildrenPinnedItersMgr();
// A helper function that will release iter in the proper manner, or pass it
// to pinned_iters_mgr_ to release it later if pinning is enabled.
void DeleteIterator(InternalIterator* iter, bool is_arena = false);
DBImpl* const db_;
const ReadOptions read_options_;
ColumnFamilyData* const cfd_;
@ -129,6 +141,7 @@ class ForwardIterator : public InternalIterator {
bool is_prev_set_;
bool is_prev_inclusive_;
PinnedIteratorsManager* pinned_iters_mgr_;
Arena arena_;
};

Loading…
Cancel
Save