From 84a54b76e64a282d51140690db479269d44bcc65 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Sat, 27 May 2023 13:03:44 +0100 Subject: [PATCH] Rename methods to `read`, `send`, `write` & `flush` Refine docs Add `send` method Add deprecated versions of write_message, write_pending, read_message Handle pong WriteBufferFull error Add changelog --- .gitignore | 1 + CHANGELOG.md | 15 ++ README.md | 4 +- benches/write.rs | 6 +- examples/autobahn-client.rs | 6 +- examples/autobahn-server.rs | 4 +- examples/client.rs | 4 +- examples/server.rs | 4 +- examples/srv_accept_unmasked_frames.rs | 2 +- fuzz/fuzz_targets/read_message_client.rs | 2 +- fuzz/fuzz_targets/read_message_server.rs | 2 +- src/protocol/frame/mod.rs | 43 +++--- src/protocol/message.rs | 2 +- src/protocol/mod.rs | 169 ++++++++++++++--------- tests/connection_reset.rs | 38 ++--- tests/no_send_after_close.rs | 6 +- tests/receive_after_init_close.rs | 12 +- 17 files changed, 191 insertions(+), 129 deletions(-) diff --git a/.gitignore b/.gitignore index a9d37c5..c8e9e48 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target Cargo.lock +.vscode diff --git a/CHANGELOG.md b/CHANGELOG.md index 0aa0a41..5a60687 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +# Unreleased (0.20.0) +- Remove many implicit flushing behaviours. In general reading and writing messages will no + longer flush until calling `flush`. An exception is automatic responses (e.g. pongs) + which will continue to be written and flushed when reading and writing. + This allows writing a batch of messages and flushing once. +- Add `WebSocket::read`, `write`, `send`, `flush`. Deprecate `read_message`, `write_message`, `write_pending`. +- Add `FrameSocket::read`, `write`, `send`, `flush`. Remove `read_frame`, `write_frame`, `write_pending`. + Note: Previous use of `write_frame` may be replaced with `send`. +- Add `WebSocketContext::read`, `write`, `flush`. Remove `read_message`, `write_message`, `write_pending`. + Note: Previous use of `write_message` may be replaced with `write` + `flush`. +- Remove `send_queue`, replaced with using the frame write buffer to achieve similar results. + * Add `WebSocketConfig::max_write_buffer_size`. Deprecate `max_send_queue`. + * Add `Error::WriteBufferFull`. Remove `Error::SendQueueFull`. + Note: `WriteBufferFull` returns the message that could not be written as a `Message::Frame`. + # 0.19.0 - Update TLS dependencies. diff --git a/README.md b/README.md index 302ac97..3522fb9 100644 --- a/README.md +++ b/README.md @@ -14,11 +14,11 @@ fn main () { spawn (move || { let mut websocket = accept(stream.unwrap()).unwrap(); loop { - let msg = websocket.read_message().unwrap(); + let msg = websocket.read().unwrap(); // We do not want to send back ping/pong messages. if msg.is_binary() || msg.is_text() { - websocket.write_message(msg).unwrap(); + websocket.send(msg).unwrap(); } } }); diff --git a/benches/write.rs b/benches/write.rs index 7584bfb..8a19874 100644 --- a/benches/write.rs +++ b/benches/write.rs @@ -42,7 +42,7 @@ impl Write for MockSlowFlushWrite { } fn benchmark(c: &mut Criterion) { - // Writes 100k small json text messages then calls `write_pending` + // Writes 100k small json text messages then flushes c.bench_function("write 100k small texts then flush", |b| { let mut ws = WebSocket::from_raw_socket( MockSlowFlushWrite(Vec::with_capacity(MOCK_WRITE_LEN)), @@ -54,9 +54,9 @@ fn benchmark(c: &mut Criterion) { || (0..100_000).map(|i| Message::Text(format!("{{\"id\":{i}}}"))), |batch| { for msg in batch { - ws.write_message(msg).unwrap(); + ws.write(msg).unwrap(); } - ws.write_pending().unwrap(); + ws.flush().unwrap(); }, BatchSize::SmallInput, ) diff --git a/examples/autobahn-client.rs b/examples/autobahn-client.rs index 2538d86..ac7a7d1 100644 --- a/examples/autobahn-client.rs +++ b/examples/autobahn-client.rs @@ -7,7 +7,7 @@ const AGENT: &str = "Tungstenite"; fn get_case_count() -> Result { let (mut socket, _) = connect(Url::parse("ws://localhost:9001/getCaseCount").unwrap())?; - let msg = socket.read_message()?; + let msg = socket.read()?; socket.close(None)?; Ok(msg.into_text()?.parse::().unwrap()) } @@ -26,9 +26,9 @@ fn run_test(case: u32) -> Result<()> { Url::parse(&format!("ws://localhost:9001/runCase?case={}&agent={}", case, AGENT)).unwrap(); let (mut socket, _) = connect(case_url)?; loop { - match socket.read_message()? { + match socket.read()? { msg @ Message::Text(_) | msg @ Message::Binary(_) => { - socket.write_message(msg)?; + socket.send(msg)?; } Message::Ping(_) | Message::Pong(_) | Message::Close(_) | Message::Frame(_) => {} } diff --git a/examples/autobahn-server.rs b/examples/autobahn-server.rs index f002efa..dafe37b 100644 --- a/examples/autobahn-server.rs +++ b/examples/autobahn-server.rs @@ -17,9 +17,9 @@ fn handle_client(stream: TcpStream) -> Result<()> { let mut socket = accept(stream).map_err(must_not_block)?; info!("Running test"); loop { - match socket.read_message()? { + match socket.read()? { msg @ Message::Text(_) | msg @ Message::Binary(_) => { - socket.write_message(msg)?; + socket.send(msg)?; } Message::Ping(_) | Message::Pong(_) | Message::Close(_) | Message::Frame(_) => {} } diff --git a/examples/client.rs b/examples/client.rs index def6a3c..a24f316 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -14,9 +14,9 @@ fn main() { println!("* {}", header); } - socket.write_message(Message::Text("Hello WebSocket".into())).unwrap(); + socket.send(Message::Text("Hello WebSocket".into())).unwrap(); loop { - let msg = socket.read_message().expect("Error reading message"); + let msg = socket.read().expect("Error reading message"); println!("Received: {}", msg); } // socket.close(None); diff --git a/examples/server.rs b/examples/server.rs index 420e5db..2183b96 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -28,9 +28,9 @@ fn main() { let mut websocket = accept_hdr(stream.unwrap(), callback).unwrap(); loop { - let msg = websocket.read_message().unwrap(); + let msg = websocket.read().unwrap(); if msg.is_binary() || msg.is_text() { - websocket.write_message(msg).unwrap(); + websocket.send(msg).unwrap(); } } }); diff --git a/examples/srv_accept_unmasked_frames.rs b/examples/srv_accept_unmasked_frames.rs index 0614214..b65e4f7 100644 --- a/examples/srv_accept_unmasked_frames.rs +++ b/examples/srv_accept_unmasked_frames.rs @@ -38,7 +38,7 @@ fn main() { let mut websocket = accept_hdr_with_config(stream.unwrap(), callback, config).unwrap(); loop { - let msg = websocket.read_message().unwrap(); + let msg = websocket.read().unwrap(); if msg.is_binary() || msg.is_text() { println!("received message {}", msg); } diff --git a/fuzz/fuzz_targets/read_message_client.rs b/fuzz/fuzz_targets/read_message_client.rs index 1c0708b..8e53512 100644 --- a/fuzz/fuzz_targets/read_message_client.rs +++ b/fuzz/fuzz_targets/read_message_client.rs @@ -33,5 +33,5 @@ fuzz_target!(|data: &[u8]| { //let vector: Vec = data.into(); let cursor = Cursor::new(data); let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Client, None); - socket.read_message().ok(); + socket.read().ok(); }); diff --git a/fuzz/fuzz_targets/read_message_server.rs b/fuzz/fuzz_targets/read_message_server.rs index d96db96..7f0e7ff 100644 --- a/fuzz/fuzz_targets/read_message_server.rs +++ b/fuzz/fuzz_targets/read_message_server.rs @@ -33,5 +33,5 @@ fuzz_target!(|data: &[u8]| { //let vector: Vec = data.into(); let cursor = Cursor::new(data); let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Server, None); - socket.read_message().ok(); + socket.read().ok(); }); diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index 1cdf376..bb72e5a 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -56,7 +56,7 @@ where Stream: Read, { /// Read a frame from stream. - pub fn read_frame(&mut self, max_size: Option) -> Result> { + pub fn read(&mut self, max_size: Option) -> Result> { self.codec.read_frame(&mut self.stream, max_size) } } @@ -65,18 +65,27 @@ impl FrameSocket where Stream: Write, { + /// Writes and immediately flushes a frame. + /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush). + pub fn send(&mut self, frame: Frame) -> Result<()> { + self.write(frame)?; + self.flush() + } + /// Write a frame to stream. /// - /// This function guarantees that the frame is queued regardless of any errors. - /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, - /// call write_pending() afterwards. - pub fn write_frame(&mut self, frame: Frame) -> Result<()> { - self.codec.write_frame(&mut self.stream, frame)?; - Ok(self.stream.flush()?) + /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. + /// + /// This function guarantees that the frame is queued unless [`Error::WriteBufferFull`] + /// is returned. + /// In order to handle WouldBlock or Incomplete, call [`flush`](Self::flush) afterwards. + pub fn write(&mut self, frame: Frame) -> Result<()> { + self.codec.write_frame(&mut self.stream, frame) } - /// Complete pending write, if any. - pub fn write_pending(&mut self) -> Result<()> { + /// Flush writes. + pub fn flush(&mut self) -> Result<()> { + self.codec.write_out_buffer(&mut self.stream)?; Ok(self.stream.flush()?) } } @@ -245,11 +254,11 @@ mod tests { let mut sock = FrameSocket::new(raw); assert_eq!( - sock.read_frame(None).unwrap().unwrap().into_data(), + sock.read(None).unwrap().unwrap().into_data(), vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07] ); - assert_eq!(sock.read_frame(None).unwrap().unwrap().into_data(), vec![0x03, 0x02, 0x01]); - assert!(sock.read_frame(None).unwrap().is_none()); + assert_eq!(sock.read(None).unwrap().unwrap().into_data(), vec![0x03, 0x02, 0x01]); + assert!(sock.read(None).unwrap().is_none()); let (_, rest) = sock.into_inner(); assert_eq!(rest, vec![0x99]); @@ -260,7 +269,7 @@ mod tests { let raw = Cursor::new(vec![0x02, 0x03, 0x04, 0x05, 0x06, 0x07]); let mut sock = FrameSocket::from_partially_read(raw, vec![0x82, 0x07, 0x01]); assert_eq!( - sock.read_frame(None).unwrap().unwrap().into_data(), + sock.read(None).unwrap().unwrap().into_data(), vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07] ); } @@ -270,10 +279,10 @@ mod tests { let mut sock = FrameSocket::new(Vec::new()); let frame = Frame::ping(vec![0x04, 0x05]); - sock.write_frame(frame).unwrap(); + sock.send(frame).unwrap(); let frame = Frame::pong(vec![0x01]); - sock.write_frame(frame).unwrap(); + sock.send(frame).unwrap(); let (buf, _) = sock.into_inner(); assert_eq!(buf, vec![0x89, 0x02, 0x04, 0x05, 0x8a, 0x01, 0x01]); @@ -285,7 +294,7 @@ mod tests { 0x83, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, ]); let mut sock = FrameSocket::new(raw); - let _ = sock.read_frame(None); // should not crash + let _ = sock.read(None); // should not crash } #[test] @@ -293,7 +302,7 @@ mod tests { let raw = Cursor::new(vec![0x82, 0x07, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]); let mut sock = FrameSocket::new(raw); assert!(matches!( - sock.read_frame(Some(5)), + sock.read(Some(5)), Err(Error::Capacity(CapacityError::MessageTooLong { size: 7, max_size: 5 })) )); } diff --git a/src/protocol/message.rs b/src/protocol/message.rs index cdebabc..2b2ed0b 100644 --- a/src/protocol/message.rs +++ b/src/protocol/message.rs @@ -185,7 +185,7 @@ impl Message { Message::Text(string.into()) } - /// Create a new binary WebSocket message by converting to Vec. + /// Create a new binary WebSocket message by converting to `Vec`. pub fn binary(bin: B) -> Message where B: Into>, diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 9839b31..ce97c51 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -75,6 +75,8 @@ impl Default for WebSocketConfig { /// /// This is THE structure you want to create to be able to speak the WebSocket protocol. /// It may be created by calling `connect`, `accept` or `client` functions. +/// +/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages. #[derive(Debug)] pub struct WebSocket { /// The underlying socket. @@ -148,89 +150,116 @@ impl WebSocket { impl WebSocket { /// Read a message from stream, if possible. /// - /// This will queue responses to ping and close messages to be sent. It will call - /// `write_pending` before trying to read in order to make sure that those responses - /// make progress even if you never call `write_pending`. That does mean that they - /// get sent out earliest on the next call to `read_message`, `write_message` or `write_pending`. + /// This will also queue responses to ping and close messages. These responses + /// will be written and flushed on the next call to [`read`](Self::read), + /// [`write`](Self::write) or [`flush`](Self::flush). /// - /// ## Closing the connection + /// # Closing the connection /// When the remote endpoint decides to close the connection this will return /// the close message with an optional close frame. /// - /// You should continue calling `read_message`, `write_message` or `write_pending` to drive - /// the reply to the close frame until [Error::ConnectionClosed] is returned. Once that happens - /// it is safe to drop the underlying connection. - pub fn read_message(&mut self) -> Result { - self.context.read_message(&mut self.socket) + /// You should continue calling [`read`](Self::read), [`write`](Self::write) or + /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`] + /// is returned. Once that happens it is safe to drop the underlying connection. + pub fn read(&mut self) -> Result { + self.context.read(&mut self.socket) + } + + /// Writes and immediately flushes a message. + /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush). + pub fn send(&mut self, message: Message) -> Result<()> { + self.write(message)?; + self.flush() } /// Write a message to the provided stream, if possible. /// - /// A subsequent call should be made to [`Self::write_pending`] to flush writes. + /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. /// /// In the event of stream write failure the message frame will be stored - /// in the write buffer and will try again on the next call to [`Self::write_message`] or [`Self::write_pending`]. + /// in the write buffer and will try again on the next call to [`write`](Self::write) + /// or [`flush`](Self::flush). /// /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] - /// `Err(WriteBufferFull(msg_frame))` is returned. + /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. /// - /// This call will not flush, except to reply to Ping - /// requests. A Pong reply will flush early because the - /// [websocket RFC](https://tools.ietf.org/html/rfc6455#section-5.5.2) specifies it should be sent - /// as soon as is practical. + /// This call will generally not flush. However, if there are queued automatic messages + /// they will be written and eagerly flushed. /// - /// Note that upon receiving a ping message, tungstenite cues a pong reply automatically. - /// When you call either `read_message`, `write_message` or `write_pending` next it will try to - /// write & flush the pong reply if possible. This means you should not respond to ping frames manually. + /// For example, upon receiving ping messages tungstenite queues pong replies automatically. + /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush) + /// will write & flush the pong reply. This means you should not respond to ping frames manually. /// /// You can however send pong frames manually in order to indicate a unidirectional heartbeat /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that - /// if `read_message` returns a ping, you should call `write_pending` until it doesn't return - /// WouldBlock before passing a pong to `write_message`, otherwise the response to the - /// ping will not be sent, but rather replaced by your custom pong message. + /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing + /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the + /// ping will not be sent as it will be replaced by your custom pong message. /// - /// ## Errors + /// # Errors /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned - /// along with the equivalent passed message frame. Otherwise, the message is queued and Ok(()) is returned. + /// along with the equivalent passed message frame. /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`]. - /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from `read_message`, - /// [`Error::AlreadyClosed`] will be returned. This indicates a program error on your part. + /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from + /// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program + /// error on your part. /// - [`Error::Io`] is returned if the underlying connection returns an error /// (consider these fatal except for WouldBlock). /// - [`Error::Capacity`] if your message size is bigger than the configured max message size. - pub fn write_message(&mut self, message: Message) -> Result<()> { - self.context.write_message(&mut self.socket, message) + pub fn write(&mut self, message: Message) -> Result<()> { + self.context.write(&mut self.socket, message) } - /// Flush pending writes. - pub fn write_pending(&mut self) -> Result<()> { - self.context.write_pending(&mut self.socket) + /// Flush writes. + /// + /// Ensures all messages previously passed to [`write`](Self::write) and automatic + /// queued pong responses are written & flushed into the underlying stream. + pub fn flush(&mut self) -> Result<()> { + self.context.flush(&mut self.socket) } /// Close the connection. /// /// This function guarantees that the close frame will be queued. /// There is no need to call it again. Calling this function is - /// the same as calling `write_message(Message::Close(..))`. + /// the same as calling `write(Message::Close(..))`. /// - /// After queuing the close frame you should continue calling `read_message` or - /// `write_pending` to drive the close handshake to completion. + /// After queuing the close frame you should continue calling [`read`](Self::read) or + /// [`flush`](Self::flush) to drive the close handshake to completion. /// /// The websocket RFC defines that the underlying connection should be closed /// by the server. Tungstenite takes care of this asymmetry for you. /// /// When the close handshake is finished (we have both sent and received - /// a close message), `read_message` or `write_pending` will return + /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return /// [Error::ConnectionClosed] if this endpoint is the server. /// /// If this endpoint is a client, [Error::ConnectionClosed] will only be /// returned after the server has closed the underlying connection. /// /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed] - /// is returned from `read_message` or `write_pending`. + /// is returned from [`read`](Self::read) or [`flush`](Self::flush). pub fn close(&mut self, code: Option) -> Result<()> { self.context.close(&mut self.socket, code) } + + /// Old name for [`read`](Self::read). + #[deprecated(note = "Use `read`")] + pub fn read_message(&mut self) -> Result { + self.read() + } + + /// Old name for [`send`](Self::send). + #[deprecated(note = "Use `send`")] + pub fn write_message(&mut self, message: Message) -> Result<()> { + self.send(message) + } + + /// Old name for [`flush`](Self::flush). + #[deprecated(note = "Use `flush`")] + pub fn write_pending(&mut self) -> Result<()> { + self.flush() + } } /// A context for managing WebSocket stream. @@ -305,7 +334,7 @@ impl WebSocketContext { /// /// This function sends pong and close responses automatically. /// However, it never blocks on write. - pub fn read_message(&mut self, stream: &mut Stream) -> Result + pub fn read(&mut self, stream: &mut Stream) -> Result where Stream: Read + Write, { @@ -315,14 +344,13 @@ impl WebSocketContext { loop { if self.additional_send.is_some() { // Since we may get ping or close, we need to reply to the messages even during read. - // Thus we call write_pending() but ignore its blocking. - self.write_pending(stream).no_block()?; + // Thus we flush but ignore its blocking. + self.flush(stream).no_block()?; } else if self.role == Role::Server && !self.state.can_read() { self.state = WebSocketState::Terminated; return Err(Error::ConnectionClosed); } - // TODO don't flush writes when reading // If we get here, either write blocks or we have nothing to write. // Thus if read blocks, just let it return WouldBlock. if let Some(message) = self.read_message_frame(stream)? { @@ -332,19 +360,17 @@ impl WebSocketContext { } } - /// Write a message to the provided stream, if possible. + /// Write a message to the provided stream. /// - /// A subsequent call should be made to [`Self::write_pending`] to flush writes. + /// A subsequent call should be made to [`flush`](Self::flush) to flush writes. /// /// In the event of stream write failure the message frame will be stored - /// in the write buffer and will try again on the next call to [`Self::write_message`] or [`Self::write_pending`]. + /// in the write buffer and will try again on the next call to [`write`](Self::write) + /// or [`flush`](Self::flush). /// /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`] - /// `Err(WriteBufferFull(msg_frame))` is returned. - /// - /// Note that only the latest pong frame is stored to be sent, so only the - /// most recent pong frame is sent if multiple pong frames are queued. - pub fn write_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> + /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned. + pub fn write(&mut self, stream: &mut Stream, message: Message) -> Result<()> where Stream: Read + Write, { @@ -363,35 +389,38 @@ impl WebSocketContext { Message::Pong(data) => { self.set_additional(Frame::pong(data)); // Note: user pongs can be user flushed so no need to flush here - return self.write(stream, None).map(|_| ()); + return self._write(stream, None).map(|_| ()); } Message::Close(code) => return self.close(stream, code), Message::Frame(f) => f, }; - let should_flush = self.write(stream, Some(frame))?; + let should_flush = self._write(stream, Some(frame))?; if should_flush { - self.write_pending(stream)?; + self.flush(stream)?; } Ok(()) } - /// Flush pending writes. + /// Flush writes. + /// + /// Ensures all messages previously passed to [`write`](Self::write) and automatically + /// queued pong responses are written & flushed into the `stream`. #[inline] - pub fn write_pending(&mut self, stream: &mut Stream) -> Result<()> + pub fn flush(&mut self, stream: &mut Stream) -> Result<()> where Stream: Read + Write, { - _ = self.write(stream, None)?; + self._write(stream, None)?; Ok(stream.flush()?) } - /// Write send queue & pongs. + /// Writes any data in the out_buffer, `additional_send` and given `data`. /// /// Does **not** flush. /// - /// Returns if the write contents indicate we should flush immediately. - fn write(&mut self, stream: &mut Stream, data: Option) -> Result + /// Returns true if the write contents indicate we should flush immediately. + fn _write(&mut self, stream: &mut Stream, data: Option) -> Result where Stream: Read + Write, { @@ -405,7 +434,15 @@ impl WebSocketContext { // respond with Pong frame as soon as is practical. (RFC 6455) let should_flush = if let Some(msg) = self.additional_send.take() { trace!("Sending pong/close"); - self.write_one_frame(stream, msg)?; + if let Err(err) = self.write_one_frame(stream, msg) { + match err { + // if an system message would exceed the buffer put it back in + // `additional_send` for retry. Otherwise returning this error + // may not make sense to the user, e.g. calling `flush`. + Error::WriteBufferFull(Message::Frame(msg)) => self.set_additional(msg), + err => return Err(err), + } + } true } else { false @@ -438,7 +475,7 @@ impl WebSocketContext { if let WebSocketState::Active = self.state { self.state = WebSocketState::ClosedByUs; let frame = Frame::close(code); - self.write(stream, Some(frame))?; + self._write(stream, Some(frame))?; } else { // Already closed, nothing to do. } @@ -731,10 +768,10 @@ mod tests { 0x03, ]); let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None); - assert_eq!(socket.read_message().unwrap(), Message::Ping(vec![1, 2])); - assert_eq!(socket.read_message().unwrap(), Message::Pong(vec![3])); - assert_eq!(socket.read_message().unwrap(), Message::Text("Hello, World!".into())); - assert_eq!(socket.read_message().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); + assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2])); + assert_eq!(socket.read().unwrap(), Message::Pong(vec![3])); + assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into())); + assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); } #[test] @@ -747,7 +784,7 @@ mod tests { let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); assert!(matches!( - socket.read_message(), + socket.read(), Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 })) )); } @@ -759,7 +796,7 @@ mod tests { let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit)); assert!(matches!( - socket.read_message(), + socket.read(), Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 })) )); } diff --git a/tests/connection_reset.rs b/tests/connection_reset.rs index 40b7469..015b4c1 100644 --- a/tests/connection_reset.rs +++ b/tests/connection_reset.rs @@ -52,27 +52,27 @@ fn test_server_close() { do_test( 3012, |mut cli_sock| { - cli_sock.write_message(Message::Text("Hello WebSocket".into())).unwrap(); + cli_sock.send(Message::Text("Hello WebSocket".into())).unwrap(); - let message = cli_sock.read_message().unwrap(); // receive close from server + let message = cli_sock.read().unwrap(); // receive close from server assert!(message.is_close()); - let err = cli_sock.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = cli_sock.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err), } }, |mut srv_sock| { - let message = srv_sock.read_message().unwrap(); + let message = srv_sock.read().unwrap(); assert_eq!(message.into_data(), b"Hello WebSocket"); srv_sock.close(None).unwrap(); // send close to client - let message = srv_sock.read_message().unwrap(); // receive acknowledgement + let message = srv_sock.read().unwrap(); // receive acknowledgement assert!(message.is_close()); - let err = srv_sock.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = srv_sock.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err), @@ -86,26 +86,26 @@ fn test_evil_server_close() { do_test( 3013, |mut cli_sock| { - cli_sock.write_message(Message::Text("Hello WebSocket".into())).unwrap(); + cli_sock.send(Message::Text("Hello WebSocket".into())).unwrap(); sleep(Duration::from_secs(1)); - let message = cli_sock.read_message().unwrap(); // receive close from server + let message = cli_sock.read().unwrap(); // receive close from server assert!(message.is_close()); - let err = cli_sock.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = cli_sock.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err), } }, |mut srv_sock| { - let message = srv_sock.read_message().unwrap(); + let message = srv_sock.read().unwrap(); assert_eq!(message.into_data(), b"Hello WebSocket"); srv_sock.close(None).unwrap(); // send close to client - let message = srv_sock.read_message().unwrap(); // receive acknowledgement + let message = srv_sock.read().unwrap(); // receive acknowledgement assert!(message.is_close()); // and now just drop the connection without waiting for `ConnectionClosed` srv_sock.get_mut().set_linger(Some(Duration::from_secs(0))).unwrap(); @@ -119,32 +119,32 @@ fn test_client_close() { do_test( 3014, |mut cli_sock| { - cli_sock.write_message(Message::Text("Hello WebSocket".into())).unwrap(); + cli_sock.send(Message::Text("Hello WebSocket".into())).unwrap(); - let message = cli_sock.read_message().unwrap(); // receive answer from server + let message = cli_sock.read().unwrap(); // receive answer from server assert_eq!(message.into_data(), b"From Server"); cli_sock.close(None).unwrap(); // send close to server - let message = cli_sock.read_message().unwrap(); // receive acknowledgement from server + let message = cli_sock.read().unwrap(); // receive acknowledgement from server assert!(message.is_close()); - let err = cli_sock.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = cli_sock.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err), } }, |mut srv_sock| { - let message = srv_sock.read_message().unwrap(); + let message = srv_sock.read().unwrap(); assert_eq!(message.into_data(), b"Hello WebSocket"); - srv_sock.write_message(Message::Text("From Server".into())).unwrap(); + srv_sock.send(Message::Text("From Server".into())).unwrap(); - let message = srv_sock.read_message().unwrap(); // receive close from client + let message = srv_sock.read().unwrap(); // receive close from client assert!(message.is_close()); - let err = srv_sock.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = srv_sock.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err), diff --git a/tests/no_send_after_close.rs b/tests/no_send_after_close.rs index 2182b92..258d09a 100644 --- a/tests/no_send_after_close.rs +++ b/tests/no_send_after_close.rs @@ -29,10 +29,10 @@ fn test_no_send_after_close() { let client_thread = spawn(move || { let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap()).unwrap(); - let message = client.read_message().unwrap(); // receive close from server + let message = client.read().unwrap(); // receive close from server assert!(message.is_close()); - let err = client.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = client.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err), @@ -44,7 +44,7 @@ fn test_no_send_after_close() { client_handler.close(None).unwrap(); // send close to client - let err = client_handler.write_message(Message::Text("Hello WebSocket".into())); + let err = client_handler.send(Message::Text("Hello WebSocket".into())); assert!(err.is_err()); diff --git a/tests/receive_after_init_close.rs b/tests/receive_after_init_close.rs index c8661c8..af867a7 100644 --- a/tests/receive_after_init_close.rs +++ b/tests/receive_after_init_close.rs @@ -29,12 +29,12 @@ fn test_receive_after_init_close() { let client_thread = spawn(move || { let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap()).unwrap(); - client.write_message(Message::Text("Hello WebSocket".into())).unwrap(); + client.send(Message::Text("Hello WebSocket".into())).unwrap(); - let message = client.read_message().unwrap(); // receive close from server + let message = client.read().unwrap(); // receive close from server assert!(message.is_close()); - let err = client.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = client.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err), @@ -47,12 +47,12 @@ fn test_receive_after_init_close() { client_handler.close(None).unwrap(); // send close to client // This read should succeed even though we already initiated a close - let message = client_handler.read_message().unwrap(); + let message = client_handler.read().unwrap(); assert_eq!(message.into_data(), b"Hello WebSocket"); - assert!(client_handler.read_message().unwrap().is_close()); // receive acknowledgement + assert!(client_handler.read().unwrap().is_close()); // receive acknowledgement - let err = client_handler.read_message().unwrap_err(); // now we should get ConnectionClosed + let err = client_handler.read().unwrap_err(); // now we should get ConnectionClosed match err { Error::ConnectionClosed => {} _ => panic!("unexpected error: {:?}", err),