@ -6,13 +6,6 @@ mod message;
pub use self ::{ frame ::CloseFrame , message ::Message } ;
use log ::* ;
use std ::{
collections ::VecDeque ,
io ::{ ErrorKind as IoErrorKind , Read , Write } ,
mem ::replace ,
} ;
use self ::{
frame ::{
coding ::{ CloseCode , Control as OpCtl , Data as OpData , OpCode } ,
@ -22,9 +15,13 @@ use self::{
} ;
use crate ::{
error ::{ Error , ProtocolError , Result } ,
extensions ::Extensions ,
util ::NonBlockingResult ,
} ;
use log ::* ;
use std ::{
io ::{ ErrorKind as IoErrorKind , Read , Write } ,
mem ::replace ,
} ;
/// Indicates a Client or Server role of the websocket
#[ derive(Debug, Clone, Copy, PartialEq, Eq) ]
@ -38,10 +35,21 @@ pub enum Role {
/// The configuration for WebSocket connection.
#[ derive(Debug, Clone, Copy) ]
pub struct WebSocketConfig {
/// The size of the send queue. You can use it to turn on/off the backpressure features. `None`
/// means here that the size of the queue is unlimited. The default value is the unlimited
/// queue.
/// Does nothing, instead use `max_write_buffer_size`.
#[ deprecated ]
pub max_send_queue : Option < usize > ,
/// The target minimum size of the write buffer to reach before writing the data
/// to the underlying stream.
/// The default value is 128 KiB.
///
/// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
pub write_buffer_size : usize ,
/// The max size of the write buffer in bytes. Setting this can provide backpressure
/// in the case the write buffer is filling up due to write errors.
/// The default value is unlimited.
///
/// Note: Should always be set higher than [`write_buffer_size`](Self::write_buffer_size).
pub max_write_buffer_size : usize ,
/// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
/// which should be reasonably big for all normal use-cases but small enough to prevent
/// memory eating by a malicious user.
@ -57,76 +65,18 @@ pub struct WebSocketConfig {
/// some popular libraries that are sending unmasked frames, ignoring the RFC.
/// By default this option is set to `false`, i.e. according to RFC 6455.
pub accept_unmasked_frames : bool ,
/// Optional configuration for Per-Message Compression Extension.
#[ cfg(feature = " deflate " ) ]
pub compression : Option < crate ::extensions ::DeflateConfig > ,
}
impl Default for WebSocketConfig {
fn default ( ) -> Self {
#[ allow(deprecated) ]
WebSocketConfig {
max_send_queue : None ,
write_buffer_size : 128 * 1024 ,
max_write_buffer_size : usize ::MAX ,
max_message_size : Some ( 64 < < 20 ) ,
max_frame_size : Some ( 16 < < 20 ) ,
accept_unmasked_frames : false ,
#[ cfg(feature = " deflate " ) ]
compression : None ,
}
}
}
impl WebSocketConfig {
// Generate extension negotiation offers for configured extensions.
// Only `permessage-deflate` is supported at the moment.
pub ( crate ) fn generate_offers ( & self ) -> Option < headers ::SecWebsocketExtensions > {
#[ cfg(feature = " deflate " ) ]
{
let mut offers = Vec ::new ( ) ;
if let Some ( compression ) = self . compression . map ( | c | c . generate_offer ( ) ) {
offers . push ( compression ) ;
}
if offers . is_empty ( ) {
None
} else {
Some ( headers ::SecWebsocketExtensions ::new ( offers ) )
}
}
#[ cfg(not(feature = " deflate " )) ]
{
None
}
}
// This can be used with `WebSocket::from_raw_socket_with_extensions` for integration.
/// Returns negotiation response based on offers and `Extensions` to manage extensions.
pub fn accept_offers (
& self ,
_offers : & headers ::SecWebsocketExtensions ,
) -> Option < ( headers ::SecWebsocketExtensions , Extensions ) > {
#[ cfg(feature = " deflate " ) ]
{
// To support more extensions, store extension context in `Extensions` and
// concatenate negotiation responses from each extension.
let mut agreed_extensions = Vec ::new ( ) ;
let mut extensions = Extensions ::default ( ) ;
if let Some ( compression ) = & self . compression {
if let Some ( ( agreed , compression ) ) = compression . accept_offer ( _offers ) {
agreed_extensions . push ( agreed ) ;
extensions . compression = Some ( compression ) ;
}
}
if agreed_extensions . is_empty ( ) {
None
} else {
Some ( ( headers ::SecWebsocketExtensions ::new ( agreed_extensions ) , extensions ) )
}
}
#[ cfg(not(feature = " deflate " )) ]
{
None
}
}
}
@ -135,6 +85,8 @@ impl WebSocketConfig {
///
/// This is THE structure you want to create to be able to speak the WebSocket protocol.
/// It may be created by calling `connect`, `accept` or `client` functions.
///
/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
#[ derive(Debug) ]
pub struct WebSocket < Stream > {
/// The underlying socket.
@ -153,18 +105,6 @@ impl<Stream> WebSocket<Stream> {
WebSocket { socket : stream , context : WebSocketContext ::new ( role , config ) }
}
/// Convert a raw socket into a WebSocket without performing a handshake.
pub fn from_raw_socket_with_extensions (
stream : Stream ,
role : Role ,
config : Option < WebSocketConfig > ,
extensions : Option < Extensions > ,
) -> Self {
let mut context = WebSocketContext ::new ( role , config ) ;
context . extensions = extensions ;
WebSocket { socket : stream , context }
}
/// Convert a raw socket into a WebSocket without performing a handshake.
///
/// Call this function if you're using Tungstenite as a part of a web framework
@ -182,21 +122,6 @@ impl<Stream> WebSocket<Stream> {
}
}
pub ( crate ) fn from_partially_read_with_extensions (
stream : Stream ,
part : Vec < u8 > ,
role : Role ,
config : Option < WebSocketConfig > ,
extensions : Option < Extensions > ,
) -> Self {
WebSocket {
socket : stream ,
context : WebSocketContext ::from_partially_read_with_extensions (
part , role , config , extensions ,
) ,
}
}
/// Returns a shared reference to the inner stream.
pub fn get_ref ( & self ) -> & Stream {
& self . socket
@ -235,82 +160,116 @@ impl<Stream> WebSocket<Stream> {
impl < Stream : Read + Write > WebSocket < Stream > {
/// Read a message from stream, if possible.
///
/// This will queue responses to ping and close messages to be sent. It will call
/// `write_pending` before trying to read in order to make sure that those responses
/// make progress even if you never call `write_pending`. That does mean that they
/// get sent out earliest on the next call to `read_message`, `write_message` or `write_pending`.
/// This will also queue responses to ping and close messages. These responses
/// will be written and flushed on the next call to [`read`](Self::read),
/// [`write`](Self::write) or [`flush`](Self::flush).
///
/// ## Closing the connection
/// # Closing the connection
/// When the remote endpoint decides to close the connection this will return
/// the close message with an optional close frame.
///
/// You should continue calling `read_message`, `write_message` or `write_pending` to drive
/// the reply to the close frame until [Error::ConnectionClosed] is returned. Once that happens
/// it is safe to drop the underlying connection.
pub fn read_message ( & mut self ) -> Result < Message > {
self . context . read_message ( & mut self . socket )
/// You should continue calling [`read`](Self::read), [`write`](Self::write) or
/// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
/// is returned. Once that happens it is safe to drop the underlying connection.
pub fn read ( & mut self ) -> Result < Message > {
self . context . read ( & mut self . socket )
}
/// Writes and immediately flushes a message.
/// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
pub fn send ( & mut self , message : Message ) -> Result < ( ) > {
self . write ( message ) ? ;
self . flush ( )
}
/// Send a message to stream, if possible.
/// Write a message to the provided stream, if possible.
///
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// requests. A Pong reply will jump the queue because the
/// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent
/// as soon as is practical.
/// In the event of stream write failure the message frame will be stored
/// in the write buffer and will try again on the next call to [`write`](Self::write)
/// or [`flush`](Self::flush).
///
/// Note that upon receiving a ping message, tungstenite cues a pong reply automatically.
/// When you call either `read_message`, `write_message` or `write_pending` next it will try to send
/// that pong out if the underlying connection can take more data. This means you should not
/// respond to ping frames manually.
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
///
/// This call will generally not flush. However, if there are queued automatic messages
/// they will be written and eagerly flushed.
///
/// For example, upon receiving ping messages tungstenite queues pong replies automatically.
/// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
/// will write & flush the pong reply. This means you should not respond to ping frames manually.
///
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
/// if `read_message` returns a ping, you should call `write_pending` until it doesn't return
/// WouldBlock before passing a pong to `write_message`, otherwise the response to the
/// ping will not be sent, but rather replaced by your custom pong message.
///
/// ## Errors
/// - If the WebSocket's send queue is full, `SendQueueFull` will be returned
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
/// - If the connection is closed and should be dropped, this will return [Error::ConnectionClosed].
/// - If you try again after [Error::ConnectionClosed] was returned either from here or from `read_message`,
/// [Error::AlreadyClosed] will be returned. This indicates a program error on your part.
/// - [Error::Io] is returned if the underlying connection returns an error
/// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
/// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
/// ping will not be sent as it will be replaced by your custom pong message.
///
/// # Errors
/// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
/// along with the equivalent passed message frame.
/// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
/// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
/// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
/// error on your part.
/// - [`Error::Io`] is returned if the underlying connection returns an error
/// (consider these fatal except for WouldBlock).
/// - [Error::Capacity] if your message size is bigger than the configured max message size.
pub fn write_message ( & mut self , message : Message ) -> Result < ( ) > {
self . context . write_message ( & mut self . socket , message )
/// - [` Error::Capacity` ] if your message size is bigger than the configured max message size.
pub fn write ( & mut self , message : Message ) -> Result < ( ) > {
self . context . write ( & mut self . socket , message )
}
/// Flush the pending send queue.
pub fn write_pending ( & mut self ) -> Result < ( ) > {
self . context . write_pending ( & mut self . socket )
/// Flush writes.
///
/// Ensures all messages previously passed to [`write`](Self::write) and automatic
/// queued pong responses are written & flushed into the underlying stream.
pub fn flush ( & mut self ) -> Result < ( ) > {
self . context . flush ( & mut self . socket )
}
/// Close the connection.
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is
/// the same as calling `write_message (Message::Close(..))`.
/// the same as calling `write(Message::Close(..))`.
///
/// After queuing the close frame you should continue calling `read_message` or
/// `write_pending` to drive the close handshake to completion.
/// After queuing the close frame you should continue calling [`read`](Self::read) or
/// [`flush`](Self::flush) to drive the close handshake to completion.
///
/// The websocket RFC defines that the underlying connection should be closed
/// by the server. Tungstenite takes care of this asymmetry for you.
///
/// When the close handshake is finished (we have both sent and received
/// a close message), `read_message` or `write_pending` will return
/// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
/// [Error::ConnectionClosed] if this endpoint is the server.
///
/// If this endpoint is a client, [Error::ConnectionClosed] will only be
/// returned after the server has closed the underlying connection.
///
/// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
/// is returned from `read_message` or `write_pending` .
/// is returned from [`read`](Self::read) or [`flush`](Self::flush) .
pub fn close ( & mut self , code : Option < CloseFrame > ) -> Result < ( ) > {
self . context . close ( & mut self . socket , code )
}
/// Old name for [`read`](Self::read).
#[ deprecated(note = " Use `read` " ) ]
pub fn read_message ( & mut self ) -> Result < Message > {
self . read ( )
}
/// Old name for [`send`](Self::send).
#[ deprecated(note = " Use `send` " ) ]
pub fn write_message ( & mut self , message : Message ) -> Result < ( ) > {
self . send ( message )
}
/// Old name for [`flush`](Self::flush).
#[ deprecated(note = " Use `flush` " ) ]
pub fn write_pending ( & mut self ) -> Result < ( ) > {
self . flush ( )
}
}
/// A context for managing WebSocket stream.
@ -324,55 +283,41 @@ pub struct WebSocketContext {
state : WebSocketState ,
/// Receive: an incomplete message being processed.
incomplete : Option < IncompleteMessage > ,
/// Send: a data send queue.
send_queue : VecDeque < Frame > ,
/// Send: an OOB pong message.
pong : Option < Frame > ,
/// Send in addition to regular messages E.g. "pong" or "close".
additional_send : Option < Frame > ,
/// The configuration for the websocket session.
config : WebSocketConfig ,
// Container for extensions.
pub ( crate ) extensions : Option < Extensions > ,
}
impl WebSocketContext {
/// Create a WebSocket context that manages a post-handshake stream.
pub fn new ( role : Role , config : Option < WebSocketConfig > ) -> Self {
WebSocketContext {
role ,
frame : FrameCodec ::new ( ) ,
state : WebSocketState ::Active ,
incomplete : None ,
send_queue : VecDeque ::new ( ) ,
pong : None ,
config : config . unwrap_or_default ( ) ,
extensions : None ,
}
Self ::_new ( role , FrameCodec ::new ( ) , config . unwrap_or_default ( ) )
}
/// Create a WebSocket context that manages an post-handshake stream.
pub fn from_partially_read ( part : Vec < u8 > , role : Role , config : Option < WebSocketConfig > ) -> Self {
WebSocketContext {
frame : FrameCodec ::from_partially_read ( part ) ,
.. WebSocketContext ::new ( role , config )
}
Self ::_new ( role , FrameCodec ::from_partially_read ( part ) , config . unwrap_or_default ( ) )
}
pub ( crate ) fn from_partially_read_with_extensions (
part : Vec < u8 > ,
role : Role ,
config : Option < WebSocketConfig > ,
extensions : Option < Extensions > ,
) -> Self {
WebSocketContext {
frame : FrameCodec ::from_partially_read ( part ) ,
extensions ,
.. WebSocketContext ::new ( role , config )
fn _new ( role : Role , mut frame : FrameCodec , config : WebSocketConfig ) -> Self {
frame . set_max_out_buffer_len ( config . max_write_buffer_size ) ;
frame . set_out_buffer_write_len ( config . write_buffer_size ) ;
Self {
role ,
frame ,
state : WebSocketState ::Active ,
incomplete : None ,
additional_send : None ,
config ,
}
}
/// Change the configuration.
pub fn set_config ( & mut self , set_func : impl FnOnce ( & mut WebSocketConfig ) ) {
set_func ( & mut self . config )
set_func ( & mut self . config ) ;
self . frame . set_max_out_buffer_len ( self . config . max_write_buffer_size ) ;
self . frame . set_out_buffer_write_len ( self . config . write_buffer_size ) ;
}
/// Read the configuration.
@ -399,17 +344,23 @@ impl WebSocketContext {
///
/// This function sends pong and close responses automatically.
/// However, it never blocks on write.
pub fn read_message < Stream > ( & mut self , stream : & mut Stream ) -> Result < Message >
pub fn read < Stream > ( & mut self , stream : & mut Stream ) -> Result < Message >
where
Stream : Read + Write ,
{
// Do not read from already closed connections.
self . state . check_active ( ) ? ;
self . state . check_not_terminated ( ) ? ;
loop {
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking.
self . write_pending ( stream ) . no_block ( ) ? ;
if self . additional_send . is_some ( ) {
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we flush but ignore its blocking.
self . flush ( stream ) . no_block ( ) ? ;
} else if self . role = = Role ::Server & & ! self . state . can_read ( ) {
self . state = WebSocketState ::Terminated ;
return Err ( Error ::ConnectionClosed ) ;
}
// If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock.
if let Some ( message ) = self . read_message_frame ( stream ) ? {
@ -419,89 +370,94 @@ impl WebSocketContext {
}
}
/// Send a message to the provided stream, if possible.
/// Write a message to the provided stream.
///
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned .
/// In the event of stream write failure the message frame will be stored
/// in the write buffer and will try again on the next call to [`write`](Self::write)
/// or [`flush`](Self::flush) .
///
/// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queu ed.
pub fn write_message < Stream > ( & mut self , stream : & mut Stream , message : Message ) -> Result < ( ) >
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is return ed.
pub fn write < Stream > ( & mut self , stream : & mut Stream , message : Message ) -> Result < ( ) >
where
Stream : Read + Write ,
{
// When terminated, return AlreadyClosed.
self . state . check_active ( ) ? ;
self . state . check_not_terminated ( ) ? ;
// Do not write after sending a close frame.
if ! self . state . is_active ( ) {
return Err ( Error ::Protocol ( ProtocolError ::SendAfterClosing ) ) ;
}
if let Some ( max_send_queue ) = self . config . max_send_queue {
if self . send_queue . len ( ) > = max_send_queue {
// Try to make some room for the new message.
// Do not return here if write would block, ignore WouldBlock silently
// since we must queue the message anyway.
self . write_pending ( stream ) . no_block ( ) ? ;
}
if self . send_queue . len ( ) > = max_send_queue {
return Err ( Error ::SendQueueFull ( message ) ) ;
}
}
let frame = match message {
Message ::Text ( data ) = > self . prepare_data_fram e( data . into ( ) , OpData ::Text ) ? ,
Message ::Binary ( data ) = > self . prepare_data_frame ( data , OpData ::Binary ) ? ,
Message ::Text ( data ) = > Frame ::message ( data . into ( ) , OpCode ::Data ( OpData ::Text ) , true ) ,
Message ::Binary ( data ) = > Frame ::message ( data , OpCode ::Data ( OpData ::Binary ) , true ) ,
Message ::Ping ( data ) = > Frame ::ping ( data ) ,
Message ::Pong ( data ) = > {
self . pong = Some ( Frame ::pong ( data ) ) ;
return self . write_pending ( stream ) ;
self . set_additional ( Frame ::pong ( data ) ) ;
// Note: user pongs can be user flushed so no need to flush here
return self . _write ( stream , None ) . map ( | _ | ( ) ) ;
}
Message ::Close ( code ) = > return self . close ( stream , code ) ,
Message ::Frame ( f ) = > f ,
} ;
self . send_queue . push_back ( frame ) ;
self . write_pending ( stream )
let should_flush = self . _write ( stream , Some ( frame ) ) ? ;
if should_flush {
self . flush ( stream ) ? ;
}
Ok ( ( ) )
}
fn prepare_data_frame ( & mut self , data : Vec < u8 > , opdata : OpData ) -> Result < Frame > {
debug_assert! ( matches! ( opdata , OpData ::Text | OpData ::Binary ) , "Invalid data frame kind" ) ;
let opcode = OpCode ::Data ( opdata ) ;
let is_final = true ;
#[ cfg(feature = " deflate " ) ]
if let Some ( pmce ) = self . extensions . as_mut ( ) . and_then ( | e | e . compression . as_mut ( ) ) {
return Ok ( Frame ::compressed_message ( pmce . compress ( & data ) ? , opcode , is_final ) ) ;
}
Ok ( Frame ::message ( data , opcode , is_final ) )
/// Flush writes.
///
/// Ensures all messages previously passed to [`write`](Self::write) and automatically
/// queued pong responses are written & flushed into the `stream`.
#[ inline ]
pub fn flush < Stream > ( & mut self , stream : & mut Stream ) -> Result < ( ) >
where
Stream : Read + Write ,
{
self . _write ( stream , None ) ? ;
self . frame . write_out_buffer ( stream ) ? ;
Ok ( stream . flush ( ) ? )
}
/// Flush the pending send queue.
pub fn write_pending < Stream > ( & mut self , stream : & mut Stream ) -> Result < ( ) >
/// Writes any data in the out_buffer, `additional_send` and given `data`.
///
/// Does **not** flush.
///
/// Returns true if the write contents indicate we should flush immediately.
fn _write < Stream > ( & mut self , stream : & mut Stream , data : Option < Frame > ) -> Result < bool >
where
Stream : Read + Write ,
{
// First, make sure we have no pending frame sending.
self . frame . write_pending ( stream ) ? ;
if let Some ( data ) = data {
self . buffer_frame ( stream , data ) ? ;
}
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455)
if let Some ( pong ) = self . pong . take ( ) {
trace ! ( "Sending pong reply" ) ;
self . send_one_frame ( stream , pong ) ? ;
}
// If we have any unsent frames, send them.
trace ! ( "Frames still in queue: {}" , self . send_queue . len ( ) ) ;
while let Some ( data ) = self . send_queue . pop_front ( ) {
self . send_one_frame ( stream , data ) ? ;
}
// If we get to this point, the send queue is empty and the underlying socket is still
// willing to take more data.
let should_flush = if let Some ( msg ) = self . additional_send . take ( ) {
trace ! ( "Sending pong/close" ) ;
match self . buffer_frame ( stream , msg ) {
Err ( Error ::WriteBufferFull ( Message ::Frame ( msg ) ) ) = > {
// if an system message would exceed the buffer put it back in
// `additional_send` for retry. Otherwise returning this error
// may not make sense to the user, e.g. calling `flush`.
self . set_additional ( msg ) ;
false
}
Err ( err ) = > return Err ( err ) ,
Ok ( _ ) = > true ,
}
} else {
false
} ;
// If we're closing and there is nothing to send anymore, we should close the connection.
if self . role = = Role ::Server & & ! self . state . can_read ( ) {
@ -511,10 +467,11 @@ impl WebSocketContext {
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
self . frame . write_out_buffer ( stream ) ? ;
self . state = WebSocketState ::Terminated ;
Err ( Error ::ConnectionClosed )
} else {
Ok ( ( ) )
Ok ( should_flush )
}
}
@ -522,7 +479,7 @@ impl WebSocketContext {
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is
/// the same as calling `write (Message::Close(..))`.
/// the same as calling `send (Message::Close(..))`.
pub fn close < Stream > ( & mut self , stream : & mut Stream , code : Option < CloseFrame > ) -> Result < ( ) >
where
Stream : Read + Write ,
@ -530,11 +487,9 @@ impl WebSocketContext {
if let WebSocketState ::Active = self . state {
self . state = WebSocketState ::ClosedByUs ;
let frame = Frame ::close ( code ) ;
self . send_queue . push_back ( frame ) ;
} else {
// Already closed, nothing to do.
self . _write ( stream , Some ( frame ) ) ? ;
}
self . write_pending ( stream )
self . flush ( stream )
}
/// Try to decode one message frame. May return None.
@ -555,14 +510,12 @@ impl WebSocketContext {
// the negotiated extensions defines the meaning of such a nonzero
// value, the receiving endpoint MUST _Fail the WebSocket
// Connection_.
let is_compressed = {
{
let hdr = frame . header ( ) ;
if ( hdr . rsv1 & & ! self . has_compression ( ) ) | | hdr . rsv2 | | hdr . rsv3 {
if hdr . rsv1 | | hdr . rsv2 | | hdr . rsv3 {
return Err ( Error ::Protocol ( ProtocolError ::NonZeroReservedBits ) ) ;
}
hdr . rsv1
} ;
}
match self . role {
Role ::Server = > {
@ -597,10 +550,6 @@ impl WebSocketContext {
_ if frame . payload ( ) . len ( ) > 125 = > {
Err ( Error ::Protocol ( ProtocolError ::ControlFrameTooBig ) )
}
// Control frames must not have compress bit.
_ if is_compressed = > {
Err ( Error ::Protocol ( ProtocolError ::CompressedControlFrame ) )
}
OpCtl ::Close = > Ok ( self . do_close ( frame . into_close ( ) ? ) . map ( Message ::Close ) ) ,
OpCtl ::Reserved ( i ) = > {
Err ( Error ::Protocol ( ProtocolError ::UnknownControlFrameType ( i ) ) )
@ -609,7 +558,7 @@ impl WebSocketContext {
let data = frame . into_data ( ) ;
// No ping processing after we sent a close frame.
if self . state . is_active ( ) {
self . pong = Some ( Frame ::pong ( data . clone ( ) ) ) ;
self . set_additional ( Frame ::pong ( data . clone ( ) ) ) ;
}
Ok ( Some ( Message ::Ping ( data ) ) )
}
@ -621,34 +570,39 @@ impl WebSocketContext {
let fin = frame . header ( ) . is_final ;
match data {
OpData ::Continue = > {
if self . incomplete . is_some ( ) & & is_compressed {
if let Some ( ref mut msg ) = self . incomplete {
msg . extend ( frame . into_data ( ) , self . config . max_message_size ) ? ;
} else {
return Err ( Error ::Protocol (
ProtocolError ::CompressedContinueFrame ,
ProtocolError ::Unexpect edContinueFrame,
) ) ;
}
let msg = self
. incomplete
. take ( )
. ok_or ( Error ::Protocol ( ProtocolError ::UnexpectedContinueFrame ) ) ? ;
self . extend_incomplete ( msg , frame . into_data ( ) , fin )
if fin {
Ok ( Some ( self . incomplete . take ( ) . unwrap ( ) . complete ( ) ? ) )
} else {
Ok ( None )
}
}
c if self . incomplete . is_some ( ) = > {
Err ( Error ::Protocol ( ProtocolError ::ExpectedFragment ( c ) ) )
}
OpData ::Text | OpData ::Binary = > {
let message_type = match data {
OpData ::Text = > IncompleteMessageType ::Text ,
OpData ::Binary = > IncompleteMessageType ::Binary ,
_ = > panic! ( "Bug: message is not text nor binary" ) ,
let msg = {
let message_type = match data {
OpData ::Text = > IncompleteMessageType ::Text ,
OpData ::Binary = > IncompleteMessageType ::Binary ,
_ = > panic! ( "Bug: message is not text nor binary" ) ,
} ;
let mut m = IncompleteMessage ::new ( message_type ) ;
m . extend ( frame . into_data ( ) , self . config . max_message_size ) ? ;
m
} ;
#[ cfg(feature = " deflate " ) ]
let msg = IncompleteMessage ::new ( message_type , is_compressed ) ;
#[ cfg(not(feature = " deflate " )) ]
let msg = IncompleteMessage ::new ( message_type ) ;
self . extend_incomplete ( msg , frame . into_data ( ) , fin )
if fin {
Ok ( Some ( msg . complete ( ) ? ) )
} else {
self . incomplete = Some ( msg ) ;
Ok ( None )
}
}
OpData ::Reserved ( i ) = > {
Err ( Error ::Protocol ( ProtocolError ::UnknownDataFrameType ( i ) ) )
@ -667,32 +621,6 @@ impl WebSocketContext {
}
}
fn extend_incomplete (
& mut self ,
mut msg : IncompleteMessage ,
data : Vec < u8 > ,
is_final : bool ,
) -> Result < Option < Message > > {
#[ cfg(feature = " deflate " ) ]
let data = if msg . compressed ( ) {
// `msg.compressed()` is only true when compression is enabled so it's safe to unwrap
self . extensions
. as_mut ( )
. and_then ( | x | x . compression . as_mut ( ) )
. unwrap ( )
. decompress ( data , is_final ) ?
} else {
data
} ;
msg . extend ( data , self . config . max_message_size ) ? ;
if is_final {
Ok ( Some ( msg . complete ( ) ? ) )
} else {
self . incomplete = Some ( msg ) ;
Ok ( None )
}
}
/// Received a close frame. Tells if we need to return a close frame to the user.
#[ allow(clippy::option_option) ]
fn do_close < ' t > ( & mut self , close : Option < CloseFrame < ' t > > ) -> Option < Option < CloseFrame < ' t > > > {
@ -714,7 +642,7 @@ impl WebSocketContext {
let reply = Frame ::close ( close . clone ( ) ) ;
debug ! ( "Replying to close with {:?}" , reply ) ;
self . send_queue . push_back ( reply ) ;
self . set_additional ( reply ) ;
Some ( close )
}
@ -731,8 +659,8 @@ impl WebSocketContext {
}
}
/// Send a single pending frame .
fn send_one _frame< Stream > ( & mut self , stream : & mut Stream , mut frame : Frame ) -> Result < ( ) >
/// Write a single frame into the write-buffer .
fn buffer _frame< Stream > ( & mut self , stream : & mut Stream , mut frame : Frame ) -> Result < ( ) >
where
Stream : Read + Write ,
{
@ -746,17 +674,17 @@ impl WebSocketContext {
}
trace ! ( "Sending frame: {:?}" , frame ) ;
self . frame . write _frame( stream , frame ) . check_connection_reset ( self . state )
self . frame . buffer _frame( stream , frame ) . check_connection_reset ( self . state )
}
fn has_compression ( & self ) -> bool {
#[ cfg(feature = " deflate " ) ]
{
self . extensions . as_ref ( ) . and_then ( | c | c . compression . as_ref ( ) ) . is_some ( )
}
#[ cfg(not(feature = " deflate " )) ]
{
false
/// Replace `additional_send` if it is currently a `Pong` message.
fn set_additional ( & mut self , add : Frame ) {
let empty_or_pong = self
. additional_send
. as_ref ( )
. map_or ( true , | f | f . header ( ) . opcode = = OpCode ::Control ( OpCtl ::Pong ) ) ;
if empty_or_pong {
self . additional_send . replace ( add ) ;
}
}
}
@ -790,7 +718,7 @@ impl WebSocketState {
}
/// Check if the state is active, return error if not.
fn check_active ( self ) -> Result < ( ) > {
fn check_not_terminated ( self ) -> Result < ( ) > {
match self {
WebSocketState ::Terminated = > Err ( Error ::AlreadyClosed ) ,
_ = > Ok ( ( ) ) ,
@ -842,64 +770,6 @@ mod tests {
}
}
struct WouldBlockStreamMoc ;
impl io ::Write for WouldBlockStreamMoc {
fn write ( & mut self , _ : & [ u8 ] ) -> io ::Result < usize > {
Err ( io ::Error ::new ( io ::ErrorKind ::WouldBlock , "would block" ) )
}
fn flush ( & mut self ) -> io ::Result < ( ) > {
Err ( io ::Error ::new ( io ::ErrorKind ::WouldBlock , "would block" ) )
}
}
impl io ::Read for WouldBlockStreamMoc {
fn read ( & mut self , _ : & mut [ u8 ] ) -> io ::Result < usize > {
Err ( io ::Error ::new ( io ::ErrorKind ::WouldBlock , "would block" ) )
}
}
#[ test ]
fn queue_logic ( ) {
// Create a socket with the queue size of 1.
let mut socket = WebSocket ::from_raw_socket (
WouldBlockStreamMoc ,
Role ::Client ,
Some ( WebSocketConfig { max_send_queue : Some ( 1 ) , .. Default ::default ( ) } ) ,
) ;
// Test message that we're going to send.
let message = Message ::Binary ( vec! [ 0xFF ; 1024 ] ) ;
// Helper to check the error.
let assert_would_block = | error | {
if let Error ::Io ( io_error ) = error {
assert_eq! ( io_error . kind ( ) , io ::ErrorKind ::WouldBlock ) ;
} else {
panic! ( "Expected WouldBlock error" ) ;
}
} ;
// The first attempt of writing must not fail, since the queue is empty at start.
// But since the underlying mock object always returns `WouldBlock`, so is the result.
assert_would_block ( dbg! ( socket . write_message ( message . clone ( ) ) . unwrap_err ( ) ) ) ;
// Any subsequent attempts must return an error telling that the queue is full.
for _i in 0 .. 100 {
assert! ( matches! (
socket . write_message ( message . clone ( ) ) . unwrap_err ( ) ,
Error ::SendQueueFull ( .. )
) ) ;
}
// The size of the output buffer must not be bigger than the size of that message
// that we managed to write to the output buffer at first. Since we could not make
// any progress (because of the logic of the moc buffer), the size remains unchanged.
if socket . context . frame . output_buffer_len ( ) > message . len ( ) {
panic! ( "Too many frames in the queue" ) ;
}
}
#[ test ]
fn receive_messages ( ) {
let incoming = Cursor ::new ( vec! [
@ -908,10 +778,10 @@ mod tests {
0x03 ,
] ) ;
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , None ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Ping ( vec! [ 1 , 2 ] ) ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Pong ( vec! [ 3 ] ) ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Text ( "Hello, World!" . into ( ) ) ) ;
assert_eq! ( socket . read_message ( ) . unwrap ( ) , Message ::Binary ( vec! [ 0x01 , 0x02 , 0x03 ] ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Ping ( vec! [ 1 , 2 ] ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Pong ( vec! [ 3 ] ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Text ( "Hello, World!" . into ( ) ) ) ;
assert_eq! ( socket . read ( ) . unwrap ( ) , Message ::Binary ( vec! [ 0x01 , 0x02 , 0x03 ] ) ) ;
}
#[ test ]
@ -924,7 +794,7 @@ mod tests {
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , Some ( limit ) ) ;
assert! ( matches! (
socket . read_message ( ) ,
socket . read ( ) ,
Err ( Error ::Capacity ( CapacityError ::MessageTooLong { size : 13 , max_size : 10 } ) )
) ) ;
}
@ -936,7 +806,7 @@ mod tests {
let mut socket = WebSocket ::from_raw_socket ( WriteMoc ( incoming ) , Role ::Client , Some ( limit ) ) ;
assert! ( matches! (
socket . read_message ( ) ,
socket . read ( ) ,
Err ( Error ::Capacity ( CapacityError ::MessageTooLong { size : 3 , max_size : 2 } ) )
) ) ;
}