@ -6,13 +6,6 @@ mod message;
pub use self ::{ frame ::CloseFrame , message ::Message } ;
pub use self ::{ frame ::CloseFrame , message ::Message } ;
use log ::* ;
use std ::{
collections ::VecDeque ,
io ::{ ErrorKind as IoErrorKind , Read , Write } ,
mem ::replace ,
} ;
use self ::{
use self ::{
frame ::{
frame ::{
coding ::{ CloseCode , Control as OpCtl , Data as OpData , OpCode } ,
coding ::{ CloseCode , Control as OpCtl , Data as OpData , OpCode } ,
@ -24,6 +17,11 @@ use crate::{
error ::{ Error , ProtocolError , Result } ,
error ::{ Error , ProtocolError , Result } ,
util ::NonBlockingResult ,
util ::NonBlockingResult ,
} ;
} ;
use log ::* ;
use std ::{
io ::{ ErrorKind as IoErrorKind , Read , Write } ,
mem ::replace ,
} ;
/// Indicates a Client or Server role of the websocket
/// Indicates a Client or Server role of the websocket
#[ derive(Debug, Clone, Copy, PartialEq, Eq) ]
#[ derive(Debug, Clone, Copy, PartialEq, Eq) ]
@ -37,10 +35,21 @@ pub enum Role {
/// The configuration for WebSocket connection.
/// The configuration for WebSocket connection.
#[ derive(Debug, Clone, Copy) ]
#[ derive(Debug, Clone, Copy) ]
pub struct WebSocketConfig {
pub struct WebSocketConfig {
/// The size of the send queue. You can use it to turn on/off the backpressure features. `None`
/// Does nothing, instead use `max_write_buffer_size`.
/// means here that the size of the queue is unlimited. The default value is the unlimited
#[ deprecated ]
/// queue.
pub max_send_queue : Option < usize > ,
pub max_send_queue : Option < usize > ,
/// The target minimum size of the write buffer to reach before writing the data
/// to the underlying stream.
/// The default value is 128 KiB.
///
/// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
pub write_buffer_size : usize ,
/// The max size of the write buffer in bytes. Setting this can provide backpressure
/// in the case the write buffer is filling up due to write errors.
/// The default value is unlimited.
///
/// Note: Should always be set higher than [`write_buffer_size`](Self::write_buffer_size).
pub max_write_buffer_size : usize ,
/// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
/// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
/// which should be reasonably big for all normal use-cases but small enough to prevent
/// which should be reasonably big for all normal use-cases but small enough to prevent
/// memory eating by a malicious user.
/// memory eating by a malicious user.
@ -60,8 +69,11 @@ pub struct WebSocketConfig {
impl Default for WebSocketConfig {
impl Default for WebSocketConfig {
fn default ( ) -> Self {
fn default ( ) -> Self {
#[ allow(deprecated) ]
WebSocketConfig {
WebSocketConfig {
max_send_queue : None ,
max_send_queue : None ,
write_buffer_size : 128 * 1024 ,
max_write_buffer_size : usize ::MAX ,
max_message_size : Some ( 64 < < 20 ) ,
max_message_size : Some ( 64 < < 20 ) ,
max_frame_size : Some ( 16 < < 20 ) ,
max_frame_size : Some ( 16 < < 20 ) ,
accept_unmasked_frames : false ,
accept_unmasked_frames : false ,
@ -73,6 +85,8 @@ impl Default for WebSocketConfig {
///
///
/// This is THE structure you want to create to be able to speak the WebSocket protocol.
/// This is THE structure you want to create to be able to speak the WebSocket protocol.
/// It may be created by calling `connect`, `accept` or `client` functions.
/// It may be created by calling `connect`, `accept` or `client` functions.
///
/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
#[ derive(Debug) ]
#[ derive(Debug) ]
pub struct WebSocket < Stream > {
pub struct WebSocket < Stream > {
/// The underlying socket.
/// The underlying socket.
@ -146,82 +160,116 @@ impl<Stream> WebSocket<Stream> {
impl < Stream : Read + Write > WebSocket < Stream > {
impl < Stream : Read + Write > WebSocket < Stream > {
/// Read a message from stream, if possible.
/// Read a message from stream, if possible.
///
///
/// This will queue responses to ping and close messages to be sent. It will call
/// This will also queue responses to ping and close messages. These responses
/// `write_pending` before trying to read in order to make sure that those responses
/// will be written and flushed on the next call to [`read`](Self::read),
/// make progress even if you never call `write_pending`. That does mean that they
/// [`write`](Self::write) or [`flush`](Self::flush).
/// get sent out earliest on the next call to `read_message`, `write_message` or `write_pending`.
///
///
/// ## Closing the connection
/// # Closing the connection
/// When the remote endpoint decides to close the connection this will return
/// When the remote endpoint decides to close the connection this will return
/// the close message with an optional close frame.
/// the close message with an optional close frame.
///
///
/// You should continue calling `read_message`, `write_message` or `write_pending` to drive
/// You should continue calling [`read`](Self::read), [`write`](Self::write) or
/// the reply to the close frame until [Error::ConnectionClosed] is returned. Once that happens
/// [`flush`](Self::flush) to drive the reply to the close frame until [` Error::ConnectionClosed` ]
/// it is safe to drop the underlying connection.
/// is returned. Once that happens i t is safe to drop the underlying connection.
pub fn read_message ( & mut self ) -> Result < Message > {
pub fn read ( & mut self ) -> Result < Message > {
self . context . read_message ( & mut self . socket )
self . context . read ( & mut self . socket )
}
}
/// Send a message to stream, if possible.
/// Writes and immediately flushes a message.
/// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
pub fn send ( & mut self , message : Message ) -> Result < ( ) > {
self . write ( message ) ? ;
self . flush ( )
}
/// Write a message to the provided stream, if possible.
///
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
///
///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// In the event of stream write failure the message frame will be stored
/// requests. A Pong reply will jump the queue because the
/// in the write buffer and will try again on the next call to [`write`](Self::write)
/// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent
/// or [`flush`](Self::flush).
/// as soon as is practical.
///
///
/// Note that upon receiving a ping message, tungstenite cues a pong reply automatically.
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
/// When you call either `read_message`, `write_message` or `write_pending` next it will try to send
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
/// that pong out if the underlying connection can take more data. This means you should not
///
/// respond to ping frames manually.
/// This call will generally not flush. However, if there are queued automatic messages
/// they will be written and eagerly flushed.
///
/// For example, upon receiving ping messages tungstenite queues pong replies automatically.
/// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
/// will write & flush the pong reply. This means you should not respond to ping frames manually.
///
///
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
/// if `read_message` returns a ping, you should call `write_pending` until it doesn't return
/// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
/// WouldBlock before passing a pong to `write_message`, otherwise the response to the
/// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
/// ping will not be sent, but rather replaced by your custom pong message.
/// ping will not be sent as it will be replaced by your custom pong message.
///
///
/// ## Errors
/// # Errors
/// - If the WebSocket's send queue is full, `SendQueueFull` will be returned
/// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
/// along with the equivalent passed message frame.
/// - If the connection is closed and should be dropped, this will return [Error::ConnectionClosed].
/// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
/// - If you try again after [Error::ConnectionClosed] was returned either from here or from `read_message`,
/// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
/// [Error::AlreadyClosed] will be returned. This indicates a program error on your part.
/// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
/// - [Error::Io] is returned if the underlying connection returns an error
/// error on your part.
/// - [`Error::Io`] is returned if the underlying connection returns an error
/// (consider these fatal except for WouldBlock).
/// (consider these fatal except for WouldBlock).
/// - [Error::Capacity] if your message size is bigger than the configured max message size.
/// - [` Error::Capacity` ] if your message size is bigger than the configured max message size.
pub fn write_message ( & mut self , message : Message ) -> Result < ( ) > {
pub fn write ( & mut self , message : Message ) -> Result < ( ) > {
self . context . write_message ( & mut self . socket , message )
self . context . write ( & mut self . socket , message )
}
}
/// Flush the pending send queue.
/// Flush writes.
pub fn write_pending ( & mut self ) -> Result < ( ) > {
///
self . context . write_pending ( & mut self . socket )
/// Ensures all messages previously passed to [`write`](Self::write) and automatic
/// queued pong responses are written & flushed into the underlying stream.
pub fn flush ( & mut self ) -> Result < ( ) > {
self . context . flush ( & mut self . socket )
}
}
/// Close the connection.
/// Close the connection.
///
///
/// This function guarantees that the close frame will be queued.
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is
/// There is no need to call it again. Calling this function is
/// the same as calling `write_message (Message::Close(..))`.
/// the same as calling `write(Message::Close(..))`.
///
///
/// After queuing the close frame you should continue calling `read_message` or
/// After queuing the close frame you should continue calling [`read`](Self::read) or
/// `write_pending` to drive the close handshake to completion.
/// [`flush`](Self::flush) to drive the close handshake to completion.
///
///
/// The websocket RFC defines that the underlying connection should be closed
/// The websocket RFC defines that the underlying connection should be closed
/// by the server. Tungstenite takes care of this asymmetry for you.
/// by the server. Tungstenite takes care of this asymmetry for you.
///
///
/// When the close handshake is finished (we have both sent and received
/// When the close handshake is finished (we have both sent and received
/// a close message), `read_message` or `write_pending` will return
/// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
/// [Error::ConnectionClosed] if this endpoint is the server.
/// [Error::ConnectionClosed] if this endpoint is the server.
///
///
/// If this endpoint is a client, [Error::ConnectionClosed] will only be
/// If this endpoint is a client, [Error::ConnectionClosed] will only be
/// returned after the server has closed the underlying connection.
/// returned after the server has closed the underlying connection.
///
///
/// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
/// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
/// is returned from `read_message` or `write_pending` .
/// is returned from [`read`](Self::read) or [`flush`](Self::flush) .
pub fn close ( & mut self , code : Option < CloseFrame > ) -> Result < ( ) > {
pub fn close ( & mut self , code : Option < CloseFrame > ) -> Result < ( ) > {
self . context . close ( & mut self . socket , code )
self . context . close ( & mut self . socket , code )
}
}
/// Old name for [`read`](Self::read).
#[ deprecated(note = " Use `read` " ) ]
pub fn read_message ( & mut self ) -> Result < Message > {
self . read ( )
}
/// Old name for [`send`](Self::send).
#[ deprecated(note = " Use `send` " ) ]
pub fn write_message ( & mut self , message : Message ) -> Result < ( ) > {
self . send ( message )
}
/// Old name for [`flush`](Self::flush).
#[ deprecated(note = " Use `flush` " ) ]
pub fn write_pending ( & mut self ) -> Result < ( ) > {
self . flush ( )
}
}
}
/// A context for managing WebSocket stream.
/// A context for managing WebSocket stream.
@ -235,10 +283,8 @@ pub struct WebSocketContext {
state : WebSocketState ,
state : WebSocketState ,
/// Receive: an incomplete message being processed.
/// Receive: an incomplete message being processed.
incomplete : Option < IncompleteMessage > ,
incomplete : Option < IncompleteMessage > ,
/// Send: a data send queue.
/// Send in addition to regular messages E.g. "pong" or "close".
send_queue : VecDeque < Frame > ,
additional_send : Option < Frame > ,
/// Send: an OOB pong message.
pong : Option < Frame > ,
/// The configuration for the websocket session.
/// The configuration for the websocket session.
config : WebSocketConfig ,
config : WebSocketConfig ,
}
}
@ -246,28 +292,32 @@ pub struct WebSocketContext {
impl WebSocketContext {
impl WebSocketContext {
/// Create a WebSocket context that manages a post-handshake stream.
/// Create a WebSocket context that manages a post-handshake stream.
pub fn new ( role : Role , config : Option < WebSocketConfig > ) -> Self {
pub fn new ( role : Role , config : Option < WebSocketConfig > ) -> Self {
WebSocketContext {
Self ::_new ( role , FrameCodec ::new ( ) , config . unwrap_or_default ( ) )
role ,
frame : FrameCodec ::new ( ) ,
state : WebSocketState ::Active ,
incomplete : None ,
send_queue : VecDeque ::new ( ) ,
pong : None ,
config : config . unwrap_or_default ( ) ,
}
}
}
/// Create a WebSocket context that manages an post-handshake stream.
/// Create a WebSocket context that manages an post-handshake stream.
pub fn from_partially_read ( part : Vec < u8 > , role : Role , config : Option < WebSocketConfig > ) -> Self {
pub fn from_partially_read ( part : Vec < u8 > , role : Role , config : Option < WebSocketConfig > ) -> Self {
WebSocketContext {
Self ::_new ( role , FrameCodec ::from_partially_read ( part ) , config . unwrap_or_default ( ) )
frame : FrameCodec ::from_partially_read ( part ) ,
}
.. WebSocketContext ::new ( role , config )
fn _new ( role : Role , mut frame : FrameCodec , config : WebSocketConfig ) -> Self {
frame . set_max_out_buffer_len ( config . max_write_buffer_size ) ;
frame . set_out_buffer_write_len ( config . write_buffer_size ) ;
Self {
role ,
frame ,
state : WebSocketState ::Active ,
incomplete : None ,
additional_send : None ,
config ,
}
}
}
}
/// Change the configuration.
/// Change the configuration.
pub fn set_config ( & mut self , set_func : impl FnOnce ( & mut WebSocketConfig ) ) {
pub fn set_config ( & mut self , set_func : impl FnOnce ( & mut WebSocketConfig ) ) {
set_func ( & mut self . config )
set_func ( & mut self . config ) ;
self . frame . set_max_out_buffer_len ( self . config . max_write_buffer_size ) ;
self . frame . set_out_buffer_write_len ( self . config . write_buffer_size ) ;
}
}
/// Read the configuration.
/// Read the configuration.
@ -294,17 +344,23 @@ impl WebSocketContext {
///
///
/// This function sends pong and close responses automatically.
/// This function sends pong and close responses automatically.
/// However, it never blocks on write.
/// However, it never blocks on write.
pub fn read_message < Stream > ( & mut self , stream : & mut Stream ) -> Result < Message >
pub fn read < Stream > ( & mut self , stream : & mut Stream ) -> Result < Message >
where
where
Stream : Read + Write ,
Stream : Read + Write ,
{
{
// Do not read from already closed connections.
// Do not read from already closed connections.
self . state . check_active ( ) ? ;
self . state . check_not_terminated ( ) ? ;
loop {
loop {
// Since we may get ping or close, we need to reply to the messages even during read.
if self . additional_send . is_some ( ) {
// Thus we call write_pending() but ignore its blocking.
// Since we may get ping or close, we need to reply to the messages even during read.
self . write_pending ( stream ) . no_block ( ) ? ;
// Thus we flush but ignore its blocking.
self . flush ( stream ) . no_block ( ) ? ;
} else if self . role = = Role ::Server & & ! self . state . can_read ( ) {
self . state = WebSocketState ::Terminated ;
return Err ( Error ::ConnectionClosed ) ;
}
// If we get here, either write blocks or we have nothing to write.
// If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock.
// Thus if read blocks, just let it return WouldBlock.
if let Some ( message ) = self . read_message_frame ( stream ) ? {
if let Some ( message ) = self . read_message_frame ( stream ) ? {
@ -314,78 +370,94 @@ impl WebSocketContext {
}
}
}
}
/// Send a message to the provided stream, if possible .
/// Write a message to the provided stream .
///
///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
/// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
///
///
/// Note that only the last pong frame is stored to be sent, and only the
/// In the event of stream write failure the message frame will be stored
/// most recent pong frame is sent if multiple pong frames are queued.
/// in the write buffer and will try again on the next call to [`write`](Self::write)
pub fn write_message < Stream > ( & mut self , stream : & mut Stream , message : Message ) -> Result < ( ) >
/// or [`flush`](Self::flush).
///
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
pub fn write < Stream > ( & mut self , stream : & mut Stream , message : Message ) -> Result < ( ) >
where
where
Stream : Read + Write ,
Stream : Read + Write ,
{
{
// When terminated, return AlreadyClosed.
// When terminated, return AlreadyClosed.
self . state . check_active ( ) ? ;
self . state . check_not_terminated ( ) ? ;
// Do not write after sending a close frame.
// Do not write after sending a close frame.
if ! self . state . is_active ( ) {
if ! self . state . is_active ( ) {
return Err ( Error ::Protocol ( ProtocolError ::SendAfterClosing ) ) ;
return Err ( Error ::Protocol ( ProtocolError ::SendAfterClosing ) ) ;
}
}
if let Some ( max_send_queue ) = self . config . max_send_queue {
if self . send_queue . len ( ) > = max_send_queue {
// Try to make some room for the new message.
// Do not return here if write would block, ignore WouldBlock silently
// since we must queue the message anyway.
self . write_pending ( stream ) . no_block ( ) ? ;
}
if self . send_queue . len ( ) > = max_send_queue {
return Err ( Error ::SendQueueFull ( message ) ) ;
}
}
let frame = match message {
let frame = match message {
Message ::Text ( data ) = > Frame ::message ( data . into ( ) , OpCode ::Data ( OpData ::Text ) , true ) ,
Message ::Text ( data ) = > Frame ::message ( data . into ( ) , OpCode ::Data ( OpData ::Text ) , true ) ,
Message ::Binary ( data ) = > Frame ::message ( data , OpCode ::Data ( OpData ::Binary ) , true ) ,
Message ::Binary ( data ) = > Frame ::message ( data , OpCode ::Data ( OpData ::Binary ) , true ) ,
Message ::Ping ( data ) = > Frame ::ping ( data ) ,
Message ::Ping ( data ) = > Frame ::ping ( data ) ,
Message ::Pong ( data ) = > {
Message ::Pong ( data ) = > {
self . pong = Some ( Frame ::pong ( data ) ) ;
self . set_additional ( Frame ::pong ( data ) ) ;
return self . write_pending ( stream ) ;
// Note: user pongs can be user flushed so no need to flush here
return self . _write ( stream , None ) . map ( | _ | ( ) ) ;
}
}
Message ::Close ( code ) = > return self . close ( stream , code ) ,
Message ::Close ( code ) = > return self . close ( stream , code ) ,
Message ::Frame ( f ) = > f ,
Message ::Frame ( f ) = > f ,
} ;
} ;
self . send_queue . push_back ( frame ) ;
let should_flush = self . _write ( stream , Some ( frame ) ) ? ;
self . write_pending ( stream )
if should_flush {
self . flush ( stream ) ? ;
}
Ok ( ( ) )
}
}
/// Flush the pending send queue.
/// Flush writes.
pub fn write_pending < Stream > ( & mut self , stream : & mut Stream ) -> Result < ( ) >
///
/// Ensures all messages previously passed to [`write`](Self::write) and automatically
/// queued pong responses are written & flushed into the `stream`.
#[ inline ]
pub fn flush < Stream > ( & mut self , stream : & mut Stream ) -> Result < ( ) >
where
where
Stream : Read + Write ,
Stream : Read + Write ,
{
{
// First, make sure we have no pending frame sending.
self . _write ( stream , None ) ? ;
self . frame . write_pending ( stream ) ? ;
self . frame . write_out_buffer ( stream ) ? ;
Ok ( stream . flush ( ) ? )
}
/// Writes any data in the out_buffer, `additional_send` and given `data`.
///
/// Does **not** flush.
///
/// Returns true if the write contents indicate we should flush immediately.
fn _write < Stream > ( & mut self , stream : & mut Stream , data : Option < Frame > ) -> Result < bool >
where
Stream : Read + Write ,
{
if let Some ( data ) = data {
self . buffer_frame ( stream , data ) ? ;
}
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD
// response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455)
// respond with Pong frame as soon as is practical. (RFC 6455)
if let Some ( pong ) = self . pong . take ( ) {
let should_flush = if let Some ( msg ) = self . additional_send . take ( ) {
trace ! ( "Sending pong reply" ) ;
trace ! ( "Sending pong/close" ) ;
self . send_one_frame ( stream , pong ) ? ;
match self . buffer_frame ( stream , msg ) {
}
Err ( Error ::WriteBufferFull ( Message ::Frame ( msg ) ) ) = > {
// If we have any unsent frames, send them.
// if an system message would exceed the buffer put it back in
trace ! ( "Frames still in queue: {}" , self . send_queue . len ( ) ) ;
// `additional_send` for retry. Otherwise returning this error
while let Some ( data ) = self . send_queue . pop_front ( ) {
// may not make sense to the user, e.g. calling `flush`.
self . send_one_frame ( stream , data ) ? ;
self . set_additional ( msg ) ;
}
false
}
// If we get to this point, the send queue is empty and the underlying socket is still
Err ( err ) = > return Err ( err ) ,
// willing to take more data.
Ok ( _ ) = > true ,
}
} else {
false
} ;
// If we're closing and there is nothing to send anymore, we should close the connection.
// If we're closing and there is nothing to send anymore, we should close the connection.
if self . role = = Role ::Server & & ! self . state . can_read ( ) {
if self . role = = Role ::Server & & ! self . state . can_read ( ) {
@ -395,10 +467,11 @@ impl WebSocketContext {
// maximum segment lifetimes (2MSL), while there is no corresponding
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
// a new SYN with a higher seq number). (RFC 6455)
self . frame . write_out_buffer ( stream ) ? ;
self . state = WebSocketState ::Terminated ;
self . state = WebSocketState ::Terminated ;
Err ( Error ::ConnectionClosed )
Err ( Error ::ConnectionClosed )
} else {
} else {
Ok ( ( ) )
Ok ( should_flush )
}
}
}
}
@ -406,7 +479,7 @@ impl WebSocketContext {
///
///
/// This function guarantees that the close frame will be queued.
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is
/// There is no need to call it again. Calling this function is
/// the same as calling `write (Message::Close(..))`.
/// the same as calling `send (Message::Close(..))`.
pub fn close < Stream > ( & mut self , stream : & mut Stream , code : Option < CloseFrame > ) -> Result < ( ) >
pub fn close < Stream > ( & mut self , stream : & mut Stream , code : Option < CloseFrame > ) -> Result < ( ) >
where
where
Stream : Read + Write ,
Stream : Read + Write ,
@ -414,11 +487,9 @@ impl WebSocketContext {
if let WebSocketState ::Active = self . state {
if let WebSocketState ::Active = self . state {
self . state = WebSocketState ::ClosedByUs ;
self . state = WebSocketState ::ClosedByUs ;
let frame = Frame ::close ( code ) ;
let frame = Frame ::close ( code ) ;
self . send_queue . push_back ( frame ) ;
self . _write ( stream , Some ( frame ) ) ? ;
} else {
// Already closed, nothing to do.
}
}
self . write_pending ( stream )
self . flush ( stream )
}
}
/// Try to decode one message frame. May return None.
/// Try to decode one message frame. May return None.
@ -487,7 +558,7 @@ impl WebSocketContext {
let data = frame . into_data ( ) ;
let data = frame . into_data ( ) ;
// No ping processing after we sent a close frame.
// No ping processing after we sent a close frame.
if self . state . is_active ( ) {
if self . state . is_active ( ) {
self . pong = Some ( Frame ::pong ( data . clone ( ) ) ) ;
self . set_additional ( Frame ::pong ( data . clone ( ) ) ) ;
}
}
Ok ( Some ( Message ::Ping ( data ) ) )
Ok ( Some ( Message ::Ping ( data ) ) )
}
}
@ -571,7 +642,7 @@ impl WebSocketContext {
let reply = Frame ::close ( close . clone ( ) ) ;
let reply = Frame ::close ( close . clone ( ) ) ;
debug ! ( "Replying to close with {:?}" , reply ) ;
debug ! ( "Replying to close with {:?}" , reply ) ;
self . send_queue . push_back ( reply ) ;
self . set_additional ( reply ) ;
Some ( close )
Some ( close )
}
}
@ -588,8 +659,8 @@ impl WebSocketContext {
}
}
}
}
/// Send a single pending frame .
/// Write a single frame into the write-buffer .
fn send_one _frame< Stream > ( & mut self , stream : & mut Stream , mut frame : Frame ) -> Result < ( ) >
fn buffer _frame< Stream > ( & mut self , stream : & mut Stream , mut frame : Frame ) -> Result < ( ) >
where
where
Stream : Read + Write ,
Stream : Read + Write ,
{
{
@ -603,7 +674,18 @@ impl WebSocketContext {
}
}
trace ! ( "Sending frame: {:?}" , frame ) ;
trace ! ( "Sending frame: {:?}" , frame ) ;
self . frame . write_frame ( stream , frame ) . check_connection_reset ( self . state )
self . frame . buffer_frame ( stream , frame ) . check_connection_reset ( self . state )
}
/// Replace `additional_send` if it is currently a `Pong` message.
fn set_additional ( & mut self , add : Frame ) {
let empty_or_pong = self
. additional_send
. as_ref ( )
. map_or ( true , | f | f . header ( ) . opcode = = OpCode ::Control ( OpCtl ::Pong ) ) ;
if empty_or_pong {
self . additional_send . replace ( add ) ;
}
}
}
}
}
@ -636,7 +718,7 @@ impl WebSocketState {
}
}
/// Check if the state is active, return error if not.
/// Check if the state is active, return error if not.
fn check_active ( self ) -> Result < ( ) > {
fn check_not_terminated ( self ) -> Result < ( ) > {
match self {
match self {
WebSocketState ::Terminated = > Err ( Error ::AlreadyClosed ) ,
WebSocketState ::Terminated = > Err ( Error ::AlreadyClosed ) ,
_ = > Ok ( ( ) ) ,
_ = > Ok ( ( ) ) ,
@ -688,64 +770,6 @@ mod tests {
}
}
}
}
struct WouldBlockStreamMoc ;
impl io ::Write for WouldBlockStreamMoc {
fn write ( & mut self , _ : & [ u8 ] ) -> io ::Result < usize > {
Err ( io ::Error ::new ( io ::ErrorKind ::WouldBlock , "would block" ) )
}
fn flush ( & mut self ) -> io ::Result < ( ) > {
Err ( io ::Error ::new ( io ::ErrorKind ::WouldBlock , "would block" ) )
}
}
impl io ::Read for WouldBlockStreamMoc {
fn read ( & mut self , _ : & mut [ u8 ] ) -> io ::Result < usize > {
Err ( io ::Error ::new ( io ::ErrorKind ::WouldBlock , "would block" ) )
}
}
#[ test ]
fn queue_logic ( ) {
// Create a socket with the queue size of 1.
let mut socket = WebSocket ::from_raw_socket (
WouldBlockStreamMoc ,
Role ::Client ,
Some ( WebSocketConfig { max_send_queue : Some ( 1 ) , .. Default ::default ( ) } ) ,
) ;
// Test message that we're going to send.
let message = Message ::Binary ( vec! [ 0xFF ; 1024 ] ) ;
// Helper to check the error.
let assert_would_block = | error | {
if let Error ::Io ( io_error ) = error {
assert_eq! ( io_error . kind ( ) , io ::ErrorKind ::WouldBlock ) ;
} else {
panic! ( "Expected WouldBlock error" ) ;
}
} ;
// The first attempt of writing must not fail, since the queue is empty at start.
// But since the underlying mock object always returns `WouldBlock`, so is the result.
assert_would_block ( dbg! ( socket . write_message ( message . clone ( ) ) . unwrap_err ( ) ) ) ;
// Any subsequent attempts must return an error telling that the queue is full.
for _i in 0 .. 100 {
assert! ( matches! (
socket . write_message ( message . clone ( ) ) . unwrap_err ( ) ,
Error ::SendQueueFull ( .. )
) ) ;
}
// The size of the output buffer must not be bigger than the size of that message
// that we managed to write to the output buffer at first. Since we could not make
// any progress (because of the logic of the moc buffer), the size remains unchanged.
if socket . context . frame . output_buffer_len ( ) > message . len ( ) {
panic! ( "Too many frames in the queue" ) ;
}
}
#[ test ]
#[ test ]
fn receive_messages ( ) {
fn receive_messages ( ) {
let incoming = Cursor ::new ( vec! [
let incoming = Cursor ::new ( vec! [
@ -754,10 +778,10 @@ mod tests {
0x03 ,
0x03 ,
] ) ;
] ) ;
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , None ) ;
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , None ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Ping ( vec! [ 1 , 2 ] ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Ping ( vec! [ 1 , 2 ] ) ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Pong ( vec! [ 3 ] ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Pong ( vec! [ 3 ] ) ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Text ( "Hello, World!" . into ( ) ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Text ( "Hello, World!" . into ( ) ) ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Binary ( vec! [ 0x01 , 0x02 , 0x03 ] ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Binary ( vec! [ 0x01 , 0x02 , 0x03 ] ) ) ;
}
}
#[ test ]
#[ test ]
@ -770,7 +794,7 @@ mod tests {
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , Some ( limit ) ) ;
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , Some ( limit ) ) ;
assert! ( matches! (
assert! ( matches! (
socket . read_message ( ) ,
socket . read ( ) ,
Err ( Error ::Capacity ( CapacityError ::MessageTooLong { size : 13 , max_size : 10 } ) )
Err ( Error ::Capacity ( CapacityError ::MessageTooLong { size : 13 , max_size : 10 } ) )
) ) ;
) ) ;
}
}
@ -782,7 +806,7 @@ mod tests {
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , Some ( limit ) ) ;
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , Some ( limit ) ) ;
assert! ( matches! (
assert! ( matches! (
socket . read_message ( ) ,
socket . read ( ) ,
Err ( Error ::Capacity ( CapacityError ::MessageTooLong { size : 3 , max_size : 2 } ) )
Err ( Error ::Capacity ( CapacityError ::MessageTooLong { size : 3 , max_size : 2 } ) )
) ) ;
) ) ;
}
}