Merge pull request #291 from RustUser246/buffering

add support for batching websocket messages
pull/292/head
Alexey Galakhov 2 years ago committed by GitHub
commit 3edc4c286b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 43
      src/protocol/frame/mod.rs
  2. 64
      src/protocol/mod.rs

@ -66,13 +66,9 @@ impl<Stream> FrameSocket<Stream>
where where
Stream: Write, Stream: Write,
{ {
/// Write a frame to stream. /// Add a frame to the end of the output buffer.
/// pub fn queue_frame(&mut self, frame: Frame) {
/// This function guarantees that the frame is queued regardless of any errors. self.codec.queue_frame(frame);
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards.
pub fn write_frame(&mut self, frame: Frame) -> Result<()> {
self.codec.write_frame(&mut self.stream, frame)
} }
/// Complete pending write, if any. /// Complete pending write, if any.
@ -98,6 +94,10 @@ impl FrameCodec {
Self { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), header: None } Self { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), header: None }
} }
pub(super) fn has_unsent(&self) -> bool {
!self.out_buffer.is_empty()
}
/// Create a new frame codec from partially read data. /// Create a new frame codec from partially read data.
pub(super) fn from_partially_read(part: Vec<u8>) -> Self { pub(super) fn from_partially_read(part: Vec<u8>) -> Self {
Self { Self {
@ -165,15 +165,12 @@ impl FrameCodec {
Ok(Some(frame)) Ok(Some(frame))
} }
/// Write a frame to the provided stream. /// Add a frame to the end of the output buffer.
pub(super) fn write_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> pub(super) fn queue_frame(&mut self, frame: Frame)
where
Stream: Write,
{ {
trace!("writing frame {}", frame); trace!("writing frame {}", frame);
self.out_buffer.reserve(frame.len()); self.out_buffer.reserve(frame.len());
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
self.write_pending(stream)
} }
/// Complete pending write, if any. /// Complete pending write, if any.
@ -241,10 +238,28 @@ mod tests {
let mut sock = FrameSocket::new(Vec::new()); let mut sock = FrameSocket::new(Vec::new());
let frame = Frame::ping(vec![0x04, 0x05]); let frame = Frame::ping(vec![0x04, 0x05]);
sock.write_frame(frame).unwrap(); sock.queue_frame(frame);
sock.write_pending().unwrap();
let frame = Frame::pong(vec![0x01]);
sock.queue_frame(frame);
sock.write_pending().unwrap();
let (buf, _) = sock.into_inner();
assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]);
}
#[test]
fn queue_frames() {
let mut sock = FrameSocket::new(Vec::new());
let frame = Frame::ping(vec![0x04, 0x05]);
sock.queue_frame(frame);
let frame = Frame::pong(vec![0x01]); let frame = Frame::pong(vec![0x01]);
sock.write_frame(frame).unwrap(); sock.queue_frame(frame);
sock.write_pending().unwrap();
let (buf, _) = sock.into_inner(); let (buf, _) = sock.into_inner();
assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]); assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]);

@ -190,7 +190,41 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// (consider these fatal except for WouldBlock). /// (consider these fatal except for WouldBlock).
/// - [Error::Capacity] if your message size is bigger than the configured max message size. /// - [Error::Capacity] if your message size is bigger than the configured max message size.
pub fn write_message(&mut self, message: Message) -> Result<()> { pub fn write_message(&mut self, message: Message) -> Result<()> {
self.context.write_message(&mut self.socket, message) self.context.queue_message(&mut self.socket, message)?;
self.context.write_pending(&mut self.socket)
}
/// Add a message to the send queue. Differs from write_message in that this method does not
/// flush queued messages to the stream.
///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// requests. A Pong reply will jump the queue because the
/// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent
/// as soon as is practical.
///
/// Note that upon receiving a ping message, tungstenite cues a pong reply automatically.
/// When you call either `read_message`, `write_message` or `write_pending` next it will try to send
/// that pong out if the underlying connection can take more data. This means you should not
/// respond to ping frames manually.
///
/// You can however send pong frames manually in order to indicate a unidirectional heartbeat
/// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
/// if `read_message` returns a ping, you should call `write_pending` until it doesn't return
/// WouldBlock before passing a pong to `write_message`, otherwise the response to the
/// ping will not be sent, but rather replaced by your custom pong message.
///
/// ## Errors
/// - If the WebSocket's send queue is full, `SendQueueFull` will be returned
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
/// - If the connection is closed and should be dropped, this will return [Error::ConnectionClosed].
/// - If you try again after [Error::ConnectionClosed] was returned either from here or from `read_message`,
/// [Error::AlreadyClosed] will be returned. This indicates a program error on your part.
/// - [Error::Io] is returned if the underlying connection returns an error
/// (consider these fatal except for WouldBlock).
/// - [Error::Capacity] if your message size is bigger than the configured max message size.
pub fn queue_message(&mut self, message: Message) -> Result<()> {
self.context.queue_message(&mut self.socket, message)
} }
/// Flush the pending send queue. /// Flush the pending send queue.
@ -314,7 +348,7 @@ impl WebSocketContext {
} }
} }
/// Send a message to the provided stream, if possible. /// Queue up a message without flushing to the stream.
/// ///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned /// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned
@ -322,7 +356,7 @@ impl WebSocketContext {
/// ///
/// Note that only the last pong frame is stored to be sent, and only the /// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queued. /// most recent pong frame is sent if multiple pong frames are queued.
pub fn write_message<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()> pub fn queue_message<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
where where
Stream: Read + Write, Stream: Read + Write,
{ {
@ -360,7 +394,7 @@ impl WebSocketContext {
}; };
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
self.write_pending(stream) Ok(())
} }
/// Flush the pending send queue. /// Flush the pending send queue.
@ -368,20 +402,22 @@ impl WebSocketContext {
where where
Stream: Read + Write, Stream: Read + Write,
{ {
// First, make sure we have no pending frame sending.
self.frame.write_pending(stream)?;
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD // response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455) // respond with Pong frame as soon as is practical. (RFC 6455)
if let Some(pong) = self.pong.take() { if let Some(pong) = self.pong.take() {
trace!("Sending pong reply"); trace!("Sending pong reply");
self.send_one_frame(stream, pong)?; self.queue_one_frame(pong);
} }
// If we have any unsent frames, send them. // If we have any frames queued, add them to the outgoing buffer
trace!("Frames still in queue: {}", self.send_queue.len()); trace!("Frames still in queue: {}", self.send_queue.len());
while let Some(data) = self.send_queue.pop_front() { while let Some(data) = self.send_queue.pop_front() {
self.send_one_frame(stream, data)?; self.queue_one_frame(data);
}
// if the outgoing buffer isn't empty then try write some data
if self.frame.has_unsent() {
self.frame.write_pending(stream)?;
} }
// If we get to this point, the send queue is empty and the underlying socket is still // If we get to this point, the send queue is empty and the underlying socket is still
@ -588,10 +624,8 @@ impl WebSocketContext {
} }
} }
/// Send a single pending frame. /// Add a single frame to the end of the output buffer.
fn send_one_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> fn queue_one_frame(&mut self, mut frame: Frame)
where
Stream: Read + Write,
{ {
match self.role { match self.role {
Role::Server => {} Role::Server => {}
@ -603,7 +637,7 @@ impl WebSocketContext {
} }
trace!("Sending frame: {:?}", frame); trace!("Sending frame: {:?}", frame);
self.frame.write_frame(stream, frame).check_connection_reset(self.state) self.frame.queue_frame(frame);
} }
} }

Loading…
Cancel
Save