From 604e2021ce29db32221b7719803fe8511dc35fcb Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Fri, 17 Feb 2017 18:13:30 +0100 Subject: [PATCH] protocol: add write_pending() functions The semantics of write_message() and write_frame() is changed accordingly. --- src/protocol/frame/mod.rs | 16 +++++-- src/protocol/mod.rs | 88 ++++++++++++++++++++++----------------- 2 files changed, 63 insertions(+), 41 deletions(-) diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index 9c9b955..39ed5db 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -66,12 +66,22 @@ impl FrameSocket where Stream: Write { /// Write a frame to stream. + /// + /// This function guarantees that the frame is queued regardless of any errors. + /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, + /// call write_pending() afterwards. pub fn write_frame(&mut self, frame: Frame) -> Result<()> { debug!("writing frame {}", frame); self.out_buffer.reserve(frame.len()); - frame.format(&mut self.out_buffer)?; - let len = self.stream.write(&self.out_buffer)?; - self.out_buffer.drain(0..len); + frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); + self.write_pending() + } + /// Complete pending write, if any. + pub fn write_pending(&mut self) -> Result<()> { + while !self.out_buffer.is_empty() { + let len = self.stream.write(&self.out_buffer)?; + self.out_buffer.drain(0..len); + } Ok(()) } } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index de01fdd..c8ddccf 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -55,9 +55,14 @@ impl WebSocket } /// Read a message from stream, if possible. + /// + /// This function sends pong and close responses automatically. + /// However, it never blocks on write. pub fn read_message(&mut self) -> Result { loop { - self.send_pending().no_block()?; + // Since we may get ping or close, we need to reply to the messages even during read. + // Thus we call write_pending() but ignore its blocking. + self.write_pending().no_block()?; // If we get here, either write blocks or we have nothing to write. // Thus if read blocks, just let it return WouldBlock. if let Some(message) = self.read_message_frame()? { @@ -68,6 +73,10 @@ impl WebSocket } /// Send a message to stream, if possible. + /// + /// This function guarantees that the frame is queued regardless of any errors. + /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, + /// call write_pending() afterwards. pub fn write_message(&mut self, message: Message) -> Result<()> { let frame = { let opcode = match message { @@ -77,11 +86,13 @@ impl WebSocket Frame::message(message.into_data(), OpCode::Data(opcode), true) }; self.send_queue.push_back(frame); - self.send_pending().no_block()?; - Ok(()) + self.write_pending() } /// Close the connection. + /// + /// This function guarantees that the close frame will be queued. + /// There is no need to call it again, just like write_message(). pub fn close(&mut self) -> Result<()> { match self.state { WebSocketState::Active => { @@ -93,8 +104,41 @@ impl WebSocket // already closed, nothing to do } } - self.send_pending().no_block()?; - Ok(()) + self.write_pending() + } + + /// Flush the pending send queue. + pub fn write_pending(&mut self) -> Result<()> { + // First, make sure we have no pending frame sending. + self.socket.write_pending()?; + + // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in + // response, unless it already received a Close frame. It SHOULD + // respond with Pong frame as soon as is practical. (RFC 6455) + if let Some(pong) = replace(&mut self.pong, None) { + self.send_one_frame(pong)?; + } + // If we have any unsent frames, send them. + while let Some(data) = self.send_queue.pop_front() { + self.send_one_frame(data)?; + } + + // If we're closing and there is nothing to send anymore, we should close the connection. + match self.state { + WebSocketState::ClosedByPeer if self.send_queue.is_empty() => { + // The underlying TCP connection, in most normal cases, SHOULD be closed + // first by the server, so that it holds the TIME_WAIT state and not the + // client (as this would prevent it from re-opening the connection for 2 + // maximum segment lifetimes (2MSL), while there is no corresponding + // server impact as a TIME_WAIT connection is immediately reopened upon + // a new SYN with a higher seq number). (RFC 6455) + match self.role { + Role::Client => Ok(()), + Role::Server => Err(Error::ConnectionClosed), + } + } + _ => Ok(()), + } } /// Convert a frame socket into a WebSocket. @@ -288,37 +332,6 @@ impl WebSocket Ok(()) } - /// Flush the pending send queue. - fn send_pending(&mut self) -> Result<()> { - // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in - // response, unless it already received a Close frame. It SHOULD - // respond with Pong frame as soon as is practical. (RFC 6455) - if let Some(pong) = replace(&mut self.pong, None) { - self.send_one_frame(pong)?; - } - // If we have any unsent frames, send them. - while let Some(data) = self.send_queue.pop_front() { - self.send_one_frame(data)?; - } - - // If we're closing and there is nothing to send anymore, we should close the connection. - match self.state { - WebSocketState::ClosedByPeer if self.send_queue.is_empty() => { - // The underlying TCP connection, in most normal cases, SHOULD be closed - // first by the server, so that it holds the TIME_WAIT state and not the - // client (as this would prevent it from re-opening the connection for 2 - // maximum segment lifetimes (2MSL), while there is no corresponding - // server impact as a TIME_WAIT connection is immediately reopened upon - // a new SYN with a higher seq number). (RFC 6455) - match self.role { - Role::Client => Ok(()), - Role::Server => Err(Error::ConnectionClosed), - } - } - _ => Ok(()), - } - } - /// Send a single pending frame. fn send_one_frame(&mut self, mut frame: Frame) -> Result<()> { match self.role { @@ -330,8 +343,7 @@ impl WebSocket frame.set_mask(); } } - self.socket.write_frame(frame)?; - Ok(()) + self.socket.write_frame(frame) } }