From 9f1c80b55668e3009a276803c66ac0fa0c5887e3 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 5 Sep 2014 15:20:05 -0700 Subject: [PATCH] Drop column family from write thread Summary: If we drop column family only from (single) write thread, we can be sure that nobody will drop the column family while we're writing (and our mutex is released). This greatly simplifies my patch that's getting rid of MakeRoomForWrite(). Test Plan: make check, but also running stress test Reviewers: ljin, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D22965 --- db/db_impl.cc | 44 +++++++++++++++++++++++--------------------- db/db_impl.h | 27 ++++++++++++++++++++++++++- db/db_impl_debug.cc | 27 +++++++++++++++++++++++++++ db/db_test.cc | 21 +++++++++++++++++++++ 4 files changed, 97 insertions(+), 22 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 1769471cf..b83d60f5e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -77,20 +77,6 @@ const std::string kDefaultColumnFamilyName("default"); void DumpLeveldbBuildVersion(Logger * log); -// Information kept for every waiting writer -struct DBImpl::Writer { - Status status; - WriteBatch* batch; - bool sync; - bool disableWAL; - bool in_batch_group; - bool done; - uint64_t timeout_hint_us; - port::CondVar cv; - - explicit Writer(port::Mutex* mu) : cv(mu) { } -}; - struct DBImpl::WriteContext { autovector superversions_to_free_; autovector logs_to_free_; @@ -3627,6 +3613,14 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { edit.DropColumnFamily(); edit.SetColumnFamily(cfd->GetID()); + Writer w(&mutex_); + w.batch = nullptr; + w.sync = false; + w.disableWAL = false; + w.in_batch_group = false; + w.done = false; + w.timeout_hint_us = kNoTimeOut; + Status s; { MutexLock l(&mutex_); @@ -3634,7 +3628,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { s = Status::InvalidArgument("Column family already dropped!\n"); } if (s.ok()) { + // we drop column family from a single write thread + s = BeginWrite(&w, 0); + assert(s.ok() && !w.done); // No timeout and nobody should do our job s = versions_->LogAndApply(cfd, &edit, &mutex_); + EndWrite(&w, &w, s); } } @@ -4173,15 +4171,19 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, break; } - if (w->batch != nullptr) { - size += WriteBatchInternal::ByteSize(w->batch); - if (size > max_size) { - // Do not make batch too big - break; - } + if (w->batch == nullptr) { + // Do not include those writes with nullptr batch. Those are not writes, + // those are something else. They want to be alone + break; + } - write_batch_group->push_back(w->batch); + size += WriteBatchInternal::ByteSize(w->batch); + if (size > max_size) { + // Do not make batch too big + break; } + + write_batch_group->push_back(w->batch); w->in_batch_group = true; *last_writer = w; } diff --git a/db/db_impl.h b/db/db_impl.h index e49b954cc..69fe2eaac 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -203,6 +203,17 @@ class DBImpl : public DB { SequenceNumber* sequence); Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence); + + void TEST_LockMutex(); + + void TEST_UnlockMutex(); + + // REQUIRES: mutex locked + void* TEST_BeginWrite(); + + // REQUIRES: mutex locked + // pass the pointer that you got from TEST_BeginWrite() + void TEST_EndWrite(void* w); #endif // NDEBUG // Structure to store information for candidate files to delete. @@ -309,7 +320,7 @@ class DBImpl : public DB { #endif friend struct SuperVersion; struct CompactionState; - struct Writer; + struct WriteContext; Status NewDB(); @@ -349,6 +360,20 @@ class DBImpl : public DB { uint64_t SlowdownAmount(int n, double bottom, double top); + // Information kept for every waiting writer + struct Writer { + Status status; + WriteBatch* batch; + bool sync; + bool disableWAL; + bool in_batch_group; + bool done; + uint64_t timeout_hint_us; + port::CondVar cv; + + explicit Writer(port::Mutex* mu) : cv(mu) {} + }; + // Before applying write operation (such as DBImpl::Write, DBImpl::Flush) // thread should grab the mutex_ and be the first on writers queue. // BeginWrite is used for it. diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 77d4e0551..5f7a4818d 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -130,5 +130,32 @@ Status DBImpl::TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence) { return ReadFirstLine(fname, sequence); } + +void DBImpl::TEST_LockMutex() { + mutex_.Lock(); +} + +void DBImpl::TEST_UnlockMutex() { + mutex_.Unlock(); +} + +void* DBImpl::TEST_BeginWrite() { + auto w = new Writer(&mutex_); + w->batch = nullptr; + w->sync = false; + w->disableWAL = false; + w->in_batch_group = false; + w->done = false; + w->timeout_hint_us = kNoTimeOut; + Status s = BeginWrite(w, 0); + assert(s.ok() && !w->done); // No timeout and nobody should do our job + return reinterpret_cast(w); +} + +void DBImpl::TEST_EndWrite(void* w) { + auto writer = reinterpret_cast(w); + EndWrite(writer, writer, Status::OK()); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_test.cc b/db/db_test.cc index 570af31a5..5b913f43c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -7894,6 +7895,26 @@ TEST(DBTest, DBIteratorBoundTest) { } } +TEST(DBTest, WriteSingleThreadEntry) { + std::vector threads; + dbfull()->TEST_LockMutex(); + auto w = dbfull()->TEST_BeginWrite(); + threads.emplace_back([&] { Put("a", "b"); }); + env_->SleepForMicroseconds(10000); + threads.emplace_back([&] { Flush(); }); + env_->SleepForMicroseconds(10000); + dbfull()->TEST_UnlockMutex(); + dbfull()->TEST_LockMutex(); + dbfull()->TEST_EndWrite(w); + dbfull()->TEST_UnlockMutex(); + + for (auto& t : threads) { + t.join(); + } +} + + + } // namespace rocksdb int main(int argc, char** argv) {