From 46bfd815395ceb23974eb1160a06141ea6a5abc5 Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Fri, 3 Feb 2017 22:14:15 +0100 Subject: [PATCH] protocol: correct non-blocking handling --- src/protocol/mod.rs | 63 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 8d12f9f..5a22e14 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -5,12 +5,13 @@ mod message; pub use self::message::Message; -use self::message::{IncompleteMessage, IncompleteMessageType}; use std::collections::VecDeque; -use std::io::{Read, Write}; +use std::io::{Read, Write, Error as IoError, ErrorKind as IoErrorKind}; use std::mem::replace; +use std::result::Result as StdResult; use error::{Error, Result}; +use self::message::{IncompleteMessage, IncompleteMessageType}; use self::frame::{Frame, FrameSocket}; use self::frame::coding::{OpCode, Data as OpData, Control as OpCtl, CloseCode}; @@ -56,8 +57,14 @@ impl WebSocket /// Read a message from stream, if possible. pub fn read_message(&mut self) -> Result { loop { - self.send_pending()?; // FIXME - if let Some(message) = self.read_message_frame()? { + let write_blocks = self.send_pending().no_block()?.is_none(); + let read = self.read_message_frame(); + let frame = if write_blocks { + Some(read?) + } else { + read.no_block()? + }; + if let Some(Some(message)) = frame { debug!("Received message {}", message); return Ok(message) } @@ -74,7 +81,8 @@ impl WebSocket Frame::message(message.into_data(), OpCode::Data(opcode), true) }; self.send_queue.push_back(frame); - self.send_pending() + self.send_pending().no_block()?; + Ok(()) } /// Close the connection. @@ -89,6 +97,7 @@ impl WebSocket // already closed, nothing to do } } + self.send_pending().no_block()?; Ok(()) } @@ -348,6 +357,50 @@ impl WebSocketState { } } +/// Non-blocking IO handling. +trait NonBlockingError: Sized { + fn into_non_blocking(self) -> Option; +} + +impl NonBlockingError for IoError { + fn into_non_blocking(self) -> Option { + match self.kind() { + IoErrorKind::Interrupted => None, + _ => Some(self), + } + } +} + +impl NonBlockingError for Error { + fn into_non_blocking(self) -> Option { + match self { + Error::Io(e) => e.into_non_blocking().map(|e| e.into()), + x => Some(x), + } + } +} + +/// Non-blocking IO wrapper. +trait NonBlockingResult { + type Result; + fn no_block(self) -> Self::Result; +} + +impl NonBlockingResult for StdResult +where E : NonBlockingError +{ + type Result = StdResult, E>; + fn no_block(self) -> Self::Result { + match self { + Ok(x) => Ok(Some(x)), + Err(e) => match e.into_non_blocking() { + Some(e) => Err(e), + None => Ok(None), + } + } + } +} + #[cfg(test)] mod tests { use super::{WebSocket, Role, Message};