diff --git a/src/lib.rs b/src/lib.rs index 78a414f..a458cb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,22 +156,32 @@ impl Future for ServerHandshakeAsync { } } +trait ToAsync { + type T; + type E; + fn to_async(self) -> Result, Self::E>; +} + +impl ToAsync for Result { + type T = T; + type E = WsError; + fn to_async(self) -> Result, Self::E> { + match self { + Ok(x) => Ok(Async::Ready(x)), + Err(error) => match error { + WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(Async::NotReady), + err => Err(err), + }, + } + } +} + impl Stream for WebSocketStream where T: Io { type Item = Message; type Error = WsError; fn poll(&mut self) -> Poll, WsError> { - match self.inner.read_message() { - Ok(message) => Ok(Async::Ready(Some(message))), - Err(error) => { - match error { - WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => { - Ok(Async::NotReady) - }, - _ => Err(error), - } - }, - } + self.inner.read_message().map(|m| Some(m)).to_async() } } @@ -180,12 +190,12 @@ impl Sink for WebSocketStream where T: Io { type SinkError = WsError; fn start_send(&mut self, item: Message) -> StartSend { - try!(self.inner.write_message(item)); + try!(self.inner.write_message(item).to_async()); Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), WsError> { - Ok(Async::Ready(())) + self.inner.write_pending().to_async() } }