// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #include "db/write_thread.h" namespace rocksdb { Status WriteThread::EnterWriteThread(WriteThread::Writer* w, uint64_t expiration_time) { // the following code block pushes the current writer "w" into the writer // queue "writers_" and wait until one of the following conditions met: // 1. the job of "w" has been done by some other writers. // 2. "w" becomes the first writer in "writers_" // 3. "w" timed-out. writers_.push_back(w); bool timed_out = false; while (!w->done && w != writers_.front()) { if (expiration_time == 0) { w->cv.Wait(); } else if (w->cv.TimedWait(expiration_time)) { if (w->in_batch_group) { // then it means the front writer is currently doing the // write on behalf of this "timed-out" writer. Then it // should wait until the write completes. expiration_time = 0; } else { timed_out = true; break; } } } if (timed_out) { #ifndef NDEBUG bool found = false; #endif for (auto iter = writers_.begin(); iter != writers_.end(); iter++) { if (*iter == w) { writers_.erase(iter); #ifndef NDEBUG found = true; #endif break; } } #ifndef NDEBUG assert(found); #endif // writers_.front() might still be in cond_wait without a time-out. // As a result, we need to signal it to wake it up. Otherwise no // one else will wake him up, and RocksDB will hang. if (!writers_.empty()) { writers_.front()->cv.Signal(); } return Status::TimedOut(); } return Status::OK(); } void WriteThread::ExitWriteThread(WriteThread::Writer* w, WriteThread::Writer* last_writer, Status status) { // Pop out the current writer and all writers being pushed before the // current writer from the writer queue. while (!writers_.empty()) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } // Notify new head of write queue if (!writers_.empty()) { writers_.front()->cv.Signal(); } } // This function will be called only when the first writer succeeds. // All writers in the to-be-built batch group will be processed. // // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-nullptr batch void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer, autovector* write_batch_group) { assert(!writers_.empty()); Writer* first = writers_.front(); assert(first->batch != nullptr); size_t size = WriteBatchInternal::ByteSize(first->batch); write_batch_group->push_back(first->batch); // Allow the group to grow up to a maximum size, but if the // original write is small, limit the growth so we do not slow // down the small write too much. size_t max_size = 1 << 20; if (size <= (128<<10)) { max_size = size + (128<<10); } *last_writer = first; std::deque::iterator iter = writers_.begin(); ++iter; // Advance past "first" for (; iter != writers_.end(); ++iter) { Writer* w = *iter; if (w->sync && !first->sync) { // Do not include a sync write into a batch handled by a non-sync write. break; } if (!w->disableWAL && first->disableWAL) { // Do not include a write that needs WAL into a batch that has // WAL disabled. break; } if (w->timeout_hint_us < first->timeout_hint_us) { // Do not include those writes with shorter timeout. Otherwise, we might // execute a write that should instead be aborted because of timeout. break; } if (w->batch == nullptr) { // Do not include those writes with nullptr batch. Those are not writes, // those are something else. They want to be alone break; } size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { // Do not make batch too big break; } write_batch_group->push_back(w->batch); w->in_batch_group = true; *last_writer = w; } } } // namespace rocksdb