protocol: add write_pending() functions

The semantics of write_message() and write_frame()
is changed accordingly.
pull/7/head
Alexey Galakhov 8 years ago
parent e6b658d94f
commit 604e2021ce
  1. 12
      src/protocol/frame/mod.rs
  2. 88
      src/protocol/mod.rs

@ -66,12 +66,22 @@ impl<Stream> FrameSocket<Stream>
where Stream: Write
{
/// Write a frame to stream.
///
/// This function guarantees that the frame is queued regardless of any errors.
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards.
pub fn write_frame(&mut self, frame: Frame) -> Result<()> {
debug!("writing frame {}", frame);
self.out_buffer.reserve(frame.len());
frame.format(&mut self.out_buffer)?;
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
self.write_pending()
}
/// Complete pending write, if any.
pub fn write_pending(&mut self) -> Result<()> {
while !self.out_buffer.is_empty() {
let len = self.stream.write(&self.out_buffer)?;
self.out_buffer.drain(0..len);
}
Ok(())
}
}

@ -55,9 +55,14 @@ impl<Stream> WebSocket<Stream>
}
/// Read a message from stream, if possible.
///
/// This function sends pong and close responses automatically.
/// However, it never blocks on write.
pub fn read_message(&mut self) -> Result<Message> {
loop {
self.send_pending().no_block()?;
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking.
self.write_pending().no_block()?;
// If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock.
if let Some(message) = self.read_message_frame()? {
@ -68,6 +73,10 @@ impl<Stream> WebSocket<Stream>
}
/// Send a message to stream, if possible.
///
/// This function guarantees that the frame is queued regardless of any errors.
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards.
pub fn write_message(&mut self, message: Message) -> Result<()> {
let frame = {
let opcode = match message {
@ -77,11 +86,13 @@ impl<Stream> WebSocket<Stream>
Frame::message(message.into_data(), OpCode::Data(opcode), true)
};
self.send_queue.push_back(frame);
self.send_pending().no_block()?;
Ok(())
self.write_pending()
}
/// Close the connection.
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again, just like write_message().
pub fn close(&mut self) -> Result<()> {
match self.state {
WebSocketState::Active => {
@ -93,8 +104,41 @@ impl<Stream> WebSocket<Stream>
// already closed, nothing to do
}
}
self.send_pending().no_block()?;
Ok(())
self.write_pending()
}
/// Flush the pending send queue.
pub fn write_pending(&mut self) -> Result<()> {
// First, make sure we have no pending frame sending.
self.socket.write_pending()?;
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455)
if let Some(pong) = replace(&mut self.pong, None) {
self.send_one_frame(pong)?;
}
// If we have any unsent frames, send them.
while let Some(data) = self.send_queue.pop_front() {
self.send_one_frame(data)?;
}
// If we're closing and there is nothing to send anymore, we should close the connection.
match self.state {
WebSocketState::ClosedByPeer if self.send_queue.is_empty() => {
// The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
match self.role {
Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed),
}
}
_ => Ok(()),
}
}
/// Convert a frame socket into a WebSocket.
@ -288,37 +332,6 @@ impl<Stream> WebSocket<Stream>
Ok(())
}
/// Flush the pending send queue.
fn send_pending(&mut self) -> Result<()> {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455)
if let Some(pong) = replace(&mut self.pong, None) {
self.send_one_frame(pong)?;
}
// If we have any unsent frames, send them.
while let Some(data) = self.send_queue.pop_front() {
self.send_one_frame(data)?;
}
// If we're closing and there is nothing to send anymore, we should close the connection.
match self.state {
WebSocketState::ClosedByPeer if self.send_queue.is_empty() => {
// The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
match self.role {
Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed),
}
}
_ => Ok(()),
}
}
/// Send a single pending frame.
fn send_one_frame(&mut self, mut frame: Frame) -> Result<()> {
match self.role {
@ -330,8 +343,7 @@ impl<Stream> WebSocket<Stream>
frame.set_mask();
}
}
self.socket.write_frame(frame)?;
Ok(())
self.socket.write_frame(frame)
}
}

Loading…
Cancel
Save