//! Generic WebSocket message stream. pub mod frame; mod message; pub use self::{frame::CloseFrame, message::Message}; use self::{ frame::{ coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode}, Frame, FrameCodec, }, message::{IncompleteMessage, IncompleteMessageType}, }; use crate::{ error::{Error, ProtocolError, Result}, util::NonBlockingResult, }; use log::*; use std::{ io::{ErrorKind as IoErrorKind, Read, Write}, mem::replace, }; /// Indicates a Client or Server role of the websocket #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Role { /// This socket is a server Server, /// This socket is a client Client, } /// The configuration for WebSocket connection. #[derive(Debug, Clone, Copy)] pub struct WebSocketConfig { /// Does nothing, instead use `max_write_buffer_size`. #[deprecated] pub max_send_queue: Option, /// The max size of the write buffer in bytes. Setting this can provide backpressure. /// The default value is unlimited. pub max_write_buffer_size: usize, /// The maximum size of a message. `None` means no size limit. The default value is 64 MiB /// which should be reasonably big for all normal use-cases but small enough to prevent /// memory eating by a malicious user. pub max_message_size: Option, /// The maximum size of a single message frame. `None` means no size limit. The limit is for /// frame payload NOT including the frame header. The default value is 16 MiB which should /// be reasonably big for all normal use-cases but small enough to prevent memory eating /// by a malicious user. pub max_frame_size: Option, /// When set to `true`, the server will accept and handle unmasked frames /// from the client. According to the RFC 6455, the server must close the /// connection to the client in such cases, however it seems like there are /// some popular libraries that are sending unmasked frames, ignoring the RFC. /// By default this option is set to `false`, i.e. according to RFC 6455. pub accept_unmasked_frames: bool, } impl Default for WebSocketConfig { fn default() -> Self { #[allow(deprecated)] WebSocketConfig { max_send_queue: None, max_write_buffer_size: usize::MAX, max_message_size: Some(64 << 20), max_frame_size: Some(16 << 20), accept_unmasked_frames: false, } } } /// WebSocket input-output stream. /// /// This is THE structure you want to create to be able to speak the WebSocket protocol. /// It may be created by calling `connect`, `accept` or `client` functions. /// /// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages. #[derive(Debug)] pub struct WebSocket { /// The underlying socket. socket: Stream, /// The context for managing a WebSocket. context: WebSocketContext, } impl WebSocket { /// Convert a raw socket into a WebSocket without performing a handshake. /// /// Call this function if you're using Tungstenite as a part of a web framework /// or together with an existing one. If you need an initial handshake, use /// `connect()` or `accept()` functions of the crate to construct a websocket. pub fn from_raw_socket(stream: Stream, role: Role, config: Option) -> Self { WebSocket { socket: stream, context: WebSocketContext::new(role, config) } } /// Convert a raw socket into a WebSocket without performing a handshake. /// /// Call this function if you're using Tungstenite as a part of a web framework /// or together with an existing one. If you need an initial handshake, use /// `connect()` or `accept()` functions of the crate to construct a websocket. pub fn from_partially_read( stream: Stream, part: Vec, role: Role, config: Option, ) -> Self { WebSocket { socket: stream, context: WebSocketContext::from_partially_read(part, role, config), } } /// Returns a shared reference to the inner stream. pub fn get_ref(&self) -> &Stream { &self.socket } /// Returns a mutable reference to the inner stream. pub fn get_mut(&mut self) -> &mut Stream { &mut self.socket } /// Change the configuration. pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { self.context.set_config(set_func) } /// Read the configuration. pub fn get_config(&self) -> &WebSocketConfig { self.context.get_config() } /// Check if it is possible to read messages. /// /// Reading is impossible after receiving `Message::Close`. It is still possible after /// sending close frame since the peer still may send some data before confirming close. pub fn can_read(&self) -> bool { self.context.can_read() } /// Check if it is possible to write messages. /// /// Writing gets impossible immediately after sending or receiving `Message::Close`. pub fn can_write(&self) -> bool { self.context.can_write() } } impl WebSocket { /// Read a message from stream, if possible. /// /// This will also queue responses to ping and close messages. These responses /// will be written and flushed on the next call to [`read`](Self::read), /// [`write`](Self::write) or [`flush`](Self::flush). /// /// # Closing the connection /// When the remote endpoint decides to close the connection this will return /// the close message with an optional close frame. /// /// You should continue calling [`read`](Self::read), [`write`](Self::write) or /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`] /// is returned. Once that happens it is safe to drop the underlying connection. pub fn read(&mut self) -> Result { self.context.read(&mut self.socket) } /// Writes and immediately flushes a message. /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush). pub fn send(&mut self, message: Message) -> Result<()> { self.write(message)?; self.flush() } /// Write a message to the provided stream, if possible. /// /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. /// /// In the event of stream write failure the message frame will be stored /// in the write buffer and will try again on the next call to [`write`](Self::write) /// or [`flush`](Self::flush). /// /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. /// /// This call will generally not flush. However, if there are queued automatic messages /// they will be written and eagerly flushed. /// /// For example, upon receiving ping messages tungstenite queues pong replies automatically. /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush) /// will write & flush the pong reply. This means you should not respond to ping frames manually. /// /// You can however send pong frames manually in order to indicate a unidirectional heartbeat /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the /// ping will not be sent as it will be replaced by your custom pong message. /// /// # Errors /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned /// along with the equivalent passed message frame. /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`]. /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from /// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program /// error on your part. /// - [`Error::Io`] is returned if the underlying connection returns an error /// (consider these fatal except for WouldBlock). /// - [`Error::Capacity`] if your message size is bigger than the configured max message size. pub fn write(&mut self, message: Message) -> Result<()> { self.context.write(&mut self.socket, message) } /// Flush writes. /// /// Ensures all messages previously passed to [`write`](Self::write) and automatic /// queued pong responses are written & flushed into the underlying stream. pub fn flush(&mut self) -> Result<()> { self.context.flush(&mut self.socket) } /// Close the connection. /// /// This function guarantees that the close frame will be queued. /// There is no need to call it again. Calling this function is /// the same as calling `write(Message::Close(..))`. /// /// After queuing the close frame you should continue calling [`read`](Self::read) or /// [`flush`](Self::flush) to drive the close handshake to completion. /// /// The websocket RFC defines that the underlying connection should be closed /// by the server. Tungstenite takes care of this asymmetry for you. /// /// When the close handshake is finished (we have both sent and received /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return /// [Error::ConnectionClosed] if this endpoint is the server. /// /// If this endpoint is a client, [Error::ConnectionClosed] will only be /// returned after the server has closed the underlying connection. /// /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed] /// is returned from [`read`](Self::read) or [`flush`](Self::flush). pub fn close(&mut self, code: Option) -> Result<()> { self.context.close(&mut self.socket, code) } /// Old name for [`read`](Self::read). #[deprecated(note = "Use `read`")] pub fn read_message(&mut self) -> Result { self.read() } /// Old name for [`send`](Self::send). #[deprecated(note = "Use `send`")] pub fn write_message(&mut self, message: Message) -> Result<()> { self.send(message) } /// Old name for [`flush`](Self::flush). #[deprecated(note = "Use `flush`")] pub fn write_pending(&mut self) -> Result<()> { self.flush() } } /// A context for managing WebSocket stream. #[derive(Debug)] pub struct WebSocketContext { /// Server or client? role: Role, /// encoder/decoder of frame. frame: FrameCodec, /// The state of processing, either "active" or "closing". state: WebSocketState, /// Receive: an incomplete message being processed. incomplete: Option, /// Send in addition to regular messages E.g. "pong" or "close". additional_send: Option, /// The configuration for the websocket session. config: WebSocketConfig, } impl WebSocketContext { /// Create a WebSocket context that manages a post-handshake stream. pub fn new(role: Role, config: Option) -> Self { let config = config.unwrap_or_default(); WebSocketContext { role, frame: FrameCodec::new().with_max_out_buffer_len(config.max_write_buffer_size), state: WebSocketState::Active, incomplete: None, additional_send: None, config, } } /// Create a WebSocket context that manages an post-handshake stream. pub fn from_partially_read(part: Vec, role: Role, config: Option) -> Self { let config = config.unwrap_or_default(); WebSocketContext { frame: FrameCodec::from_partially_read(part) .with_max_out_buffer_len(config.max_write_buffer_size), ..WebSocketContext::new(role, Some(config)) } } /// Change the configuration. pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { set_func(&mut self.config); self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size); } /// Read the configuration. pub fn get_config(&self) -> &WebSocketConfig { &self.config } /// Check if it is possible to read messages. /// /// Reading is impossible after receiving `Message::Close`. It is still possible after /// sending close frame since the peer still may send some data before confirming close. pub fn can_read(&self) -> bool { self.state.can_read() } /// Check if it is possible to write messages. /// /// Writing gets impossible immediately after sending or receiving `Message::Close`. pub fn can_write(&self) -> bool { self.state.is_active() } /// Read a message from the provided stream, if possible. /// /// This function sends pong and close responses automatically. /// However, it never blocks on write. pub fn read(&mut self, stream: &mut Stream) -> Result where Stream: Read + Write, { // Do not read from already closed connections. self.state.check_not_terminated()?; loop { if self.additional_send.is_some() { // Since we may get ping or close, we need to reply to the messages even during read. // Thus we flush but ignore its blocking. self.flush(stream).no_block()?; } else if self.role == Role::Server && !self.state.can_read() { self.state = WebSocketState::Terminated; return Err(Error::ConnectionClosed); } // If we get here, either write blocks or we have nothing to write. // Thus if read blocks, just let it return WouldBlock. if let Some(message) = self.read_message_frame(stream)? { trace!("Received message {}", message); return Ok(message); } } } /// Write a message to the provided stream. /// /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. /// /// In the event of stream write failure the message frame will be stored /// in the write buffer and will try again on the next call to [`write`](Self::write) /// or [`flush`](Self::flush). /// /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. pub fn write(&mut self, stream: &mut Stream, message: Message) -> Result<()> where Stream: Read + Write, { // When terminated, return AlreadyClosed. self.state.check_not_terminated()?; // Do not write after sending a close frame. if !self.state.is_active() { return Err(Error::Protocol(ProtocolError::SendAfterClosing)); } let frame = match message { Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true), Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true), Message::Ping(data) => Frame::ping(data), Message::Pong(data) => { self.set_additional(Frame::pong(data)); // Note: user pongs can be user flushed so no need to flush here return self._write(stream, None).map(|_| ()); } Message::Close(code) => return self.close(stream, code), Message::Frame(f) => f, }; let should_flush = self._write(stream, Some(frame))?; if should_flush { self.flush(stream)?; } Ok(()) } /// Flush writes. /// /// Ensures all messages previously passed to [`write`](Self::write) and automatically /// queued pong responses are written & flushed into the `stream`. #[inline] pub fn flush(&mut self, stream: &mut Stream) -> Result<()> where Stream: Read + Write, { self._write(stream, None)?; Ok(stream.flush()?) } /// Writes any data in the out_buffer, `additional_send` and given `data`. /// /// Does **not** flush. /// /// Returns true if the write contents indicate we should flush immediately. fn _write(&mut self, stream: &mut Stream, data: Option) -> Result where Stream: Read + Write, { match data { Some(data) => self.write_one_frame(stream, data)?, None => self.frame.write_out_buffer(stream)?, } // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // response, unless it already received a Close frame. It SHOULD // respond with Pong frame as soon as is practical. (RFC 6455) let should_flush = if let Some(msg) = self.additional_send.take() { trace!("Sending pong/close"); match self.write_one_frame(stream, msg) { Err(Error::WriteBufferFull(Message::Frame(msg))) => { // if an system message would exceed the buffer put it back in // `additional_send` for retry. Otherwise returning this error // may not make sense to the user, e.g. calling `flush`. self.set_additional(msg); false } Err(err) => return Err(err), Ok(_) => true, } } else { false }; // If we're closing and there is nothing to send anymore, we should close the connection. if self.role == Role::Server && !self.state.can_read() { // The underlying TCP connection, in most normal cases, SHOULD be closed // first by the server, so that it holds the TIME_WAIT state and not the // client (as this would prevent it from re-opening the connection for 2 // maximum segment lifetimes (2MSL), while there is no corresponding // server impact as a TIME_WAIT connection is immediately reopened upon // a new SYN with a higher seq number). (RFC 6455) self.state = WebSocketState::Terminated; Err(Error::ConnectionClosed) } else { Ok(should_flush) } } /// Close the connection. /// /// This function guarantees that the close frame will be queued. /// There is no need to call it again. Calling this function is /// the same as calling `write(Message::Close(..))`. pub fn close(&mut self, stream: &mut Stream, code: Option) -> Result<()> where Stream: Read + Write, { if let WebSocketState::Active = self.state { self.state = WebSocketState::ClosedByUs; let frame = Frame::close(code); self._write(stream, Some(frame))?; } else { // Already closed, nothing to do. } Ok(stream.flush()?) } /// Try to decode one message frame. May return None. fn read_message_frame(&mut self, stream: &mut Stream) -> Result> where Stream: Read + Write, { if let Some(mut frame) = self .frame .read_frame(stream, self.config.max_frame_size) .check_connection_reset(self.state)? { if !self.state.can_read() { return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing)); } // MUST be 0 unless an extension is negotiated that defines meanings // for non-zero values. If a nonzero value is received and none of // the negotiated extensions defines the meaning of such a nonzero // value, the receiving endpoint MUST _Fail the WebSocket // Connection_. { let hdr = frame.header(); if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 { return Err(Error::Protocol(ProtocolError::NonZeroReservedBits)); } } match self.role { Role::Server => { if frame.is_masked() { // A server MUST remove masking for data frames received from a client // as described in Section 5.3. (RFC 6455) frame.apply_mask() } else if !self.config.accept_unmasked_frames { // The server MUST close the connection upon receiving a // frame that is not masked. (RFC 6455) // The only exception here is if the user explicitly accepts given // stream by setting WebSocketConfig.accept_unmasked_frames to true return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient)); } } Role::Client => { if frame.is_masked() { // A client MUST close a connection if it detects a masked frame. (RFC 6455) return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer)); } } } match frame.header().opcode { OpCode::Control(ctl) => { match ctl { // All control frames MUST have a payload length of 125 bytes or less // and MUST NOT be fragmented. (RFC 6455) _ if !frame.header().is_final => { Err(Error::Protocol(ProtocolError::FragmentedControlFrame)) } _ if frame.payload().len() > 125 => { Err(Error::Protocol(ProtocolError::ControlFrameTooBig)) } OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)), OpCtl::Reserved(i) => { Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i))) } OpCtl::Ping => { let data = frame.into_data(); // No ping processing after we sent a close frame. if self.state.is_active() { self.set_additional(Frame::pong(data.clone())); } Ok(Some(Message::Ping(data))) } OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))), } } OpCode::Data(data) => { let fin = frame.header().is_final; match data { OpData::Continue => { if let Some(ref mut msg) = self.incomplete { msg.extend(frame.into_data(), self.config.max_message_size)?; } else { return Err(Error::Protocol( ProtocolError::UnexpectedContinueFrame, )); } if fin { Ok(Some(self.incomplete.take().unwrap().complete()?)) } else { Ok(None) } } c if self.incomplete.is_some() => { Err(Error::Protocol(ProtocolError::ExpectedFragment(c))) } OpData::Text | OpData::Binary => { let msg = { let message_type = match data { OpData::Text => IncompleteMessageType::Text, OpData::Binary => IncompleteMessageType::Binary, _ => panic!("Bug: message is not text nor binary"), }; let mut m = IncompleteMessage::new(message_type); m.extend(frame.into_data(), self.config.max_message_size)?; m }; if fin { Ok(Some(msg.complete()?)) } else { self.incomplete = Some(msg); Ok(None) } } OpData::Reserved(i) => { Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i))) } } } } // match opcode } else { // Connection closed by peer match replace(&mut self.state, WebSocketState::Terminated) { WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => { Err(Error::ConnectionClosed) } _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)), } } } /// Received a close frame. Tells if we need to return a close frame to the user. #[allow(clippy::option_option)] fn do_close<'t>(&mut self, close: Option>) -> Option>> { debug!("Received close frame: {:?}", close); match self.state { WebSocketState::Active => { self.state = WebSocketState::ClosedByPeer; let close = close.map(|frame| { if !frame.code.is_allowed() { CloseFrame { code: CloseCode::Protocol, reason: "Protocol violation".into(), } } else { frame } }); let reply = Frame::close(close.clone()); debug!("Replying to close with {:?}", reply); self.set_additional(reply); Some(close) } WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => { // It is already closed, just ignore. None } WebSocketState::ClosedByUs => { // We received a reply. self.state = WebSocketState::CloseAcknowledged; Some(close) } WebSocketState::Terminated => unreachable!(), } } /// Write a single frame into the stream via the write-buffer. fn write_one_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> where Stream: Read + Write, { match self.role { Role::Server => {} Role::Client => { // 5. If the data is being sent by the client, the frame(s) MUST be // masked as defined in Section 5.3. (RFC 6455) frame.set_random_mask(); } } trace!("Sending frame: {:?}", frame); self.frame.write_frame(stream, frame).check_connection_reset(self.state) } /// Replace `additional_send` if it is currently a `Pong` message. fn set_additional(&mut self, add: Frame) { let empty_or_pong = self .additional_send .as_ref() .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong)); if empty_or_pong { self.additional_send.replace(add); } } } /// The current connection state. #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum WebSocketState { /// The connection is active. Active, /// We initiated a close handshake. ClosedByUs, /// The peer initiated a close handshake. ClosedByPeer, /// The peer replied to our close handshake. CloseAcknowledged, /// The connection does not exist anymore. Terminated, } impl WebSocketState { /// Tell if we're allowed to process normal messages. fn is_active(self) -> bool { matches!(self, WebSocketState::Active) } /// Tell if we should process incoming data. Note that if we send a close frame /// but the remote hasn't confirmed, they might have sent data before they receive our /// close frame, so we should still pass those to client code, hence ClosedByUs is valid. fn can_read(self) -> bool { matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs) } /// Check if the state is active, return error if not. fn check_not_terminated(self) -> Result<()> { match self { WebSocketState::Terminated => Err(Error::AlreadyClosed), _ => Ok(()), } } } /// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate. trait CheckConnectionReset { fn check_connection_reset(self, state: WebSocketState) -> Self; } impl CheckConnectionReset for Result { fn check_connection_reset(self, state: WebSocketState) -> Self { match self { Err(Error::Io(io_error)) => Err({ if !state.can_read() && io_error.kind() == IoErrorKind::ConnectionReset { Error::ConnectionClosed } else { Error::Io(io_error) } }), x => x, } } } #[cfg(test)] mod tests { use super::{Message, Role, WebSocket, WebSocketConfig}; use crate::error::{CapacityError, Error}; use std::{io, io::Cursor}; struct WriteMoc(Stream); impl io::Write for WriteMoc { fn write(&mut self, buf: &[u8]) -> io::Result { Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl io::Read for WriteMoc { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } #[test] fn receive_messages() { let incoming = Cursor::new(vec![ 0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02, 0x03, ]); let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None); assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2])); assert_eq!(socket.read().unwrap(), Message::Pong(vec![3])); assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into())); assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); } #[test] fn size_limiting_text_fragmented() { let incoming = Cursor::new(vec![ 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, ]); let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() }; let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); assert!(matches!( socket.read(), Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 })) )); } #[test] fn size_limiting_binary() { let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]); let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() }; let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); assert!(matches!( socket.read(), Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 })) )); } }