From 5742c822ee14668b7948c4cc8b725042451ab821 Mon Sep 17 00:00:00 2001 From: Teddy DeRego Date: Fri, 23 Mar 2018 17:40:08 -0700 Subject: [PATCH] WebSocket::write_message will no longer buffer unlimited messages - it will only buffer a configurable number of outbound messages. Needed for snapview/tokio-tungstenite#35. --- Cargo.toml | 4 ++-- src/error.rs | 5 ++++ src/protocol/mod.rs | 56 ++++++++++++++++++++++++++------------------- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9c69d14..b347b4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ authors = ["Alexey Galakhov"] license = "MIT/Apache-2.0" readme = "README.md" homepage = "https://github.com/snapview/tungstenite-rs" -documentation = "https://docs.rs/tungstenite/0.5.3" +documentation = "https://docs.rs/tungstenite/0.5.4" repository = "https://github.com/snapview/tungstenite-rs" -version = "0.5.3" +version = "0.5.4" [features] default = ["tls"] diff --git a/src/error.rs b/src/error.rs index 4b027fd..543774c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,6 +12,7 @@ use std::string; use httparse; use protocol::frame::CloseFrame; +use protocol::Message; #[cfg(feature="tls")] pub mod tls { @@ -36,6 +37,8 @@ pub enum Error { Capacity(Cow<'static, str>), /// Protocol violation Protocol(Cow<'static, str>), + /// Message send queue full + SendQueueFull(Message), /// UTF coding error Utf8, /// Invlid URL. @@ -59,6 +62,7 @@ impl fmt::Display for Error { Error::Tls(ref err) => write!(f, "TLS error: {}", err), Error::Capacity(ref msg) => write!(f, "Space limit exceeded: {}", msg), Error::Protocol(ref msg) => write!(f, "WebSocket protocol error: {}", msg), + Error::SendQueueFull(_) => write!(f, "Send queue is full"), Error::Utf8 => write!(f, "UTF-8 encoding error"), Error::Url(ref msg) => write!(f, "URL error: {}", msg), Error::Http(code) => write!(f, "HTTP code: {}", code), @@ -75,6 +79,7 @@ impl ErrorTrait for Error { Error::Tls(ref err) => err.description(), Error::Capacity(ref msg) => msg.borrow(), Error::Protocol(ref msg) => msg.borrow(), + Error::SendQueueFull(_) => "Send queue is full", Error::Utf8 => "", Error::Url(ref msg) => msg.borrow(), Error::Http(_) => "", diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 4b78ba2..a7ea9b2 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -42,6 +42,7 @@ pub struct WebSocket { incomplete: Option, /// Send: a data send queue. send_queue: VecDeque, + max_send_queue: usize, /// Send: an OOB pong message. pong: Option, } @@ -74,6 +75,7 @@ impl WebSocket { state: WebSocketState::Active, incomplete: None, send_queue: VecDeque::new(), + max_send_queue: 1, pong: None, } } @@ -101,13 +103,20 @@ impl WebSocket { /// Send a message to stream, if possible. /// - /// This function guarantees that the frame is queued regardless of any errors. - /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, - /// call write_pending() afterwards. + /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping + /// and Close requests. If the WebSocket's send queue is full, SendQueueFull will be returned + /// along with the passed message. Otherwise, the message is queued and Ok(()) is returned. /// /// Note that only the last pong frame is stored to be sent, and only the - /// most recent pong frame is sent if multiple pong frames are queued up. + /// most recent pong frame is sent if multiple pong frames are queued pub fn write_message(&mut self, message: Message) -> Result<()> { + // Try to make some room for the new message + self.write_pending().no_block()?; + + if self.send_queue.len() >= self.max_send_queue { + return Err(Error::SendQueueFull(message)); + } + let frame = match message { Message::Text(data) => { Frame::message(data.into(), OpCode::Data(OpData::Text), true) @@ -122,13 +131,13 @@ impl WebSocket { } }; self.send_queue.push_back(frame); - self.write_pending() + Ok(()) } /// Close the connection. /// /// This function guarantees that the close frame will be queued. - /// There is no need to call it again, just like write_message(). + /// There is no need to call it again. pub fn close(&mut self, code: Option) -> Result<()> { if let WebSocketState::Active = self.state { self.state = WebSocketState::ClosedByUs; @@ -151,7 +160,7 @@ impl WebSocket { // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // response, unless it already received a Close frame. It SHOULD // respond with Pong frame as soon as is practical. (RFC 6455) - if let Some(pong) = replace(&mut self.pong, None) { + if let Some(pong) = self.pong.take() { self.send_one_frame(pong)?; } // If we have any unsent frames, send them. @@ -159,21 +168,20 @@ impl WebSocket { self.send_one_frame(data)?; } + // If we get to this point, the send queue is empty and the underlying socket is still + // willing to take more data. + // If we're closing and there is nothing to send anymore, we should close the connection. - if self.send_queue.is_empty() { - if let WebSocketState::ClosedByPeer(ref mut frame) = self.state { - // The underlying TCP connection, in most normal cases, SHOULD be closed - // first by the server, so that it holds the TIME_WAIT state and not the - // client (as this would prevent it from re-opening the connection for 2 - // maximum segment lifetimes (2MSL), while there is no corresponding - // server impact as a TIME_WAIT connection is immediately reopened upon - // a new SYN with a higher seq number). (RFC 6455) - match self.role { - Role::Client => Ok(()), - Role::Server => Err(Error::ConnectionClosed(replace(frame, None))), - } - } else { - Ok(()) + if let WebSocketState::ClosedByPeer(ref mut frame) = self.state { + // The underlying TCP connection, in most normal cases, SHOULD be closed + // first by the server, so that it holds the TIME_WAIT state and not the + // client (as this would prevent it from re-opening the connection for 2 + // maximum segment lifetimes (2MSL), while there is no corresponding + // server impact as a TIME_WAIT connection is immediately reopened upon + // a new SYN with a higher seq number). (RFC 6455) + match self.role { + Role::Client => Ok(()), + Role::Server => Err(Error::ConnectionClosed(frame.take())), } } else { Ok(()) @@ -262,7 +270,7 @@ impl WebSocket { return Err(Error::Protocol("Continue frame but nothing to continue".into())) } if fin { - Ok(Some(replace(&mut self.incomplete, None).unwrap().complete()?)) + Ok(Some(self.incomplete.take().unwrap().complete()?)) } else { Ok(None) } @@ -381,9 +389,9 @@ impl WebSocket { if err.kind() == IoErrorKind::ConnectionReset { match self.state { WebSocketState::ClosedByPeer(ref mut frame) => - Error::ConnectionClosed(replace(frame, None)), + Error::ConnectionClosed(frame.take()), WebSocketState::CloseAcknowledged(ref mut frame) => - Error::ConnectionClosed(replace(frame, None)), + Error::ConnectionClosed(frame.take()), _ => Error::Io(err), } } else {