Nonblocking/timeout-compatible reads.

Adds a new method to a `WebSocket`: `try_read_message`.
Calling `read_message` would always enter a loop until receipt of a
message, invalidating any modifications to the underlying stream's
timeout or blocking mode. The new method allows these to be respected.

This is structured so that the semantics/behaviour of `read_message`
remain unchanged, exposing the inner part of its loop publicly.
pull/52/head
Kyle Simpson 6 years ago
parent 3ce4c01b92
commit 70dfc4f6f0
  1. 39
      src/protocol/mod.rs

@ -141,19 +141,40 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// However, it never blocks on write.
pub fn read_message(&mut self) -> Result<Message> {
loop {
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking.
self.write_pending().no_block()?;
// If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock.
let res = self.read_message_frame();
if let Some(message) = self.translate_close(res)? {
trace!("Received message {}", message);
return Ok(message)
if let Some(msg) = self.try_read_message()? {
return Ok(msg);
}
}
}
/// Read a message from a stream (without blocking), if possible.
///
/// As in [`read_message`], this function sends pong and close
/// responses automatically in a non-blocking fashion. Unlike
/// [`read_message`], this allows early exit if the underlying
/// stream is set to nonblocking mode or has a read timeout.
///
/// [`read_message`]: struct.WebSocket.html#read_message
pub fn try_read_message(&mut self) -> Result<Option<Message>> {
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking.
self.write_pending().no_block()?;
// Try to get a frame, convert to None if we get a WouldBlock,
// strip off any unnecessary wrapping. Additionally, wrap up any
// closing logic.
let frame = self.read_message_frame();
let out = self.translate_close(frame)
.no_block()?
.and_then(|x| x);
if log_enabled!(log::Level::Trace) && out.is_some() {
trace!("Received message {}", out.as_ref().unwrap());
}
Ok(out)
}
/// Send a message to stream, if possible.
///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping

Loading…
Cancel
Save