fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							147 lines
						
					
					
						
							4.4 KiB
						
					
					
				
			
		
		
	
	
							147 lines
						
					
					
						
							4.4 KiB
						
					
					
				| //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under the BSD-style license found in the
 | |
| //  LICENSE file in the root directory of this source tree. An additional grant
 | |
| //  of patent rights can be found in the PATENTS file in the same directory.
 | |
| 
 | |
| #include "db/write_thread.h"
 | |
| 
 | |
| namespace rocksdb {
 | |
| 
 | |
| Status WriteThread::EnterWriteThread(WriteThread::Writer* w,
 | |
|                                      uint64_t expiration_time) {
 | |
|   // the following code block pushes the current writer "w" into the writer
 | |
|   // queue "writers_" and wait until one of the following conditions met:
 | |
|   // 1. the job of "w" has been done by some other writers.
 | |
|   // 2. "w" becomes the first writer in "writers_"
 | |
|   // 3. "w" timed-out.
 | |
|   writers_.push_back(w);
 | |
| 
 | |
|   bool timed_out = false;
 | |
|   while (!w->done && w != writers_.front()) {
 | |
|     if (expiration_time == 0) {
 | |
|       w->cv.Wait();
 | |
|     } else if (w->cv.TimedWait(expiration_time)) {
 | |
|       if (w->in_batch_group) {
 | |
|         // then it means the front writer is currently doing the
 | |
|         // write on behalf of this "timed-out" writer.  Then it
 | |
|         // should wait until the write completes.
 | |
|         expiration_time = 0;
 | |
|       } else {
 | |
|         timed_out = true;
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (timed_out) {
 | |
| #ifndef NDEBUG
 | |
|     bool found = false;
 | |
| #endif
 | |
|     for (auto iter = writers_.begin(); iter != writers_.end(); iter++) {
 | |
|       if (*iter == w) {
 | |
|         writers_.erase(iter);
 | |
| #ifndef NDEBUG
 | |
|         found = true;
 | |
| #endif
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
| #ifndef NDEBUG
 | |
|     assert(found);
 | |
| #endif
 | |
|     // writers_.front() might still be in cond_wait without a time-out.
 | |
|     // As a result, we need to signal it to wake it up.  Otherwise no
 | |
|     // one else will wake him up, and RocksDB will hang.
 | |
|     if (!writers_.empty()) {
 | |
|       writers_.front()->cv.Signal();
 | |
|     }
 | |
|     return Status::TimedOut();
 | |
|   }
 | |
|   return Status::OK();
 | |
| }
 | |
| 
 | |
| void WriteThread::ExitWriteThread(WriteThread::Writer* w,
 | |
|                                   WriteThread::Writer* last_writer,
 | |
|                                   Status status) {
 | |
|   // Pop out the current writer and all writers being pushed before the
 | |
|   // current writer from the writer queue.
 | |
|   while (!writers_.empty()) {
 | |
|     Writer* ready = writers_.front();
 | |
|     writers_.pop_front();
 | |
|     if (ready != w) {
 | |
|       ready->status = status;
 | |
|       ready->done = true;
 | |
|       ready->cv.Signal();
 | |
|     }
 | |
|     if (ready == last_writer) break;
 | |
|   }
 | |
| 
 | |
|   // Notify new head of write queue
 | |
|   if (!writers_.empty()) {
 | |
|     writers_.front()->cv.Signal();
 | |
|   }
 | |
| }
 | |
| 
 | |
| // This function will be called only when the first writer succeeds.
 | |
| // All writers in the to-be-built batch group will be processed.
 | |
| //
 | |
| // REQUIRES: Writer list must be non-empty
 | |
| // REQUIRES: First writer must have a non-nullptr batch
 | |
| void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer,
 | |
|                                   autovector<WriteBatch*>* write_batch_group) {
 | |
|   assert(!writers_.empty());
 | |
|   Writer* first = writers_.front();
 | |
|   assert(first->batch != nullptr);
 | |
| 
 | |
|   size_t size = WriteBatchInternal::ByteSize(first->batch);
 | |
|   write_batch_group->push_back(first->batch);
 | |
| 
 | |
|   // Allow the group to grow up to a maximum size, but if the
 | |
|   // original write is small, limit the growth so we do not slow
 | |
|   // down the small write too much.
 | |
|   size_t max_size = 1 << 20;
 | |
|   if (size <= (128<<10)) {
 | |
|     max_size = size + (128<<10);
 | |
|   }
 | |
| 
 | |
|   *last_writer = first;
 | |
|   std::deque<Writer*>::iterator iter = writers_.begin();
 | |
|   ++iter;  // Advance past "first"
 | |
|   for (; iter != writers_.end(); ++iter) {
 | |
|     Writer* w = *iter;
 | |
|     if (w->sync && !first->sync) {
 | |
|       // Do not include a sync write into a batch handled by a non-sync write.
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if (!w->disableWAL && first->disableWAL) {
 | |
|       // Do not include a write that needs WAL into a batch that has
 | |
|       // WAL disabled.
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if (w->timeout_hint_us < first->timeout_hint_us) {
 | |
|       // Do not include those writes with shorter timeout.  Otherwise, we might
 | |
|       // execute a write that should instead be aborted because of timeout.
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if (w->batch == nullptr) {
 | |
|       // Do not include those writes with nullptr batch. Those are not writes,
 | |
|       // those are something else. They want to be alone
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     size += WriteBatchInternal::ByteSize(w->batch);
 | |
|     if (size > max_size) {
 | |
|       // Do not make batch too big
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     write_batch_group->push_back(w->batch);
 | |
|     w->in_batch_group = true;
 | |
|     *last_writer = w;
 | |
|   }
 | |
| }
 | |
| 
 | |
| }  // namespace rocksdb
 | |
| 
 |