fork of https://github.com/rust-rocksdb/rust-rocksdb for nextgraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							464 lines
						
					
					
						
							17 KiB
						
					
					
				
			
		
		
	
	
							464 lines
						
					
					
						
							17 KiB
						
					
					
				| //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under both the GPLv2 (found in the
 | |
| //  COPYING file in the root directory) and Apache 2.0 License
 | |
| //  (found in the LICENSE.Apache file in the root directory).
 | |
| 
 | |
| #pragma once
 | |
| 
 | |
| #include <atomic>
 | |
| #include <cassert>
 | |
| #include <chrono>
 | |
| #include <condition_variable>
 | |
| #include <cstdint>
 | |
| #include <mutex>
 | |
| #include <type_traits>
 | |
| #include <vector>
 | |
| 
 | |
| #include "db/dbformat.h"
 | |
| #include "db/post_memtable_callback.h"
 | |
| #include "db/pre_release_callback.h"
 | |
| #include "db/write_callback.h"
 | |
| #include "monitoring/instrumented_mutex.h"
 | |
| #include "rocksdb/options.h"
 | |
| #include "rocksdb/status.h"
 | |
| #include "rocksdb/types.h"
 | |
| #include "rocksdb/write_batch.h"
 | |
| #include "util/autovector.h"
 | |
| 
 | |
| namespace ROCKSDB_NAMESPACE {
 | |
| 
 | |
| class WriteThread {
 | |
|  public:
 | |
|   enum State : uint8_t {
 | |
|     // The initial state of a writer.  This is a Writer that is
 | |
|     // waiting in JoinBatchGroup.  This state can be left when another
 | |
|     // thread informs the waiter that it has become a group leader
 | |
|     // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
 | |
|     // non-parallel informs a follower that its writes have been committed
 | |
|     // (-> STATE_COMPLETED), or when a leader that has chosen to perform
 | |
|     // updates in parallel and needs this Writer to apply its batch (->
 | |
|     // STATE_PARALLEL_MEMTABLE_WRITER).
 | |
|     STATE_INIT = 1,
 | |
| 
 | |
|     // The state used to inform a waiting Writer that it has become the
 | |
|     // leader, and it should now build a write batch group.  Tricky:
 | |
|     // this state is not used if newest_writer_ is empty when a writer
 | |
|     // enqueues itself, because there is no need to wait (or even to
 | |
|     // create the mutex and condvar used to wait) in that case.  This is
 | |
|     // a terminal state unless the leader chooses to make this a parallel
 | |
|     // batch, in which case the last parallel worker to finish will move
 | |
|     // the leader to STATE_COMPLETED.
 | |
|     STATE_GROUP_LEADER = 2,
 | |
| 
 | |
|     // The state used to inform a waiting writer that it has become the
 | |
|     // leader of memtable writer group. The leader will either write
 | |
|     // memtable for the whole group, or launch a parallel group write
 | |
|     // to memtable by calling LaunchParallelMemTableWrite.
 | |
|     STATE_MEMTABLE_WRITER_LEADER = 4,
 | |
| 
 | |
|     // The state used to inform a waiting writer that it has become a
 | |
|     // parallel memtable writer. It can be the group leader who launch the
 | |
|     // parallel writer group, or one of the followers. The writer should then
 | |
|     // apply its batch to the memtable concurrently and call
 | |
|     // CompleteParallelMemTableWriter.
 | |
|     STATE_PARALLEL_MEMTABLE_WRITER = 8,
 | |
| 
 | |
|     // A follower whose writes have been applied, or a parallel leader
 | |
|     // whose followers have all finished their work.  This is a terminal
 | |
|     // state.
 | |
|     STATE_COMPLETED = 16,
 | |
| 
 | |
|     // A state indicating that the thread may be waiting using StateMutex()
 | |
|     // and StateCondVar()
 | |
|     STATE_LOCKED_WAITING = 32,
 | |
|   };
 | |
| 
 | |
|   struct Writer;
 | |
| 
 | |
|   struct WriteGroup {
 | |
|     Writer* leader = nullptr;
 | |
|     Writer* last_writer = nullptr;
 | |
|     SequenceNumber last_sequence;
 | |
|     // before running goes to zero, status needs leader->StateMutex()
 | |
|     Status status;
 | |
|     std::atomic<size_t> running;
 | |
|     size_t size = 0;
 | |
| 
 | |
|     struct Iterator {
 | |
|       Writer* writer;
 | |
|       Writer* last_writer;
 | |
| 
 | |
|       explicit Iterator(Writer* w, Writer* last)
 | |
|           : writer(w), last_writer(last) {}
 | |
| 
 | |
|       Writer* operator*() const { return writer; }
 | |
| 
 | |
|       Iterator& operator++() {
 | |
|         assert(writer != nullptr);
 | |
|         if (writer == last_writer) {
 | |
|           writer = nullptr;
 | |
|         } else {
 | |
|           writer = writer->link_newer;
 | |
|         }
 | |
|         return *this;
 | |
|       }
 | |
| 
 | |
|       bool operator!=(const Iterator& other) const {
 | |
|         return writer != other.writer;
 | |
|       }
 | |
|     };
 | |
| 
 | |
|     Iterator begin() const { return Iterator(leader, last_writer); }
 | |
|     Iterator end() const { return Iterator(nullptr, nullptr); }
 | |
|   };
 | |
| 
 | |
|   // Information kept for every waiting writer.
 | |
|   struct Writer {
 | |
|     WriteBatch* batch;
 | |
|     bool sync;
 | |
|     bool no_slowdown;
 | |
|     bool disable_wal;
 | |
|     Env::IOPriority rate_limiter_priority;
 | |
|     bool disable_memtable;
 | |
|     size_t batch_cnt;  // if non-zero, number of sub-batches in the write batch
 | |
|     size_t protection_bytes_per_key;
 | |
|     PreReleaseCallback* pre_release_callback;
 | |
|     PostMemTableCallback* post_memtable_callback;
 | |
|     uint64_t log_used;  // log number that this batch was inserted into
 | |
|     uint64_t log_ref;   // log number that memtable insert should reference
 | |
|     WriteCallback* callback;
 | |
|     bool made_waitable;          // records lazy construction of mutex and cv
 | |
|     std::atomic<uint8_t> state;  // write under StateMutex() or pre-link
 | |
|     WriteGroup* write_group;
 | |
|     SequenceNumber sequence;  // the sequence number to use for the first key
 | |
|     Status status;
 | |
|     Status callback_status;  // status returned by callback->Callback()
 | |
| 
 | |
|     std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
 | |
|     std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
 | |
|     Writer* link_older;  // read/write only before linking, or as leader
 | |
|     Writer* link_newer;  // lazy, read/write only before linking, or as leader
 | |
| 
 | |
|     Writer()
 | |
|         : batch(nullptr),
 | |
|           sync(false),
 | |
|           no_slowdown(false),
 | |
|           disable_wal(false),
 | |
|           rate_limiter_priority(Env::IOPriority::IO_TOTAL),
 | |
|           disable_memtable(false),
 | |
|           batch_cnt(0),
 | |
|           protection_bytes_per_key(0),
 | |
|           pre_release_callback(nullptr),
 | |
|           post_memtable_callback(nullptr),
 | |
|           log_used(0),
 | |
|           log_ref(0),
 | |
|           callback(nullptr),
 | |
|           made_waitable(false),
 | |
|           state(STATE_INIT),
 | |
|           write_group(nullptr),
 | |
|           sequence(kMaxSequenceNumber),
 | |
|           link_older(nullptr),
 | |
|           link_newer(nullptr) {}
 | |
| 
 | |
|     Writer(const WriteOptions& write_options, WriteBatch* _batch,
 | |
|            WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
 | |
|            size_t _batch_cnt = 0,
 | |
|            PreReleaseCallback* _pre_release_callback = nullptr,
 | |
|            PostMemTableCallback* _post_memtable_callback = nullptr)
 | |
|         : batch(_batch),
 | |
|           sync(write_options.sync),
 | |
|           no_slowdown(write_options.no_slowdown),
 | |
|           disable_wal(write_options.disableWAL),
 | |
|           rate_limiter_priority(write_options.rate_limiter_priority),
 | |
|           disable_memtable(_disable_memtable),
 | |
|           batch_cnt(_batch_cnt),
 | |
|           protection_bytes_per_key(_batch->GetProtectionBytesPerKey()),
 | |
|           pre_release_callback(_pre_release_callback),
 | |
|           post_memtable_callback(_post_memtable_callback),
 | |
|           log_used(0),
 | |
|           log_ref(_log_ref),
 | |
|           callback(_callback),
 | |
|           made_waitable(false),
 | |
|           state(STATE_INIT),
 | |
|           write_group(nullptr),
 | |
|           sequence(kMaxSequenceNumber),
 | |
|           link_older(nullptr),
 | |
|           link_newer(nullptr) {}
 | |
| 
 | |
|     ~Writer() {
 | |
|       if (made_waitable) {
 | |
|         StateMutex().~mutex();
 | |
|         StateCV().~condition_variable();
 | |
|       }
 | |
|       status.PermitUncheckedError();
 | |
|       callback_status.PermitUncheckedError();
 | |
|     }
 | |
| 
 | |
|     bool CheckCallback(DB* db) {
 | |
|       if (callback != nullptr) {
 | |
|         callback_status = callback->Callback(db);
 | |
|       }
 | |
|       return callback_status.ok();
 | |
|     }
 | |
| 
 | |
|     void CreateMutex() {
 | |
|       if (!made_waitable) {
 | |
|         // Note that made_waitable is tracked separately from state
 | |
|         // transitions, because we can't atomically create the mutex and
 | |
|         // link into the list.
 | |
|         made_waitable = true;
 | |
|         new (&state_mutex_bytes) std::mutex;
 | |
|         new (&state_cv_bytes) std::condition_variable;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     // returns the aggregate status of this Writer
 | |
|     Status FinalStatus() {
 | |
|       if (!status.ok()) {
 | |
|         // a non-ok memtable write status takes presidence
 | |
|         assert(callback == nullptr || callback_status.ok());
 | |
|         return status;
 | |
|       } else if (!callback_status.ok()) {
 | |
|         // if the callback failed then that is the status we want
 | |
|         // because a memtable insert should not have been attempted
 | |
|         assert(callback != nullptr);
 | |
|         assert(status.ok());
 | |
|         return callback_status;
 | |
|       } else {
 | |
|         // if there is no callback then we only care about
 | |
|         // the memtable insert status
 | |
|         assert(callback == nullptr || callback_status.ok());
 | |
|         return status;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     bool CallbackFailed() {
 | |
|       return (callback != nullptr) && !callback_status.ok();
 | |
|     }
 | |
| 
 | |
|     bool ShouldWriteToMemtable() {
 | |
|       return status.ok() && !CallbackFailed() && !disable_memtable;
 | |
|     }
 | |
| 
 | |
|     bool ShouldWriteToWAL() {
 | |
|       return status.ok() && !CallbackFailed() && !disable_wal;
 | |
|     }
 | |
| 
 | |
|     // No other mutexes may be acquired while holding StateMutex(), it is
 | |
|     // always last in the order
 | |
|     std::mutex& StateMutex() {
 | |
|       assert(made_waitable);
 | |
|       return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
 | |
|     }
 | |
| 
 | |
|     std::condition_variable& StateCV() {
 | |
|       assert(made_waitable);
 | |
|       return *static_cast<std::condition_variable*>(
 | |
|           static_cast<void*>(&state_cv_bytes));
 | |
|     }
 | |
|   };
 | |
| 
 | |
|   struct AdaptationContext {
 | |
|     const char* name;
 | |
|     std::atomic<int32_t> value;
 | |
| 
 | |
|     explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
 | |
|   };
 | |
| 
 | |
|   explicit WriteThread(const ImmutableDBOptions& db_options);
 | |
| 
 | |
|   virtual ~WriteThread() = default;
 | |
| 
 | |
|   // IMPORTANT: None of the methods in this class rely on the db mutex
 | |
|   // for correctness. All of the methods except JoinBatchGroup and
 | |
|   // EnterUnbatched may be called either with or without the db mutex held.
 | |
|   // Correctness is maintained by ensuring that only a single thread is
 | |
|   // a leader at a time.
 | |
| 
 | |
|   // Registers w as ready to become part of a batch group, waits until the
 | |
|   // caller should perform some work, and returns the current state of the
 | |
|   // writer.  If w has become the leader of a write batch group, returns
 | |
|   // STATE_GROUP_LEADER.  If w has been made part of a sequential batch
 | |
|   // group and the leader has performed the write, returns STATE_DONE.
 | |
|   // If w has been made part of a parallel batch group and is responsible
 | |
|   // for updating the memtable, returns STATE_PARALLEL_MEMTABLE_WRITER.
 | |
|   //
 | |
|   // The db mutex SHOULD NOT be held when calling this function, because
 | |
|   // it will block.
 | |
|   //
 | |
|   // Writer* w:        Writer to be executed as part of a batch group
 | |
|   void JoinBatchGroup(Writer* w);
 | |
| 
 | |
|   // Constructs a write batch group led by leader, which should be a
 | |
|   // Writer passed to JoinBatchGroup on the current thread.
 | |
|   //
 | |
|   // Writer* leader:          Writer that is STATE_GROUP_LEADER
 | |
|   // WriteGroup* write_group: Out-param of group members
 | |
|   // returns:                 Total batch group byte size
 | |
|   size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
 | |
| 
 | |
|   // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
 | |
|   // and wakes up the next leader (if any).
 | |
|   //
 | |
|   // WriteGroup* write_group: the write group
 | |
|   // Status status:           Status of write operation
 | |
|   void ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status);
 | |
| 
 | |
|   // Exit batch group on behalf of batch group leader.
 | |
|   void ExitAsBatchGroupFollower(Writer* w);
 | |
| 
 | |
|   // Constructs a write batch group led by leader from newest_memtable_writers_
 | |
|   // list. The leader should either write memtable for the whole group and
 | |
|   // call ExitAsMemTableWriter, or launch parallel memtable write through
 | |
|   // LaunchParallelMemTableWriters.
 | |
|   void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
 | |
| 
 | |
|   // Memtable writer group leader, or the last finished writer in a parallel
 | |
|   // write group, exit from the newest_memtable_writers_ list, and wake up
 | |
|   // the next leader if needed.
 | |
|   void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
 | |
| 
 | |
|   // Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of
 | |
|   // the non-leader members of this write batch group.  Sets Writer::sequence
 | |
|   // before waking them up.
 | |
|   //
 | |
|   // WriteGroup* write_group: Extra state used to coordinate the parallel add
 | |
|   void LaunchParallelMemTableWriters(WriteGroup* write_group);
 | |
| 
 | |
|   // Reports the completion of w's batch to the parallel group leader, and
 | |
|   // waits for the rest of the parallel batch to complete.  Returns true
 | |
|   // if this thread is the last to complete, and hence should advance
 | |
|   // the sequence number and then call EarlyExitParallelGroup, false if
 | |
|   // someone else has already taken responsibility for that.
 | |
|   bool CompleteParallelMemTableWriter(Writer* w);
 | |
| 
 | |
|   // Waits for all preceding writers (unlocking mu while waiting), then
 | |
|   // registers w as the currently proceeding writer.
 | |
|   //
 | |
|   // Writer* w:              A Writer not eligible for batching
 | |
|   // InstrumentedMutex* mu:  The db mutex, to unlock while waiting
 | |
|   // REQUIRES: db mutex held
 | |
|   void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
 | |
| 
 | |
|   // Completes a Writer begun with EnterUnbatched, unblocking subsequent
 | |
|   // writers.
 | |
|   void ExitUnbatched(Writer* w);
 | |
| 
 | |
|   // Wait for all parallel memtable writers to finish, in case pipelined
 | |
|   // write is enabled.
 | |
|   void WaitForMemTableWriters();
 | |
| 
 | |
|   SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
 | |
|     if (sequence > last_sequence_) {
 | |
|       last_sequence_ = sequence;
 | |
|     }
 | |
|     return last_sequence_;
 | |
|   }
 | |
| 
 | |
|   // Insert a dummy writer at the tail of the write queue to indicate a write
 | |
|   // stall, and fail any writers in the queue with no_slowdown set to true
 | |
|   // REQUIRES: db mutex held, no other stall on this queue outstanding
 | |
|   void BeginWriteStall();
 | |
| 
 | |
|   // Remove the dummy writer and wake up waiting writers
 | |
|   // REQUIRES: db mutex held
 | |
|   void EndWriteStall();
 | |
| 
 | |
|   // Number of BeginWriteStall(), or 0 if there is no active stall in the
 | |
|   // write queue.
 | |
|   // REQUIRES: db mutex held
 | |
|   uint64_t GetBegunCountOfOutstandingStall();
 | |
| 
 | |
|   // Wait for number of completed EndWriteStall() to reach >= `stall_count`,
 | |
|   // which will generally have come from GetBegunCountOfOutstandingStall().
 | |
|   // (Does not require db mutex held)
 | |
|   void WaitForStallEndedCount(uint64_t stall_count);
 | |
| 
 | |
|  private:
 | |
|   // See AwaitState.
 | |
|   const uint64_t max_yield_usec_;
 | |
|   const uint64_t slow_yield_usec_;
 | |
| 
 | |
|   // Allow multiple writers write to memtable concurrently.
 | |
|   const bool allow_concurrent_memtable_write_;
 | |
| 
 | |
|   // Enable pipelined write to WAL and memtable.
 | |
|   const bool enable_pipelined_write_;
 | |
| 
 | |
|   // The maximum limit of number of bytes that are written in a single batch
 | |
|   // of WAL or memtable write. It is followed when the leader write size
 | |
|   // is larger than 1/8 of this limit.
 | |
|   const uint64_t max_write_batch_group_size_bytes;
 | |
| 
 | |
|   // Points to the newest pending writer. Only leader can remove
 | |
|   // elements, adding can be done lock-free by anybody.
 | |
|   std::atomic<Writer*> newest_writer_;
 | |
| 
 | |
|   // Points to the newest pending memtable writer. Used only when pipelined
 | |
|   // write is enabled.
 | |
|   std::atomic<Writer*> newest_memtable_writer_;
 | |
| 
 | |
|   // The last sequence that have been consumed by a writer. The sequence
 | |
|   // is not necessary visible to reads because the writer can be ongoing.
 | |
|   SequenceNumber last_sequence_;
 | |
| 
 | |
|   // A dummy writer to indicate a write stall condition. This will be inserted
 | |
|   // at the tail of the writer queue by the leader, so newer writers can just
 | |
|   // check for this and bail
 | |
|   Writer write_stall_dummy_;
 | |
| 
 | |
|   // Mutex and condvar for writers to block on a write stall. During a write
 | |
|   // stall, writers with no_slowdown set to false will wait on this rather
 | |
|   // on the writer queue
 | |
|   port::Mutex stall_mu_;
 | |
|   port::CondVar stall_cv_;
 | |
| 
 | |
|   // Count the number of stalls begun, so that we can check whether
 | |
|   // a particular stall has cleared (even if caught in another stall).
 | |
|   // Controlled by DB mutex.
 | |
|   // Because of the contract on BeginWriteStall() / EndWriteStall(),
 | |
|   // stall_ended_count_ <= stall_begun_count_ <= stall_ended_count_ + 1.
 | |
|   uint64_t stall_begun_count_ = 0;
 | |
|   // Count the number of stalls ended, so that we can check whether
 | |
|   // a particular stall has cleared (even if caught in another stall).
 | |
|   // Writes controlled by DB mutex + stall_mu_, signalled by stall_cv_.
 | |
|   // Read with stall_mu or DB mutex.
 | |
|   uint64_t stall_ended_count_ = 0;
 | |
| 
 | |
|   // Waits for w->state & goal_mask using w->StateMutex().  Returns
 | |
|   // the state that satisfies goal_mask.
 | |
|   uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
 | |
| 
 | |
|   // Blocks until w->state & goal_mask, returning the state value
 | |
|   // that satisfied the predicate.  Uses ctx to adaptively use
 | |
|   // std::this_thread::yield() to avoid mutex overheads.  ctx should be
 | |
|   // a context-dependent static.
 | |
|   uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
 | |
| 
 | |
|   // Set writer state and wake the writer up if it is waiting.
 | |
|   void SetState(Writer* w, uint8_t new_state);
 | |
| 
 | |
|   // Links w into the newest_writer list. Return true if w was linked directly
 | |
|   // into the leader position.  Safe to call from multiple threads without
 | |
|   // external locking.
 | |
|   bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
 | |
| 
 | |
|   // Link write group into the newest_writer list as a whole, while keeping the
 | |
|   // order of the writers unchanged. Return true if the group was linked
 | |
|   // directly into the leader position.
 | |
|   bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
 | |
| 
 | |
|   // Computes any missing link_newer links.  Should not be called
 | |
|   // concurrently with itself.
 | |
|   void CreateMissingNewerLinks(Writer* head);
 | |
| 
 | |
|   // Set the leader in write_group to completed state and remove it from the
 | |
|   // write group.
 | |
|   void CompleteLeader(WriteGroup& write_group);
 | |
| 
 | |
|   // Set a follower in write_group to completed state and remove it from the
 | |
|   // write group.
 | |
|   void CompleteFollower(Writer* w, WriteGroup& write_group);
 | |
| };
 | |
| 
 | |
| }  // namespace ROCKSDB_NAMESPACE
 | |
| 
 |