diff --git a/db/db_impl.cc b/db/db_impl.cc index 20cfa7160..f19ecba6c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2209,7 +2209,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, StopWatch sw(env_, options_.statistics, DB_GET); SequenceNumber snapshot; - MutexLock l(&mutex_); + mutex_.Lock(); if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; } else { @@ -2254,6 +2254,9 @@ Status DBImpl::GetImpl(const ReadOptions& options, mem->Unref(); imm.UnrefAll(); current->Unref(); + mutex_.Unlock(); + + // Note, tickers are atomic now - no lock protection needed any more. RecordTick(options_.statistics, NUMBER_KEYS_READ); RecordTick(options_.statistics, BYTES_READ, value->size()); return s; @@ -2265,7 +2268,7 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, StopWatch sw(env_, options_.statistics, DB_MULTIGET); SequenceNumber snapshot; - MutexLock l(&mutex_); + mutex_.Lock(); if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; } else { @@ -2329,6 +2332,8 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, mem->Unref(); imm.UnrefAll(); current->Unref(); + mutex_.Unlock(); + RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS); RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys); RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead); @@ -2413,22 +2418,28 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions + // TODO: BuildBatchGroup physically concatenate/copy all write batches into + // a new one. Mem copy is done with the lock held. Ideally, we only need + // the lock to obtain the last_writer and the references to all batches. + // Creation (copy) of the merged batch could have been done outside of the + // lock protected region. WriteBatch* updates = BuildBatchGroup(&last_writer); - const SequenceNumber current_sequence = last_sequence + 1; - WriteBatchInternal::SetSequence(updates, current_sequence); - int my_batch_count = WriteBatchInternal::Count(updates); - last_sequence += my_batch_count; - // Record statistics - RecordTick(options_.statistics, NUMBER_KEYS_WRITTEN, my_batch_count); - RecordTick(options_.statistics, - BYTES_WRITTEN, - WriteBatchInternal::ByteSize(updates)); + // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); + const SequenceNumber current_sequence = last_sequence + 1; + WriteBatchInternal::SetSequence(updates, current_sequence); + int my_batch_count = WriteBatchInternal::Count(updates); + last_sequence += my_batch_count; + // Record statistics + RecordTick(options_.statistics, NUMBER_KEYS_WRITTEN, my_batch_count); + RecordTick(options_.statistics, + BYTES_WRITTEN, + WriteBatchInternal::ByteSize(updates)); if (options.disableWAL) { flush_on_destroy_ = true; }