From 3cf0b839495d058d21a4a7c9f0431a04bd3ffd79 Mon Sep 17 00:00:00 2001 From: SirCipher Date: Wed, 23 Sep 2020 16:44:11 +0100 Subject: [PATCH 1/4] Adds support for outbound message chunking --- src/protocol/mod.rs | 123 ++++++++++++++++++++++++++++++++------------ 1 file changed, 90 insertions(+), 33 deletions(-) diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index bb695e5..939b9db 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -19,6 +19,7 @@ use crate::error::{Error, Result}; use crate::extensions::uncompressed::UncompressedExt; use crate::extensions::WebSocketExtension; use crate::util::NonBlockingResult; +use crate::protocol::frame::coding::Data; pub(crate) const MAX_MESSAGE_SIZE: usize = 64 << 20; @@ -34,8 +35,8 @@ pub enum Role { /// The configuration for WebSocket connection. #[derive(Debug, Copy, Clone)] pub struct WebSocketConfig -where - E: WebSocketExtension, + where + E: WebSocketExtension, { /// The size of the send queue. You can use it to turn on/off the backpressure features. `None` /// means here that the size of the queue is unlimited. The default value is the unlimited @@ -51,8 +52,8 @@ where } impl Default for WebSocketConfig -where - E: WebSocketExtension, + where + E: WebSocketExtension, { fn default() -> Self { WebSocketConfig { @@ -64,8 +65,8 @@ where } impl WebSocketConfig -where - E: WebSocketExtension, + where + E: WebSocketExtension, { /// Creates a `WebSocketConfig` instance using the default configuration and the provided /// encoder for new connections. @@ -84,8 +85,8 @@ where /// It may be created by calling `connect`, `accept` or `client` functions. #[derive(Debug)] pub struct WebSocket -where - Ext: WebSocketExtension, + where + Ext: WebSocketExtension, { /// The underlying socket. socket: Stream, @@ -94,8 +95,8 @@ where } impl WebSocket -where - Ext: WebSocketExtension, + where + Ext: WebSocketExtension, { /// Convert a raw socket into a WebSocket without performing a handshake. /// @@ -167,9 +168,9 @@ where } impl WebSocket -where - Stream: Read + Write, - Ext: WebSocketExtension, + where + Stream: Read + Write, + Ext: WebSocketExtension, { /// Read a message from stream, if possible. /// @@ -254,8 +255,8 @@ where /// A context for managing WebSocket stream. #[derive(Debug)] pub struct WebSocketContext -where - Ext: WebSocketExtension, + where + Ext: WebSocketExtension, { /// Server or client? role: Role, @@ -274,8 +275,8 @@ where } impl WebSocketContext -where - Ext: WebSocketExtension, + where + Ext: WebSocketExtension, { /// Create a WebSocket context that manages a post-handshake stream. pub fn new(role: Role, config: Option>) -> Self { @@ -334,8 +335,8 @@ where /// This function sends pong and close responses automatically. /// However, it never blocks on write. pub fn read_message(&mut self, stream: &mut Stream) -> Result - where - Stream: Read + Write, + where + Stream: Read + Write, { // Do not read from already closed connections. self.state.check_active()?; @@ -362,8 +363,8 @@ where /// Note that only the last pong frame is stored to be sent, and only the /// most recent pong frame is sent if multiple pong frames are queued. pub fn write_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { // When terminated, return AlreadyClosed. self.state.check_active()?; @@ -405,8 +406,8 @@ where /// Flush the pending send queue. pub fn write_pending(&mut self, stream: &mut Stream) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { // First, make sure we have no pending frame sending. self.frame.write_pending(stream)?; @@ -448,8 +449,8 @@ where /// There is no need to call it again. Calling this function is /// the same as calling `write(Message::Close(..))`. pub fn close(&mut self, stream: &mut Stream, code: Option) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { if let WebSocketState::Active = self.state { self.state = WebSocketState::ClosedByUs; @@ -463,8 +464,8 @@ where /// Try to decode one message frame. May return None. fn read_message_frame(&mut self, stream: &mut Stream) -> Result> - where - Stream: Read + Write, + where + Stream: Read + Write, { if let Some(mut frame) = self .frame @@ -589,8 +590,8 @@ where /// Send a single pending frame. fn send_one_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { match self.role { Role::Server => {} @@ -608,10 +609,27 @@ where }; } - trace!("Sending frame: {:?}", frame); - self.frame - .write_frame(stream, frame) - .check_connection_reset(self.state) + let max_frame_size = self.config.max_frame_size.unwrap_or_else(usize::max_value); + if frame.payload().len() > max_frame_size { + let mut chunks = frame.payload().chunks(self.config.max_frame_size.unwrap_or_else(usize::max_value)).peekable(); + let data_frame = Frame::message(Vec::from(chunks.next().unwrap()), frame.header().opcode, false); + self.frame.write_frame(stream, data_frame).check_connection_reset(self.state)?; + + while let Some(chunk) = chunks.next() { + let frame = Frame::message(Vec::from(chunk), OpCode::Data(Data::Continue), chunks.peek().is_none()); + + trace!("Sending frame: {:?}", frame); + + self.frame.write_frame(stream, frame).check_connection_reset(self.state)?; + } + + Ok(()) + } else { + trace!("Sending frame: {:?}", frame); + self.frame + .write_frame(stream, frame) + .check_connection_reset(self.state) + } } } @@ -685,6 +703,8 @@ mod tests { use crate::extensions::uncompressed::UncompressedExt; use std::io; use std::io::Cursor; + use crate::protocol::frame::Frame; + use crate::protocol::frame::coding::{OpCode, Data}; struct WriteMoc(Stream); @@ -756,4 +776,41 @@ mod tests { "Space limit exceeded: Message too big: 0 + 3 > 2" ); } + + #[test] + fn fragmented_tx() { + let max_message_size = 2; + let input_str = "hello unit test"; + + let limit = WebSocketConfig { + max_send_queue: None, + max_frame_size: Some(2), + encoder: UncompressedExt::new(Some(max_message_size)), + }; + + let mut socket = WebSocket::from_raw_socket(Cursor::new(Vec::new()), Role::Client, Some(limit)); + + socket.write_message(Message::text(input_str)).unwrap(); + socket.socket.set_position(0); + + let WebSocket { mut socket, mut context } = socket; + + let vec = input_str.chars().collect::>(); + let mut iter = vec.chunks(max_message_size).map(|c| c.iter().collect::()) + .into_iter().peekable(); + + let frame_eq = |expected: Frame, actual: Frame| { + assert_eq!(expected.payload(), actual.payload()); + assert_eq!(expected.header().opcode, actual.header().opcode); + assert_eq!(expected.header().rsv1, actual.header().rsv1); + }; + + let expected = Frame::message(iter.next().unwrap().into(), OpCode::Data(Data::Text), false); + frame_eq(expected, context.frame.read_frame(&mut socket, Some(max_message_size)).unwrap().unwrap()); + + while let Some(chars) = iter.next() { + let expected = Frame::message(chars.into(), OpCode::Data(Data::Continue), iter.peek().is_none()); + frame_eq(expected, context.frame.read_frame(&mut socket, Some(max_message_size)).unwrap().unwrap()); + } + } } From 2744d1be4fd659b873bcdf28eb03e4f46bee14fd Mon Sep 17 00:00:00 2001 From: SirCipher Date: Wed, 23 Sep 2020 16:52:01 +0100 Subject: [PATCH 2/4] Reformat --- src/protocol/mod.rs | 128 +++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 44 deletions(-) diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 939b9db..f5bc684 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -18,8 +18,8 @@ use self::message::IncompleteMessage; use crate::error::{Error, Result}; use crate::extensions::uncompressed::UncompressedExt; use crate::extensions::WebSocketExtension; -use crate::util::NonBlockingResult; use crate::protocol::frame::coding::Data; +use crate::util::NonBlockingResult; pub(crate) const MAX_MESSAGE_SIZE: usize = 64 << 20; @@ -35,8 +35,8 @@ pub enum Role { /// The configuration for WebSocket connection. #[derive(Debug, Copy, Clone)] pub struct WebSocketConfig - where - E: WebSocketExtension, +where + E: WebSocketExtension, { /// The size of the send queue. You can use it to turn on/off the backpressure features. `None` /// means here that the size of the queue is unlimited. The default value is the unlimited @@ -52,8 +52,8 @@ pub struct WebSocketConfig } impl Default for WebSocketConfig - where - E: WebSocketExtension, +where + E: WebSocketExtension, { fn default() -> Self { WebSocketConfig { @@ -65,8 +65,8 @@ impl Default for WebSocketConfig } impl WebSocketConfig - where - E: WebSocketExtension, +where + E: WebSocketExtension, { /// Creates a `WebSocketConfig` instance using the default configuration and the provided /// encoder for new connections. @@ -85,8 +85,8 @@ impl WebSocketConfig /// It may be created by calling `connect`, `accept` or `client` functions. #[derive(Debug)] pub struct WebSocket - where - Ext: WebSocketExtension, +where + Ext: WebSocketExtension, { /// The underlying socket. socket: Stream, @@ -95,8 +95,8 @@ pub struct WebSocket } impl WebSocket - where - Ext: WebSocketExtension, +where + Ext: WebSocketExtension, { /// Convert a raw socket into a WebSocket without performing a handshake. /// @@ -168,9 +168,9 @@ impl WebSocket } impl WebSocket - where - Stream: Read + Write, - Ext: WebSocketExtension, +where + Stream: Read + Write, + Ext: WebSocketExtension, { /// Read a message from stream, if possible. /// @@ -255,8 +255,8 @@ impl WebSocket /// A context for managing WebSocket stream. #[derive(Debug)] pub struct WebSocketContext - where - Ext: WebSocketExtension, +where + Ext: WebSocketExtension, { /// Server or client? role: Role, @@ -275,8 +275,8 @@ pub struct WebSocketContext } impl WebSocketContext - where - Ext: WebSocketExtension, +where + Ext: WebSocketExtension, { /// Create a WebSocket context that manages a post-handshake stream. pub fn new(role: Role, config: Option>) -> Self { @@ -335,8 +335,8 @@ impl WebSocketContext /// This function sends pong and close responses automatically. /// However, it never blocks on write. pub fn read_message(&mut self, stream: &mut Stream) -> Result - where - Stream: Read + Write, + where + Stream: Read + Write, { // Do not read from already closed connections. self.state.check_active()?; @@ -363,8 +363,8 @@ impl WebSocketContext /// Note that only the last pong frame is stored to be sent, and only the /// most recent pong frame is sent if multiple pong frames are queued. pub fn write_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { // When terminated, return AlreadyClosed. self.state.check_active()?; @@ -406,8 +406,8 @@ impl WebSocketContext /// Flush the pending send queue. pub fn write_pending(&mut self, stream: &mut Stream) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { // First, make sure we have no pending frame sending. self.frame.write_pending(stream)?; @@ -449,8 +449,8 @@ impl WebSocketContext /// There is no need to call it again. Calling this function is /// the same as calling `write(Message::Close(..))`. pub fn close(&mut self, stream: &mut Stream, code: Option) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { if let WebSocketState::Active = self.state { self.state = WebSocketState::ClosedByUs; @@ -464,8 +464,8 @@ impl WebSocketContext /// Try to decode one message frame. May return None. fn read_message_frame(&mut self, stream: &mut Stream) -> Result> - where - Stream: Read + Write, + where + Stream: Read + Write, { if let Some(mut frame) = self .frame @@ -590,8 +590,8 @@ impl WebSocketContext /// Send a single pending frame. fn send_one_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> - where - Stream: Read + Write, + where + Stream: Read + Write, { match self.role { Role::Server => {} @@ -611,16 +611,31 @@ impl WebSocketContext let max_frame_size = self.config.max_frame_size.unwrap_or_else(usize::max_value); if frame.payload().len() > max_frame_size { - let mut chunks = frame.payload().chunks(self.config.max_frame_size.unwrap_or_else(usize::max_value)).peekable(); - let data_frame = Frame::message(Vec::from(chunks.next().unwrap()), frame.header().opcode, false); - self.frame.write_frame(stream, data_frame).check_connection_reset(self.state)?; + let mut chunks = frame + .payload() + .chunks(self.config.max_frame_size.unwrap_or_else(usize::max_value)) + .peekable(); + let data_frame = Frame::message( + Vec::from(chunks.next().unwrap()), + frame.header().opcode, + false, + ); + self.frame + .write_frame(stream, data_frame) + .check_connection_reset(self.state)?; while let Some(chunk) = chunks.next() { - let frame = Frame::message(Vec::from(chunk), OpCode::Data(Data::Continue), chunks.peek().is_none()); + let frame = Frame::message( + Vec::from(chunk), + OpCode::Data(Data::Continue), + chunks.peek().is_none(), + ); trace!("Sending frame: {:?}", frame); - self.frame.write_frame(stream, frame).check_connection_reset(self.state)?; + self.frame + .write_frame(stream, frame) + .check_connection_reset(self.state)?; } Ok(()) @@ -701,10 +716,10 @@ mod tests { use super::{Message, Role, WebSocket, WebSocketConfig}; use crate::extensions::uncompressed::UncompressedExt; + use crate::protocol::frame::coding::{Data, OpCode}; + use crate::protocol::frame::Frame; use std::io; use std::io::Cursor; - use crate::protocol::frame::Frame; - use crate::protocol::frame::coding::{OpCode, Data}; struct WriteMoc(Stream); @@ -788,16 +803,23 @@ mod tests { encoder: UncompressedExt::new(Some(max_message_size)), }; - let mut socket = WebSocket::from_raw_socket(Cursor::new(Vec::new()), Role::Client, Some(limit)); + let mut socket = + WebSocket::from_raw_socket(Cursor::new(Vec::new()), Role::Client, Some(limit)); socket.write_message(Message::text(input_str)).unwrap(); socket.socket.set_position(0); - let WebSocket { mut socket, mut context } = socket; + let WebSocket { + mut socket, + mut context, + } = socket; let vec = input_str.chars().collect::>(); - let mut iter = vec.chunks(max_message_size).map(|c| c.iter().collect::()) - .into_iter().peekable(); + let mut iter = vec + .chunks(max_message_size) + .map(|c| c.iter().collect::()) + .into_iter() + .peekable(); let frame_eq = |expected: Frame, actual: Frame| { assert_eq!(expected.payload(), actual.payload()); @@ -806,11 +828,29 @@ mod tests { }; let expected = Frame::message(iter.next().unwrap().into(), OpCode::Data(Data::Text), false); - frame_eq(expected, context.frame.read_frame(&mut socket, Some(max_message_size)).unwrap().unwrap()); + frame_eq( + expected, + context + .frame + .read_frame(&mut socket, Some(max_message_size)) + .unwrap() + .unwrap(), + ); while let Some(chars) = iter.next() { - let expected = Frame::message(chars.into(), OpCode::Data(Data::Continue), iter.peek().is_none()); - frame_eq(expected, context.frame.read_frame(&mut socket, Some(max_message_size)).unwrap().unwrap()); + let expected = Frame::message( + chars.into(), + OpCode::Data(Data::Continue), + iter.peek().is_none(), + ); + frame_eq( + expected, + context + .frame + .read_frame(&mut socket, Some(max_message_size)) + .unwrap() + .unwrap(), + ); } } } From fcf56c31c5340ab4e34f8e9e99f7b1f4717734ae Mon Sep 17 00:00:00 2001 From: SirCipher Date: Wed, 23 Sep 2020 16:53:06 +0100 Subject: [PATCH 3/4] Renames ambiguous extension type parameters --- src/protocol/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index f5bc684..17d1d9c 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -51,26 +51,26 @@ where pub encoder: E, } -impl Default for WebSocketConfig +impl Default for WebSocketConfig where - E: WebSocketExtension, + Ext: WebSocketExtension, { fn default() -> Self { WebSocketConfig { max_send_queue: None, max_frame_size: Some(16 << 20), - encoder: E::new(Some(MAX_MESSAGE_SIZE)), + encoder: Ext::new(Some(MAX_MESSAGE_SIZE)), } } } -impl WebSocketConfig +impl WebSocketConfig where - E: WebSocketExtension, + Ext: WebSocketExtension, { /// Creates a `WebSocketConfig` instance using the default configuration and the provided /// encoder for new connections. - pub fn default_with_encoder(encoder: E) -> WebSocketConfig { + pub fn default_with_encoder(encoder: Ext) -> WebSocketConfig { WebSocketConfig { max_send_queue: None, max_frame_size: Some(16 << 20), From 710be47e1d69788a858defb02295beddab19168e Mon Sep 17 00:00:00 2001 From: SirCipher Date: Wed, 23 Sep 2020 17:09:09 +0100 Subject: [PATCH 4/4] Changes chunk iterator to use field for chunk size --- src/protocol/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 17d1d9c..9af32c7 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -611,10 +611,7 @@ where let max_frame_size = self.config.max_frame_size.unwrap_or_else(usize::max_value); if frame.payload().len() > max_frame_size { - let mut chunks = frame - .payload() - .chunks(self.config.max_frame_size.unwrap_or_else(usize::max_value)) - .peekable(); + let mut chunks = frame.payload().chunks(max_frame_size).peekable(); let data_frame = Frame::message( Vec::from(chunks.next().unwrap()), frame.header().opcode,