From 5e9e5a4702d121e65c547b2e0ed1321e09462494 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 8 Nov 2017 19:33:12 -0800 Subject: [PATCH] Blob DB: Fix race condition between flush and write Summary: A race condition will happen when: * a user thread writes a value, but it hits the write stop condition because there are too many un-flushed memtables, while holding blob_db_impl.write_mutex_. * Flush is triggered and call flush begin listener and try to acquire blob_db_impl.write_mutex_. Fixing it. Closes https://github.com/facebook/rocksdb/pull/3149 Differential Revision: D6279805 Pulled By: yiwu-arbug fbshipit-source-id: 0e3c58afb78795ebe3360a2c69e05651e3908c40 --- utilities/blob_db/blob_db_impl.cc | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index c65ff679d..c17815d0a 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -745,13 +745,22 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler { }; Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { - MutexLock l(&write_mutex_); uint32_t default_cf_id = reinterpret_cast(DefaultColumnFamily())->GetID(); + // TODO(yiwu): In case there are multiple writers the latest sequence would + // not be the actually sequence we are writting. Need to get the sequence + // from write batch after DB write instead. SequenceNumber current_seq = GetLatestSequenceNumber() + 1; + Status s; BlobInserter blob_inserter(options, this, default_cf_id, current_seq); - Status s = updates->Iterate(&blob_inserter); + { + // Release write_mutex_ before DB write to avoid race condition with + // flush begin listener, which also require write_mutex_ to sync + // blob files. + MutexLock l(&write_mutex_); + s = updates->Iterate(&blob_inserter); + } if (!s.ok()) { return s; } @@ -759,7 +768,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (!s.ok()) { return s; } - assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1); // add deleted key to list of keys that have been deleted for book-keeping class DeleteBookkeeper : public WriteBatch::Handler { @@ -849,10 +857,19 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options, Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration) { TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); - MutexLock l(&write_mutex_); - SequenceNumber sequence = GetLatestSequenceNumber() + 1; + Status s; WriteBatch batch; - Status s = PutBlobValue(options, key, value, expiration, sequence, &batch); + { + // Release write_mutex_ before DB write to avoid race condition with + // flush begin listener, which also require write_mutex_ to sync + // blob files. + MutexLock l(&write_mutex_); + // TODO(yiwu): In case there are multiple writers the latest sequence would + // not be the actually sequence we are writting. Need to get the sequence + // from write batch after DB write instead. + SequenceNumber sequence = GetLatestSequenceNumber() + 1; + s = PutBlobValue(options, key, value, expiration, sequence, &batch); + } if (s.ok()) { s = db_->Write(options, &batch); } @@ -1198,8 +1215,6 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, return Status::Corruption("Corruption. Blob CRC mismatch"); } - // TODO(yiwu): Should use compression flag in the blob file instead of - // current compression option. if (bfile->compression() != kNoCompression) { BlockContents contents; auto cfh = reinterpret_cast(DefaultColumnFamily());