/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#ifndef THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1

#include "thrift/lib/cpp/Thrift.h"
#include "thrift/lib/cpp/server/TServer.h"
#include "thrift/lib/cpp/transport/TBufferTransports.h"
#include "thrift/lib/cpp/transport/TSocket.h"
#include "thrift/lib/cpp/concurrency/ThreadManager.h"
#include "thrift/lib/cpp/concurrency/Thread.h"
#include "thrift/lib/cpp/concurrency/PosixThreadFactory.h"
#include "thrift/lib/cpp/concurrency/Mutex.h"
#include "thrift/lib/cpp/concurrency/ThreadLocal.h"
#include <stack>
#include <vector>
#include <string>
#include <errno.h>
#include <cstdlib>
#include <unistd.h>
#include <limits.h>
#include <event.h>  // libevent
#include <tr1/unordered_set>


namespace apache { namespace thrift { namespace server {

using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TSocket;
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
using apache::thrift::concurrency::PosixThreadFactory;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::Mutex;
using apache::thrift::concurrency::Guard;

/**
 * This is a non-blocking server in C++ for high performance that
 * operates a set of IO threads (by default only one). It assumes that
 * all incoming requests are framed with a 4 byte length indicator and
 * writes out responses using the same framing.
 *
 * It does not use the TServerTransport framework, but rather has socket
 * operations hardcoded for use with select.
 *
 */


/// Overload condition actions.
enum TOverloadAction {
  T_OVERLOAD_NO_ACTION,        ///< Don't handle overload */
  T_OVERLOAD_CLOSE_ON_ACCEPT,  ///< Drop new connections immediately */
  T_OVERLOAD_DRAIN_TASK_QUEUE, ///< Drop some tasks from head of task queue */
  T_OVERLOAD_PAUSE_ACCEPTING   ///< do not accept any new connections
};

class TNonblockingIOThread;
class TNonblockingServerObserver;

class TNonblockingServer : public TServer {
 private:
  class TConnection;

  friend class TNonblockingIOThread;
 private:
  /// Listen backlog
  static const int LISTEN_BACKLOG = 1024;

  /// Default limit on size of idle connection pool
  static const int CONNECTION_STACK_LIMIT = -1;

  /// Default limit on frame size
  static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;

  /// Default limit on total number of connected sockets
  static const int MAX_CONNECTIONS = INT_MAX;

  /// Default limit on connections in handler/task processing
  static const int MAX_ACTIVE_PROCESSORS = INT_MAX;

  /// Default limit on the number of active requests on the server.
  static const int MAX_ACTIVE_REQUESTS = INT_MAX;

  /// Default limit on memory usage
  static const uint64_t MAX_MEMORY_USAGE_BYTES = ULONG_MAX;

  /// Default size of write buffer
  static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;

  /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
  static const int IDLE_READ_BUFFER_LIMIT = 8192;

  /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
  static const int IDLE_WRITE_BUFFER_LIMIT = 0;

  /// # of calls before resizing oversized buffers (0 = check only on close)
  static const int RESIZE_BUFFER_EVERY_N = 0;

  /// # of IO threads to use by default
  static const int DEFAULT_IO_THREADS = 1;

  /// File descriptor of an invalid socket
  static const int INVALID_SOCKET = -1;

  /// # of IO threads this server will use
  size_t numIOThreads_;

  /// Whether to set high scheduling priority for IO threads
  bool useHighPriorityIOThreads_;

  /// Server socket file descriptor
  int serverSocket_;

  /// Port server runs on
  int port_;

  /// For processing via thread pool, may be NULL
  boost::shared_ptr<ThreadManager> threadManager_;

  /// Is thread pool processing?
  bool threadPoolProcessing_;

  // Factory to create the IO threads
  boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;

  // Vector of IOThread objects that will handle our IO
  std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;

  // Index of next IO Thread to be used (for round-robin)
  int nextIOThread_;

  // Synchronizes access to connection stack and similar data, such as
  // numActiveRequests_.
  Mutex connMutex_;

  /// Number of TConnection object we've created
  size_t numTConnections_;

  /// Number of Connections processing or waiting to process
  size_t numActiveProcessors_;

  /// Number of active/outstanding requests on the server. I.e., sum of requests
  /// that are being processed, waiting to be processed and are waiting for the
  /// reply to be sent out over the wire.
  size_t numActiveRequests_;

  /// Limit for the number of requests that are being processed, waiting to be
  /// processed, or are waiting for the reply to be sent out over the wire.
  size_t maxActiveRequests_;

  /// listen backlog
  int32_t listenBacklog_;

  /// Limit for how many TConnection objects to cache
  /// 0 = infinite, -1 = none.
  size_t connectionStackLimit_;

  /// Limit for number of connections processing or waiting to process
  size_t maxActiveProcessors_;

  /// Limit for number of open connections
  size_t maxConnections_;

  /// Limit for memory usage
  uint64_t maxMemoryUsageBytes_;

  /// Limit for frame size
  size_t maxFrameSize_;

  /// Time in milliseconds before an unperformed task expires (0 == infinite).
  int64_t taskExpireTime_;

  /// Time in milliseconds to pause accepts for OVERLOAD_PAUSE_ACCEPTING
  int pauseAcceptDuration_;

  /**
   * Hysteresis for overload state.  This is the fraction of the overload
   * value that needs to be reached before the overload state is cleared;
   * must be <= 1.0.
   */
  double overloadHysteresis_;

  /// Action to take when we're overloaded.
  TOverloadAction overloadAction_;

  /**
   * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
   * and found to be exceeded, reinitialized) to this size.
   */
  size_t writeBufferDefaultSize_;

  /**
   * Max read buffer size for an idle TConnection.  When we place an idle
   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
   * we will free the buffer (such that it will be reinitialized by the next
   * received frame) if it has exceeded this limit.  0 disables this check.
   */
  size_t idleReadBufferLimit_;

  /**
   * Max write buffer size for an idle connection.  When we place an idle
   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
   * we insure that its write buffer is <= to this size; otherwise we
   * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
   * idle connections don't hog memory. 0 disables this check.
   */
  size_t idleWriteBufferLimit_;

  /**
   * Every N calls we check the buffer size limits on a connected TConnection.
   * 0 disables (i.e. the checks are only done when a connection closes).
   */
  int32_t resizeBufferEveryN_;

  /// Set if we are currently in an overloaded state.
  bool overloaded_;

  /// Count of connections dropped since overload started
  uint32_t nConnectionsDropped_;

  /// Count of connections dropped on overload since server started
  uint64_t nTotalConnectionsDropped_;

  // Notification of various server events
  boost::shared_ptr<TNonblockingServerObserver> observer_;

  /**
   * This is a stack of all the objects that have been created but that
   * are NOT currently in use. When we close a connection, we place it on this
   * stack so that the object can be reused later, rather than freeing the
   * memory and reallocating a new object later.
   */
  std::stack<TConnection*> connectionStack_;

  /**
   * This contains all existing TConnections, including the ones in the
   * connectionStack_.
   */
  std::tr1::unordered_set<TConnection*>  connectionSet_;

  /**
   * Struct to help set socket options for
   * TNonblockingServer::TConnection->TSocket sockets
   */
  TSocket::Options sockOptHelper_;

  /**
   * Thread-local data storage to track the current connection being processed.
   */
  typedef concurrency::ThreadLocal<
      TConnection,
      concurrency::NoopThreadLocalManager<TConnection> >
    ThreadLocalConnection;
  ThreadLocalConnection currentConnection_;

  /**
   * Called when server socket had something happen.  We accept all waiting
   * client connections on listen socket fd and assign TConnection objects
   * to handle those requests.
   *
   * @param fd the listen socket.
   * @param which the event flag that triggered the handler.
   */
  void handleEvent(int fd, short which);

  void init(int port) {
    serverSocket_ = -1;
    numIOThreads_ = DEFAULT_IO_THREADS;
    nextIOThread_ = 0;
    useHighPriorityIOThreads_ = false;
    port_ = port;
    threadPoolProcessing_ = false;
    numTConnections_ = 0;
    numActiveProcessors_ = 0;
    numActiveRequests_ = 0;
    connectionStackLimit_ = CONNECTION_STACK_LIMIT;
    maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
    maxActiveRequests_ = MAX_ACTIVE_REQUESTS;
    maxConnections_ = MAX_CONNECTIONS;
    maxMemoryUsageBytes_ = MAX_MEMORY_USAGE_BYTES;
    maxFrameSize_ = MAX_FRAME_SIZE;
    taskExpireTime_ = 0;
    pauseAcceptDuration_ = 1;
    overloadHysteresis_ = 0.8;
    overloadAction_ = T_OVERLOAD_NO_ACTION;
    writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
    idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
    idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
    resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
    overloaded_ = false;
    nConnectionsDropped_ = 0;
    nTotalConnectionsDropped_ = 0;
    listenBacklog_ = LISTEN_BACKLOG;

  }

  size_t getWriteBufferBytesImpl() const;
  size_t getReadBufferBytesImpl() const;

 protected:

  /// Increment the count of connections currently processing.
  void incrementActiveProcessors() {
    Guard g(connMutex_);
    ++numActiveProcessors_;
  }

  /// Decrement the count of connections currently processing.
  void decrementActiveProcessors() {
    Guard g(connMutex_);
    if (numActiveProcessors_ > 0) {
      --numActiveProcessors_;
    }
  }

  /// Increment the number of active requests on the server.
  void incrementNumActiveRequests() {
    Guard g(connMutex_);
    ++numActiveRequests_;
  }

  /// Decrement the number of active requests on the server.
  void decrementNumActiveRequests() {
    Guard g(connMutex_);
    if (numActiveRequests_ > 0) {
      --numActiveRequests_;
    }
  }

 public:
  template<typename ProcessorFactory>
  TNonblockingServer(
      const boost::shared_ptr<ProcessorFactory>& processorFactory,
      int port,
      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
    TServer(processorFactory) {
    init(port);
  }

  template<typename Processor>
  TNonblockingServer(const boost::shared_ptr<Processor>& processor,
                     int port,
                     THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
    TServer(processor) {
    init(port);
  }

  template<typename ProcessorFactory>
  TNonblockingServer(
      const boost::shared_ptr<ProcessorFactory>& processorFactory,
      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
      int port,
      const boost::shared_ptr<ThreadManager>& threadManager =
        boost::shared_ptr<ThreadManager>(),
      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
    TServer(processorFactory) {

    init(port);

    setProtocolFactory(protocolFactory);
    setThreadManager(threadManager);
  }

  template<typename ProcessorFactory>
  TNonblockingServer(
      const boost::shared_ptr<ProcessorFactory>& processorFactory,
      const boost::shared_ptr<TDuplexProtocolFactory>& duplexProtocolFactory,
      int port,
      const boost::shared_ptr<ThreadManager>& threadManager =
        boost::shared_ptr<ThreadManager>(),
      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
    TServer(processorFactory) {

    init(port);

    setDuplexProtocolFactory(duplexProtocolFactory);
    setThreadManager(threadManager);
  }

  template<typename Processor>
  TNonblockingServer(
      const boost::shared_ptr<Processor>& processor,
      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
      int port,
      const boost::shared_ptr<ThreadManager>& threadManager =
        boost::shared_ptr<ThreadManager>(),
      THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
    TServer(processor) {

    init(port);

    setProtocolFactory(protocolFactory);
    setThreadManager(threadManager);
  }

  template<typename Processor>
  TNonblockingServer(
      const boost::shared_ptr<Processor>& processor,
      const boost::shared_ptr<TDuplexProtocolFactory>& duplexProtocolFactory,
      int port,
      const boost::shared_ptr<ThreadManager>& threadManager =
        boost::shared_ptr<ThreadManager>(),
      THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
    TServer(processor) {

    init(port);

    setDuplexProtocolFactory(duplexProtocolFactory);
    setThreadManager(threadManager);
  }

  template<typename ProcessorFactory>
  TNonblockingServer(
      const boost::shared_ptr<ProcessorFactory>& processorFactory,
      const boost::shared_ptr<TDuplexTransportFactory>& duplexTransportFactory,
      const boost::shared_ptr<TDuplexProtocolFactory>& duplexProtocolFactory,
      int port,
      const boost::shared_ptr<ThreadManager>& threadManager =
        boost::shared_ptr<ThreadManager>(),
      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)):
    TServer(processorFactory) {

    init(port);

    setDuplexTransportFactory(duplexTransportFactory);
    setDuplexProtocolFactory(duplexProtocolFactory);
    setThreadManager(threadManager);
  }

  template<typename Processor>
  TNonblockingServer(
      const boost::shared_ptr<Processor>& processor,
      const boost::shared_ptr<TDuplexTransportFactory>& duplexTransportFactory,
      const boost::shared_ptr<TDuplexProtocolFactory>& duplexProtocolFactory,
      int port,
      const boost::shared_ptr<ThreadManager>& threadManager =
        boost::shared_ptr<ThreadManager>(),
      THRIFT_OVERLOAD_IF(Processor, TProcessor)):
    TServer(processor) {

    init(port);

    setDuplexTransportFactory(duplexTransportFactory);
    setDuplexProtocolFactory(duplexProtocolFactory);
    setThreadManager(threadManager);
  }

  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);

  boost::shared_ptr<ThreadManager> getThreadManager() {
    return threadManager_;
  }

  /**
   * Sets the number of IO threads used by this server. Can only be used before
   * the call to serve() and has no effect afterwards.  We always use a
   * PosixThreadFactory for the IO worker threads, because they must joinable
   * for clean shutdown.
   */
  void setNumIOThreads(size_t numThreads) {
    numIOThreads_ = numThreads;
  }

  /** Return whether the IO threads will get high scheduling priority */
  bool useHighPriorityIOThreads() const {
    return useHighPriorityIOThreads_;
  }

  /** Set whether the IO threads will get high scheduling priority. */
  void setUseHighPriorityIOThreads(bool val) {
    useHighPriorityIOThreads_ = val;
  }

  /** Return the number of IO threads used by this server. */
  size_t getNumIOThreads() const {
    return numIOThreads_;
  }

  /**
   * Get the maximum number of unused TConnection we will hold in reserve.
   *
   * @return the current limit on TConnection pool size.
   */
  size_t getConnectionStackLimit() const {
    return connectionStackLimit_;
  }

  /**
   * Set the maximum number of unused TConnection we will hold in reserve.
   *
   * @param sz the new limit for TConnection pool size.
   */
  void setConnectionStackLimit(size_t sz) {
    connectionStackLimit_ = sz;
  }

  bool isThreadPoolProcessing() const {
    return threadPoolProcessing_;
  }

  /**
    * set the listen backlog
    *
    * @param listenBacklog the new listen backlog
    */
  void setListenBacklog(int32_t listenBacklog) {
    listenBacklog_ = listenBacklog;
  }

  /**
   * Return the count of sockets currently connected to.
   *
   * @return count of connected sockets.
   */
  size_t getNumConnections() const {
    return numTConnections_;
  }

  /**
   * Return the count of sockets currently connected to.
   *
   * @return count of connected sockets.
   */
  size_t getNumActiveConnections() const {
    return getNumConnections() - getNumIdleConnections();
  }

  /**
   * Return the bytes in all connection's write buffer.
   *
   * @return bytes
   */
  size_t getWriteBufferBytes() const {
    concurrency::Guard g(connMutex_);
    return getWriteBufferBytesImpl();
  }

  /**
   * Return the bytes in all connection's read buffer.
   *
   * @return bytes
   */
  size_t getReadBufferBytes() const {
    concurrency::Guard g(connMutex_);
    return getReadBufferBytesImpl();
  }

  /**
   * Return the count of connection objects allocated but not in use.
   *
   * @return count of idle connection objects.
   */
  size_t getNumIdleConnections() const {
    return connectionStack_.size();
  }

  /**
   * Return count of number of connections which are currently processing.
   * This is defined as a connection where all data has been received and
   * either assigned a task (when threading) or passed to a handler (when
   * not threading), and where the handler has not yet returned.
   *
   * @return # of connections currently processing.
   */
  size_t getNumActiveProcessors() const {
    return numActiveProcessors_;
  }

  /**
   * Return the number of active/outstanding requests on the server. This
   * includes requests that are processing, waiting to be processed and have
   * been processed, but are waiting for the reply to be sent out over the wire.
   *
   * @return # of currently active requests.
   */
  size_t getNumActiveRequests() const {
    return numActiveRequests_;
  }

  /**
   * Get the maximum number of active requests.
   *
   * @return current setting.
   */
  size_t getMaxActiveRequests() const {
    return maxActiveRequests_;
  }

  /**
   * Set the maximum # of active requests allowed before overload.
   *
   * @param maxActiveRequsts new setting for maximum # of active requests.
   */
  void setMaxActiveRequests(size_t maxActiveRequests) {
    maxActiveRequests_ = maxActiveRequests;
  }

  /**
   * Get the maximum # of connections allowed before overload.
   *
   * @return current setting.
   */
  size_t getMaxConnections() const {
    return maxConnections_;
  }

  /**
   * Set the maximum # of connections allowed before overload.
   *
   * @param maxConnections new setting for maximum # of connections.
   */
  void setMaxConnections(size_t maxConnections) {
    maxConnections_ = maxConnections;
  }

  /**
   * Get the maximum # of connections waiting in handler/task before overload.
   *
   * @return current setting.
   */
  size_t getMaxActiveProcessors() const {
    return maxActiveProcessors_;
  }

  /**
   * Set the maximum # of connections waiting in handler/task before overload.
   *
   * @param maxActiveProcessors new setting for maximum # of active processes.
   */
  void setMaxActiveProcessors(size_t maxActiveProcessors) {
    maxActiveProcessors_ = maxActiveProcessors;
  }

 /**
   * Get the maximum memory usage allowed before overload.
   *
   * @return current setting.
   */
  uint64_t getMaxMemoryUsageBytes() const {
    return maxMemoryUsageBytes_;
  }

  /**
   * Set the maximum memory usage allowed before overload.
   *
   * @param maxMemoryUsage new setting for maximum memory usage.
   */
  void setMaxMemoryUsageBytes(uint64_t maxMemoryUsageBytes) {
    maxMemoryUsageBytes_ = maxMemoryUsageBytes;
  }

  /**
   * Get the maximum allowed frame size.
   *
   * If a client tries to send a message larger than this limit,
   * its connection will be closed.
   *
   * @return Maxium frame size, in bytes.
   */
  size_t getMaxFrameSize() const {
    return maxFrameSize_;
  }

  /**
   * Set the maximum allowed frame size, up to 2^31-1
   *
   * @param maxFrameSize The new maximum frame size.
   */
  void setMaxFrameSize(size_t maxFrameSize) {
    if (maxFrameSize > 0x7fffffff) {
      maxFrameSize = 0x7fffffff;
    }
    maxFrameSize_ = maxFrameSize;
  }

  /**
   * Get fraction of maximum limits before an overload condition is cleared.
   *
   * @return hysteresis fraction
   */
  double getOverloadHysteresis() const {
    return overloadHysteresis_;
  }

  /**
   * Set fraction of maximum limits before an overload condition is cleared.
   * A good value would probably be between 0.5 and 0.9.
   *
   * @param hysteresisFraction fraction <= 1.0.
   */
  void setOverloadHysteresis(double hysteresisFraction) {
    if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
      overloadHysteresis_ = hysteresisFraction;
    }
  }

  /**
   * Get duration of stopping accepts when overload
   *
   * @return pauseAcceptDuration_
   */
  int getPauseAcceptDuration() const {
    return pauseAcceptDuration_;
  }

  /**
   * Set the accept pause duration (when overload) in ms
   *
   * @param pauseDuration > 0
   */
  void setPauseAcceptDuration(int pauseDuration) {
    if (pauseDuration > 0) {
      pauseAcceptDuration_ = pauseDuration;
    }
  }

  /**
   * Get the action the server will take on overload.
   *
   * @return a TOverloadAction enum value for the currently set action.
   */
  TOverloadAction getOverloadAction() const {
    return overloadAction_;
  }

  /**
   * Set the action the server is to take on overload.
   *
   * @param overloadAction a TOverloadAction enum value for the action.
   */
  void setOverloadAction(TOverloadAction overloadAction) {
    overloadAction_ = overloadAction;
  }

  /**
   * Get the time in milliseconds after which a task expires (0 == infinite).
   *
   * @return a 64-bit time in milliseconds.
   */
  int64_t getTaskExpireTime() const {
    return taskExpireTime_;
  }

  /**
   * Set the time in milliseconds after which a task expires (0 == infinite).
   *
   * @param taskExpireTime a 64-bit time in milliseconds.
   */
  void setTaskExpireTime(int64_t taskExpireTime) {
    taskExpireTime_ = taskExpireTime;
  }

  /**
   * Determine if the server is currently overloaded.
   * This function checks the maximums for open connections and connections
   * currently in processing, and sets an overload condition if they are
   * exceeded.  The overload will persist until both values are below the
   * current hysteresis fraction of their maximums.
   *
   * @return true if an overload condition exists, false if not.
   */
  bool serverOverloaded();

  /**
   * Check if the server is overloaded and overloadAction_ is
   * T_OVERLOAD_PAUSE_ACCEPTING, and if so, pause the accept on the listening
   * socket
   *
   * @return true if overloadAction_ != OVERLOAD_PAUSE_ACCEPTING, or
   * if overload condition doesn't exist.
   */
  bool canContinueToAccept();

  /**
   * Check if the server is overloaded, and if so, take the necessary action.
   *
   * @param conn  The connection currently being processed that is triggering
   *     the overload check.
   *
   * Returns true if the current connection has been closed due to overload
   * processing, and false otherwise.  (Note that a false return value does not
   * mean that the server is not overloaded.)
   */
  bool checkForOverload(TConnection* conn);


  /** Pop and discard next task on threadpool wait queue.
   *
   * @return true if a task was discarded, false if the wait queue was empty.
   */
  bool drainPendingTask();

  /**
   * Get the starting size of a TConnection object's write buffer.
   *
   * @return # bytes we initialize a TConnection object's write buffer to.
   */
  size_t getWriteBufferDefaultSize() const {
    return writeBufferDefaultSize_;
  }

  /**
   * Set the starting size of a TConnection object's write buffer.
   *
   * @param size # bytes we initialize a TConnection object's write buffer to.
   */
  void setWriteBufferDefaultSize(size_t size) {
    writeBufferDefaultSize_ = size;
  }

  /**
   * Get the maximum size of read buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will dealloc idle buffer.
   */
  size_t getIdleReadBufferLimit() const {
    return idleReadBufferLimit_;
  }

  /**
   * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
   * Get the maximum size of read buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will dealloc idle buffer.
   */
  size_t getIdleBufferMemLimit() const {
    return idleReadBufferLimit_;
  }

  /**
   * Set the maximum size read buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its read buffer, we free it and allow it to be reinitialized
   * on the next received frame.
   *
   * @param limit of bytes beyond which we will shrink buffers when checked.
   */
  void setIdleReadBufferLimit(size_t limit) {
    idleReadBufferLimit_ = limit;
  }

  /**
   * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
   * Set the maximum size read buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its read buffer, we free it and allow it to be reinitialized
   * on the next received frame.
   *
   * @param limit of bytes beyond which we will shrink buffers when checked.
   */
  void setIdleBufferMemLimit(size_t limit) {
    idleReadBufferLimit_ = limit;
  }



  /**
   * Get the maximum size of write buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will reallocate buffers when checked.
   */
  size_t getIdleWriteBufferLimit() const {
    return idleWriteBufferLimit_;
  }

  /**
   * Set the maximum size write buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its write buffer, we destroy and construct that buffer with
   * writeBufferDefaultSize_ bytes.
   *
   * @param limit of bytes beyond which we will shrink buffers when idle.
   */
  void setIdleWriteBufferLimit(size_t limit) {
    idleWriteBufferLimit_ = limit;
  }

  /**
   * Get # of calls made between buffer size checks.  0 means disabled.
   *
   * @return # of calls between buffer size checks.
   */
  int32_t getResizeBufferEveryN() const {
    return resizeBufferEveryN_;
  }

  /**
   * Check buffer sizes every "count" calls.  This allows buffer limits
   * to be enforced for persistant connections with a controllable degree
   * of overhead. 0 disables checks except at connection close.
   *
   * @param count the number of calls between checks, or 0 to disable
   */
  void setResizeBufferEveryN(int32_t count) {
    resizeBufferEveryN_ = count;
  }

  /**
   * Sets the sockOptHelper_
   * Socket options on underlying TSockets are set by passing
   * the sockOptHelper_ to TSocket::setSocketOptions() function
   * in the TConnection constructor
   */
  void setSocketOptions(const TSocket::Options& oh) {
    sockOptHelper_ = oh;
  }

  void setObserver(
               const boost::shared_ptr<TNonblockingServerObserver>& observer) {
    observer_ = observer;
  }

  /**
   * Main workhorse function, starts up the server listening on a port and
   * loops over the libevent handler.
   */
  void serve();

  /**
   * Causes the server to terminate gracefully (can be called from any thread).
   */
  void stop();

  TConnectionContext* getConnectionContext() const;

 private:
  void addTask(boost::shared_ptr<Runnable> task) {
    threadManager_->add(task, 0LL, taskExpireTime_);
  }

  void setCurrentConnection(TConnection* conn) {
    assert(currentConnection_.get() == NULL);
    currentConnection_.set(conn);
  }
  void clearCurrentConnection() {
    currentConnection_.clear();
  }

  /**
   * Callback function that the threadmanager calls when a task reaches
   * its expiration time.  It is needed to clean up the expired connection.
   *
   * @param task the runnable associated with the expired task.
   */
  void expireClose(boost::shared_ptr<Runnable> task);

  const boost::shared_ptr<TNonblockingServerObserver>& getObserver() const {
    return observer_;
  }

  /// Creates a socket to listen on and binds it to the local port.
  void createAndListenOnSocket();

  /**
   * Takes a socket created by createAndListenOnSocket() and sets various
   * options on it to prepare for use in the server.
   *
   * @param fd descriptor of socket to be initialized/
   */
  void listenSocket(int fd);
  /**
   * Return an initialized connection object.  Creates or recovers from
   * pool a TConnection and initializes it with the provided socket FD
   * and flags.
   *
   * @param socket FD of socket associated with this connection.
   * @param addr the sockaddr of the client
   * @param addrLen the length of addr
   * @return pointer to initialized TConnection object.
   */
  TConnection* createConnection(int socket, const sockaddr* addr,
                                            socklen_t addrLen);

  /**
   * Returns a connection to pool or deletion.  If the connection pool
   * (a stack) isn't full, place the connection object on it, otherwise
   * just delete it.
   *
   * @param connection the TConection being returned.
   */
  void returnConnection(TConnection* connection);
};

class TNonblockingIOThread : public Runnable {
 public:
  // Creates an IO thread and sets up the event base.  The listenSocket should
  // be a valid FD on which listen() has already been called.  If the
  // listenSocket is < 0, accepting will not be done.
  TNonblockingIOThread(TNonblockingServer* server,
                       int number,
                       int listenSocket,
                       bool useHighPriority);

  ~TNonblockingIOThread();

  // Returns the event-base for this thread.
  event_base* getEventBase() const { return eventBase_; }

  // Returns the server for this thread.
  TNonblockingServer* getServer() const { return server_; }

  // Returns the number of this IO thread.
  int getThreadNumber() const { return number_; }

  // Returns the thread id associated with this object.  This should
  // only be called after the thread has been started.
  pthread_t getThreadId() const { return threadId_; }

  // Returns the send-fd for task complete notifications.
  int getNotificationSendFD() const { return notificationPipeFDs_[1]; }

  // Returns the read-fd for task complete notifications.
  int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }

  // Returns the actual thread object associated with this IO thread.
  boost::shared_ptr<Thread> getThread() const { return thread_; }

  // Sets the actual thread object associated with this IO thread.
  void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }

  // Used by TConnection objects to indicate processing has finished.
  bool notify(TNonblockingServer::TConnection* conn);

  // Enters the event loop and does not return until a call to stop().
  virtual void run();

  // Exits the event loop as soon as possible.
  void stop();

  // Ensures that the event-loop thread is fully finished and shut down.
  void join();

  /// Maintains a count of requests for sampling purposes
  /// request counter is maintained module the sample rate
  uint32_t incrementRequestCounter(uint32_t sampleRate);

  /// Return true if the currently running thread is this I/O thread
  bool isInIOThread() const {
    return pthread_equal(pthread_self(), threadId_);
  }

  /// Pauses accept event handling for pauseDuration milliseconds
  void pauseAcceptHandling(int pauseDuration);

 private:
  /**
   * C-callable event handler for signaling task completion.  Provides a
   * callback that libevent can understand that will read a connection
   * object's address from a pipe and call connection->transition() for
   * that object.
   *
   * @param fd the descriptor the event occurred on.
   */
  static void notifyHandler(int fd, short which, void* v);

  /**
   * C-callable event handler for listener events.  Provides a callback
   * that libevent can understand which invokes server->handleEvent().
   *
   * @param fd the descriptor the event occurred on.
   * @param which the flags associated with the event.
   * @param v void* callback arg where we placed TNonblockingServer's "this".
   */
  static void listenHandler(int fd, short which, void* v) {
    ((TNonblockingServer*)v)->handleEvent(fd, which);
  }

  /**
   * C-callable event handler to re-enable accept handling.
   * Provides a callback that libevent can understand.
   *
   * @param fd unused.
   * @param which the flags associated with the event.
   * @param v void* callback arg where we placed TNonblockingIOThread's "this".
   */
  static void reenableAcceptHandler(int fd, short which, void* v) {
    ((TNonblockingIOThread*)v)->reenableAccept();
  }

  /// Re-enables accept event handling.
  void reenableAccept();

  /// Exits the loop ASAP in case of shutdown or error.
  void breakLoop(bool error);

  /// Registers the events for the notification & listen sockets
  void registerEvents();

  /// Create the pipe used to notify I/O process of task completion.
  void createNotificationPipe();

  /// Unregisters our events for notification and listen sockets.
  void cleanupEvents();

  /// Sets (or clears) high priority scheduling status for the current thread.
  void setCurrentThreadHighPriority(bool value);

 private:
  /// associated server
  TNonblockingServer* server_;

  /// thread number (for debugging).
  const int number_;

  /// The actual physical thread id.
  pthread_t threadId_;

  /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
  int listenSocket_;

  /// Sets a high scheduling priority when running
  bool useHighPriority_;

  /// pointer to eventbase to be used for looping
  event_base* eventBase_;

  /// Whether the server is currently accepting connections
  bool acceptingConnections_;

  /// Used with eventBase_ for connection events (only in listener thread)
  struct event serverEvent_;

  /// Used with eventBase_ for backing off on accept handling until there are
  /// available fds
  struct event acceptBackoffEvent_;

  /// Used with eventBase_ for task completion notification
  struct event notificationEvent_;

 /// File descriptors for pipe used for task completion notification.
  int notificationPipeFDs_[2];

  /// Actual IO Thread
  boost::shared_ptr<Thread> thread_;

  /// Call counter
  uint32_t requestCounter_;
};


}}} // apache::thrift::server

#endif // #ifndef THRIFT_SERVER_TNONBLOCKINGSERVER_H_