diff --git a/db/db_impl.cc b/db/db_impl.cc index 1769471cf..b83d60f5e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -77,20 +77,6 @@ const std::string kDefaultColumnFamilyName("default"); void DumpLeveldbBuildVersion(Logger * log); -// Information kept for every waiting writer -struct DBImpl::Writer { - Status status; - WriteBatch* batch; - bool sync; - bool disableWAL; - bool in_batch_group; - bool done; - uint64_t timeout_hint_us; - port::CondVar cv; - - explicit Writer(port::Mutex* mu) : cv(mu) { } -}; - struct DBImpl::WriteContext { autovector superversions_to_free_; autovector logs_to_free_; @@ -3627,6 +3613,14 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { edit.DropColumnFamily(); edit.SetColumnFamily(cfd->GetID()); + Writer w(&mutex_); + w.batch = nullptr; + w.sync = false; + w.disableWAL = false; + w.in_batch_group = false; + w.done = false; + w.timeout_hint_us = kNoTimeOut; + Status s; { MutexLock l(&mutex_); @@ -3634,7 +3628,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { s = Status::InvalidArgument("Column family already dropped!\n"); } if (s.ok()) { + // we drop column family from a single write thread + s = BeginWrite(&w, 0); + assert(s.ok() && !w.done); // No timeout and nobody should do our job s = versions_->LogAndApply(cfd, &edit, &mutex_); + EndWrite(&w, &w, s); } } @@ -4173,15 +4171,19 @@ void DBImpl::BuildBatchGroup(Writer** last_writer, break; } - if (w->batch != nullptr) { - size += WriteBatchInternal::ByteSize(w->batch); - if (size > max_size) { - // Do not make batch too big - break; - } + if (w->batch == nullptr) { + // Do not include those writes with nullptr batch. Those are not writes, + // those are something else. They want to be alone + break; + } - write_batch_group->push_back(w->batch); + size += WriteBatchInternal::ByteSize(w->batch); + if (size > max_size) { + // Do not make batch too big + break; } + + write_batch_group->push_back(w->batch); w->in_batch_group = true; *last_writer = w; } diff --git a/db/db_impl.h b/db/db_impl.h index e49b954cc..69fe2eaac 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -203,6 +203,17 @@ class DBImpl : public DB { SequenceNumber* sequence); Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence); + + void TEST_LockMutex(); + + void TEST_UnlockMutex(); + + // REQUIRES: mutex locked + void* TEST_BeginWrite(); + + // REQUIRES: mutex locked + // pass the pointer that you got from TEST_BeginWrite() + void TEST_EndWrite(void* w); #endif // NDEBUG // Structure to store information for candidate files to delete. @@ -309,7 +320,7 @@ class DBImpl : public DB { #endif friend struct SuperVersion; struct CompactionState; - struct Writer; + struct WriteContext; Status NewDB(); @@ -349,6 +360,20 @@ class DBImpl : public DB { uint64_t SlowdownAmount(int n, double bottom, double top); + // Information kept for every waiting writer + struct Writer { + Status status; + WriteBatch* batch; + bool sync; + bool disableWAL; + bool in_batch_group; + bool done; + uint64_t timeout_hint_us; + port::CondVar cv; + + explicit Writer(port::Mutex* mu) : cv(mu) {} + }; + // Before applying write operation (such as DBImpl::Write, DBImpl::Flush) // thread should grab the mutex_ and be the first on writers queue. // BeginWrite is used for it. diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 77d4e0551..5f7a4818d 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -130,5 +130,32 @@ Status DBImpl::TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence) { return ReadFirstLine(fname, sequence); } + +void DBImpl::TEST_LockMutex() { + mutex_.Lock(); +} + +void DBImpl::TEST_UnlockMutex() { + mutex_.Unlock(); +} + +void* DBImpl::TEST_BeginWrite() { + auto w = new Writer(&mutex_); + w->batch = nullptr; + w->sync = false; + w->disableWAL = false; + w->in_batch_group = false; + w->done = false; + w->timeout_hint_us = kNoTimeOut; + Status s = BeginWrite(w, 0); + assert(s.ok() && !w->done); // No timeout and nobody should do our job + return reinterpret_cast(w); +} + +void DBImpl::TEST_EndWrite(void* w) { + auto writer = reinterpret_cast(w); + EndWrite(writer, writer, Status::OK()); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_test.cc b/db/db_test.cc index 570af31a5..5b913f43c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -7894,6 +7895,26 @@ TEST(DBTest, DBIteratorBoundTest) { } } +TEST(DBTest, WriteSingleThreadEntry) { + std::vector threads; + dbfull()->TEST_LockMutex(); + auto w = dbfull()->TEST_BeginWrite(); + threads.emplace_back([&] { Put("a", "b"); }); + env_->SleepForMicroseconds(10000); + threads.emplace_back([&] { Flush(); }); + env_->SleepForMicroseconds(10000); + dbfull()->TEST_UnlockMutex(); + dbfull()->TEST_LockMutex(); + dbfull()->TEST_EndWrite(w); + dbfull()->TEST_UnlockMutex(); + + for (auto& t : threads) { + t.join(); + } +} + + + } // namespace rocksdb int main(int argc, char** argv) {