From 68daa29b196e00c505c773af1a60ad779653d423 Mon Sep 17 00:00:00 2001 From: RustUser246 Date: Tue, 26 Jul 2022 23:46:06 +0100 Subject: [PATCH] add support for batching websocket messages --- src/protocol/frame/mod.rs | 43 +++++++++++++++++--------- src/protocol/mod.rs | 64 ++++++++++++++++++++++++++++++--------- 2 files changed, 78 insertions(+), 29 deletions(-) diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index 3c45dd9..f909475 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -66,13 +66,9 @@ impl FrameSocket where Stream: Write, { - /// Write a frame to stream. - /// - /// This function guarantees that the frame is queued regardless of any errors. - /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, - /// call write_pending() afterwards. - pub fn write_frame(&mut self, frame: Frame) -> Result<()> { - self.codec.write_frame(&mut self.stream, frame) + /// Add a frame to the end of the output buffer. + pub fn queue_frame(&mut self, frame: Frame) { + self.codec.queue_frame(frame); } /// Complete pending write, if any. @@ -98,6 +94,10 @@ impl FrameCodec { Self { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), header: None } } + pub(super) fn has_unsent(&self) -> bool { + !self.out_buffer.is_empty() + } + /// Create a new frame codec from partially read data. pub(super) fn from_partially_read(part: Vec) -> Self { Self { @@ -165,15 +165,12 @@ impl FrameCodec { Ok(Some(frame)) } - /// Write a frame to the provided stream. - pub(super) fn write_frame(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> - where - Stream: Write, + /// Add a frame to the end of the output buffer. + pub(super) fn queue_frame(&mut self, frame: Frame) { trace!("writing frame {}", frame); self.out_buffer.reserve(frame.len()); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); - self.write_pending(stream) } /// Complete pending write, if any. @@ -241,10 +238,28 @@ mod tests { let mut sock = FrameSocket::new(Vec::new()); let frame = Frame::ping(vec![0x04, 0x05]); - sock.write_frame(frame).unwrap(); + sock.queue_frame(frame); + sock.write_pending().unwrap(); + + let frame = Frame::pong(vec![0x01]); + sock.queue_frame(frame); + sock.write_pending().unwrap(); + + let (buf, _) = sock.into_inner(); + assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]); + } + + #[test] + fn queue_frames() { + let mut sock = FrameSocket::new(Vec::new()); + + let frame = Frame::ping(vec![0x04, 0x05]); + sock.queue_frame(frame); let frame = Frame::pong(vec![0x01]); - sock.write_frame(frame).unwrap(); + sock.queue_frame(frame); + + sock.write_pending().unwrap(); let (buf, _) = sock.into_inner(); assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index a3fa9c2..dfc0d48 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -190,7 +190,41 @@ impl WebSocket { /// (consider these fatal except for WouldBlock). /// - [Error::Capacity] if your message size is bigger than the configured max message size. pub fn write_message(&mut self, message: Message) -> Result<()> { - self.context.write_message(&mut self.socket, message) + self.context.queue_message(&mut self.socket, message)?; + self.context.write_pending(&mut self.socket) + } + + + /// Add a message to the send queue. Differs from write_message in that this method does not + /// flush queued messages to the stream. + /// + /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping + /// requests. A Pong reply will jump the queue because the + /// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent + /// as soon as is practical. + /// + /// Note that upon receiving a ping message, tungstenite cues a pong reply automatically. + /// When you call either `read_message`, `write_message` or `write_pending` next it will try to send + /// that pong out if the underlying connection can take more data. This means you should not + /// respond to ping frames manually. + /// + /// You can however send pong frames manually in order to indicate a unidirectional heartbeat + /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that + /// if `read_message` returns a ping, you should call `write_pending` until it doesn't return + /// WouldBlock before passing a pong to `write_message`, otherwise the response to the + /// ping will not be sent, but rather replaced by your custom pong message. + /// + /// ## Errors + /// - If the WebSocket's send queue is full, `SendQueueFull` will be returned + /// along with the passed message. Otherwise, the message is queued and Ok(()) is returned. + /// - If the connection is closed and should be dropped, this will return [Error::ConnectionClosed]. + /// - If you try again after [Error::ConnectionClosed] was returned either from here or from `read_message`, + /// [Error::AlreadyClosed] will be returned. This indicates a program error on your part. + /// - [Error::Io] is returned if the underlying connection returns an error + /// (consider these fatal except for WouldBlock). + /// - [Error::Capacity] if your message size is bigger than the configured max message size. + pub fn queue_message(&mut self, message: Message) -> Result<()> { + self.context.queue_message(&mut self.socket, message) } /// Flush the pending send queue. @@ -314,7 +348,7 @@ impl WebSocketContext { } } - /// Send a message to the provided stream, if possible. + /// Queue up a message without flushing to the stream. /// /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping /// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned @@ -322,7 +356,7 @@ impl WebSocketContext { /// /// Note that only the last pong frame is stored to be sent, and only the /// most recent pong frame is sent if multiple pong frames are queued. - pub fn write_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> + pub fn queue_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> where Stream: Read + Write, { @@ -360,7 +394,7 @@ impl WebSocketContext { }; self.send_queue.push_back(frame); - self.write_pending(stream) + Ok(()) } /// Flush the pending send queue. @@ -368,20 +402,22 @@ impl WebSocketContext { where Stream: Read + Write, { - // First, make sure we have no pending frame sending. - self.frame.write_pending(stream)?; - // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // response, unless it already received a Close frame. It SHOULD // respond with Pong frame as soon as is practical. (RFC 6455) if let Some(pong) = self.pong.take() { trace!("Sending pong reply"); - self.send_one_frame(stream, pong)?; + self.queue_one_frame(pong); } - // If we have any unsent frames, send them. + // If we have any frames queued, add them to the outgoing buffer trace!("Frames still in queue: {}", self.send_queue.len()); while let Some(data) = self.send_queue.pop_front() { - self.send_one_frame(stream, data)?; + self.queue_one_frame(data); + } + + // if the outgoing buffer isn't empty then try write some data + if self.frame.has_unsent() { + self.frame.write_pending(stream)?; } // If we get to this point, the send queue is empty and the underlying socket is still @@ -588,10 +624,8 @@ impl WebSocketContext { } } - /// Send a single pending frame. - fn send_one_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> - where - Stream: Read + Write, + /// Add a single frame to the end of the output buffer. + fn queue_one_frame(&mut self, mut frame: Frame) { match self.role { Role::Server => {} @@ -603,7 +637,7 @@ impl WebSocketContext { } trace!("Sending frame: {:?}", frame); - self.frame.write_frame(stream, frame).check_connection_reset(self.state) + self.frame.queue_frame(frame); } }