diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index 39066be..c25837e 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -72,12 +72,13 @@ where /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, /// call write_pending() afterwards. pub fn write_frame(&mut self, frame: Frame) -> Result<()> { - self.codec.write_frame(&mut self.stream, frame) + self.codec.write_frame(&mut self.stream, frame)?; + Ok(self.stream.flush()?) } /// Complete pending write, if any. pub fn write_pending(&mut self) -> Result<()> { - self.codec.write_pending(&mut self.stream) + Ok(self.stream.flush()?) } } @@ -166,6 +167,8 @@ impl FrameCodec { } /// Write a frame to the provided stream. + /// + /// Does **not** flush. pub(super) fn write_frame(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> where Stream: Write, @@ -173,14 +176,7 @@ impl FrameCodec { trace!("writing frame {}", frame); self.out_buffer.reserve(frame.len()); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); - self.write_pending(stream) - } - /// Complete pending write, if any. - pub(super) fn write_pending(&mut self, stream: &mut Stream) -> Result<()> - where - Stream: Write, - { while !self.out_buffer.is_empty() { let len = stream.write(&self.out_buffer)?; if len == 0 { @@ -193,7 +189,7 @@ impl FrameCodec { } self.out_buffer.drain(0..len); } - stream.flush()?; + Ok(()) } } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 94397e9..00f0b63 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -353,24 +353,33 @@ impl WebSocketContext { Message::Ping(data) => Frame::ping(data), Message::Pong(data) => { self.pong = Some(Frame::pong(data)); - return self.write_pending(stream); + return self.write_queue(stream); } Message::Close(code) => return self.close(stream, code), Message::Frame(f) => f, }; self.send_queue.push_back(frame); - self.write_pending(stream) + self.write_queue(stream) } /// Flush the pending send queue. + #[inline] pub fn write_pending(&mut self, stream: &mut Stream) -> Result<()> where Stream: Read + Write, { - // First, make sure we have no pending frame sending. - self.frame.write_pending(stream)?; + self.write_queue(stream)?; + Ok(stream.flush()?) + } + /// Write send queue & pongs. + /// + /// Does **not** flush. + fn write_queue(&mut self, stream: &mut Stream) -> Result<()> + where + Stream: Read + Write, + { // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // response, unless it already received a Close frame. It SHOULD // respond with Pong frame as soon as is practical. (RFC 6455)