diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 94397e9..b841e7f 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -222,6 +222,17 @@ impl WebSocket { pub fn close(&mut self, code: Option) -> Result<()> { self.context.close(&mut self.socket, code) } + + /// Maybe flush pending messages in order to make space in the send queue. [`submit_message()`] is guaranteed + /// to succeed if called directly after this method returns successfully. + pub fn write_pending_ready(&mut self) -> Result<()> { + self.context.write_pending_ready(&mut self.socket) + } + + /// Submits a message to the send queue, without writing it to the stream. Call [`write_pending()`] afterwards. + pub fn submit_message(&mut self, message: Message) -> Result<()> { + self.context.submit_message(message) + } } /// A context for managing WebSocket stream. @@ -326,22 +337,21 @@ impl WebSocketContext { where Stream: Read + Write, { - // When terminated, return AlreadyClosed. - self.state.check_active()?; + // Try to make some room for the new message. + // Do not return here if write would block, ignore WouldBlock silently + // since we must queue the message anyway. + self.write_pending_ready(stream).no_block()?; - // Do not write after sending a close frame. - if !self.state.is_active() { - return Err(Error::Protocol(ProtocolError::SendAfterClosing)); - } + // if `write_pending_ready()` returned WouldBlock then this will return + // Error::SendQueueFull; this is intended behavior + self.submit_message_inner(message)?; - if let Some(max_send_queue) = self.config.max_send_queue { - if self.send_queue.len() >= max_send_queue { - // Try to make some room for the new message. - // Do not return here if write would block, ignore WouldBlock silently - // since we must queue the message anyway. - self.write_pending(stream).no_block()?; - } + self.write_pending(stream) + } + #[inline] + fn submit_message_inner(&mut self, message: Message) -> Result<()> { + if let Some(max_send_queue) = self.config.max_send_queue { if self.send_queue.len() >= max_send_queue { return Err(Error::SendQueueFull(message)); } @@ -353,14 +363,17 @@ impl WebSocketContext { Message::Ping(data) => Frame::ping(data), Message::Pong(data) => { self.pong = Some(Frame::pong(data)); - return self.write_pending(stream); + return Ok(()); + } + Message::Close(code) => { + self.close_inner(code); + return Ok(()); } - Message::Close(code) => return self.close(stream, code), Message::Frame(f) => f, }; self.send_queue.push_back(frame); - self.write_pending(stream) + Ok(()) } /// Flush the pending send queue. @@ -411,6 +424,12 @@ impl WebSocketContext { where Stream: Read + Write, { + self.close_inner(code); + self.write_pending(stream) + } + + #[inline] + fn close_inner(&mut self, code: Option) { if let WebSocketState::Active = self.state { self.state = WebSocketState::ClosedByUs; let frame = Frame::close(code); @@ -418,7 +437,42 @@ impl WebSocketContext { } else { // Already closed, nothing to do. } - self.write_pending(stream) + } + + /// Maybe flush pending messages in order to prepare to send a message. [`submit_message()`] is guaranteed + /// to succeed if called directly after this method returns successfully. + pub fn write_pending_ready(&mut self, stream: &mut Stream) -> Result<()> + where + Stream: Read + Write, + { + // When terminated, return AlreadyClosed. + self.state.check_active()?; + + // Do not write after sending a close frame. + if !self.state.is_active() { + return Err(Error::Protocol(ProtocolError::SendAfterClosing)); + } + + if let Some(max_send_queue) = self.config.max_send_queue { + if self.send_queue.len() >= max_send_queue { + self.write_pending(stream)?; + } + } + + Ok(()) + } + + /// Submits a message to the send queue, without writing it to the stream. Call [`write_pending()`] afterwards. + pub fn submit_message(&mut self, message: Message) -> Result<()> { + // When terminated, return AlreadyClosed. + self.state.check_active()?; + + // Do not write after sending a close frame. + if !self.state.is_active() { + return Err(Error::Protocol(ProtocolError::SendAfterClosing)); + } + + self.submit_message_inner(message) } /// Try to decode one message frame. May return None.