Remove Arena in RangeDelAggregator

Summary:
The Arena construction/destruction introduced significant overhead to read-heavy workload just by creating empty vectors for its blocks, so avoid it in RangeDelAggregator.
Closes https://github.com/facebook/rocksdb/pull/1547

Differential Revision: D4207781

Pulled By: ajkr

fbshipit-source-id: 9d1c130
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent e63350e726
commit fe349db57b
  1. 2
      db/builder.cc
  2. 2
      db/builder.h
  3. 8
      db/db_impl.cc
  4. 6
      db/flush_job.cc
  5. 12
      db/memtable.cc
  6. 3
      db/memtable.h
  7. 4
      db/memtable_list.cc
  8. 15
      db/range_del_aggregator.cc
  9. 6
      db/range_del_aggregator.h
  10. 2
      db/repair.cc
  11. 14
      db/write_batch_test.cc
  12. 15
      table/table_test.cc

@ -62,7 +62,7 @@ Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache, InternalIterator* iter,
ScopedArenaIterator&& range_del_iter, FileMetaData* meta,
std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,

@ -65,7 +65,7 @@ extern Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache, InternalIterator* iter,
ScopedArenaIterator&& range_del_iter, FileMetaData* meta,
std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,

@ -1779,7 +1779,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_,
cfd->table_cache(), iter.get(),
ScopedArenaIterator(mem->NewRangeTombstoneIterator(ro, &arena)),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot,
@ -3858,11 +3858,11 @@ InternalIterator* DBImpl::NewInternalIterator(
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_options, arena));
ScopedArenaIterator range_del_iter;
std::unique_ptr<InternalIterator> range_del_iter;
Status s;
if (!read_options.ignore_range_deletions) {
range_del_iter.set(
super_version->mem->NewRangeTombstoneIterator(read_options, arena));
range_del_iter.reset(
super_version->mem->NewRangeTombstoneIterator(read_options));
s = range_del_agg->AddTombstones(std::move(range_del_iter));
}
// Collect all needed child iterators for immutable memtables

@ -256,7 +256,7 @@ Status FlushJob::WriteLevel0Table() {
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
range_del_iters.push_back(m->NewRangeTombstoneIterator(ro, &arena));
range_del_iters.push_back(m->NewRangeTombstoneIterator(ro));
total_num_entries += m->num_entries();
total_num_deletes += m->num_deletes();
total_memory_usage += m->ApproximateMemoryUsage();
@ -274,9 +274,9 @@ Status FlushJob::WriteLevel0Table() {
ScopedArenaIterator iter(
NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
static_cast<int>(memtables.size()), &arena));
ScopedArenaIterator range_del_iter(NewMergingIterator(
std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
&cfd_->internal_comparator(), &range_del_iters[0],
static_cast<int>(range_del_iters.size()), &arena));
static_cast<int>(range_del_iters.size())));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber());

@ -374,13 +374,11 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
}
InternalIterator* MemTable::NewRangeTombstoneIterator(
const ReadOptions& read_options, Arena* arena) {
assert(arena != nullptr);
const ReadOptions& read_options) {
if (read_options.ignore_range_deletions) {
return NewEmptyInternalIterator(arena);
return NewEmptyInternalIterator();
}
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem) MemTableIterator(*this, read_options, arena,
return new MemTableIterator(*this, read_options, nullptr /* arena */,
true /* use_range_del_table */);
}
@ -652,8 +650,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
if (prefix_bloom_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
ScopedArenaIterator range_del_iter(
NewRangeTombstoneIterator(read_opts, range_del_agg->GetArena()));
std::unique_ptr<InternalIterator> range_del_iter(
NewRangeTombstoneIterator(read_opts));
Status status = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!status.ok()) {
*s = status;

@ -161,8 +161,7 @@ class MemTable {
// those allocated in arena.
InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
InternalIterator* NewRangeTombstoneIterator(const ReadOptions& read_options,
Arena* arena);
InternalIterator* NewRangeTombstoneIterator(const ReadOptions& read_options);
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.

@ -156,8 +156,8 @@ Status MemTableListVersion::AddRangeTombstoneIterators(
RangeDelAggregator* range_del_agg) {
assert(range_del_agg != nullptr);
for (auto& m : memlist_) {
ScopedArenaIterator range_del_iter(
m->NewRangeTombstoneIterator(read_opts, arena));
std::unique_ptr<InternalIterator> range_del_iter(
m->NewRangeTombstoneIterator(read_opts));
Status s = range_del_agg->AddTombstones(std::move(range_del_iter));
if (!s.ok()) {
return s;

@ -86,16 +86,11 @@ bool RangeDelAggregator::ShouldAddTombstones(
return false;
}
Status RangeDelAggregator::AddTombstones(ScopedArenaIterator input) {
return AddTombstones(input.release(), true /* arena */);
}
Status RangeDelAggregator::AddTombstones(
std::unique_ptr<InternalIterator> input) {
return AddTombstones(input.release(), false /* arena */);
if (input == nullptr) {
return Status::OK();
}
Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
input->SeekToFirst();
bool first_iter = true;
while (input->Valid()) {
@ -115,11 +110,7 @@ Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
input->Next();
}
if (!first_iter) {
rep_->pinned_iters_mgr_.PinIterator(input, arena);
} else if (arena) {
input->~InternalIterator();
} else {
delete input;
rep_->pinned_iters_mgr_.PinIterator(input.release(), false /* arena */);
}
return Status::OK();
}

@ -57,7 +57,6 @@ class RangeDelAggregator {
// Adds tombstones to the tombstone aggregation structure maintained by this
// object.
// @return non-OK status if any of the tombstone keys are corrupted.
Status AddTombstones(ScopedArenaIterator input);
Status AddTombstones(std::unique_ptr<InternalIterator> input);
// Writes tombstones covering a range to a table builder.
@ -83,7 +82,6 @@ class RangeDelAggregator {
void AddToBuilder(TableBuilder* builder, const Slice* lower_bound,
const Slice* upper_bound, FileMetaData* meta,
bool bottommost_level = false);
Arena* GetArena() { return &arena_; }
bool IsEmpty();
private:
@ -103,13 +101,11 @@ class RangeDelAggregator {
// once the first range deletion is encountered.
void InitRep(const std::vector<SequenceNumber>& snapshots);
Status AddTombstones(InternalIterator* input, bool arena);
TombstoneMap& GetTombstoneMap(SequenceNumber seq);
SequenceNumber upper_bound_;
Arena arena_; // must be destroyed after pinned_iters_mgr_ which references
// memory in this arena
std::unique_ptr<Rep> rep_;
const InternalKeyComparator icmp_;
};
} // namespace rocksdb

@ -381,7 +381,7 @@ class Repairer {
status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
env_options_, table_cache_, iter.get(),
ScopedArenaIterator(mem->NewRangeTombstoneIterator(ro, &arena)),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
{}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false,

@ -45,10 +45,16 @@ static std::string PrintContents(WriteBatch* b) {
int merge_count = 0;
for (int i = 0; i < 2; ++i) {
Arena arena;
auto iter =
i == 0 ? ScopedArenaIterator(mem->NewIterator(ReadOptions(), &arena))
: ScopedArenaIterator(
mem->NewRangeTombstoneIterator(ReadOptions(), &arena));
ScopedArenaIterator arena_iter_guard;
std::unique_ptr<InternalIterator> iter_guard;
InternalIterator* iter;
if (i == 0) {
iter = mem->NewIterator(ReadOptions(), &arena);
arena_iter_guard.set(iter);
} else {
iter = mem->NewRangeTombstoneIterator(ReadOptions());
iter_guard.reset(iter);
}
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey ikey;
memset((void*)&ikey, 0, sizeof(ikey));

@ -2449,11 +2449,16 @@ TEST_F(MemTableTest, Simple) {
for (int i = 0; i < 2; ++i) {
Arena arena;
ScopedArenaIterator iter =
i == 0
? ScopedArenaIterator(memtable->NewIterator(ReadOptions(), &arena))
: ScopedArenaIterator(
memtable->NewRangeTombstoneIterator(ReadOptions(), &arena));
ScopedArenaIterator arena_iter_guard;
std::unique_ptr<InternalIterator> iter_guard;
InternalIterator* iter;
if (i == 0) {
iter = memtable->NewIterator(ReadOptions(), &arena);
arena_iter_guard.set(iter);
} else {
iter = memtable->NewRangeTombstoneIterator(ReadOptions());
iter_guard.reset(iter);
}
iter->SeekToFirst();
while (iter->Valid()) {
fprintf(stderr, "key: '%s' -> '%s'\n", iter->key().ToString().c_str(),

Loading…
Cancel
Save