diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index 9d17338..830146f 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -18,12 +18,8 @@ use error::{Error, Result}; pub struct FrameSocket { /// The underlying network stream. stream: Stream, - /// Buffer to read data from the stream. - in_buffer: InputBuffer, - /// Buffer to send packets to the network. - out_buffer: Vec, - /// Header and remaining size of the incoming packet being processed. - header: Option<(FrameHeader, u64)>, + /// Codec for reading/writing frames. + codec: FrameCodec, } impl FrameSocket { @@ -31,9 +27,7 @@ impl FrameSocket { pub fn new(stream: Stream) -> Self { FrameSocket { stream, - in_buffer: InputBuffer::with_capacity(MIN_READ), - out_buffer: Vec::new(), - header: None, + codec: FrameCodec::new(), } } @@ -41,15 +35,13 @@ impl FrameSocket { pub fn from_partially_read(stream: Stream, part: Vec) -> Self { FrameSocket { stream, - in_buffer: InputBuffer::from_partially_read(part), - out_buffer: Vec::new(), - header: None, + codec: FrameCodec::from_partially_read(part), } } /// Extract a stream from the socket. pub fn into_inner(self) -> (Stream, Vec) { - (self.stream, self.in_buffer.into_vec()) + (self.stream, self.codec.in_buffer.into_vec()) } /// Returns a shared reference to the inner stream. @@ -68,6 +60,67 @@ impl FrameSocket { /// Read a frame from stream. pub fn read_frame(&mut self, max_size: Option) -> Result> { + self.codec.read_frame(&mut self.stream, max_size) + } +} + +impl FrameSocket + where Stream: Write +{ + /// Write a frame to stream. + /// + /// This function guarantees that the frame is queued regardless of any errors. + /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, + /// call write_pending() afterwards. + pub fn write_frame(&mut self, frame: Frame) -> Result<()> { + self.codec.write_frame(&mut self.stream, frame) + } + + /// Complete pending write, if any. + pub fn write_pending(&mut self) -> Result<()> { + self.codec.write_pending(&mut self.stream) + } +} + +/// A codec for WebSocket frames. +#[derive(Debug)] +pub(super) struct FrameCodec { + /// Buffer to read data from the stream. + in_buffer: InputBuffer, + /// Buffer to send packets to the network. + out_buffer: Vec, + /// Header and remaining size of the incoming packet being processed. + header: Option<(FrameHeader, u64)>, +} + +impl FrameCodec { + /// Create a new frame codec. + pub(super) fn new() -> Self { + Self { + in_buffer: InputBuffer::with_capacity(MIN_READ), + out_buffer: Vec::new(), + header: None, + } + } + + /// Create a new frame codec from partially read data. + pub(super) fn from_partially_read(part: Vec) -> Self { + Self { + in_buffer: InputBuffer::from_partially_read(part), + out_buffer: Vec::new(), + header: None, + } + } + + /// Read a frame from the provided stream. + pub(super) fn read_frame( + &mut self, + stream: &mut Stream, + max_size: Option, + ) -> Result> + where + Stream: Read, + { let max_size = max_size.unwrap_or_else(usize::max_value); let payload = loop { @@ -81,7 +134,7 @@ impl FrameSocket if let Some((_, ref length)) = self.header { let length = *length; - // Enforce frame size limit early and make sure `length` + // Enforce frame size limit early and make sure `length` // is not too big (fits into `usize`). if length > max_size as u64 { return Err(Error::Capacity( @@ -105,7 +158,7 @@ impl FrameSocket let size = self.in_buffer.prepare_reserve(MIN_READ) .with_limit(usize::max_value()) .map_err(|_| Error::Capacity("Incoming TCP buffer is full".into()))? - .read_from(&mut self.stream)?; + .read_from(stream)?; if size == 0 { trace!("no frame received"); return Ok(None) @@ -119,34 +172,35 @@ impl FrameSocket Ok(Some(frame)) } -} - -impl FrameSocket - where Stream: Write -{ - /// Write a frame to stream. - /// - /// This function guarantees that the frame is queued regardless of any errors. - /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, - /// call write_pending() afterwards. - pub fn write_frame(&mut self, frame: Frame) -> Result<()> { + /// Write a frame to the provided stream. + pub(super) fn write_frame( + &mut self, + stream: &mut Stream, + frame: Frame, + ) -> Result<()> + where + Stream: Write, + { trace!("writing frame {}", frame); self.out_buffer.reserve(frame.len()); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); - self.write_pending() + self.write_pending(stream) } + /// Complete pending write, if any. - pub fn write_pending(&mut self) -> Result<()> { + pub(super) fn write_pending(&mut self, stream: &mut Stream) -> Result<()> + where + Stream: Write, + { while !self.out_buffer.is_empty() { - let len = self.stream.write(&self.out_buffer)?; + let len = stream.write(&self.out_buffer)?; self.out_buffer.drain(0..len); } - self.stream.flush()?; + stream.flush()?; Ok(()) } } - #[cfg(test)] mod tests { diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index d82e261..f794859 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -13,7 +13,7 @@ use std::mem::replace; use error::{Error, Result}; use self::message::{IncompleteMessage, IncompleteMessageType}; -use self::frame::{Frame, FrameSocket}; +use self::frame::{Frame, FrameCodec}; use self::frame::coding::{OpCode, Data as OpData, Control as OpCtl, CloseCode}; use util::NonBlockingResult; @@ -60,20 +60,10 @@ impl Default for WebSocketConfig { /// It may be created by calling `connect`, `accept` or `client` functions. #[derive(Debug)] pub struct WebSocket { - /// Server or client? - role: Role, /// The underlying socket. - socket: FrameSocket, - /// The state of processing, either "active" or "closing". - state: WebSocketState, - /// Receive: an incomplete message being processed. - incomplete: Option, - /// Send: a data send queue. - send_queue: VecDeque, - /// Send: an OOB pong message. - pong: Option, - /// The configuration for the websocket session. - config: WebSocketConfig, + socket: Stream, + /// The context for managing a WebSocket. + context: WebSocketContext, } impl WebSocket { @@ -83,7 +73,10 @@ impl WebSocket { /// or together with an existing one. If you need an initial handshake, use /// `connect()` or `accept()` functions of the crate to construct a websocket. pub fn from_raw_socket(stream: Stream, role: Role, config: Option) -> Self { - WebSocket::from_frame_socket(FrameSocket::new(stream), role, config) + WebSocket { + socket: stream, + context: WebSocketContext::new(role, config), + } } /// Convert a raw socket into a WebSocket without performing a handshake. @@ -97,34 +90,89 @@ impl WebSocket { role: Role, config: Option, ) -> Self { - WebSocket::from_frame_socket(FrameSocket::from_partially_read(stream, part), role, config) + WebSocket { + socket: stream, + context: WebSocketContext::from_partially_read(part, role, config), + } } /// Returns a shared reference to the inner stream. pub fn get_ref(&self) -> &Stream { - self.socket.get_ref() + &self.socket } /// Returns a mutable reference to the inner stream. pub fn get_mut(&mut self) -> &mut Stream { - self.socket.get_mut() + &mut self.socket } /// Change the configuration. pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { - set_func(&mut self.config) + self.context.set_config(set_func) } } -impl WebSocket { - /// Convert a frame socket into a WebSocket. - fn from_frame_socket( - socket: FrameSocket, - role: Role, - config: Option - ) -> Self { - WebSocket { +impl WebSocket { + /// Read a message from stream, if possible. + /// + /// This function sends pong and close responses automatically. + /// However, it never blocks on write. + pub fn read_message(&mut self) -> Result { + self.context.read_message(&mut self.socket) + } + + /// Send a message to stream, if possible. + /// + /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping + /// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned + /// along with the passed message. Otherwise, the message is queued and Ok(()) is returned. + /// + /// Note that only the last pong frame is stored to be sent, and only the + /// most recent pong frame is sent if multiple pong frames are queued. + pub fn write_message(&mut self, message: Message) -> Result<()> { + self.context.write_message(&mut self.socket, message) + } + + /// Flush the pending send queue. + pub fn write_pending(&mut self) -> Result<()> { + self.context.write_pending(&mut self.socket) + } + + /// Close the connection. + /// + /// This function guarantees that the close frame will be queued. + /// There is no need to call it again. Calling this function is + /// the same as calling `write(Message::Close(..))`. + pub fn close(&mut self, code: Option) -> Result<()> { + self.context.close(&mut self.socket, code) + } +} + + +/// A context for managing WebSocket stream. +#[derive(Debug)] +pub struct WebSocketContext { + /// Server or client? + role: Role, + /// encoder/decoder of frame. + frame: FrameCodec, + /// The state of processing, either "active" or "closing". + state: WebSocketState, + /// Receive: an incomplete message being processed. + incomplete: Option, + /// Send: a data send queue. + send_queue: VecDeque, + /// Send: an OOB pong message. + pong: Option, + /// The configuration for the websocket session. + config: WebSocketConfig, +} + +impl WebSocketContext { + /// Create a WebSocket context that manages a post-handshake stream. + pub fn new(role: Role, config: Option) -> Self { + WebSocketContext { role, - socket, + frame: FrameCodec::new(), state: WebSocketState::Active, incomplete: None, send_queue: VecDeque::new(), @@ -132,28 +180,46 @@ impl WebSocket { config: config.unwrap_or_else(WebSocketConfig::default), } } -} -impl WebSocket { - /// Read a message from stream, if possible. + /// Create a WebSocket context that manages an post-handshake stream. + pub fn from_partially_read( + part: Vec, + role: Role, + config: Option, + ) -> Self { + WebSocketContext { + frame: FrameCodec::from_partially_read(part), + ..WebSocketContext::new(role, config) + } + } + + /// Change the configuration. + pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { + set_func(&mut self.config) + } + + /// Read a message from the provided stream, if possible. /// /// This function sends pong and close responses automatically. /// However, it never blocks on write. - pub fn read_message(&mut self) -> Result { + pub fn read_message(&mut self, stream: &mut Stream) -> Result + where + Stream: Read + Write, + { loop { // Since we may get ping or close, we need to reply to the messages even during read. // Thus we call write_pending() but ignore its blocking. - self.write_pending().no_block()?; + self.write_pending(stream).no_block()?; // If we get here, either write blocks or we have nothing to write. // Thus if read blocks, just let it return WouldBlock. - if let Some(message) = self.read_message_frame()? { + if let Some(message) = self.read_message_frame(stream)? { trace!("Received message {}", message); return Ok(message) } } } - /// Send a message to stream, if possible. + /// Send a message to the provided stream, if possible. /// /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping /// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned @@ -161,13 +227,16 @@ impl WebSocket { /// /// Note that only the last pong frame is stored to be sent, and only the /// most recent pong frame is sent if multiple pong frames are queued. - pub fn write_message(&mut self, message: Message) -> Result<()> { + pub fn write_message(&mut self, stream: &mut Stream, message: Message) -> Result<()> + where + Stream: Read + Write, + { if let Some(max_send_queue) = self.config.max_send_queue { if self.send_queue.len() >= max_send_queue { // Try to make some room for the new message. // Do not return here if write would block, ignore WouldBlock silently // since we must queue the message anyway. - self.write_pending().no_block()?; + self.write_pending(stream).no_block()?; } if self.send_queue.len() >= max_send_queue { @@ -185,31 +254,34 @@ impl WebSocket { Message::Ping(data) => Frame::ping(data), Message::Pong(data) => { self.pong = Some(Frame::pong(data)); - return self.write_pending() + return self.write_pending(stream) } Message::Close(code) => { - return self.close(code) + return self.close(stream, code) } }; self.send_queue.push_back(frame); - self.write_pending() + self.write_pending(stream) } /// Flush the pending send queue. - pub fn write_pending(&mut self) -> Result<()> { + pub fn write_pending(&mut self, stream: &mut Stream) -> Result<()> + where + Stream: Read + Write, + { // First, make sure we have no pending frame sending. - self.socket.write_pending()?; + self.frame.write_pending(stream)?; // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // response, unless it already received a Close frame. It SHOULD // respond with Pong frame as soon as is practical. (RFC 6455) if let Some(pong) = self.pong.take() { - self.send_one_frame(pong)?; + self.send_one_frame(stream, pong)?; } // If we have any unsent frames, send them. while let Some(data) = self.send_queue.pop_front() { - self.send_one_frame(data)?; + self.send_one_frame(stream, data)?; } // If we get to this point, the send queue is empty and the underlying socket is still @@ -237,7 +309,10 @@ impl WebSocket { /// This function guarantees that the close frame will be queued. /// There is no need to call it again. Calling this function is /// the same as calling `write(Message::Close(..))`. - pub fn close(&mut self, code: Option) -> Result<()> { + pub fn close(&mut self, stream: &mut Stream, code: Option) -> Result<()> + where + Stream: Read + Write, + { if let WebSocketState::Active = self.state { self.state = WebSocketState::ClosedByUs; let frame = Frame::close(code); @@ -245,14 +320,17 @@ impl WebSocket { } else { // Already closed, nothing to do. } - self.write_pending() + self.write_pending(stream) } } -impl WebSocket { +impl WebSocketContext { /// Try to decode one message frame. May return None. - fn read_message_frame(&mut self) -> Result> { - if let Some(mut frame) = self.socket.read_frame(self.config.max_frame_size)? { + fn read_message_frame(&mut self, stream: &mut Stream) -> Result> + where + Stream: Read + Write, + { + if let Some(mut frame) = self.frame.read_frame(stream, self.config.max_frame_size)? { // MUST be 0 unless an extension is negotiated that defines meanings // for non-zero values. If a nonzero value is received and none of @@ -434,7 +512,10 @@ impl WebSocket { } /// Send a single pending frame. - fn send_one_frame(&mut self, mut frame: Frame) -> Result<()> { + fn send_one_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> + where + Stream: Read + Write, + { match self.role { Role::Server => { } @@ -444,10 +525,11 @@ impl WebSocket { frame.set_random_mask(); } } - self.socket.write_frame(frame) + self.frame.write_frame(stream, frame) } } + /// The current connection state. #[derive(Debug)] enum WebSocketState {