|
|
@ -6,6 +6,13 @@ mod message; |
|
|
|
|
|
|
|
|
|
|
|
pub use self::{frame::CloseFrame, message::Message}; |
|
|
|
pub use self::{frame::CloseFrame, message::Message}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
use log::*; |
|
|
|
|
|
|
|
use std::{ |
|
|
|
|
|
|
|
collections::VecDeque, |
|
|
|
|
|
|
|
io::{ErrorKind as IoErrorKind, Read, Write}, |
|
|
|
|
|
|
|
mem::replace, |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
use self::{ |
|
|
|
use self::{ |
|
|
|
frame::{ |
|
|
|
frame::{ |
|
|
|
coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode}, |
|
|
|
coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode}, |
|
|
@ -15,13 +22,9 @@ use self::{ |
|
|
|
}; |
|
|
|
}; |
|
|
|
use crate::{ |
|
|
|
use crate::{ |
|
|
|
error::{Error, ProtocolError, Result}, |
|
|
|
error::{Error, ProtocolError, Result}, |
|
|
|
|
|
|
|
extensions::Extensions, |
|
|
|
util::NonBlockingResult, |
|
|
|
util::NonBlockingResult, |
|
|
|
}; |
|
|
|
}; |
|
|
|
use log::*; |
|
|
|
|
|
|
|
use std::{ |
|
|
|
|
|
|
|
io::{ErrorKind as IoErrorKind, Read, Write}, |
|
|
|
|
|
|
|
mem::replace, |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Indicates a Client or Server role of the websocket
|
|
|
|
/// Indicates a Client or Server role of the websocket
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)] |
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)] |
|
|
@ -35,21 +38,10 @@ pub enum Role { |
|
|
|
/// The configuration for WebSocket connection.
|
|
|
|
/// The configuration for WebSocket connection.
|
|
|
|
#[derive(Debug, Clone, Copy)] |
|
|
|
#[derive(Debug, Clone, Copy)] |
|
|
|
pub struct WebSocketConfig { |
|
|
|
pub struct WebSocketConfig { |
|
|
|
/// Does nothing, instead use `max_write_buffer_size`.
|
|
|
|
/// The size of the send queue. You can use it to turn on/off the backpressure features. `None`
|
|
|
|
#[deprecated] |
|
|
|
/// means here that the size of the queue is unlimited. The default value is the unlimited
|
|
|
|
|
|
|
|
/// queue.
|
|
|
|
pub max_send_queue: Option<usize>, |
|
|
|
pub max_send_queue: Option<usize>, |
|
|
|
/// The target minimum size of the write buffer to reach before writing the data
|
|
|
|
|
|
|
|
/// to the underlying stream.
|
|
|
|
|
|
|
|
/// The default value is 128 KiB.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
|
|
|
|
|
|
|
|
pub write_buffer_size: usize, |
|
|
|
|
|
|
|
/// The max size of the write buffer in bytes. Setting this can provide backpressure
|
|
|
|
|
|
|
|
/// in the case the write buffer is filling up due to write errors.
|
|
|
|
|
|
|
|
/// The default value is unlimited.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// Note: Should always be set higher than [`write_buffer_size`](Self::write_buffer_size).
|
|
|
|
|
|
|
|
pub max_write_buffer_size: usize, |
|
|
|
|
|
|
|
/// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
|
|
|
|
/// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
|
|
|
|
/// which should be reasonably big for all normal use-cases but small enough to prevent
|
|
|
|
/// which should be reasonably big for all normal use-cases but small enough to prevent
|
|
|
|
/// memory eating by a malicious user.
|
|
|
|
/// memory eating by a malicious user.
|
|
|
@ -65,18 +57,76 @@ pub struct WebSocketConfig { |
|
|
|
/// some popular libraries that are sending unmasked frames, ignoring the RFC.
|
|
|
|
/// some popular libraries that are sending unmasked frames, ignoring the RFC.
|
|
|
|
/// By default this option is set to `false`, i.e. according to RFC 6455.
|
|
|
|
/// By default this option is set to `false`, i.e. according to RFC 6455.
|
|
|
|
pub accept_unmasked_frames: bool, |
|
|
|
pub accept_unmasked_frames: bool, |
|
|
|
|
|
|
|
/// Optional configuration for Per-Message Compression Extension.
|
|
|
|
|
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
|
|
|
|
pub compression: Option<crate::extensions::DeflateConfig>, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl Default for WebSocketConfig { |
|
|
|
impl Default for WebSocketConfig { |
|
|
|
fn default() -> Self { |
|
|
|
fn default() -> Self { |
|
|
|
#[allow(deprecated)] |
|
|
|
|
|
|
|
WebSocketConfig { |
|
|
|
WebSocketConfig { |
|
|
|
max_send_queue: None, |
|
|
|
max_send_queue: None, |
|
|
|
write_buffer_size: 128 * 1024, |
|
|
|
|
|
|
|
max_write_buffer_size: usize::MAX, |
|
|
|
|
|
|
|
max_message_size: Some(64 << 20), |
|
|
|
max_message_size: Some(64 << 20), |
|
|
|
max_frame_size: Some(16 << 20), |
|
|
|
max_frame_size: Some(16 << 20), |
|
|
|
accept_unmasked_frames: false, |
|
|
|
accept_unmasked_frames: false, |
|
|
|
|
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
|
|
|
|
compression: None, |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl WebSocketConfig { |
|
|
|
|
|
|
|
// Generate extension negotiation offers for configured extensions.
|
|
|
|
|
|
|
|
// Only `permessage-deflate` is supported at the moment.
|
|
|
|
|
|
|
|
pub(crate) fn generate_offers(&self) -> Option<headers::SecWebsocketExtensions> { |
|
|
|
|
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
let mut offers = Vec::new(); |
|
|
|
|
|
|
|
if let Some(compression) = self.compression.map(|c| c.generate_offer()) { |
|
|
|
|
|
|
|
offers.push(compression); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if offers.is_empty() { |
|
|
|
|
|
|
|
None |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
Some(headers::SecWebsocketExtensions::new(offers)) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
#[cfg(not(feature = "deflate"))] |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
None |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// This can be used with `WebSocket::from_raw_socket_with_extensions` for integration.
|
|
|
|
|
|
|
|
/// Returns negotiation response based on offers and `Extensions` to manage extensions.
|
|
|
|
|
|
|
|
pub fn accept_offers( |
|
|
|
|
|
|
|
&self, |
|
|
|
|
|
|
|
_offers: &headers::SecWebsocketExtensions, |
|
|
|
|
|
|
|
) -> Option<(headers::SecWebsocketExtensions, Extensions)> { |
|
|
|
|
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
// To support more extensions, store extension context in `Extensions` and
|
|
|
|
|
|
|
|
// concatenate negotiation responses from each extension.
|
|
|
|
|
|
|
|
let mut agreed_extensions = Vec::new(); |
|
|
|
|
|
|
|
let mut extensions = Extensions::default(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if let Some(compression) = &self.compression { |
|
|
|
|
|
|
|
if let Some((agreed, compression)) = compression.accept_offer(_offers) { |
|
|
|
|
|
|
|
agreed_extensions.push(agreed); |
|
|
|
|
|
|
|
extensions.compression = Some(compression); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if agreed_extensions.is_empty() { |
|
|
|
|
|
|
|
None |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
Some((headers::SecWebsocketExtensions::new(agreed_extensions), extensions)) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(not(feature = "deflate"))] |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
None |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -85,8 +135,6 @@ impl Default for WebSocketConfig { |
|
|
|
///
|
|
|
|
///
|
|
|
|
/// This is THE structure you want to create to be able to speak the WebSocket protocol.
|
|
|
|
/// This is THE structure you want to create to be able to speak the WebSocket protocol.
|
|
|
|
/// It may be created by calling `connect`, `accept` or `client` functions.
|
|
|
|
/// It may be created by calling `connect`, `accept` or `client` functions.
|
|
|
|
///
|
|
|
|
|
|
|
|
/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
|
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
|
#[derive(Debug)] |
|
|
|
pub struct WebSocket<Stream> { |
|
|
|
pub struct WebSocket<Stream> { |
|
|
|
/// The underlying socket.
|
|
|
|
/// The underlying socket.
|
|
|
@ -105,6 +153,18 @@ impl<Stream> WebSocket<Stream> { |
|
|
|
WebSocket { socket: stream, context: WebSocketContext::new(role, config) } |
|
|
|
WebSocket { socket: stream, context: WebSocketContext::new(role, config) } |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Convert a raw socket into a WebSocket without performing a handshake.
|
|
|
|
|
|
|
|
pub fn from_raw_socket_with_extensions( |
|
|
|
|
|
|
|
stream: Stream, |
|
|
|
|
|
|
|
role: Role, |
|
|
|
|
|
|
|
config: Option<WebSocketConfig>, |
|
|
|
|
|
|
|
extensions: Option<Extensions>, |
|
|
|
|
|
|
|
) -> Self { |
|
|
|
|
|
|
|
let mut context = WebSocketContext::new(role, config); |
|
|
|
|
|
|
|
context.extensions = extensions; |
|
|
|
|
|
|
|
WebSocket { socket: stream, context } |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Convert a raw socket into a WebSocket without performing a handshake.
|
|
|
|
/// Convert a raw socket into a WebSocket without performing a handshake.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// Call this function if you're using Tungstenite as a part of a web framework
|
|
|
|
/// Call this function if you're using Tungstenite as a part of a web framework
|
|
|
@ -122,6 +182,21 @@ impl<Stream> WebSocket<Stream> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub(crate) fn from_partially_read_with_extensions( |
|
|
|
|
|
|
|
stream: Stream, |
|
|
|
|
|
|
|
part: Vec<u8>, |
|
|
|
|
|
|
|
role: Role, |
|
|
|
|
|
|
|
config: Option<WebSocketConfig>, |
|
|
|
|
|
|
|
extensions: Option<Extensions>, |
|
|
|
|
|
|
|
) -> Self { |
|
|
|
|
|
|
|
WebSocket { |
|
|
|
|
|
|
|
socket: stream, |
|
|
|
|
|
|
|
context: WebSocketContext::from_partially_read_with_extensions( |
|
|
|
|
|
|
|
part, role, config, extensions, |
|
|
|
|
|
|
|
), |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Returns a shared reference to the inner stream.
|
|
|
|
/// Returns a shared reference to the inner stream.
|
|
|
|
pub fn get_ref(&self) -> &Stream { |
|
|
|
pub fn get_ref(&self) -> &Stream { |
|
|
|
&self.socket |
|
|
|
&self.socket |
|
|
@ -160,116 +235,82 @@ impl<Stream> WebSocket<Stream> { |
|
|
|
impl<Stream: Read + Write> WebSocket<Stream> { |
|
|
|
impl<Stream: Read + Write> WebSocket<Stream> { |
|
|
|
/// Read a message from stream, if possible.
|
|
|
|
/// Read a message from stream, if possible.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// This will also queue responses to ping and close messages. These responses
|
|
|
|
/// This will queue responses to ping and close messages to be sent. It will call
|
|
|
|
/// will be written and flushed on the next call to [`read`](Self::read),
|
|
|
|
/// `write_pending` before trying to read in order to make sure that those responses
|
|
|
|
/// [`write`](Self::write) or [`flush`](Self::flush).
|
|
|
|
/// make progress even if you never call `write_pending`. That does mean that they
|
|
|
|
|
|
|
|
/// get sent out earliest on the next call to `read_message`, `write_message` or `write_pending`.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// # Closing the connection
|
|
|
|
/// ## Closing the connection
|
|
|
|
/// When the remote endpoint decides to close the connection this will return
|
|
|
|
/// When the remote endpoint decides to close the connection this will return
|
|
|
|
/// the close message with an optional close frame.
|
|
|
|
/// the close message with an optional close frame.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// You should continue calling [`read`](Self::read), [`write`](Self::write) or
|
|
|
|
/// You should continue calling `read_message`, `write_message` or `write_pending` to drive
|
|
|
|
/// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
|
|
|
|
/// the reply to the close frame until [Error::ConnectionClosed] is returned. Once that happens
|
|
|
|
/// is returned. Once that happens it is safe to drop the underlying connection.
|
|
|
|
/// it is safe to drop the underlying connection.
|
|
|
|
pub fn read(&mut self) -> Result<Message> { |
|
|
|
pub fn read_message(&mut self) -> Result<Message> { |
|
|
|
self.context.read(&mut self.socket) |
|
|
|
self.context.read_message(&mut self.socket) |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Writes and immediately flushes a message.
|
|
|
|
|
|
|
|
/// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
|
|
|
|
|
|
|
|
pub fn send(&mut self, message: Message) -> Result<()> { |
|
|
|
|
|
|
|
self.write(message)?; |
|
|
|
|
|
|
|
self.flush() |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Write a message to the provided stream, if possible.
|
|
|
|
/// Send a message to stream, if possible.
|
|
|
|
///
|
|
|
|
|
|
|
|
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// In the event of stream write failure the message frame will be stored
|
|
|
|
|
|
|
|
/// in the write buffer and will try again on the next call to [`write`](Self::write)
|
|
|
|
|
|
|
|
/// or [`flush`](Self::flush).
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
|
|
|
|
|
|
|
|
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
|
|
|
|
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// This call will generally not flush. However, if there are queued automatic messages
|
|
|
|
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
|
|
|
|
/// they will be written and eagerly flushed.
|
|
|
|
/// requests. A Pong reply will jump the queue because the
|
|
|
|
|
|
|
|
/// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent
|
|
|
|
|
|
|
|
/// as soon as is practical.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// For example, upon receiving ping messages tungstenite queues pong replies automatically.
|
|
|
|
/// Note that upon receiving a ping message, tungstenite cues a pong reply automatically.
|
|
|
|
/// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
|
|
|
|
/// When you call either `read_message`, `write_message` or `write_pending` next it will try to send
|
|
|
|
/// will write & flush the pong reply. This means you should not respond to ping frames manually.
|
|
|
|
/// that pong out if the underlying connection can take more data. This means you should not
|
|
|
|
|
|
|
|
/// respond to ping frames manually.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
|
|
|
|
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
|
|
|
|
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
|
|
|
|
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
|
|
|
|
/// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
|
|
|
|
/// if `read_message` returns a ping, you should call `write_pending` until it doesn't return
|
|
|
|
/// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
|
|
|
|
/// WouldBlock before passing a pong to `write_message`, otherwise the response to the
|
|
|
|
/// ping will not be sent as it will be replaced by your custom pong message.
|
|
|
|
/// ping will not be sent, but rather replaced by your custom pong message.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// # Errors
|
|
|
|
/// ## Errors
|
|
|
|
/// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
|
|
|
|
/// - If the WebSocket's send queue is full, `SendQueueFull` will be returned
|
|
|
|
/// along with the equivalent passed message frame.
|
|
|
|
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
|
|
|
|
/// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
|
|
|
|
/// - If the connection is closed and should be dropped, this will return [Error::ConnectionClosed].
|
|
|
|
/// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
|
|
|
|
/// - If you try again after [Error::ConnectionClosed] was returned either from here or from `read_message`,
|
|
|
|
/// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
|
|
|
|
/// [Error::AlreadyClosed] will be returned. This indicates a program error on your part.
|
|
|
|
/// error on your part.
|
|
|
|
/// - [Error::Io] is returned if the underlying connection returns an error
|
|
|
|
/// - [`Error::Io`] is returned if the underlying connection returns an error
|
|
|
|
|
|
|
|
/// (consider these fatal except for WouldBlock).
|
|
|
|
/// (consider these fatal except for WouldBlock).
|
|
|
|
/// - [`Error::Capacity`] if your message size is bigger than the configured max message size.
|
|
|
|
/// - [Error::Capacity] if your message size is bigger than the configured max message size.
|
|
|
|
pub fn write(&mut self, message: Message) -> Result<()> { |
|
|
|
pub fn write_message(&mut self, message: Message) -> Result<()> { |
|
|
|
self.context.write(&mut self.socket, message) |
|
|
|
self.context.write_message(&mut self.socket, message) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Flush writes.
|
|
|
|
/// Flush the pending send queue.
|
|
|
|
///
|
|
|
|
pub fn write_pending(&mut self) -> Result<()> { |
|
|
|
/// Ensures all messages previously passed to [`write`](Self::write) and automatic
|
|
|
|
self.context.write_pending(&mut self.socket) |
|
|
|
/// queued pong responses are written & flushed into the underlying stream.
|
|
|
|
|
|
|
|
pub fn flush(&mut self) -> Result<()> { |
|
|
|
|
|
|
|
self.context.flush(&mut self.socket) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Close the connection.
|
|
|
|
/// Close the connection.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// This function guarantees that the close frame will be queued.
|
|
|
|
/// This function guarantees that the close frame will be queued.
|
|
|
|
/// There is no need to call it again. Calling this function is
|
|
|
|
/// There is no need to call it again. Calling this function is
|
|
|
|
/// the same as calling `write(Message::Close(..))`.
|
|
|
|
/// the same as calling `write_message(Message::Close(..))`.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// After queuing the close frame you should continue calling [`read`](Self::read) or
|
|
|
|
/// After queuing the close frame you should continue calling `read_message` or
|
|
|
|
/// [`flush`](Self::flush) to drive the close handshake to completion.
|
|
|
|
/// `write_pending` to drive the close handshake to completion.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// The websocket RFC defines that the underlying connection should be closed
|
|
|
|
/// The websocket RFC defines that the underlying connection should be closed
|
|
|
|
/// by the server. Tungstenite takes care of this asymmetry for you.
|
|
|
|
/// by the server. Tungstenite takes care of this asymmetry for you.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// When the close handshake is finished (we have both sent and received
|
|
|
|
/// When the close handshake is finished (we have both sent and received
|
|
|
|
/// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
|
|
|
|
/// a close message), `read_message` or `write_pending` will return
|
|
|
|
/// [Error::ConnectionClosed] if this endpoint is the server.
|
|
|
|
/// [Error::ConnectionClosed] if this endpoint is the server.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// If this endpoint is a client, [Error::ConnectionClosed] will only be
|
|
|
|
/// If this endpoint is a client, [Error::ConnectionClosed] will only be
|
|
|
|
/// returned after the server has closed the underlying connection.
|
|
|
|
/// returned after the server has closed the underlying connection.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
|
|
|
|
/// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
|
|
|
|
/// is returned from [`read`](Self::read) or [`flush`](Self::flush).
|
|
|
|
/// is returned from `read_message` or `write_pending`.
|
|
|
|
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> { |
|
|
|
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> { |
|
|
|
self.context.close(&mut self.socket, code) |
|
|
|
self.context.close(&mut self.socket, code) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Old name for [`read`](Self::read).
|
|
|
|
|
|
|
|
#[deprecated(note = "Use `read`")] |
|
|
|
|
|
|
|
pub fn read_message(&mut self) -> Result<Message> { |
|
|
|
|
|
|
|
self.read() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Old name for [`send`](Self::send).
|
|
|
|
|
|
|
|
#[deprecated(note = "Use `send`")] |
|
|
|
|
|
|
|
pub fn write_message(&mut self, message: Message) -> Result<()> { |
|
|
|
|
|
|
|
self.send(message) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Old name for [`flush`](Self::flush).
|
|
|
|
|
|
|
|
#[deprecated(note = "Use `flush`")] |
|
|
|
|
|
|
|
pub fn write_pending(&mut self) -> Result<()> { |
|
|
|
|
|
|
|
self.flush() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// A context for managing WebSocket stream.
|
|
|
|
/// A context for managing WebSocket stream.
|
|
|
@ -283,41 +324,55 @@ pub struct WebSocketContext { |
|
|
|
state: WebSocketState, |
|
|
|
state: WebSocketState, |
|
|
|
/// Receive: an incomplete message being processed.
|
|
|
|
/// Receive: an incomplete message being processed.
|
|
|
|
incomplete: Option<IncompleteMessage>, |
|
|
|
incomplete: Option<IncompleteMessage>, |
|
|
|
/// Send in addition to regular messages E.g. "pong" or "close".
|
|
|
|
/// Send: a data send queue.
|
|
|
|
additional_send: Option<Frame>, |
|
|
|
send_queue: VecDeque<Frame>, |
|
|
|
|
|
|
|
/// Send: an OOB pong message.
|
|
|
|
|
|
|
|
pong: Option<Frame>, |
|
|
|
/// The configuration for the websocket session.
|
|
|
|
/// The configuration for the websocket session.
|
|
|
|
config: WebSocketConfig, |
|
|
|
config: WebSocketConfig, |
|
|
|
|
|
|
|
// Container for extensions.
|
|
|
|
|
|
|
|
pub(crate) extensions: Option<Extensions>, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl WebSocketContext { |
|
|
|
impl WebSocketContext { |
|
|
|
/// Create a WebSocket context that manages a post-handshake stream.
|
|
|
|
/// Create a WebSocket context that manages a post-handshake stream.
|
|
|
|
pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self { |
|
|
|
pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self { |
|
|
|
Self::_new(role, FrameCodec::new(), config.unwrap_or_default()) |
|
|
|
WebSocketContext { |
|
|
|
|
|
|
|
role, |
|
|
|
|
|
|
|
frame: FrameCodec::new(), |
|
|
|
|
|
|
|
state: WebSocketState::Active, |
|
|
|
|
|
|
|
incomplete: None, |
|
|
|
|
|
|
|
send_queue: VecDeque::new(), |
|
|
|
|
|
|
|
pong: None, |
|
|
|
|
|
|
|
config: config.unwrap_or_default(), |
|
|
|
|
|
|
|
extensions: None, |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Create a WebSocket context that manages an post-handshake stream.
|
|
|
|
/// Create a WebSocket context that manages an post-handshake stream.
|
|
|
|
pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self { |
|
|
|
pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self { |
|
|
|
Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default()) |
|
|
|
WebSocketContext { |
|
|
|
|
|
|
|
frame: FrameCodec::from_partially_read(part), |
|
|
|
|
|
|
|
..WebSocketContext::new(role, config) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self { |
|
|
|
pub(crate) fn from_partially_read_with_extensions( |
|
|
|
frame.set_max_out_buffer_len(config.max_write_buffer_size); |
|
|
|
part: Vec<u8>, |
|
|
|
frame.set_out_buffer_write_len(config.write_buffer_size); |
|
|
|
role: Role, |
|
|
|
Self { |
|
|
|
config: Option<WebSocketConfig>, |
|
|
|
role, |
|
|
|
extensions: Option<Extensions>, |
|
|
|
frame, |
|
|
|
) -> Self { |
|
|
|
state: WebSocketState::Active, |
|
|
|
WebSocketContext { |
|
|
|
incomplete: None, |
|
|
|
frame: FrameCodec::from_partially_read(part), |
|
|
|
additional_send: None, |
|
|
|
extensions, |
|
|
|
config, |
|
|
|
..WebSocketContext::new(role, config) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Change the configuration.
|
|
|
|
/// Change the configuration.
|
|
|
|
pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { |
|
|
|
pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { |
|
|
|
set_func(&mut self.config); |
|
|
|
set_func(&mut self.config) |
|
|
|
self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size); |
|
|
|
|
|
|
|
self.frame.set_out_buffer_write_len(self.config.write_buffer_size); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Read the configuration.
|
|
|
|
/// Read the configuration.
|
|
|
@ -344,23 +399,17 @@ impl WebSocketContext { |
|
|
|
///
|
|
|
|
///
|
|
|
|
/// This function sends pong and close responses automatically.
|
|
|
|
/// This function sends pong and close responses automatically.
|
|
|
|
/// However, it never blocks on write.
|
|
|
|
/// However, it never blocks on write.
|
|
|
|
pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message> |
|
|
|
pub fn read_message<Stream>(&mut self, stream: &mut Stream) -> Result<Message> |
|
|
|
where |
|
|
|
where |
|
|
|
Stream: Read + Write, |
|
|
|
Stream: Read + Write, |
|
|
|
{ |
|
|
|
{ |
|
|
|
// Do not read from already closed connections.
|
|
|
|
// Do not read from already closed connections.
|
|
|
|
self.state.check_not_terminated()?; |
|
|
|
self.state.check_active()?; |
|
|
|
|
|
|
|
|
|
|
|
loop { |
|
|
|
loop { |
|
|
|
if self.additional_send.is_some() { |
|
|
|
|
|
|
|
// Since we may get ping or close, we need to reply to the messages even during read.
|
|
|
|
// Since we may get ping or close, we need to reply to the messages even during read.
|
|
|
|
// Thus we flush but ignore its blocking.
|
|
|
|
// Thus we call write_pending() but ignore its blocking.
|
|
|
|
self.flush(stream).no_block()?; |
|
|
|
self.write_pending(stream).no_block()?; |
|
|
|
} else if self.role == Role::Server && !self.state.can_read() { |
|
|
|
|
|
|
|
self.state = WebSocketState::Terminated; |
|
|
|
|
|
|
|
return Err(Error::ConnectionClosed); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If we get here, either write blocks or we have nothing to write.
|
|
|
|
// If we get here, either write blocks or we have nothing to write.
|
|
|
|
// Thus if read blocks, just let it return WouldBlock.
|
|
|
|
// Thus if read blocks, just let it return WouldBlock.
|
|
|
|
if let Some(message) = self.read_message_frame(stream)? { |
|
|
|
if let Some(message) = self.read_message_frame(stream)? { |
|
|
@ -370,94 +419,89 @@ impl WebSocketContext { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Write a message to the provided stream.
|
|
|
|
/// Send a message to the provided stream, if possible.
|
|
|
|
///
|
|
|
|
|
|
|
|
/// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
|
|
|
|
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// In the event of stream write failure the message frame will be stored
|
|
|
|
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
|
|
|
|
/// in the write buffer and will try again on the next call to [`write`](Self::write)
|
|
|
|
/// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned
|
|
|
|
/// or [`flush`](Self::flush).
|
|
|
|
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
|
|
|
|
/// Note that only the last pong frame is stored to be sent, and only the
|
|
|
|
/// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
|
|
|
|
/// most recent pong frame is sent if multiple pong frames are queued.
|
|
|
|
pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()> |
|
|
|
pub fn write_message<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()> |
|
|
|
where |
|
|
|
where |
|
|
|
Stream: Read + Write, |
|
|
|
Stream: Read + Write, |
|
|
|
{ |
|
|
|
{ |
|
|
|
// When terminated, return AlreadyClosed.
|
|
|
|
// When terminated, return AlreadyClosed.
|
|
|
|
self.state.check_not_terminated()?; |
|
|
|
self.state.check_active()?; |
|
|
|
|
|
|
|
|
|
|
|
// Do not write after sending a close frame.
|
|
|
|
// Do not write after sending a close frame.
|
|
|
|
if !self.state.is_active() { |
|
|
|
if !self.state.is_active() { |
|
|
|
return Err(Error::Protocol(ProtocolError::SendAfterClosing)); |
|
|
|
return Err(Error::Protocol(ProtocolError::SendAfterClosing)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if let Some(max_send_queue) = self.config.max_send_queue { |
|
|
|
|
|
|
|
if self.send_queue.len() >= max_send_queue { |
|
|
|
|
|
|
|
// Try to make some room for the new message.
|
|
|
|
|
|
|
|
// Do not return here if write would block, ignore WouldBlock silently
|
|
|
|
|
|
|
|
// since we must queue the message anyway.
|
|
|
|
|
|
|
|
self.write_pending(stream).no_block()?; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.send_queue.len() >= max_send_queue { |
|
|
|
|
|
|
|
return Err(Error::SendQueueFull(message)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
let frame = match message { |
|
|
|
let frame = match message { |
|
|
|
Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true), |
|
|
|
Message::Text(data) => self.prepare_data_frame(data.into(), OpData::Text)?, |
|
|
|
Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true), |
|
|
|
Message::Binary(data) => self.prepare_data_frame(data, OpData::Binary)?, |
|
|
|
Message::Ping(data) => Frame::ping(data), |
|
|
|
Message::Ping(data) => Frame::ping(data), |
|
|
|
Message::Pong(data) => { |
|
|
|
Message::Pong(data) => { |
|
|
|
self.set_additional(Frame::pong(data)); |
|
|
|
self.pong = Some(Frame::pong(data)); |
|
|
|
// Note: user pongs can be user flushed so no need to flush here
|
|
|
|
return self.write_pending(stream); |
|
|
|
return self._write(stream, None).map(|_| ()); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
Message::Close(code) => return self.close(stream, code), |
|
|
|
Message::Close(code) => return self.close(stream, code), |
|
|
|
Message::Frame(f) => f, |
|
|
|
Message::Frame(f) => f, |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
let should_flush = self._write(stream, Some(frame))?; |
|
|
|
self.send_queue.push_back(frame); |
|
|
|
if should_flush { |
|
|
|
self.write_pending(stream) |
|
|
|
self.flush(stream)?; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Flush writes.
|
|
|
|
fn prepare_data_frame(&mut self, data: Vec<u8>, opdata: OpData) -> Result<Frame> { |
|
|
|
///
|
|
|
|
debug_assert!(matches!(opdata, OpData::Text | OpData::Binary), "Invalid data frame kind"); |
|
|
|
/// Ensures all messages previously passed to [`write`](Self::write) and automatically
|
|
|
|
let opcode = OpCode::Data(opdata); |
|
|
|
/// queued pong responses are written & flushed into the `stream`.
|
|
|
|
let is_final = true; |
|
|
|
#[inline] |
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()> |
|
|
|
if let Some(pmce) = self.extensions.as_mut().and_then(|e| e.compression.as_mut()) { |
|
|
|
where |
|
|
|
return Ok(Frame::compressed_message(pmce.compress(&data)?, opcode, is_final)); |
|
|
|
Stream: Read + Write, |
|
|
|
} |
|
|
|
{ |
|
|
|
Ok(Frame::message(data, opcode, is_final)) |
|
|
|
self._write(stream, None)?; |
|
|
|
|
|
|
|
self.frame.write_out_buffer(stream)?; |
|
|
|
|
|
|
|
Ok(stream.flush()?) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Writes any data in the out_buffer, `additional_send` and given `data`.
|
|
|
|
/// Flush the pending send queue.
|
|
|
|
///
|
|
|
|
pub fn write_pending<Stream>(&mut self, stream: &mut Stream) -> Result<()> |
|
|
|
/// Does **not** flush.
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// Returns true if the write contents indicate we should flush immediately.
|
|
|
|
|
|
|
|
fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool> |
|
|
|
|
|
|
|
where |
|
|
|
where |
|
|
|
Stream: Read + Write, |
|
|
|
Stream: Read + Write, |
|
|
|
{ |
|
|
|
{ |
|
|
|
if let Some(data) = data { |
|
|
|
// First, make sure we have no pending frame sending.
|
|
|
|
self.buffer_frame(stream, data)?; |
|
|
|
self.frame.write_pending(stream)?; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
|
|
|
|
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
|
|
|
|
// response, unless it already received a Close frame. It SHOULD
|
|
|
|
// response, unless it already received a Close frame. It SHOULD
|
|
|
|
// respond with Pong frame as soon as is practical. (RFC 6455)
|
|
|
|
// respond with Pong frame as soon as is practical. (RFC 6455)
|
|
|
|
let should_flush = if let Some(msg) = self.additional_send.take() { |
|
|
|
if let Some(pong) = self.pong.take() { |
|
|
|
trace!("Sending pong/close"); |
|
|
|
trace!("Sending pong reply"); |
|
|
|
match self.buffer_frame(stream, msg) { |
|
|
|
self.send_one_frame(stream, pong)?; |
|
|
|
Err(Error::WriteBufferFull(Message::Frame(msg))) => { |
|
|
|
|
|
|
|
// if an system message would exceed the buffer put it back in
|
|
|
|
|
|
|
|
// `additional_send` for retry. Otherwise returning this error
|
|
|
|
|
|
|
|
// may not make sense to the user, e.g. calling `flush`.
|
|
|
|
|
|
|
|
self.set_additional(msg); |
|
|
|
|
|
|
|
false |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
Err(err) => return Err(err), |
|
|
|
// If we have any unsent frames, send them.
|
|
|
|
Ok(_) => true, |
|
|
|
trace!("Frames still in queue: {}", self.send_queue.len()); |
|
|
|
|
|
|
|
while let Some(data) = self.send_queue.pop_front() { |
|
|
|
|
|
|
|
self.send_one_frame(stream, data)?; |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
|
|
|
|
false |
|
|
|
// If we get to this point, the send queue is empty and the underlying socket is still
|
|
|
|
}; |
|
|
|
// willing to take more data.
|
|
|
|
|
|
|
|
|
|
|
|
// If we're closing and there is nothing to send anymore, we should close the connection.
|
|
|
|
// If we're closing and there is nothing to send anymore, we should close the connection.
|
|
|
|
if self.role == Role::Server && !self.state.can_read() { |
|
|
|
if self.role == Role::Server && !self.state.can_read() { |
|
|
@ -467,11 +511,10 @@ impl WebSocketContext { |
|
|
|
// maximum segment lifetimes (2MSL), while there is no corresponding
|
|
|
|
// maximum segment lifetimes (2MSL), while there is no corresponding
|
|
|
|
// server impact as a TIME_WAIT connection is immediately reopened upon
|
|
|
|
// server impact as a TIME_WAIT connection is immediately reopened upon
|
|
|
|
// a new SYN with a higher seq number). (RFC 6455)
|
|
|
|
// a new SYN with a higher seq number). (RFC 6455)
|
|
|
|
self.frame.write_out_buffer(stream)?; |
|
|
|
|
|
|
|
self.state = WebSocketState::Terminated; |
|
|
|
self.state = WebSocketState::Terminated; |
|
|
|
Err(Error::ConnectionClosed) |
|
|
|
Err(Error::ConnectionClosed) |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
Ok(should_flush) |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -479,7 +522,7 @@ impl WebSocketContext { |
|
|
|
///
|
|
|
|
///
|
|
|
|
/// This function guarantees that the close frame will be queued.
|
|
|
|
/// This function guarantees that the close frame will be queued.
|
|
|
|
/// There is no need to call it again. Calling this function is
|
|
|
|
/// There is no need to call it again. Calling this function is
|
|
|
|
/// the same as calling `send(Message::Close(..))`.
|
|
|
|
/// the same as calling `write(Message::Close(..))`.
|
|
|
|
pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()> |
|
|
|
pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()> |
|
|
|
where |
|
|
|
where |
|
|
|
Stream: Read + Write, |
|
|
|
Stream: Read + Write, |
|
|
@ -487,9 +530,11 @@ impl WebSocketContext { |
|
|
|
if let WebSocketState::Active = self.state { |
|
|
|
if let WebSocketState::Active = self.state { |
|
|
|
self.state = WebSocketState::ClosedByUs; |
|
|
|
self.state = WebSocketState::ClosedByUs; |
|
|
|
let frame = Frame::close(code); |
|
|
|
let frame = Frame::close(code); |
|
|
|
self._write(stream, Some(frame))?; |
|
|
|
self.send_queue.push_back(frame); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
// Already closed, nothing to do.
|
|
|
|
} |
|
|
|
} |
|
|
|
self.flush(stream) |
|
|
|
self.write_pending(stream) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Try to decode one message frame. May return None.
|
|
|
|
/// Try to decode one message frame. May return None.
|
|
|
@ -510,12 +555,14 @@ impl WebSocketContext { |
|
|
|
// the negotiated extensions defines the meaning of such a nonzero
|
|
|
|
// the negotiated extensions defines the meaning of such a nonzero
|
|
|
|
// value, the receiving endpoint MUST _Fail the WebSocket
|
|
|
|
// value, the receiving endpoint MUST _Fail the WebSocket
|
|
|
|
// Connection_.
|
|
|
|
// Connection_.
|
|
|
|
{ |
|
|
|
let is_compressed = { |
|
|
|
let hdr = frame.header(); |
|
|
|
let hdr = frame.header(); |
|
|
|
if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 { |
|
|
|
if (hdr.rsv1 && !self.has_compression()) || hdr.rsv2 || hdr.rsv3 { |
|
|
|
return Err(Error::Protocol(ProtocolError::NonZeroReservedBits)); |
|
|
|
return Err(Error::Protocol(ProtocolError::NonZeroReservedBits)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
hdr.rsv1 |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
match self.role { |
|
|
|
match self.role { |
|
|
|
Role::Server => { |
|
|
|
Role::Server => { |
|
|
@ -550,6 +597,10 @@ impl WebSocketContext { |
|
|
|
_ if frame.payload().len() > 125 => { |
|
|
|
_ if frame.payload().len() > 125 => { |
|
|
|
Err(Error::Protocol(ProtocolError::ControlFrameTooBig)) |
|
|
|
Err(Error::Protocol(ProtocolError::ControlFrameTooBig)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Control frames must not have compress bit.
|
|
|
|
|
|
|
|
_ if is_compressed => { |
|
|
|
|
|
|
|
Err(Error::Protocol(ProtocolError::CompressedControlFrame)) |
|
|
|
|
|
|
|
} |
|
|
|
OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)), |
|
|
|
OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)), |
|
|
|
OpCtl::Reserved(i) => { |
|
|
|
OpCtl::Reserved(i) => { |
|
|
|
Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i))) |
|
|
|
Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i))) |
|
|
@ -558,7 +609,7 @@ impl WebSocketContext { |
|
|
|
let data = frame.into_data(); |
|
|
|
let data = frame.into_data(); |
|
|
|
// No ping processing after we sent a close frame.
|
|
|
|
// No ping processing after we sent a close frame.
|
|
|
|
if self.state.is_active() { |
|
|
|
if self.state.is_active() { |
|
|
|
self.set_additional(Frame::pong(data.clone())); |
|
|
|
self.pong = Some(Frame::pong(data.clone())); |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(Some(Message::Ping(data))) |
|
|
|
Ok(Some(Message::Ping(data))) |
|
|
|
} |
|
|
|
} |
|
|
@ -570,39 +621,34 @@ impl WebSocketContext { |
|
|
|
let fin = frame.header().is_final; |
|
|
|
let fin = frame.header().is_final; |
|
|
|
match data { |
|
|
|
match data { |
|
|
|
OpData::Continue => { |
|
|
|
OpData::Continue => { |
|
|
|
if let Some(ref mut msg) = self.incomplete { |
|
|
|
if self.incomplete.is_some() && is_compressed { |
|
|
|
msg.extend(frame.into_data(), self.config.max_message_size)?; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
return Err(Error::Protocol( |
|
|
|
return Err(Error::Protocol( |
|
|
|
ProtocolError::UnexpectedContinueFrame, |
|
|
|
ProtocolError::CompressedContinueFrame, |
|
|
|
)); |
|
|
|
)); |
|
|
|
} |
|
|
|
} |
|
|
|
if fin { |
|
|
|
|
|
|
|
Ok(Some(self.incomplete.take().unwrap().complete()?)) |
|
|
|
let msg = self |
|
|
|
} else { |
|
|
|
.incomplete |
|
|
|
Ok(None) |
|
|
|
.take() |
|
|
|
} |
|
|
|
.ok_or(Error::Protocol(ProtocolError::UnexpectedContinueFrame))?; |
|
|
|
|
|
|
|
self.extend_incomplete(msg, frame.into_data(), fin) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
c if self.incomplete.is_some() => { |
|
|
|
c if self.incomplete.is_some() => { |
|
|
|
Err(Error::Protocol(ProtocolError::ExpectedFragment(c))) |
|
|
|
Err(Error::Protocol(ProtocolError::ExpectedFragment(c))) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
OpData::Text | OpData::Binary => { |
|
|
|
OpData::Text | OpData::Binary => { |
|
|
|
let msg = { |
|
|
|
|
|
|
|
let message_type = match data { |
|
|
|
let message_type = match data { |
|
|
|
OpData::Text => IncompleteMessageType::Text, |
|
|
|
OpData::Text => IncompleteMessageType::Text, |
|
|
|
OpData::Binary => IncompleteMessageType::Binary, |
|
|
|
OpData::Binary => IncompleteMessageType::Binary, |
|
|
|
_ => panic!("Bug: message is not text nor binary"), |
|
|
|
_ => panic!("Bug: message is not text nor binary"), |
|
|
|
}; |
|
|
|
}; |
|
|
|
let mut m = IncompleteMessage::new(message_type); |
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
m.extend(frame.into_data(), self.config.max_message_size)?; |
|
|
|
let msg = IncompleteMessage::new(message_type, is_compressed); |
|
|
|
m |
|
|
|
#[cfg(not(feature = "deflate"))] |
|
|
|
}; |
|
|
|
let msg = IncompleteMessage::new(message_type); |
|
|
|
if fin { |
|
|
|
self.extend_incomplete(msg, frame.into_data(), fin) |
|
|
|
Ok(Some(msg.complete()?)) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
self.incomplete = Some(msg); |
|
|
|
|
|
|
|
Ok(None) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
OpData::Reserved(i) => { |
|
|
|
OpData::Reserved(i) => { |
|
|
|
Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i))) |
|
|
|
Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i))) |
|
|
@ -621,6 +667,32 @@ impl WebSocketContext { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn extend_incomplete( |
|
|
|
|
|
|
|
&mut self, |
|
|
|
|
|
|
|
mut msg: IncompleteMessage, |
|
|
|
|
|
|
|
data: Vec<u8>, |
|
|
|
|
|
|
|
is_final: bool, |
|
|
|
|
|
|
|
) -> Result<Option<Message>> { |
|
|
|
|
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
|
|
|
|
let data = if msg.compressed() { |
|
|
|
|
|
|
|
// `msg.compressed()` is only true when compression is enabled so it's safe to unwrap
|
|
|
|
|
|
|
|
self.extensions |
|
|
|
|
|
|
|
.as_mut() |
|
|
|
|
|
|
|
.and_then(|x| x.compression.as_mut()) |
|
|
|
|
|
|
|
.unwrap() |
|
|
|
|
|
|
|
.decompress(data, is_final)? |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
data |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
msg.extend(data, self.config.max_message_size)?; |
|
|
|
|
|
|
|
if is_final { |
|
|
|
|
|
|
|
Ok(Some(msg.complete()?)) |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
self.incomplete = Some(msg); |
|
|
|
|
|
|
|
Ok(None) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Received a close frame. Tells if we need to return a close frame to the user.
|
|
|
|
/// Received a close frame. Tells if we need to return a close frame to the user.
|
|
|
|
#[allow(clippy::option_option)] |
|
|
|
#[allow(clippy::option_option)] |
|
|
|
fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> { |
|
|
|
fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> { |
|
|
@ -642,7 +714,7 @@ impl WebSocketContext { |
|
|
|
|
|
|
|
|
|
|
|
let reply = Frame::close(close.clone()); |
|
|
|
let reply = Frame::close(close.clone()); |
|
|
|
debug!("Replying to close with {:?}", reply); |
|
|
|
debug!("Replying to close with {:?}", reply); |
|
|
|
self.set_additional(reply); |
|
|
|
self.send_queue.push_back(reply); |
|
|
|
|
|
|
|
|
|
|
|
Some(close) |
|
|
|
Some(close) |
|
|
|
} |
|
|
|
} |
|
|
@ -659,8 +731,8 @@ impl WebSocketContext { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Write a single frame into the write-buffer.
|
|
|
|
/// Send a single pending frame.
|
|
|
|
fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> |
|
|
|
fn send_one_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> |
|
|
|
where |
|
|
|
where |
|
|
|
Stream: Read + Write, |
|
|
|
Stream: Read + Write, |
|
|
|
{ |
|
|
|
{ |
|
|
@ -674,17 +746,17 @@ impl WebSocketContext { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
trace!("Sending frame: {:?}", frame); |
|
|
|
trace!("Sending frame: {:?}", frame); |
|
|
|
self.frame.buffer_frame(stream, frame).check_connection_reset(self.state) |
|
|
|
self.frame.write_frame(stream, frame).check_connection_reset(self.state) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Replace `additional_send` if it is currently a `Pong` message.
|
|
|
|
fn has_compression(&self) -> bool { |
|
|
|
fn set_additional(&mut self, add: Frame) { |
|
|
|
#[cfg(feature = "deflate")] |
|
|
|
let empty_or_pong = self |
|
|
|
{ |
|
|
|
.additional_send |
|
|
|
self.extensions.as_ref().and_then(|c| c.compression.as_ref()).is_some() |
|
|
|
.as_ref() |
|
|
|
} |
|
|
|
.map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong)); |
|
|
|
#[cfg(not(feature = "deflate"))] |
|
|
|
if empty_or_pong { |
|
|
|
{ |
|
|
|
self.additional_send.replace(add); |
|
|
|
false |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -718,7 +790,7 @@ impl WebSocketState { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/// Check if the state is active, return error if not.
|
|
|
|
/// Check if the state is active, return error if not.
|
|
|
|
fn check_not_terminated(self) -> Result<()> { |
|
|
|
fn check_active(self) -> Result<()> { |
|
|
|
match self { |
|
|
|
match self { |
|
|
|
WebSocketState::Terminated => Err(Error::AlreadyClosed), |
|
|
|
WebSocketState::Terminated => Err(Error::AlreadyClosed), |
|
|
|
_ => Ok(()), |
|
|
|
_ => Ok(()), |
|
|
@ -770,6 +842,64 @@ mod tests { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct WouldBlockStreamMoc; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl io::Write for WouldBlockStreamMoc { |
|
|
|
|
|
|
|
fn write(&mut self, _: &[u8]) -> io::Result<usize> { |
|
|
|
|
|
|
|
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
fn flush(&mut self) -> io::Result<()> { |
|
|
|
|
|
|
|
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl io::Read for WouldBlockStreamMoc { |
|
|
|
|
|
|
|
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { |
|
|
|
|
|
|
|
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|
|
|
|
fn queue_logic() { |
|
|
|
|
|
|
|
// Create a socket with the queue size of 1.
|
|
|
|
|
|
|
|
let mut socket = WebSocket::from_raw_socket( |
|
|
|
|
|
|
|
WouldBlockStreamMoc, |
|
|
|
|
|
|
|
Role::Client, |
|
|
|
|
|
|
|
Some(WebSocketConfig { max_send_queue: Some(1), ..Default::default() }), |
|
|
|
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Test message that we're going to send.
|
|
|
|
|
|
|
|
let message = Message::Binary(vec![0xFF; 1024]); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Helper to check the error.
|
|
|
|
|
|
|
|
let assert_would_block = |error| { |
|
|
|
|
|
|
|
if let Error::Io(io_error) = error { |
|
|
|
|
|
|
|
assert_eq!(io_error.kind(), io::ErrorKind::WouldBlock); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
panic!("Expected WouldBlock error"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// The first attempt of writing must not fail, since the queue is empty at start.
|
|
|
|
|
|
|
|
// But since the underlying mock object always returns `WouldBlock`, so is the result.
|
|
|
|
|
|
|
|
assert_would_block(dbg!(socket.write_message(message.clone()).unwrap_err())); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Any subsequent attempts must return an error telling that the queue is full.
|
|
|
|
|
|
|
|
for _i in 0..100 { |
|
|
|
|
|
|
|
assert!(matches!( |
|
|
|
|
|
|
|
socket.write_message(message.clone()).unwrap_err(), |
|
|
|
|
|
|
|
Error::SendQueueFull(..) |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// The size of the output buffer must not be bigger than the size of that message
|
|
|
|
|
|
|
|
// that we managed to write to the output buffer at first. Since we could not make
|
|
|
|
|
|
|
|
// any progress (because of the logic of the moc buffer), the size remains unchanged.
|
|
|
|
|
|
|
|
if socket.context.frame.output_buffer_len() > message.len() { |
|
|
|
|
|
|
|
panic!("Too many frames in the queue"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
#[test] |
|
|
|
fn receive_messages() { |
|
|
|
fn receive_messages() { |
|
|
|
let incoming = Cursor::new(vec![ |
|
|
|
let incoming = Cursor::new(vec![ |
|
|
@ -778,10 +908,10 @@ mod tests { |
|
|
|
0x03, |
|
|
|
0x03, |
|
|
|
]); |
|
|
|
]); |
|
|
|
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None); |
|
|
|
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None); |
|
|
|
assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2])); |
|
|
|
assert_eq!(socket.read_message().unwrap(), Message::Ping(vec![1, 2])); |
|
|
|
assert_eq!(socket.read().unwrap(), Message::Pong(vec![3])); |
|
|
|
assert_eq!(socket.read_message().unwrap(), Message::Pong(vec![3])); |
|
|
|
assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into())); |
|
|
|
assert_eq!(socket.read_message().unwrap(), Message::Text("Hello, World!".into())); |
|
|
|
assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); |
|
|
|
assert_eq!(socket.read_message().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
#[test] |
|
|
@ -794,7 +924,7 @@ mod tests { |
|
|
|
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); |
|
|
|
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); |
|
|
|
|
|
|
|
|
|
|
|
assert!(matches!( |
|
|
|
assert!(matches!( |
|
|
|
socket.read(), |
|
|
|
socket.read_message(), |
|
|
|
Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 })) |
|
|
|
Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 })) |
|
|
|
)); |
|
|
|
)); |
|
|
|
} |
|
|
|
} |
|
|
@ -806,7 +936,7 @@ mod tests { |
|
|
|
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); |
|
|
|
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); |
|
|
|
|
|
|
|
|
|
|
|
assert!(matches!( |
|
|
|
assert!(matches!( |
|
|
|
socket.read(), |
|
|
|
socket.read_message(), |
|
|
|
Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 })) |
|
|
|
Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 })) |
|
|
|
)); |
|
|
|
)); |
|
|
|
} |
|
|
|
} |
|
|
|