|
|
@ -1035,7 +1035,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit, |
|
|
|
(unsigned long)m->GetLogNumber()); |
|
|
|
(unsigned long)m->GetLogNumber()); |
|
|
|
list.push_back(m->NewIterator()); |
|
|
|
list.push_back(m->NewIterator()); |
|
|
|
} |
|
|
|
} |
|
|
|
Iterator* iter = NewMergingIterator(env_, &internal_comparator_, &list[0], |
|
|
|
Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0], |
|
|
|
list.size()); |
|
|
|
list.size()); |
|
|
|
const SequenceNumber newest_snapshot = snapshots_.GetNewest(); |
|
|
|
const SequenceNumber newest_snapshot = snapshots_.GetNewest(); |
|
|
|
const SequenceNumber earliest_seqno_in_memtable = |
|
|
|
const SequenceNumber earliest_seqno_in_memtable = |
|
|
@ -2519,7 +2519,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, |
|
|
|
// Collect iterators for files in L0 - Ln
|
|
|
|
// Collect iterators for files in L0 - Ln
|
|
|
|
versions_->current()->AddIterators(options, storage_options_, &list); |
|
|
|
versions_->current()->AddIterators(options, storage_options_, &list); |
|
|
|
Iterator* internal_iter = |
|
|
|
Iterator* internal_iter = |
|
|
|
NewMergingIterator(env_, &internal_comparator_, &list[0], list.size()); |
|
|
|
NewMergingIterator(&internal_comparator_, &list[0], list.size()); |
|
|
|
versions_->current()->Ref(); |
|
|
|
versions_->current()->Ref(); |
|
|
|
|
|
|
|
|
|
|
|
cleanup->mu = &mutex_; |
|
|
|
cleanup->mu = &mutex_; |
|
|
@ -2555,8 +2555,6 @@ Status DBImpl::GetImpl(const ReadOptions& options, |
|
|
|
Status s; |
|
|
|
Status s; |
|
|
|
|
|
|
|
|
|
|
|
StopWatch sw(env_, options_.statistics, DB_GET); |
|
|
|
StopWatch sw(env_, options_.statistics, DB_GET); |
|
|
|
StopWatchNano snapshot_timer(env_, false); |
|
|
|
|
|
|
|
StartPerfTimer(&snapshot_timer); |
|
|
|
|
|
|
|
SequenceNumber snapshot; |
|
|
|
SequenceNumber snapshot; |
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
if (options.snapshot != nullptr) { |
|
|
|
if (options.snapshot != nullptr) { |
|
|
@ -2585,23 +2583,15 @@ Status DBImpl::GetImpl(const ReadOptions& options, |
|
|
|
// s is both in/out. When in, s could either be OK or MergeInProgress.
|
|
|
|
// s is both in/out. When in, s could either be OK or MergeInProgress.
|
|
|
|
// merge_operands will contain the sequence of merges in the latter case.
|
|
|
|
// merge_operands will contain the sequence of merges in the latter case.
|
|
|
|
LookupKey lkey(key, snapshot); |
|
|
|
LookupKey lkey(key, snapshot); |
|
|
|
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer); |
|
|
|
|
|
|
|
if (mem->Get(lkey, value, &s, &merge_operands, options_)) { |
|
|
|
if (mem->Get(lkey, value, &s, &merge_operands, options_)) { |
|
|
|
// Done
|
|
|
|
// Done
|
|
|
|
} else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { |
|
|
|
} else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { |
|
|
|
// Done
|
|
|
|
// Done
|
|
|
|
} else { |
|
|
|
} else { |
|
|
|
StopWatchNano from_files_timer(env_, false); |
|
|
|
|
|
|
|
StartPerfTimer(&from_files_timer); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current->Get(options, lkey, value, &s, &merge_operands, &stats, |
|
|
|
current->Get(options, lkey, value, &s, &merge_operands, &stats, |
|
|
|
options_, value_found); |
|
|
|
options_, value_found); |
|
|
|
have_stat_update = true; |
|
|
|
have_stat_update = true; |
|
|
|
BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
StopWatchNano post_process_timer(env_, false); |
|
|
|
|
|
|
|
StartPerfTimer(&post_process_timer); |
|
|
|
|
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
|
|
|
|
|
|
|
|
if (!options_.disable_seek_compaction && |
|
|
|
if (!options_.disable_seek_compaction && |
|
|
@ -2617,8 +2607,6 @@ Status DBImpl::GetImpl(const ReadOptions& options, |
|
|
|
// Note, tickers are atomic now - no lock protection needed any more.
|
|
|
|
// Note, tickers are atomic now - no lock protection needed any more.
|
|
|
|
RecordTick(options_.statistics, NUMBER_KEYS_READ); |
|
|
|
RecordTick(options_.statistics, NUMBER_KEYS_READ); |
|
|
|
RecordTick(options_.statistics, BYTES_READ, value->size()); |
|
|
|
RecordTick(options_.statistics, BYTES_READ, value->size()); |
|
|
|
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return s; |
|
|
|
return s; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -2627,8 +2615,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
std::vector<std::string>* values) { |
|
|
|
std::vector<std::string>* values) { |
|
|
|
|
|
|
|
|
|
|
|
StopWatch sw(env_, options_.statistics, DB_MULTIGET); |
|
|
|
StopWatch sw(env_, options_.statistics, DB_MULTIGET); |
|
|
|
StopWatchNano snapshot_timer(env_, false); |
|
|
|
|
|
|
|
StartPerfTimer(&snapshot_timer); |
|
|
|
|
|
|
|
SequenceNumber snapshot; |
|
|
|
SequenceNumber snapshot; |
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
if (options.snapshot != nullptr) { |
|
|
|
if (options.snapshot != nullptr) { |
|
|
@ -2660,7 +2646,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
|
|
|
|
|
|
|
|
// Keep track of bytes that we read for statistics-recording later
|
|
|
|
// Keep track of bytes that we read for statistics-recording later
|
|
|
|
uint64_t bytesRead = 0; |
|
|
|
uint64_t bytesRead = 0; |
|
|
|
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// For each of the given keys, apply the entire "get" process as follows:
|
|
|
|
// For each of the given keys, apply the entire "get" process as follows:
|
|
|
|
// First look in the memtable, then in the immutable memtable (if any).
|
|
|
|
// First look in the memtable, then in the immutable memtable (if any).
|
|
|
@ -2687,8 +2672,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Post processing (decrement reference counts and record statistics)
|
|
|
|
// Post processing (decrement reference counts and record statistics)
|
|
|
|
StopWatchNano post_process_timer(env_, false); |
|
|
|
|
|
|
|
StartPerfTimer(&post_process_timer); |
|
|
|
|
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
if (!options_.disable_seek_compaction && |
|
|
|
if (!options_.disable_seek_compaction && |
|
|
|
have_stat_update && current->UpdateStats(stats)) { |
|
|
|
have_stat_update && current->UpdateStats(stats)) { |
|
|
@ -2703,7 +2686,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS); |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS); |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys); |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys); |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead); |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead); |
|
|
|
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return statList; |
|
|
|
return statList; |
|
|
|
} |
|
|
|
} |
|
|
@ -2772,8 +2754,6 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
StopWatchNano pre_post_process_timer(env_, false); |
|
|
|
|
|
|
|
StartPerfTimer(&pre_post_process_timer); |
|
|
|
|
|
|
|
Writer w(&mutex_); |
|
|
|
Writer w(&mutex_); |
|
|
|
w.batch = my_batch; |
|
|
|
w.batch = my_batch; |
|
|
|
w.sync = options.sync; |
|
|
|
w.sync = options.sync; |
|
|
@ -2820,13 +2800,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
if (options.disableWAL) { |
|
|
|
if (options.disableWAL) { |
|
|
|
flush_on_destroy_ = true; |
|
|
|
flush_on_destroy_ = true; |
|
|
|
} |
|
|
|
} |
|
|
|
BumpPerfTime(&perf_context.write_pre_and_post_process_time, |
|
|
|
|
|
|
|
&pre_post_process_timer); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (!options.disableWAL) { |
|
|
|
if (!options.disableWAL) { |
|
|
|
StopWatchNano timer(env_); |
|
|
|
StopWatchNano timer(env_); |
|
|
|
StartPerfTimer(&timer); |
|
|
|
StartPerfTimer(&timer); |
|
|
|
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); |
|
|
|
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); |
|
|
|
|
|
|
|
BumpPerfTime(&perf_context.wal_write_time, &timer); |
|
|
|
if (status.ok() && options.sync) { |
|
|
|
if (status.ok() && options.sync) { |
|
|
|
if (options_.use_fsync) { |
|
|
|
if (options_.use_fsync) { |
|
|
|
StopWatch(env_, options_.statistics, WAL_FILE_SYNC_MICROS); |
|
|
|
StopWatch(env_, options_.statistics, WAL_FILE_SYNC_MICROS); |
|
|
@ -2836,14 +2815,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
status = log_->file()->Sync(); |
|
|
|
status = log_->file()->Sync(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
BumpPerfTime(&perf_context.write_wal_time, &timer); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
if (status.ok()) { |
|
|
|
if (status.ok()) { |
|
|
|
StopWatchNano write_memtable_timer(env_, false); |
|
|
|
|
|
|
|
StartPerfTimer(&write_memtable_timer); |
|
|
|
|
|
|
|
status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this, |
|
|
|
status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this, |
|
|
|
options_.filter_deletes); |
|
|
|
options_.filter_deletes); |
|
|
|
BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer); |
|
|
|
|
|
|
|
if (!status.ok()) { |
|
|
|
if (!status.ok()) { |
|
|
|
// Panic for in-memory corruptions
|
|
|
|
// Panic for in-memory corruptions
|
|
|
|
// Note that existing logic was not sound. Any partial failure writing
|
|
|
|
// Note that existing logic was not sound. Any partial failure writing
|
|
|
@ -2853,7 +2828,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
} |
|
|
|
} |
|
|
|
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence); |
|
|
|
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence); |
|
|
|
} |
|
|
|
} |
|
|
|
StartPerfTimer(&pre_post_process_timer); |
|
|
|
|
|
|
|
LogFlush(options_.info_log); |
|
|
|
LogFlush(options_.info_log); |
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
if (status.ok()) { |
|
|
|
if (status.ok()) { |
|
|
@ -2881,8 +2855,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
if (!writers_.empty()) { |
|
|
|
if (!writers_.empty()) { |
|
|
|
writers_.front()->cv.Signal(); |
|
|
|
writers_.front()->cv.Signal(); |
|
|
|
} |
|
|
|
} |
|
|
|
BumpPerfTime(&perf_context.write_pre_and_post_process_time, |
|
|
|
|
|
|
|
&pre_post_process_timer); |
|
|
|
|
|
|
|
return status; |
|
|
|
return status; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|