@ -2209,7 +2209,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
StopWatch sw ( env_ , options_ . statistics , DB_GET ) ;
StopWatch sw ( env_ , options_ . statistics , DB_GET ) ;
SequenceNumber snapshot ;
SequenceNumber snapshot ;
MutexLock l ( & mutex_ ) ;
mutex_ . Lock ( ) ;
if ( options . snapshot ! = nullptr ) {
if ( options . snapshot ! = nullptr ) {
snapshot = reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_ ;
snapshot = reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_ ;
} else {
} else {
@ -2254,6 +2254,9 @@ Status DBImpl::GetImpl(const ReadOptions& options,
mem - > Unref ( ) ;
mem - > Unref ( ) ;
imm . UnrefAll ( ) ;
imm . UnrefAll ( ) ;
current - > Unref ( ) ;
current - > Unref ( ) ;
mutex_ . Unlock ( ) ;
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick ( options_ . statistics , NUMBER_KEYS_READ ) ;
RecordTick ( options_ . statistics , NUMBER_KEYS_READ ) ;
RecordTick ( options_ . statistics , BYTES_READ , value - > size ( ) ) ;
RecordTick ( options_ . statistics , BYTES_READ , value - > size ( ) ) ;
return s ;
return s ;
@ -2265,7 +2268,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
StopWatch sw ( env_ , options_ . statistics , DB_MULTIGET ) ;
StopWatch sw ( env_ , options_ . statistics , DB_MULTIGET ) ;
SequenceNumber snapshot ;
SequenceNumber snapshot ;
MutexLock l ( & mutex_ ) ;
mutex_ . Lock ( ) ;
if ( options . snapshot ! = nullptr ) {
if ( options . snapshot ! = nullptr ) {
snapshot = reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_ ;
snapshot = reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_ ;
} else {
} else {
@ -2329,6 +2332,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
mem - > Unref ( ) ;
mem - > Unref ( ) ;
imm . UnrefAll ( ) ;
imm . UnrefAll ( ) ;
current - > Unref ( ) ;
current - > Unref ( ) ;
mutex_ . Unlock ( ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_CALLS ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_CALLS ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_KEYS_READ , numKeys ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_KEYS_READ , numKeys ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_BYTES_READ , bytesRead ) ;
RecordTick ( options_ . statistics , NUMBER_MULTIGET_BYTES_READ , bytesRead ) ;
@ -2413,22 +2418,28 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
Writer * last_writer = & w ;
Writer * last_writer = & w ;
if ( status . ok ( ) & & my_batch ! = nullptr ) { // nullptr batch is for compactions
if ( status . ok ( ) & & my_batch ! = nullptr ) { // nullptr batch is for compactions
// TODO: BuildBatchGroup physically concatenate/copy all write batches into
// a new one. Mem copy is done with the lock held. Ideally, we only need
// the lock to obtain the last_writer and the references to all batches.
// Creation (copy) of the merged batch could have been done outside of the
// lock protected region.
WriteBatch * updates = BuildBatchGroup ( & last_writer ) ;
WriteBatch * updates = BuildBatchGroup ( & last_writer ) ;
const SequenceNumber current_sequence = last_sequence + 1 ;
WriteBatchInternal : : SetSequence ( updates , current_sequence ) ;
int my_batch_count = WriteBatchInternal : : Count ( updates ) ;
last_sequence + = my_batch_count ;
// Record statistics
RecordTick ( options_ . statistics , NUMBER_KEYS_WRITTEN , my_batch_count ) ;
RecordTick ( options_ . statistics ,
BYTES_WRITTEN ,
WriteBatchInternal : : ByteSize ( updates ) ) ;
// Add to log and apply to memtable. We can release the lock
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// and protects against concurrent loggers and concurrent writes
// into mem_.
// into mem_.
{
{
mutex_ . Unlock ( ) ;
mutex_ . Unlock ( ) ;
const SequenceNumber current_sequence = last_sequence + 1 ;
WriteBatchInternal : : SetSequence ( updates , current_sequence ) ;
int my_batch_count = WriteBatchInternal : : Count ( updates ) ;
last_sequence + = my_batch_count ;
// Record statistics
RecordTick ( options_ . statistics , NUMBER_KEYS_WRITTEN , my_batch_count ) ;
RecordTick ( options_ . statistics ,
BYTES_WRITTEN ,
WriteBatchInternal : : ByteSize ( updates ) ) ;
if ( options . disableWAL ) {
if ( options . disableWAL ) {
flush_on_destroy_ = true ;
flush_on_destroy_ = true ;
}
}