@ -1035,7 +1035,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
( unsigned long ) m - > GetLogNumber ( ) ) ;
( unsigned long ) m - > GetLogNumber ( ) ) ;
list . push_back ( m - > NewIterator ( ) ) ;
list . push_back ( m - > NewIterator ( ) ) ;
}
}
Iterator * iter = NewMergingIterator ( & internal_comparator_ , & list [ 0 ] ,
Iterator * iter = NewMergingIterator ( env_ , & internal_comparator_ , & list [ 0 ] ,
list . size ( ) ) ;
list . size ( ) ) ;
const SequenceNumber newest_snapshot = snapshots_ . GetNewest ( ) ;
const SequenceNumber newest_snapshot = snapshots_ . GetNewest ( ) ;
const SequenceNumber earliest_seqno_in_memtable =
const SequenceNumber earliest_seqno_in_memtable =
@ -2519,7 +2519,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect iterators for files in L0 - Ln
// Collect iterators for files in L0 - Ln
versions_ - > current ( ) - > AddIterators ( options , storage_options_ , & list ) ;
versions_ - > current ( ) - > AddIterators ( options , storage_options_ , & list ) ;
Iterator * internal_iter =
Iterator * internal_iter =
NewMergingIterator ( & internal_comparator_ , & list [ 0 ] , list . size ( ) ) ;
NewMergingIterator ( env_ , & internal_comparator_ , & list [ 0 ] , list . size ( ) ) ;
versions_ - > current ( ) - > Ref ( ) ;
versions_ - > current ( ) - > Ref ( ) ;
cleanup - > mu = & mutex_ ;
cleanup - > mu = & mutex_ ;
@ -2555,6 +2555,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
Status s ;
Status s ;
StopWatch sw ( env_ , options_ . statistics , DB_GET ) ;
StopWatch sw ( env_ , options_ . statistics , DB_GET ) ;
StopWatchNano snapshot_timer ( env_ , false ) ;
StartPerfTimer ( & snapshot_timer ) ;
SequenceNumber snapshot ;
SequenceNumber snapshot ;
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
if ( options . snapshot ! = nullptr ) {
if ( options . snapshot ! = nullptr ) {
@ -2583,15 +2585,23 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey ( key , snapshot ) ;
LookupKey lkey ( key , snapshot ) ;
BumpPerfTime ( & perf_context . get_snapshot_time , & snapshot_timer ) ;
if ( mem - > Get ( lkey , value , & s , & merge_operands , options_ ) ) {
if ( mem - > Get ( lkey , value , & s , & merge_operands , options_ ) ) {
// Done
// Done
} else if ( imm . Get ( lkey , value , & s , & merge_operands , options_ ) ) {
} else if ( imm . Get ( lkey , value , & s , & merge_operands , options_ ) ) {
// Done
// Done
} else {
} else {
StopWatchNano from_files_timer ( env_ , false ) ;
StartPerfTimer ( & from_files_timer ) ;
current - > Get ( options , lkey , value , & s , & merge_operands , & stats ,
current - > Get ( options , lkey , value , & s , & merge_operands , & stats ,
options_ , value_found ) ;
options_ , value_found ) ;
have_stat_update = true ;
have_stat_update = true ;
BumpPerfTime ( & perf_context . get_from_output_files_time , & from_files_timer ) ;
}
}
StopWatchNano post_process_timer ( env_ , false ) ;
StartPerfTimer ( & post_process_timer ) ;
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
if ( ! options_ . disable_seek_compaction & &
if ( ! options_ . disable_seek_compaction & &
@ -2607,6 +2617,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// Note, tickers are atomic now - no lock protection needed any more.
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick ( options_ . statistics , NUMBER_KEYS_READ ) ;
RecordTick ( options_ . statistics , NUMBER_KEYS_READ ) ;
RecordTick ( options_ . statistics , BYTES_READ , value - > size ( ) ) ;
RecordTick ( options_ . statistics , BYTES_READ , value - > size ( ) ) ;
BumpPerfTime ( & perf_context . get_post_process_time , & post_process_timer ) ;
return s ;
return s ;
}
}
@ -2615,6 +2627,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
std : : vector < std : : string > * values ) {
std : : vector < std : : string > * values ) {
StopWatch sw ( env_ , options_ . statistics , DB_MULTIGET ) ;
StopWatch sw ( env_ , options_ . statistics , DB_MULTIGET ) ;
StopWatchNano snapshot_timer ( env_ , false ) ;
StartPerfTimer ( & snapshot_timer ) ;
SequenceNumber snapshot ;
SequenceNumber snapshot ;
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
if ( options . snapshot ! = nullptr ) {
if ( options . snapshot ! = nullptr ) {
@ -2646,6 +2660,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
// Keep track of bytes that we read for statistics-recording later
// Keep track of bytes that we read for statistics-recording later
uint64_t bytesRead = 0 ;
uint64_t bytesRead = 0 ;
BumpPerfTime ( & perf_context . get_snapshot_time , & snapshot_timer ) ;
// For each of the given keys, apply the entire "get" process as follows:
// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
// First look in the memtable, then in the immutable memtable (if any).
@ -2672,6 +2687,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
}
}
// Post processing (decrement reference counts and record statistics)
// Post processing (decrement reference counts and record statistics)
StopWatchNano post_process_timer ( env_ , false ) ;
StartPerfTimer ( & post_process_timer ) ;
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
if ( ! options_ . disable_seek_compaction & &
if ( ! options_ . disable_seek_compaction & &
have_stat_update & & current - > UpdateStats ( stats ) ) {
have_stat_update & & current - > UpdateStats ( stats ) ) {
@ -2686,6 +2703,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
RecordTick ( options_ . statistics , NUMBER_MULTIGET_CALLS ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_CALLS ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_KEYS_READ , numKeys ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_KEYS_READ , numKeys ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_BYTES_READ , bytesRead ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_BYTES_READ , bytesRead ) ;
BumpPerfTime ( & perf_context . get_post_process_time , & post_process_timer ) ;
return statList ;
return statList ;
}
}
@ -2754,6 +2772,8 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
}
}
Status DBImpl : : Write ( const WriteOptions & options , WriteBatch * my_batch ) {
Status DBImpl : : Write ( const WriteOptions & options , WriteBatch * my_batch ) {
StopWatchNano pre_post_process_timer ( env_ , false ) ;
StartPerfTimer ( & pre_post_process_timer ) ;
Writer w ( & mutex_ ) ;
Writer w ( & mutex_ ) ;
w . batch = my_batch ;
w . batch = my_batch ;
w . sync = options . sync ;
w . sync = options . sync ;
@ -2800,12 +2820,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if ( options . disableWAL ) {
if ( options . disableWAL ) {
flush_on_destroy_ = true ;
flush_on_destroy_ = true ;
}
}
BumpPerfTime ( & perf_context . write_pre_and_post_process_time ,
& pre_post_process_timer ) ;
if ( ! options . disableWAL ) {
if ( ! options . disableWAL ) {
StopWatchNano timer ( env_ ) ;
StopWatchNano timer ( env_ ) ;
StartPerfTimer ( & timer ) ;
StartPerfTimer ( & timer ) ;
status = log_ - > AddRecord ( WriteBatchInternal : : Contents ( updates ) ) ;
status = log_ - > AddRecord ( WriteBatchInternal : : Contents ( updates ) ) ;
BumpPerfTime ( & perf_context . wal_write_time , & timer ) ;
if ( status . ok ( ) & & options . sync ) {
if ( status . ok ( ) & & options . sync ) {
if ( options_ . use_fsync ) {
if ( options_ . use_fsync ) {
StopWatch ( env_ , options_ . statistics , WAL_FILE_SYNC_MICROS ) ;
StopWatch ( env_ , options_ . statistics , WAL_FILE_SYNC_MICROS ) ;
@ -2815,10 +2836,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
status = log_ - > file ( ) - > Sync ( ) ;
status = log_ - > file ( ) - > Sync ( ) ;
}
}
}
}
BumpPerfTime ( & perf_context . write_wal_time , & timer ) ;
}
}
if ( status . ok ( ) ) {
if ( status . ok ( ) ) {
StopWatchNano write_memtable_timer ( env_ , false ) ;
StartPerfTimer ( & write_memtable_timer ) ;
status = WriteBatchInternal : : InsertInto ( updates , mem_ , & options_ , this ,
status = WriteBatchInternal : : InsertInto ( updates , mem_ , & options_ , this ,
options_ . filter_deletes ) ;
options_ . filter_deletes ) ;
BumpPerfTime ( & perf_context . write_memtable_time , & write_memtable_timer ) ;
if ( ! status . ok ( ) ) {
if ( ! status . ok ( ) ) {
// Panic for in-memory corruptions
// Panic for in-memory corruptions
// Note that existing logic was not sound. Any partial failure writing
// Note that existing logic was not sound. Any partial failure writing
@ -2828,6 +2853,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
}
SetTickerCount ( options_ . statistics , SEQUENCE_NUMBER , last_sequence ) ;
SetTickerCount ( options_ . statistics , SEQUENCE_NUMBER , last_sequence ) ;
}
}
StartPerfTimer ( & pre_post_process_timer ) ;
LogFlush ( options_ . info_log ) ;
LogFlush ( options_ . info_log ) ;
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
if ( status . ok ( ) ) {
if ( status . ok ( ) ) {
@ -2855,6 +2881,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if ( ! writers_ . empty ( ) ) {
if ( ! writers_ . empty ( ) ) {
writers_ . front ( ) - > cv . Signal ( ) ;
writers_ . front ( ) - > cv . Signal ( ) ;
}
}
BumpPerfTime ( & perf_context . write_pre_and_post_process_time ,
& pre_post_process_timer ) ;
return status ;
return status ;
}
}