When flushing mem tables, create iterators out of mutex

Summary:
creating new iterators of mem tables can be expensive. Move them out of mutex.
DBImpl::WriteLevel0Table()'s mems seems to be a local vector and is only used by flushing. memtables to flush are also immutable, so it should be safe to do so.

Test Plan: make all check

Reviewers: haobo, dhruba, kailiu

Reviewed By: dhruba

CC: igor, leveldb

Differential Revision: https://reviews.facebook.net/D14577
main
Siying Dong 11 years ago
parent 3c02c363b3
commit 95a411d853
  1. 35
      db/db_impl.cc

@ -1054,27 +1054,26 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
*filenumber = meta.number; *filenumber = meta.number;
pending_outputs_.insert(meta.number); pending_outputs_.insert(meta.number);
std::vector<Iterator*> list;
for (MemTable* m : mems) {
Log(options_.info_log,
"Flushing memtable with log file: %lu\n",
(unsigned long)m->GetLogNumber());
list.push_back(m->NewIterator());
}
Iterator* iter = NewMergingIterator(env_, &internal_comparator_, &list[0],
list.size());
const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable = const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber(); mems[0]->GetFirstSequenceNumber();
Log(options_.info_log,
"Level-0 flush table #%lu: started",
(unsigned long)meta.number);
Version* base = versions_->current(); Version* base = versions_->current();
base->Ref(); // it is likely that we do not need this reference base->Ref(); // it is likely that we do not need this reference
Status s; Status s;
{ {
mutex_.Unlock(); mutex_.Unlock();
std::vector<Iterator*> list;
for (MemTable* m : mems) {
Log(options_.info_log,
"Flushing memtable with log file: %lu\n",
(unsigned long)m->GetLogNumber());
list.push_back(m->NewIterator());
}
Iterator* iter = NewMergingIterator(env_, &internal_comparator_, &list[0],
list.size());
Log(options_.info_log,
"Level-0 flush table #%lu: started",
(unsigned long)meta.number);
// We skip compression if universal compression is used and the size // We skip compression if universal compression is used and the size
// threshold is set for compression. // threshold is set for compression.
bool enable_compression = (options_.compaction_style bool enable_compression = (options_.compaction_style
@ -1085,15 +1084,15 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
user_comparator(), newest_snapshot, user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, enable_compression); earliest_seqno_in_memtable, enable_compression);
LogFlush(options_.info_log); LogFlush(options_.info_log);
delete iter;
Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s",
(unsigned long) meta.number,
(unsigned long) meta.file_size,
s.ToString().c_str());
mutex_.Lock(); mutex_.Lock();
} }
base->Unref(); base->Unref();
Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s",
(unsigned long) meta.number,
(unsigned long) meta.file_size,
s.ToString().c_str());
delete iter;
// re-acquire the most current version // re-acquire the most current version
base = versions_->current(); base = versions_->current();

Loading…
Cancel
Save