diff --git a/db/db_impl.cc b/db/db_impl.cc index 13fec6f85..0b332b72f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1915,14 +1915,6 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options) { - Writer w(&mutex_); - w.batch = nullptr; - w.sync = false; - w.disableWAL = false; - w.in_batch_group = false; - w.done = false; - w.timeout_hint_us = kNoTimeOut; - Status s; { WriteContext context; @@ -1933,7 +1925,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, return Status::OK(); } - s = BeginWrite(&w, 0); + WriteThread::Writer w(&mutex_); + s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job // SetNewMemtableAndNewLogFile() will release and reacquire mutex @@ -1942,12 +1935,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, cfd->imm()->FlushRequested(); MaybeScheduleFlushOrCompaction(); - assert(!writers_.empty()); - assert(writers_.front() == &w); - EndWrite(&w, &w, s); + write_thread_.ExitWriteThread(&w, &w, s); } - if (s.ok() && options.wait) { // Wait until the compaction completes s = WaitForFlushMemTable(cfd); @@ -3652,13 +3642,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { edit.DropColumnFamily(); edit.SetColumnFamily(cfd->GetID()); - Writer w(&mutex_); - w.batch = nullptr; - w.sync = false; - w.disableWAL = false; - w.in_batch_group = false; - w.done = false; - w.timeout_hint_us = kNoTimeOut; Status s; { @@ -3668,10 +3651,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { } if (s.ok()) { // we drop column family from a single write thread - s = BeginWrite(&w, 0); + WriteThread::Writer w(&mutex_); + s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job s = versions_->LogAndApply(cfd, &edit, &mutex_); - EndWrite(&w, &w, s); + write_thread_.ExitWriteThread(&w, &w, s); } } @@ -3891,88 +3875,12 @@ Status DBImpl::Delete(const WriteOptions& options, return DB::Delete(options, column_family, key); } -// REQUIRES: mutex_ is held -Status DBImpl::BeginWrite(Writer* w, uint64_t expiration_time) { - // the following code block pushes the current writer "w" into the writer - // queue "writers_" and wait until one of the following conditions met: - // 1. the job of "w" has been done by some other writers. - // 2. "w" becomes the first writer in "writers_" - // 3. "w" timed-out. - mutex_.AssertHeld(); - writers_.push_back(w); - - bool timed_out = false; - while (!w->done && w != writers_.front()) { - if (expiration_time == 0) { - w->cv.Wait(); - } else if (w->cv.TimedWait(expiration_time)) { - if (w->in_batch_group) { - // then it means the front writer is currently doing the - // write on behalf of this "timed-out" writer. Then it - // should wait until the write completes. - expiration_time = 0; - } else { - timed_out = true; - break; - } - } - } - - if (timed_out) { -#ifndef NDEBUG - bool found = false; -#endif - for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { - if (*iter == w) { - writers_.erase(iter); -#ifndef NDEBUG - found = true; -#endif - break; - } - } -#ifndef NDEBUG - assert(found); -#endif - // writers_.front() might still be in cond_wait without a time-out. - // As a result, we need to signal it to wake it up. Otherwise no - // one else will wake him up, and RocksDB will hang. - if (!writers_.empty()) { - writers_.front()->cv.Signal(); - } - return Status::TimedOut(); - } - return Status::OK(); -} - -// REQUIRES: mutex_ is held -void DBImpl::EndWrite(Writer* w, Writer* last_writer, Status status) { - // Pop out the current writer and all writers being pushed before the - // current writer from the writer queue. - mutex_.AssertHeld(); - while (!writers_.empty()) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); - } - if (ready == last_writer) break; - } - - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); - } -} - Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } PERF_TIMER_GUARD(write_pre_and_post_process_time); - Writer w(&mutex_); + WriteThread::Writer w(&mutex_); w.batch = my_batch; w.sync = options.sync; w.disableWAL = options.disableWAL; @@ -3983,7 +3891,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { uint64_t expiration_time = 0; bool has_timeout = false; if (w.timeout_hint_us == 0) { - w.timeout_hint_us = kNoTimeOut; + w.timeout_hint_us = WriteThread::kNoTimeOut; } else { expiration_time = env_->NowMicros() + w.timeout_hint_us; has_timeout = true; @@ -3996,7 +3904,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { WriteContext context; mutex_.Lock(); - Status status = BeginWrite(&w, expiration_time); + Status status = write_thread_.EnterWriteThread(&w, expiration_time); assert(status.ok() || status.IsTimedOut()); if (status.IsTimedOut()) { mutex_.Unlock(); @@ -4066,10 +3974,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } uint64_t last_sequence = versions_->LastSequence(); - Writer* last_writer = &w; + WriteThread::Writer* last_writer = &w; if (status.ok()) { autovector write_batch_group; - BuildBatchGroup(&last_writer, &write_batch_group); + write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -4161,7 +4069,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { bg_error_ = status; // stop compaction & fail any further writes } - EndWrite(&w, last_writer, status); + write_thread_.ExitWriteThread(&w, last_writer, status); mutex_.Unlock(); if (status.IsTimedOut()) { @@ -4171,68 +4079,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { return status; } -// This function will be called only when the first writer succeeds. -// All writers in the to-be-built batch group will be processed. -// -// REQUIRES: Writer list must be non-empty -// REQUIRES: First writer must have a non-nullptr batch -void DBImpl::BuildBatchGroup(Writer** last_writer, - autovector* write_batch_group) { - assert(!writers_.empty()); - Writer* first = writers_.front(); - assert(first->batch != nullptr); - - size_t size = WriteBatchInternal::ByteSize(first->batch); - write_batch_group->push_back(first->batch); - - // Allow the group to grow up to a maximum size, but if the - // original write is small, limit the growth so we do not slow - // down the small write too much. - size_t max_size = 1 << 20; - if (size <= (128<<10)) { - max_size = size + (128<<10); - } - - *last_writer = first; - std::deque::iterator iter = writers_.begin(); - ++iter; // Advance past "first" - for (; iter != writers_.end(); ++iter) { - Writer* w = *iter; - if (w->sync && !first->sync) { - // Do not include a sync write into a batch handled by a non-sync write. - break; - } - - if (!w->disableWAL && first->disableWAL) { - // Do not include a write that needs WAL into a batch that has - // WAL disabled. - break; - } - - if (w->timeout_hint_us < first->timeout_hint_us) { - // Do not include those writes with shorter timeout. Otherwise, we might - // execute a write that should instead be aborted because of timeout. - break; - } - - if (w->batch == nullptr) { - // Do not include those writes with nullptr batch. Those are not writes, - // those are something else. They want to be alone - break; - } - - size += WriteBatchInternal::ByteSize(w->batch); - if (size > max_size) { - // Do not make batch too big - break; - } - - write_batch_group->push_back(w->batch); - w->in_batch_group = true; - *last_writer = w; - } -} - // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue void DBImpl::DelayWrite(uint64_t expiration_time) { diff --git a/db/db_impl.h b/db/db_impl.h index 0336b3af5..d2b0dfc94 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -34,6 +34,7 @@ #include "db/internal_stats.h" #include "db/write_controller.h" #include "db/flush_scheduler.h" +#include "db/write_thread.h" namespace rocksdb { @@ -359,44 +360,6 @@ class DBImpl : public DB { Status WriteLevel0Table(ColumnFamilyData* cfd, autovector& mems, VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer); - // Information kept for every waiting writer - struct Writer { - Status status; - WriteBatch* batch; - bool sync; - bool disableWAL; - bool in_batch_group; - bool done; - uint64_t timeout_hint_us; - port::CondVar cv; - - explicit Writer(port::Mutex* mu) : cv(mu) {} - }; - - // Before applying write operation (such as DBImpl::Write, DBImpl::Flush) - // thread should grab the mutex_ and be the first on writers queue. - // BeginWrite is used for it. - // Be aware! Writer's job can be done by other thread (see DBImpl::Write - // for examples), so check it via w.done before applying changes. - // - // Writer* w: writer to be placed in the queue - // uint64_t expiration_time: maximum time to be in the queue - // See also: EndWrite - Status BeginWrite(Writer* w, uint64_t expiration_time); - - // After doing write job, we need to remove already used writers from - // writers_ queue and notify head of the queue about it. - // EndWrite is used for this. - // - // Writer* w: Writer, that was added by BeginWrite function - // Writer* last_writer: Since we can join a few Writers (as DBImpl::Write - // does) - // we should pass last_writer as a parameter to - // EndWrite - // (if you don't touch other writers, just pass w) - // Status status: Status of write operation - // See also: BeginWrite - void EndWrite(Writer* w, Writer* last_writer, Status status); void DelayWrite(uint64_t expiration_time); @@ -405,9 +368,6 @@ class DBImpl : public DB { Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, WriteContext* context); - void BuildBatchGroup(Writer** last_writer, - autovector* write_batch_group); - // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); @@ -552,8 +512,8 @@ class DBImpl : public DB { std::unique_ptr db_directory_; - // Queue of writers. - std::deque writers_; + WriteThread write_thread_; + WriteBatch tmp_batch_; WriteController write_controller_; @@ -627,7 +587,6 @@ class DBImpl : public DB { bool flush_on_destroy_; // Used when disableWAL is true. static const int KEEP_LOG_FILE_NUM = 1000; - static const uint64_t kNoTimeOut = std::numeric_limits::max(); std::string db_absolute_path_; // The options to access storage files diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 3446571eb..6c073d4d5 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -140,21 +140,15 @@ void DBImpl::TEST_UnlockMutex() { } void* DBImpl::TEST_BeginWrite() { - auto w = new Writer(&mutex_); - w->batch = nullptr; - w->sync = false; - w->disableWAL = false; - w->in_batch_group = false; - w->done = false; - w->timeout_hint_us = kNoTimeOut; - Status s = BeginWrite(w, 0); + auto w = new WriteThread::Writer(&mutex_); + Status s = write_thread_.EnterWriteThread(w, 0); assert(s.ok() && !w->done); // No timeout and nobody should do our job return reinterpret_cast(w); } void DBImpl::TEST_EndWrite(void* w) { - auto writer = reinterpret_cast(w); - EndWrite(writer, writer, Status::OK()); + auto writer = reinterpret_cast(w); + write_thread_.ExitWriteThread(writer, writer, Status::OK()); delete writer; } diff --git a/db/write_thread.cc b/db/write_thread.cc new file mode 100644 index 000000000..052e1209e --- /dev/null +++ b/db/write_thread.cc @@ -0,0 +1,147 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/write_thread.h" + +namespace rocksdb { + +Status WriteThread::EnterWriteThread(WriteThread::Writer* w, + uint64_t expiration_time) { + // the following code block pushes the current writer "w" into the writer + // queue "writers_" and wait until one of the following conditions met: + // 1. the job of "w" has been done by some other writers. + // 2. "w" becomes the first writer in "writers_" + // 3. "w" timed-out. + writers_.push_back(w); + + bool timed_out = false; + while (!w->done && w != writers_.front()) { + if (expiration_time == 0) { + w->cv.Wait(); + } else if (w->cv.TimedWait(expiration_time)) { + if (w->in_batch_group) { + // then it means the front writer is currently doing the + // write on behalf of this "timed-out" writer. Then it + // should wait until the write completes. + expiration_time = 0; + } else { + timed_out = true; + break; + } + } + } + + if (timed_out) { +#ifndef NDEBUG + bool found = false; +#endif + for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { + if (*iter == w) { + writers_.erase(iter); +#ifndef NDEBUG + found = true; +#endif + break; + } + } +#ifndef NDEBUG + assert(found); +#endif + // writers_.front() might still be in cond_wait without a time-out. + // As a result, we need to signal it to wake it up. Otherwise no + // one else will wake him up, and RocksDB will hang. + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } + return Status::TimedOut(); + } + return Status::OK(); +} + +void WriteThread::ExitWriteThread(WriteThread::Writer* w, + WriteThread::Writer* last_writer, + Status status) { + // Pop out the current writer and all writers being pushed before the + // current writer from the writer queue. + while (!writers_.empty()) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); + } + if (ready == last_writer) break; + } + + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } +} + +// This function will be called only when the first writer succeeds. +// All writers in the to-be-built batch group will be processed. +// +// REQUIRES: Writer list must be non-empty +// REQUIRES: First writer must have a non-nullptr batch +void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer, + autovector* write_batch_group) { + assert(!writers_.empty()); + Writer* first = writers_.front(); + assert(first->batch != nullptr); + + size_t size = WriteBatchInternal::ByteSize(first->batch); + write_batch_group->push_back(first->batch); + + // Allow the group to grow up to a maximum size, but if the + // original write is small, limit the growth so we do not slow + // down the small write too much. + size_t max_size = 1 << 20; + if (size <= (128<<10)) { + max_size = size + (128<<10); + } + + *last_writer = first; + std::deque::iterator iter = writers_.begin(); + ++iter; // Advance past "first" + for (; iter != writers_.end(); ++iter) { + Writer* w = *iter; + if (w->sync && !first->sync) { + // Do not include a sync write into a batch handled by a non-sync write. + break; + } + + if (!w->disableWAL && first->disableWAL) { + // Do not include a write that needs WAL into a batch that has + // WAL disabled. + break; + } + + if (w->timeout_hint_us < first->timeout_hint_us) { + // Do not include those writes with shorter timeout. Otherwise, we might + // execute a write that should instead be aborted because of timeout. + break; + } + + if (w->batch == nullptr) { + // Do not include those writes with nullptr batch. Those are not writes, + // those are something else. They want to be alone + break; + } + + size += WriteBatchInternal::ByteSize(w->batch); + if (size > max_size) { + // Do not make batch too big + break; + } + + write_batch_group->push_back(w->batch); + w->in_batch_group = true; + *last_writer = w; + } +} + +} // namespace rocksdb diff --git a/db/write_thread.h b/db/write_thread.h new file mode 100644 index 000000000..8c5baa664 --- /dev/null +++ b/db/write_thread.h @@ -0,0 +1,80 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include +#include "rocksdb/status.h" +#include "db/write_batch_internal.h" +#include "util/autovector.h" +#include "port/port.h" + +namespace rocksdb { + +class WriteThread { + public: + static const uint64_t kNoTimeOut = std::numeric_limits::max(); + // Information kept for every waiting writer + struct Writer { + Status status; + WriteBatch* batch; + bool sync; + bool disableWAL; + bool in_batch_group; + bool done; + uint64_t timeout_hint_us; + port::CondVar cv; + + explicit Writer(port::Mutex* mu) + : batch(nullptr), + sync(false), + disableWAL(false), + in_batch_group(false), + done(false), + timeout_hint_us(kNoTimeOut), + cv(mu) {} + }; + + WriteThread() = default; + ~WriteThread() = default; + + // Before applying write operation (such as DBImpl::Write, DBImpl::Flush) + // thread should grab the mutex_ and be the first on writers queue. + // EnterWriteThread is used for it. + // Be aware! Writer's job can be done by other thread (see DBImpl::Write + // for examples), so check it via w.done before applying changes. + // + // Writer* w: writer to be placed in the queue + // uint64_t expiration_time: maximum time to be in the queue + // See also: ExitWriteThread + // REQUIRES: db mutex held + Status EnterWriteThread(Writer* w, uint64_t expiration_time); + + // After doing write job, we need to remove already used writers from + // writers_ queue and notify head of the queue about it. + // ExitWriteThread is used for this. + // + // Writer* w: Writer, that was added by EnterWriteThread function + // Writer* last_writer: Since we can join a few Writers (as DBImpl::Write + // does) + // we should pass last_writer as a parameter to + // ExitWriteThread + // (if you don't touch other writers, just pass w) + // Status status: Status of write operation + // See also: EnterWriteThread + // REQUIRES: db mutex held + void ExitWriteThread(Writer* w, Writer* last_writer, Status status); + + void BuildBatchGroup(Writer** last_writer, + autovector* write_batch_group); + + private: + // Queue of writers. + std::deque writers_; +}; + +} // namespace rocksdb