//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <assert.h>
#include <stdint.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <type_traits>
#include <vector>

#include "db/dbformat.h"
#include "db/pre_release_callback.h"
#include "db/write_callback.h"
#include "monitoring/instrumented_mutex.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/write_batch.h"
#include "util/autovector.h"

namespace rocksdb {

class WriteThread {
 public:
  enum State : uint8_t {
    // The initial state of a writer.  This is a Writer that is
    // waiting in JoinBatchGroup.  This state can be left when another
    // thread informs the waiter that it has become a group leader
    // (-> STATE_GROUP_LEADER), when a leader that has chosen to be
    // non-parallel informs a follower that its writes have been committed
    // (-> STATE_COMPLETED), or when a leader that has chosen to perform
    // updates in parallel and needs this Writer to apply its batch (->
    // STATE_PARALLEL_FOLLOWER).
    STATE_INIT = 1,

    // The state used to inform a waiting Writer that it has become the
    // leader, and it should now build a write batch group.  Tricky:
    // this state is not used if newest_writer_ is empty when a writer
    // enqueues itself, because there is no need to wait (or even to
    // create the mutex and condvar used to wait) in that case.  This is
    // a terminal state unless the leader chooses to make this a parallel
    // batch, in which case the last parallel worker to finish will move
    // the leader to STATE_COMPLETED.
    STATE_GROUP_LEADER = 2,

    // The state used to inform a waiting writer that it has become the
    // leader of memtable writer group. The leader will either write
    // memtable for the whole group, or launch a parallel group write
    // to memtable by calling LaunchParallelMemTableWrite.
    STATE_MEMTABLE_WRITER_LEADER = 4,

    // The state used to inform a waiting writer that it has become a
    // parallel memtable writer. It can be the group leader who launch the
    // parallel writer group, or one of the followers. The writer should then
    // apply its batch to the memtable concurrently and call
    // CompleteParallelMemTableWriter.
    STATE_PARALLEL_MEMTABLE_WRITER = 8,

    // A follower whose writes have been applied, or a parallel leader
    // whose followers have all finished their work.  This is a terminal
    // state.
    STATE_COMPLETED = 16,

    // A state indicating that the thread may be waiting using StateMutex()
    // and StateCondVar()
    STATE_LOCKED_WAITING = 32,
  };

  struct Writer;

  struct WriteGroup {
    Writer* leader = nullptr;
    Writer* last_writer = nullptr;
    SequenceNumber last_sequence;
    // before running goes to zero, status needs leader->StateMutex()
    Status status;
    std::atomic<size_t> running;
    size_t size = 0;

    struct Iterator {
      Writer* writer;
      Writer* last_writer;

      explicit Iterator(Writer* w, Writer* last)
          : writer(w), last_writer(last) {}

      Writer* operator*() const { return writer; }

      Iterator& operator++() {
        assert(writer != nullptr);
        if (writer == last_writer) {
          writer = nullptr;
        } else {
          writer = writer->link_newer;
        }
        return *this;
      }

      bool operator!=(const Iterator& other) const {
        return writer != other.writer;
      }
    };

    Iterator begin() const { return Iterator(leader, last_writer); }
    Iterator end() const { return Iterator(nullptr, nullptr); }
  };

  // Information kept for every waiting writer.
  struct Writer {
    WriteBatch* batch;
    bool sync;
    bool no_slowdown;
    bool disable_wal;
    bool disable_memtable;
    size_t batch_cnt;  // if non-zero, number of sub-batches in the write batch
    PreReleaseCallback* pre_release_callback;
    uint64_t log_used;  // log number that this batch was inserted into
    uint64_t log_ref;   // log number that memtable insert should reference
    WriteCallback* callback;
    bool made_waitable;          // records lazy construction of mutex and cv
    std::atomic<uint8_t> state;  // write under StateMutex() or pre-link
    WriteGroup* write_group;
    SequenceNumber sequence;  // the sequence number to use for the first key
    Status status;
    Status callback_status;   // status returned by callback->Callback()

    std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
    std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
    Writer* link_older;  // read/write only before linking, or as leader
    Writer* link_newer;  // lazy, read/write only before linking, or as leader

    Writer()
        : batch(nullptr),
          sync(false),
          no_slowdown(false),
          disable_wal(false),
          disable_memtable(false),
          batch_cnt(0),
          pre_release_callback(nullptr),
          log_used(0),
          log_ref(0),
          callback(nullptr),
          made_waitable(false),
          state(STATE_INIT),
          write_group(nullptr),
          sequence(kMaxSequenceNumber),
          link_older(nullptr),
          link_newer(nullptr) {}

    Writer(const WriteOptions& write_options, WriteBatch* _batch,
           WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
           size_t _batch_cnt = 0,
           PreReleaseCallback* _pre_release_callback = nullptr)
        : batch(_batch),
          sync(write_options.sync),
          no_slowdown(write_options.no_slowdown),
          disable_wal(write_options.disableWAL),
          disable_memtable(_disable_memtable),
          batch_cnt(_batch_cnt),
          pre_release_callback(_pre_release_callback),
          log_used(0),
          log_ref(_log_ref),
          callback(_callback),
          made_waitable(false),
          state(STATE_INIT),
          write_group(nullptr),
          sequence(kMaxSequenceNumber),
          link_older(nullptr),
          link_newer(nullptr) {}

    ~Writer() {
      if (made_waitable) {
        StateMutex().~mutex();
        StateCV().~condition_variable();
      }
    }

    bool CheckCallback(DB* db) {
      if (callback != nullptr) {
        callback_status = callback->Callback(db);
      }
      return callback_status.ok();
    }

    void CreateMutex() {
      if (!made_waitable) {
        // Note that made_waitable is tracked separately from state
        // transitions, because we can't atomically create the mutex and
        // link into the list.
        made_waitable = true;
        new (&state_mutex_bytes) std::mutex;
        new (&state_cv_bytes) std::condition_variable;
      }
    }

    // returns the aggregate status of this Writer
    Status FinalStatus() {
      if (!status.ok()) {
        // a non-ok memtable write status takes presidence
        assert(callback == nullptr || callback_status.ok());
        return status;
      } else if (!callback_status.ok()) {
        // if the callback failed then that is the status we want
        // because a memtable insert should not have been attempted
        assert(callback != nullptr);
        assert(status.ok());
        return callback_status;
      } else {
        // if there is no callback then we only care about
        // the memtable insert status
        assert(callback == nullptr || callback_status.ok());
        return status;
      }
    }

    bool CallbackFailed() {
      return (callback != nullptr) && !callback_status.ok();
    }

    bool ShouldWriteToMemtable() {
      return status.ok() && !CallbackFailed() && !disable_memtable;
    }

    bool ShouldWriteToWAL() {
      return status.ok() && !CallbackFailed() && !disable_wal;
    }

    // No other mutexes may be acquired while holding StateMutex(), it is
    // always last in the order
    std::mutex& StateMutex() {
      assert(made_waitable);
      return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
    }

    std::condition_variable& StateCV() {
      assert(made_waitable);
      return *static_cast<std::condition_variable*>(
                 static_cast<void*>(&state_cv_bytes));
    }
  };

  struct AdaptationContext {
    const char* name;
    std::atomic<int32_t> value;

    explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
  };

  explicit WriteThread(const ImmutableDBOptions& db_options);

  virtual ~WriteThread() = default;

  // IMPORTANT: None of the methods in this class rely on the db mutex
  // for correctness. All of the methods except JoinBatchGroup and
  // EnterUnbatched may be called either with or without the db mutex held.
  // Correctness is maintained by ensuring that only a single thread is
  // a leader at a time.

  // Registers w as ready to become part of a batch group, waits until the
  // caller should perform some work, and returns the current state of the
  // writer.  If w has become the leader of a write batch group, returns
  // STATE_GROUP_LEADER.  If w has been made part of a sequential batch
  // group and the leader has performed the write, returns STATE_DONE.
  // If w has been made part of a parallel batch group and is responsible
  // for updating the memtable, returns STATE_PARALLEL_FOLLOWER.
  //
  // The db mutex SHOULD NOT be held when calling this function, because
  // it will block.
  //
  // Writer* w:        Writer to be executed as part of a batch group
  void JoinBatchGroup(Writer* w);

  // Constructs a write batch group led by leader, which should be a
  // Writer passed to JoinBatchGroup on the current thread.
  //
  // Writer* leader:          Writer that is STATE_GROUP_LEADER
  // WriteGroup* write_group: Out-param of group members
  // returns:                 Total batch group byte size
  size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);

  // Unlinks the Writer-s in a batch group, wakes up the non-leaders,
  // and wakes up the next leader (if any).
  //
  // WriteGroup* write_group: the write group
  // Status status:           Status of write operation
  void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status);

  // Exit batch group on behalf of batch group leader.
  void ExitAsBatchGroupFollower(Writer* w);

  // Constructs a write batch group led by leader from newest_memtable_writers_
  // list. The leader should either write memtable for the whole group and
  // call ExitAsMemTableWriter, or launch parallel memtable write through
  // LaunchParallelMemTableWriters.
  void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);

  // Memtable writer group leader, or the last finished writer in a parallel
  // write group, exit from the newest_memtable_writers_ list, and wake up
  // the next leader if needed.
  void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);

  // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
  // non-leader members of this write batch group.  Sets Writer::sequence
  // before waking them up.
  //
  // WriteGroup* write_group: Extra state used to coordinate the parallel add
  void LaunchParallelMemTableWriters(WriteGroup* write_group);

  // Reports the completion of w's batch to the parallel group leader, and
  // waits for the rest of the parallel batch to complete.  Returns true
  // if this thread is the last to complete, and hence should advance
  // the sequence number and then call EarlyExitParallelGroup, false if
  // someone else has already taken responsibility for that.
  bool CompleteParallelMemTableWriter(Writer* w);

  // Waits for all preceding writers (unlocking mu while waiting), then
  // registers w as the currently proceeding writer.
  //
  // Writer* w:              A Writer not eligible for batching
  // InstrumentedMutex* mu:  The db mutex, to unlock while waiting
  // REQUIRES: db mutex held
  void EnterUnbatched(Writer* w, InstrumentedMutex* mu);

  // Completes a Writer begun with EnterUnbatched, unblocking subsequent
  // writers.
  void ExitUnbatched(Writer* w);

  // Wait for all parallel memtable writers to finish, in case pipelined
  // write is enabled.
  void WaitForMemTableWriters();

  SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
    if (sequence > last_sequence_) {
      last_sequence_ = sequence;
    }
    return last_sequence_;
  }

  // Insert a dummy writer at the tail of the write queue to indicate a write
  // stall, and fail any writers in the queue with no_slowdown set to true
  void BeginWriteStall();

  // Remove the dummy writer and wake up waiting writers
  void EndWriteStall();

 private:
  // See AwaitState.
  const uint64_t max_yield_usec_;
  const uint64_t slow_yield_usec_;

  // Allow multiple writers write to memtable concurrently.
  const bool allow_concurrent_memtable_write_;

  // Enable pipelined write to WAL and memtable.
  const bool enable_pipelined_write_;

  // The maximum limit of number of bytes that are written in a single batch
  // of WAL or memtable write. It is followed when the leader write size
  // is larger than 1/8 of this limit.
  const uint64_t max_write_batch_group_size_bytes;

  // Points to the newest pending writer. Only leader can remove
  // elements, adding can be done lock-free by anybody.
  std::atomic<Writer*> newest_writer_;

  // Points to the newest pending memtable writer. Used only when pipelined
  // write is enabled.
  std::atomic<Writer*> newest_memtable_writer_;

  // The last sequence that have been consumed by a writer. The sequence
  // is not necessary visible to reads because the writer can be ongoing.
  SequenceNumber last_sequence_;

  // A dummy writer to indicate a write stall condition. This will be inserted
  // at the tail of the writer queue by the leader, so newer writers can just
  // check for this and bail
  Writer write_stall_dummy_;

  // Mutex and condvar for writers to block on a write stall. During a write
  // stall, writers with no_slowdown set to false will wait on this rather
  // on the writer queue
  port::Mutex stall_mu_;
  port::CondVar stall_cv_;

  // Waits for w->state & goal_mask using w->StateMutex().  Returns
  // the state that satisfies goal_mask.
  uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);

  // Blocks until w->state & goal_mask, returning the state value
  // that satisfied the predicate.  Uses ctx to adaptively use
  // std::this_thread::yield() to avoid mutex overheads.  ctx should be
  // a context-dependent static.
  uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);

  // Set writer state and wake the writer up if it is waiting.
  void SetState(Writer* w, uint8_t new_state);

  // Links w into the newest_writer list. Return true if w was linked directly
  // into the leader position.  Safe to call from multiple threads without
  // external locking.
  bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);

  // Link write group into the newest_writer list as a whole, while keeping the
  // order of the writers unchanged. Return true if the group was linked
  // directly into the leader position.
  bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);

  // Computes any missing link_newer links.  Should not be called
  // concurrently with itself.
  void CreateMissingNewerLinks(Writer* head);

  // Starting from a pending writer, follow link_older to search for next
  // leader, until we hit boundary.
  Writer* FindNextLeader(Writer* pending_writer, Writer* boundary);

  // Set the leader in write_group to completed state and remove it from the
  // write group.
  void CompleteLeader(WriteGroup& write_group);

  // Set a follower in write_group to completed state and remove it from the
  // write group.
  void CompleteFollower(Writer* w, WriteGroup& write_group);
};

}  // namespace rocksdb