Remove implicit write flushing

pull/357/head
Alex Butler 1 year ago
parent d298089bf3
commit 483d229707
  1. 16
      src/protocol/frame/mod.rs
  2. 17
      src/protocol/mod.rs

@ -72,12 +72,13 @@ where
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards. /// call write_pending() afterwards.
pub fn write_frame(&mut self, frame: Frame) -> Result<()> { pub fn write_frame(&mut self, frame: Frame) -> Result<()> {
self.codec.write_frame(&mut self.stream, frame) self.codec.write_frame(&mut self.stream, frame)?;
Ok(self.stream.flush()?)
} }
/// Complete pending write, if any. /// Complete pending write, if any.
pub fn write_pending(&mut self) -> Result<()> { pub fn write_pending(&mut self) -> Result<()> {
self.codec.write_pending(&mut self.stream) Ok(self.stream.flush()?)
} }
} }
@ -166,6 +167,8 @@ impl FrameCodec {
} }
/// Write a frame to the provided stream. /// Write a frame to the provided stream.
///
/// Does **not** flush.
pub(super) fn write_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> pub(super) fn write_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()>
where where
Stream: Write, Stream: Write,
@ -173,14 +176,7 @@ impl FrameCodec {
trace!("writing frame {}", frame); trace!("writing frame {}", frame);
self.out_buffer.reserve(frame.len()); self.out_buffer.reserve(frame.len());
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
self.write_pending(stream)
}
/// Complete pending write, if any.
pub(super) fn write_pending<Stream>(&mut self, stream: &mut Stream) -> Result<()>
where
Stream: Write,
{
while !self.out_buffer.is_empty() { while !self.out_buffer.is_empty() {
let len = stream.write(&self.out_buffer)?; let len = stream.write(&self.out_buffer)?;
if len == 0 { if len == 0 {
@ -193,7 +189,7 @@ impl FrameCodec {
} }
self.out_buffer.drain(0..len); self.out_buffer.drain(0..len);
} }
stream.flush()?;
Ok(()) Ok(())
} }
} }

@ -353,24 +353,33 @@ impl WebSocketContext {
Message::Ping(data) => Frame::ping(data), Message::Ping(data) => Frame::ping(data),
Message::Pong(data) => { Message::Pong(data) => {
self.pong = Some(Frame::pong(data)); self.pong = Some(Frame::pong(data));
return self.write_pending(stream); return self.write_queue(stream);
} }
Message::Close(code) => return self.close(stream, code), Message::Close(code) => return self.close(stream, code),
Message::Frame(f) => f, Message::Frame(f) => f,
}; };
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
self.write_pending(stream) self.write_queue(stream)
} }
/// Flush the pending send queue. /// Flush the pending send queue.
#[inline]
pub fn write_pending<Stream>(&mut self, stream: &mut Stream) -> Result<()> pub fn write_pending<Stream>(&mut self, stream: &mut Stream) -> Result<()>
where where
Stream: Read + Write, Stream: Read + Write,
{ {
// First, make sure we have no pending frame sending. self.write_queue(stream)?;
self.frame.write_pending(stream)?; Ok(stream.flush()?)
}
/// Write send queue & pongs.
///
/// Does **not** flush.
fn write_queue<Stream>(&mut self, stream: &mut Stream) -> Result<()>
where
Stream: Read + Write,
{
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD // response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455) // respond with Pong frame as soon as is practical. (RFC 6455)

Loading…
Cancel
Save