Merge pull request #26 from tedsta/master

`WebSocket::write_message` only buffers one message at a time to apply back-pressure
pull/28/head
Daniel Abramov 7 years ago committed by GitHub
commit 1f037abc34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      Cargo.toml
  2. 5
      src/error.rs
  3. 56
      src/protocol/mod.rs

@ -7,9 +7,9 @@ authors = ["Alexey Galakhov"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
readme = "README.md" readme = "README.md"
homepage = "https://github.com/snapview/tungstenite-rs" homepage = "https://github.com/snapview/tungstenite-rs"
documentation = "https://docs.rs/tungstenite/0.5.3" documentation = "https://docs.rs/tungstenite/0.5.4"
repository = "https://github.com/snapview/tungstenite-rs" repository = "https://github.com/snapview/tungstenite-rs"
version = "0.5.3" version = "0.5.4"
[features] [features]
default = ["tls"] default = ["tls"]

@ -12,6 +12,7 @@ use std::string;
use httparse; use httparse;
use protocol::frame::CloseFrame; use protocol::frame::CloseFrame;
use protocol::Message;
#[cfg(feature="tls")] #[cfg(feature="tls")]
pub mod tls { pub mod tls {
@ -36,6 +37,8 @@ pub enum Error {
Capacity(Cow<'static, str>), Capacity(Cow<'static, str>),
/// Protocol violation /// Protocol violation
Protocol(Cow<'static, str>), Protocol(Cow<'static, str>),
/// Message send queue full
SendQueueFull(Message),
/// UTF coding error /// UTF coding error
Utf8, Utf8,
/// Invlid URL. /// Invlid URL.
@ -59,6 +62,7 @@ impl fmt::Display for Error {
Error::Tls(ref err) => write!(f, "TLS error: {}", err), Error::Tls(ref err) => write!(f, "TLS error: {}", err),
Error::Capacity(ref msg) => write!(f, "Space limit exceeded: {}", msg), Error::Capacity(ref msg) => write!(f, "Space limit exceeded: {}", msg),
Error::Protocol(ref msg) => write!(f, "WebSocket protocol error: {}", msg), Error::Protocol(ref msg) => write!(f, "WebSocket protocol error: {}", msg),
Error::SendQueueFull(_) => write!(f, "Send queue is full"),
Error::Utf8 => write!(f, "UTF-8 encoding error"), Error::Utf8 => write!(f, "UTF-8 encoding error"),
Error::Url(ref msg) => write!(f, "URL error: {}", msg), Error::Url(ref msg) => write!(f, "URL error: {}", msg),
Error::Http(code) => write!(f, "HTTP code: {}", code), Error::Http(code) => write!(f, "HTTP code: {}", code),
@ -75,6 +79,7 @@ impl ErrorTrait for Error {
Error::Tls(ref err) => err.description(), Error::Tls(ref err) => err.description(),
Error::Capacity(ref msg) => msg.borrow(), Error::Capacity(ref msg) => msg.borrow(),
Error::Protocol(ref msg) => msg.borrow(), Error::Protocol(ref msg) => msg.borrow(),
Error::SendQueueFull(_) => "Send queue is full",
Error::Utf8 => "", Error::Utf8 => "",
Error::Url(ref msg) => msg.borrow(), Error::Url(ref msg) => msg.borrow(),
Error::Http(_) => "", Error::Http(_) => "",

@ -42,6 +42,7 @@ pub struct WebSocket<Stream> {
incomplete: Option<IncompleteMessage>, incomplete: Option<IncompleteMessage>,
/// Send: a data send queue. /// Send: a data send queue.
send_queue: VecDeque<Frame>, send_queue: VecDeque<Frame>,
max_send_queue: usize,
/// Send: an OOB pong message. /// Send: an OOB pong message.
pong: Option<Frame>, pong: Option<Frame>,
} }
@ -74,6 +75,7 @@ impl<Stream> WebSocket<Stream> {
state: WebSocketState::Active, state: WebSocketState::Active,
incomplete: None, incomplete: None,
send_queue: VecDeque::new(), send_queue: VecDeque::new(),
max_send_queue: 1,
pong: None, pong: None,
} }
} }
@ -101,13 +103,20 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// Send a message to stream, if possible. /// Send a message to stream, if possible.
/// ///
/// This function guarantees that the frame is queued regardless of any errors. /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, /// and Close requests. If the WebSocket's send queue is full, SendQueueFull will be returned
/// call write_pending() afterwards. /// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
/// ///
/// Note that only the last pong frame is stored to be sent, and only the /// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queued up. /// most recent pong frame is sent if multiple pong frames are queued
pub fn write_message(&mut self, message: Message) -> Result<()> { pub fn write_message(&mut self, message: Message) -> Result<()> {
// Try to make some room for the new message
self.write_pending().no_block()?;
if self.send_queue.len() >= self.max_send_queue {
return Err(Error::SendQueueFull(message));
}
let frame = match message { let frame = match message {
Message::Text(data) => { Message::Text(data) => {
Frame::message(data.into(), OpCode::Data(OpData::Text), true) Frame::message(data.into(), OpCode::Data(OpData::Text), true)
@ -122,13 +131,13 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} }
}; };
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
self.write_pending() Ok(())
} }
/// Close the connection. /// Close the connection.
/// ///
/// This function guarantees that the close frame will be queued. /// This function guarantees that the close frame will be queued.
/// There is no need to call it again, just like write_message(). /// There is no need to call it again.
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> { pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
if let WebSocketState::Active = self.state { if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs; self.state = WebSocketState::ClosedByUs;
@ -151,7 +160,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD // response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455) // respond with Pong frame as soon as is practical. (RFC 6455)
if let Some(pong) = replace(&mut self.pong, None) { if let Some(pong) = self.pong.take() {
self.send_one_frame(pong)?; self.send_one_frame(pong)?;
} }
// If we have any unsent frames, send them. // If we have any unsent frames, send them.
@ -159,21 +168,20 @@ impl<Stream: Read + Write> WebSocket<Stream> {
self.send_one_frame(data)?; self.send_one_frame(data)?;
} }
// If we get to this point, the send queue is empty and the underlying socket is still
// willing to take more data.
// If we're closing and there is nothing to send anymore, we should close the connection. // If we're closing and there is nothing to send anymore, we should close the connection.
if self.send_queue.is_empty() { if let WebSocketState::ClosedByPeer(ref mut frame) = self.state {
if let WebSocketState::ClosedByPeer(ref mut frame) = self.state { // The underlying TCP connection, in most normal cases, SHOULD be closed
// The underlying TCP connection, in most normal cases, SHOULD be closed // first by the server, so that it holds the TIME_WAIT state and not the
// first by the server, so that it holds the TIME_WAIT state and not the // client (as this would prevent it from re-opening the connection for 2
// client (as this would prevent it from re-opening the connection for 2 // maximum segment lifetimes (2MSL), while there is no corresponding
// maximum segment lifetimes (2MSL), while there is no corresponding // server impact as a TIME_WAIT connection is immediately reopened upon
// server impact as a TIME_WAIT connection is immediately reopened upon // a new SYN with a higher seq number). (RFC 6455)
// a new SYN with a higher seq number). (RFC 6455) match self.role {
match self.role { Role::Client => Ok(()),
Role::Client => Ok(()), Role::Server => Err(Error::ConnectionClosed(frame.take())),
Role::Server => Err(Error::ConnectionClosed(replace(frame, None))),
}
} else {
Ok(())
} }
} else { } else {
Ok(()) Ok(())
@ -262,7 +270,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
return Err(Error::Protocol("Continue frame but nothing to continue".into())) return Err(Error::Protocol("Continue frame but nothing to continue".into()))
} }
if fin { if fin {
Ok(Some(replace(&mut self.incomplete, None).unwrap().complete()?)) Ok(Some(self.incomplete.take().unwrap().complete()?))
} else { } else {
Ok(None) Ok(None)
} }
@ -381,9 +389,9 @@ impl<Stream: Read + Write> WebSocket<Stream> {
if err.kind() == IoErrorKind::ConnectionReset { if err.kind() == IoErrorKind::ConnectionReset {
match self.state { match self.state {
WebSocketState::ClosedByPeer(ref mut frame) => WebSocketState::ClosedByPeer(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)), Error::ConnectionClosed(frame.take()),
WebSocketState::CloseAcknowledged(ref mut frame) => WebSocketState::CloseAcknowledged(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)), Error::ConnectionClosed(frame.take()),
_ => Error::Io(err), _ => Error::Io(err),
} }
} else { } else {

Loading…
Cancel
Save