fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							201 lines
						
					
					
						
							6.4 KiB
						
					
					
				
			
		
		
	
	
							201 lines
						
					
					
						
							6.4 KiB
						
					
					
				| //  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under the BSD-style license found in the
 | |
| //  LICENSE file in the root directory of this source tree. An additional grant
 | |
| //  of patent rights can be found in the PATENTS file in the same directory.
 | |
| 
 | |
| #include "db/write_thread.h"
 | |
| 
 | |
| namespace rocksdb {
 | |
| 
 | |
| void WriteThread::Await(Writer* w) {
 | |
|   std::unique_lock<std::mutex> guard(w->JoinMutex());
 | |
|   w->JoinCV().wait(guard, [w] { return w->joined; });
 | |
| }
 | |
| 
 | |
| void WriteThread::MarkJoined(Writer* w) {
 | |
|   std::lock_guard<std::mutex> guard(w->JoinMutex());
 | |
|   assert(!w->joined);
 | |
|   w->joined = true;
 | |
|   w->JoinCV().notify_one();
 | |
| }
 | |
| 
 | |
| void WriteThread::LinkOne(Writer* w, bool* wait_needed) {
 | |
|   assert(!w->joined && !w->done);
 | |
| 
 | |
|   Writer* writers = newest_writer_.load(std::memory_order_relaxed);
 | |
|   while (true) {
 | |
|     w->link_older = writers;
 | |
|     if (writers != nullptr) {
 | |
|       w->CreateMutex();
 | |
|     }
 | |
|     if (newest_writer_.compare_exchange_strong(writers, w)) {
 | |
|       // Success.
 | |
|       *wait_needed = (writers != nullptr);
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WriteThread::CreateMissingNewerLinks(Writer* head) {
 | |
|   while (true) {
 | |
|     Writer* next = head->link_older;
 | |
|     if (next == nullptr || next->link_newer != nullptr) {
 | |
|       assert(next == nullptr || next->link_newer == head);
 | |
|       break;
 | |
|     }
 | |
|     next->link_newer = head;
 | |
|     head = next;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WriteThread::JoinBatchGroup(Writer* w) {
 | |
|   assert(w->batch != nullptr);
 | |
|   bool wait_needed;
 | |
|   LinkOne(w, &wait_needed);
 | |
|   if (wait_needed) {
 | |
|     Await(w);
 | |
|   }
 | |
| }
 | |
| 
 | |
| size_t WriteThread::EnterAsBatchGroupLeader(
 | |
|     Writer* leader, WriteThread::Writer** last_writer,
 | |
|     autovector<WriteBatch*>* write_batch_group) {
 | |
|   assert(leader->link_older == nullptr);
 | |
|   assert(leader->batch != nullptr);
 | |
| 
 | |
|   size_t size = WriteBatchInternal::ByteSize(leader->batch);
 | |
|   write_batch_group->push_back(leader->batch);
 | |
| 
 | |
|   // Allow the group to grow up to a maximum size, but if the
 | |
|   // original write is small, limit the growth so we do not slow
 | |
|   // down the small write too much.
 | |
|   size_t max_size = 1 << 20;
 | |
|   if (size <= (128 << 10)) {
 | |
|     max_size = size + (128 << 10);
 | |
|   }
 | |
| 
 | |
|   *last_writer = leader;
 | |
| 
 | |
|   if (leader->has_callback) {
 | |
|     // TODO(agiardullo:) Batching not currently supported as this write may
 | |
|     // fail if the callback function decides to abort this write.
 | |
|     return size;
 | |
|   }
 | |
| 
 | |
|   Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
 | |
| 
 | |
|   // This is safe regardless of any db mutex status of the caller. Previous
 | |
|   // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
 | |
|   // (they emptied the list and then we added ourself as leader) or had to
 | |
|   // explicitly wake up us (the list was non-empty when we added ourself,
 | |
|   // so we have already received our MarkJoined).
 | |
|   CreateMissingNewerLinks(newest_writer);
 | |
| 
 | |
|   // Tricky. Iteration start (leader) is exclusive and finish
 | |
|   // (newest_writer) is inclusive. Iteration goes from old to new.
 | |
|   Writer* w = leader;
 | |
|   while (w != newest_writer) {
 | |
|     w = w->link_newer;
 | |
| 
 | |
|     if (w->sync && !leader->sync) {
 | |
|       // Do not include a sync write into a batch handled by a non-sync write.
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if (!w->disableWAL && leader->disableWAL) {
 | |
|       // Do not include a write that needs WAL into a batch that has
 | |
|       // WAL disabled.
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if (w->has_callback) {
 | |
|       // Do not include writes which may be aborted if the callback does not
 | |
|       // succeed.
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     if (w->batch == nullptr) {
 | |
|       // Do not include those writes with nullptr batch. Those are not writes,
 | |
|       // those are something else. They want to be alone
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     auto batch_size = WriteBatchInternal::ByteSize(w->batch);
 | |
|     if (size + batch_size > max_size) {
 | |
|       // Do not make batch too big
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     size += batch_size;
 | |
|     write_batch_group->push_back(w->batch);
 | |
|     w->in_batch_group = true;
 | |
|     *last_writer = w;
 | |
|   }
 | |
|   return size;
 | |
| }
 | |
| 
 | |
| void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
 | |
|                                          Status status) {
 | |
|   assert(leader->link_older == nullptr);
 | |
| 
 | |
|   Writer* head = newest_writer_.load(std::memory_order_acquire);
 | |
|   if (head != last_writer ||
 | |
|       !newest_writer_.compare_exchange_strong(head, nullptr)) {
 | |
|     // Either w wasn't the head during the load(), or it was the head
 | |
|     // during the load() but somebody else pushed onto the list before
 | |
|     // we did the compare_exchange_strong (causing it to fail).  In the
 | |
|     // latter case compare_exchange_strong has the effect of re-reading
 | |
|     // its first param (head).  No need to retry a failing CAS, because
 | |
|     // only a departing leader (which we are at the moment) can remove
 | |
|     // nodes from the list.
 | |
|     assert(head != last_writer);
 | |
| 
 | |
|     // After walking link_older starting from head (if not already done)
 | |
|     // we will be able to traverse w->link_newer below. This function
 | |
|     // can only be called from an active leader, only a leader can
 | |
|     // clear newest_writer_, we didn't, and only a clear newest_writer_
 | |
|     // could cause the next leader to start their work without a call
 | |
|     // to MarkJoined, so we can definitely conclude that no other leader
 | |
|     // work is going on here (with or without db mutex).
 | |
|     CreateMissingNewerLinks(head);
 | |
|     assert(last_writer->link_newer->link_older == last_writer);
 | |
|     last_writer->link_newer->link_older = nullptr;
 | |
| 
 | |
|     // Next leader didn't self-identify, because newest_writer_ wasn't
 | |
|     // nullptr when they enqueued (we were definitely enqueued before them
 | |
|     // and are still in the list).  That means leader handoff occurs when
 | |
|     // we call MarkJoined
 | |
|     MarkJoined(last_writer->link_newer);
 | |
|   }
 | |
|   // else nobody else was waiting, although there might already be a new
 | |
|   // leader now
 | |
| 
 | |
|   while (last_writer != leader) {
 | |
|     last_writer->status = status;
 | |
|     last_writer->done = true;
 | |
|     // We must read link_older before calling MarkJoined, because as
 | |
|     // soon as it is marked the other thread's AwaitJoined may return
 | |
|     // and deallocate the Writer.
 | |
|     auto next = last_writer->link_older;
 | |
|     MarkJoined(last_writer);
 | |
|     last_writer = next;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
 | |
|   assert(w->batch == nullptr);
 | |
|   bool wait_needed;
 | |
|   LinkOne(w, &wait_needed);
 | |
|   if (wait_needed) {
 | |
|     mu->Unlock();
 | |
|     Await(w);
 | |
|     mu->Lock();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WriteThread::ExitUnbatched(Writer* w) {
 | |
|   Status dummy_status;
 | |
|   ExitAsBatchGroupLeader(w, w, dummy_status);
 | |
| }
 | |
| 
 | |
| }  // namespace rocksdb
 | |
| 
 |