extern crate futures; extern crate tokio_core; extern crate tungstenite; use std::io::ErrorKind; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; use tokio_core::io::Io; use tungstenite::handshake::client::{ClientHandshake, Request}; use tungstenite::handshake::server::ServerHandshake; use tungstenite::handshake::{Handshake, HandshakeResult}; use tungstenite::protocol::{WebSocket, Message}; use tungstenite::error::Error as WsError; pub struct WebSocketStream { inner: WebSocket, } pub struct ClientHandshakeAsync { inner: Option>, } pub struct ServerHandshakeAsync { inner: Option>, } pub trait ClientHandshakeExt { fn new_async(stream: S, request: Request) -> ClientHandshakeAsync; } pub trait ServerHandshakeExt { fn new_async(stream: S) -> ServerHandshakeAsync; } impl ClientHandshakeExt for ClientHandshake { fn new_async(stream: Stream, request: Request) -> ClientHandshakeAsync { ClientHandshakeAsync { inner: Some(ClientHandshake::new(stream, request)), } } } impl ServerHandshakeExt for ServerHandshake { fn new_async(stream: Stream) -> ServerHandshakeAsync { ServerHandshakeAsync { inner: Some(ServerHandshake::new(stream)), } } } impl Future for ClientHandshakeAsync { type Item = WebSocketStream; type Error = WsError; fn poll(&mut self) -> Poll, WsError> { let hs = self.inner.take().expect("Cannot poll a handshake twice"); match hs.handshake()? { HandshakeResult::Done(stream) => { Ok(WebSocketStream { inner: stream }.into()) }, HandshakeResult::Incomplete(handshake) => { self.inner= Some(handshake); Ok(Async::NotReady) }, } } } impl Future for ServerHandshakeAsync { type Item = WebSocketStream; type Error = WsError; fn poll(&mut self) -> Poll, WsError> { let hs = self.inner.take().expect("Cannot poll a handshake twice"); match hs.handshake()? { HandshakeResult::Done(stream) => { Ok(WebSocketStream { inner: stream }.into()) }, HandshakeResult::Incomplete(handshake) => { self.inner= Some(handshake); Ok(Async::NotReady) }, } } } impl Stream for WebSocketStream where T: Io { type Item = Message; type Error = WsError; fn poll(&mut self) -> Poll, WsError> { match self.inner.read_message() { Ok(message) => Ok(Async::Ready(Some(message))), Err(error) => { match error { WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(Async::NotReady), _ => Err(error), } }, } } } impl Sink for WebSocketStream where T: Io { type SinkItem = Message; type SinkError = WsError; fn start_send(&mut self, item: Message) -> StartSend { try!(self.inner.write_message(item)); Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), WsError> { Ok(Async::Ready(())) } } #[cfg(test)] mod tests { #[test] fn it_works() { } }