WebSocket::write_message will no longer buffer unlimited messages - it will only buffer a configurable number of outbound messages. Needed for snapview/tokio-tungstenite#35.

pull/26/head
Teddy DeRego 7 years ago
parent 5d5ea88b12
commit 5742c822ee
  1. 4
      Cargo.toml
  2. 5
      src/error.rs
  3. 56
      src/protocol/mod.rs

@ -7,9 +7,9 @@ authors = ["Alexey Galakhov"]
license = "MIT/Apache-2.0"
readme = "README.md"
homepage = "https://github.com/snapview/tungstenite-rs"
documentation = "https://docs.rs/tungstenite/0.5.3"
documentation = "https://docs.rs/tungstenite/0.5.4"
repository = "https://github.com/snapview/tungstenite-rs"
version = "0.5.3"
version = "0.5.4"
[features]
default = ["tls"]

@ -12,6 +12,7 @@ use std::string;
use httparse;
use protocol::frame::CloseFrame;
use protocol::Message;
#[cfg(feature="tls")]
pub mod tls {
@ -36,6 +37,8 @@ pub enum Error {
Capacity(Cow<'static, str>),
/// Protocol violation
Protocol(Cow<'static, str>),
/// Message send queue full
SendQueueFull(Message),
/// UTF coding error
Utf8,
/// Invlid URL.
@ -59,6 +62,7 @@ impl fmt::Display for Error {
Error::Tls(ref err) => write!(f, "TLS error: {}", err),
Error::Capacity(ref msg) => write!(f, "Space limit exceeded: {}", msg),
Error::Protocol(ref msg) => write!(f, "WebSocket protocol error: {}", msg),
Error::SendQueueFull(_) => write!(f, "Send queue is full"),
Error::Utf8 => write!(f, "UTF-8 encoding error"),
Error::Url(ref msg) => write!(f, "URL error: {}", msg),
Error::Http(code) => write!(f, "HTTP code: {}", code),
@ -75,6 +79,7 @@ impl ErrorTrait for Error {
Error::Tls(ref err) => err.description(),
Error::Capacity(ref msg) => msg.borrow(),
Error::Protocol(ref msg) => msg.borrow(),
Error::SendQueueFull(_) => "Send queue is full",
Error::Utf8 => "",
Error::Url(ref msg) => msg.borrow(),
Error::Http(_) => "",

@ -42,6 +42,7 @@ pub struct WebSocket<Stream> {
incomplete: Option<IncompleteMessage>,
/// Send: a data send queue.
send_queue: VecDeque<Frame>,
max_send_queue: usize,
/// Send: an OOB pong message.
pong: Option<Frame>,
}
@ -74,6 +75,7 @@ impl<Stream> WebSocket<Stream> {
state: WebSocketState::Active,
incomplete: None,
send_queue: VecDeque::new(),
max_send_queue: 1,
pong: None,
}
}
@ -101,13 +103,20 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// Send a message to stream, if possible.
///
/// This function guarantees that the frame is queued regardless of any errors.
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards.
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// and Close requests. If the WebSocket's send queue is full, SendQueueFull will be returned
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
///
/// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queued up.
/// most recent pong frame is sent if multiple pong frames are queued
pub fn write_message(&mut self, message: Message) -> Result<()> {
// Try to make some room for the new message
self.write_pending().no_block()?;
if self.send_queue.len() >= self.max_send_queue {
return Err(Error::SendQueueFull(message));
}
let frame = match message {
Message::Text(data) => {
Frame::message(data.into(), OpCode::Data(OpData::Text), true)
@ -122,13 +131,13 @@ impl<Stream: Read + Write> WebSocket<Stream> {
}
};
self.send_queue.push_back(frame);
self.write_pending()
Ok(())
}
/// Close the connection.
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again, just like write_message().
/// There is no need to call it again.
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs;
@ -151,7 +160,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455)
if let Some(pong) = replace(&mut self.pong, None) {
if let Some(pong) = self.pong.take() {
self.send_one_frame(pong)?;
}
// If we have any unsent frames, send them.
@ -159,21 +168,20 @@ impl<Stream: Read + Write> WebSocket<Stream> {
self.send_one_frame(data)?;
}
// If we get to this point, the send queue is empty and the underlying socket is still
// willing to take more data.
// If we're closing and there is nothing to send anymore, we should close the connection.
if self.send_queue.is_empty() {
if let WebSocketState::ClosedByPeer(ref mut frame) = self.state {
// The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
match self.role {
Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed(replace(frame, None))),
}
} else {
Ok(())
if let WebSocketState::ClosedByPeer(ref mut frame) = self.state {
// The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
match self.role {
Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed(frame.take())),
}
} else {
Ok(())
@ -262,7 +270,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
return Err(Error::Protocol("Continue frame but nothing to continue".into()))
}
if fin {
Ok(Some(replace(&mut self.incomplete, None).unwrap().complete()?))
Ok(Some(self.incomplete.take().unwrap().complete()?))
} else {
Ok(None)
}
@ -381,9 +389,9 @@ impl<Stream: Read + Write> WebSocket<Stream> {
if err.kind() == IoErrorKind::ConnectionReset {
match self.state {
WebSocketState::ClosedByPeer(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
Error::ConnectionClosed(frame.take()),
WebSocketState::CloseAcknowledged(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
Error::ConnectionClosed(frame.take()),
_ => Error::Io(err),
}
} else {

Loading…
Cancel
Save