Fix transaction locking

Summary: Broke transaction locking in 4.4 in D52197.  Will cherry-pick this change into 4.4 (which hasn't yet been fully released).  Repro'd using db_bench.

Test Plan: unit tests and db_Bench

Reviewers: sdong, yhchiang, kradhakrishnan, ngbronson

Reviewed By: ngbronson

Subscribers: ngbronson, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D54021
main
agiardullo 9 years ago
parent 730a422c3a
commit d08d50295c
  1. 33
      utilities/transactions/transaction_db_mutex_impl.cc

@ -18,20 +18,19 @@ namespace rocksdb {
class TransactionDBMutexImpl : public TransactionDBMutex { class TransactionDBMutexImpl : public TransactionDBMutex {
public: public:
TransactionDBMutexImpl() : lock_(mutex_, std::defer_lock) {} TransactionDBMutexImpl() {}
~TransactionDBMutexImpl() {} ~TransactionDBMutexImpl() {}
Status Lock() override; Status Lock() override;
Status TryLockFor(int64_t timeout_time) override; Status TryLockFor(int64_t timeout_time) override;
void UnLock() override { lock_.unlock(); } void UnLock() override { mutex_.unlock(); }
friend class TransactionDBCondVarImpl; friend class TransactionDBCondVarImpl;
private: private:
std::mutex mutex_; // Do not acquire mutex_ directly. Use lock_. std::mutex mutex_;
std::unique_lock<std::mutex> lock_;
}; };
class TransactionDBCondVarImpl : public TransactionDBCondVar { class TransactionDBCondVarImpl : public TransactionDBCondVar {
@ -63,7 +62,7 @@ TransactionDBMutexFactoryImpl::AllocateCondVar() {
} }
Status TransactionDBMutexImpl::Lock() { Status TransactionDBMutexImpl::Lock() {
lock_.lock(); mutex_.lock();
return Status::OK(); return Status::OK();
} }
@ -71,7 +70,7 @@ Status TransactionDBMutexImpl::TryLockFor(int64_t timeout_time) {
bool locked = true; bool locked = true;
if (timeout_time == 0) { if (timeout_time == 0) {
locked = lock_.try_lock(); locked = mutex_.try_lock();
} else { } else {
// Previously, this code used a std::timed_mutex. However, this was changed // Previously, this code used a std::timed_mutex. However, this was changed
// due to known bugs in gcc versions < 4.9. // due to known bugs in gcc versions < 4.9.
@ -80,7 +79,7 @@ Status TransactionDBMutexImpl::TryLockFor(int64_t timeout_time) {
// Since this mutex isn't held for long and only a single mutex is ever // Since this mutex isn't held for long and only a single mutex is ever
// held at a time, it is reasonable to ignore the lock timeout_time here // held at a time, it is reasonable to ignore the lock timeout_time here
// and only check it when waiting on the condition_variable. // and only check it when waiting on the condition_variable.
lock_.lock(); mutex_.lock();
} }
if (!locked) { if (!locked) {
@ -95,30 +94,40 @@ Status TransactionDBCondVarImpl::Wait(
std::shared_ptr<TransactionDBMutex> mutex) { std::shared_ptr<TransactionDBMutex> mutex) {
auto mutex_impl = reinterpret_cast<TransactionDBMutexImpl*>(mutex.get()); auto mutex_impl = reinterpret_cast<TransactionDBMutexImpl*>(mutex.get());
cv_.wait(mutex_impl->lock_); std::unique_lock<std::mutex> lock(mutex_impl->mutex_, std::adopt_lock);
cv_.wait(lock);
// Make sure unique_lock doesn't unlock mutex when it destructs
lock.release();
return Status::OK(); return Status::OK();
} }
Status TransactionDBCondVarImpl::WaitFor( Status TransactionDBCondVarImpl::WaitFor(
std::shared_ptr<TransactionDBMutex> mutex, int64_t timeout_time) { std::shared_ptr<TransactionDBMutex> mutex, int64_t timeout_time) {
Status s;
auto mutex_impl = reinterpret_cast<TransactionDBMutexImpl*>(mutex.get()); auto mutex_impl = reinterpret_cast<TransactionDBMutexImpl*>(mutex.get());
std::unique_lock<std::mutex> lock(mutex_impl->mutex_, std::adopt_lock);
if (timeout_time < 0) { if (timeout_time < 0) {
// If timeout is negative, do not use a timeout // If timeout is negative, do not use a timeout
cv_.wait(mutex_impl->lock_); cv_.wait(lock);
} else { } else {
auto duration = std::chrono::microseconds(timeout_time); auto duration = std::chrono::microseconds(timeout_time);
auto cv_status = cv_.wait_for(mutex_impl->lock_, duration); auto cv_status = cv_.wait_for(lock, duration);
// Check if the wait stopped due to timing out. // Check if the wait stopped due to timing out.
if (cv_status == std::cv_status::timeout) { if (cv_status == std::cv_status::timeout) {
return Status::TimedOut(Status::SubCode::kMutexTimeout); s = Status::TimedOut(Status::SubCode::kMutexTimeout);
} }
} }
// Make sure unique_lock doesn't unlock mutex when it destructs
lock.release();
// CV was signaled, or we spuriously woke up (but didn't time out) // CV was signaled, or we spuriously woke up (but didn't time out)
return Status::OK(); return s;
} }
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save