Add methods on WebSocket to better implement Sink::poll_ready

pull/342/head
Noa 2 years ago
parent 67e25fdd68
commit b3b115826f
No known key found for this signature in database
GPG Key ID: 7F9F7DB1768C59CF
  1. 88
      src/protocol/mod.rs

@ -222,6 +222,17 @@ impl<Stream: Read + Write> WebSocket<Stream> {
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
self.context.close(&mut self.socket, code)
}
/// Maybe flush pending messages in order to make space in the send queue. [`submit_message()`] is guaranteed
/// to succeed if called directly after this method returns successfully.
pub fn write_pending_ready(&mut self) -> Result<()> {
self.context.write_pending_ready(&mut self.socket)
}
/// Submits a message to the send queue, without writing it to the stream. Call [`write_pending()`] afterwards.
pub fn submit_message(&mut self, message: Message) -> Result<()> {
self.context.submit_message(message)
}
}
/// A context for managing WebSocket stream.
@ -326,22 +337,21 @@ impl WebSocketContext {
where
Stream: Read + Write,
{
// When terminated, return AlreadyClosed.
self.state.check_active()?;
// Try to make some room for the new message.
// Do not return here if write would block, ignore WouldBlock silently
// since we must queue the message anyway.
self.write_pending_ready(stream).no_block()?;
// Do not write after sending a close frame.
if !self.state.is_active() {
return Err(Error::Protocol(ProtocolError::SendAfterClosing));
}
// if `write_pending_ready()` returned WouldBlock then this will return
// Error::SendQueueFull; this is intended behavior
self.submit_message_inner(message)?;
if let Some(max_send_queue) = self.config.max_send_queue {
if self.send_queue.len() >= max_send_queue {
// Try to make some room for the new message.
// Do not return here if write would block, ignore WouldBlock silently
// since we must queue the message anyway.
self.write_pending(stream).no_block()?;
}
self.write_pending(stream)
}
#[inline]
fn submit_message_inner(&mut self, message: Message) -> Result<()> {
if let Some(max_send_queue) = self.config.max_send_queue {
if self.send_queue.len() >= max_send_queue {
return Err(Error::SendQueueFull(message));
}
@ -353,14 +363,17 @@ impl WebSocketContext {
Message::Ping(data) => Frame::ping(data),
Message::Pong(data) => {
self.pong = Some(Frame::pong(data));
return self.write_pending(stream);
return Ok(());
}
Message::Close(code) => {
self.close_inner(code);
return Ok(());
}
Message::Close(code) => return self.close(stream, code),
Message::Frame(f) => f,
};
self.send_queue.push_back(frame);
self.write_pending(stream)
Ok(())
}
/// Flush the pending send queue.
@ -411,6 +424,12 @@ impl WebSocketContext {
where
Stream: Read + Write,
{
self.close_inner(code);
self.write_pending(stream)
}
#[inline]
fn close_inner(&mut self, code: Option<CloseFrame>) {
if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(code);
@ -418,7 +437,42 @@ impl WebSocketContext {
} else {
// Already closed, nothing to do.
}
self.write_pending(stream)
}
/// Maybe flush pending messages in order to prepare to send a message. [`submit_message()`] is guaranteed
/// to succeed if called directly after this method returns successfully.
pub fn write_pending_ready<Stream>(&mut self, stream: &mut Stream) -> Result<()>
where
Stream: Read + Write,
{
// When terminated, return AlreadyClosed.
self.state.check_active()?;
// Do not write after sending a close frame.
if !self.state.is_active() {
return Err(Error::Protocol(ProtocolError::SendAfterClosing));
}
if let Some(max_send_queue) = self.config.max_send_queue {
if self.send_queue.len() >= max_send_queue {
self.write_pending(stream)?;
}
}
Ok(())
}
/// Submits a message to the send queue, without writing it to the stream. Call [`write_pending()`] afterwards.
pub fn submit_message(&mut self, message: Message) -> Result<()> {
// When terminated, return AlreadyClosed.
self.state.check_active()?;
// Do not write after sending a close frame.
if !self.state.is_active() {
return Err(Error::Protocol(ProtocolError::SendAfterClosing));
}
self.submit_message_inner(message)
}
/// Try to decode one message frame. May return None.

Loading…
Cancel
Save