From e1f65c4a20402954abf8dc2cf2edf94b2f8e1838 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Fri, 3 Feb 2017 20:25:01 +0100 Subject: [PATCH] Add tokio bindings for the library Needs some improvements, for instance the copy-paste can be removed. --- Cargo.toml | 1 - src/lib.rs | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 22eb3fd..fbbc457 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,5 @@ authors = ["Daniel Abramov "] [dependencies] futures = "*" tokio-core = "*" -url = "*" tungstenite = { path = "../tungstenite-rs" } diff --git a/src/lib.rs b/src/lib.rs index cdfbe1a..0096fde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,121 @@ +extern crate futures; +extern crate tokio_core; +extern crate tungstenite; + +use std::io::ErrorKind; + +use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; +use tokio_core::io::Io; + +use tungstenite::handshake::client::{ClientHandshake, Request}; +use tungstenite::handshake::server::ServerHandshake; +use tungstenite::handshake::{Handshake, HandshakeResult}; +use tungstenite::protocol::{WebSocket, Message}; +use tungstenite::error::Error as WsError; + +pub struct WebSocketStream { + inner: WebSocket, +} + +pub struct ClientHandshakeAsync { + inner: Option>, +} + +pub struct ServerHandshakeAsync { + inner: Option>, +} + +pub trait ClientHandshakeExt { + fn new_async(stream: S, request: Request) -> ClientHandshakeAsync; +} + +pub trait ServerHandshakeExt { + fn new_async(stream: S) -> ServerHandshakeAsync; +} + +impl ClientHandshakeExt for ClientHandshake { + fn new_async(stream: Stream, request: Request) -> ClientHandshakeAsync { + ClientHandshakeAsync { + inner: Some(ClientHandshake::new(stream, request)), + } + } +} + +impl ServerHandshakeExt for ServerHandshake { + fn new_async(stream: Stream) -> ServerHandshakeAsync { + ServerHandshakeAsync { + inner: Some(ServerHandshake::new(stream)), + } + } +} + +impl Future for ClientHandshakeAsync { + type Item = WebSocketStream; + type Error = WsError; + + fn poll(&mut self) -> Poll, WsError> { + let hs = self.inner.take().expect("Cannot poll a handshake twice"); + match hs.handshake()? { + HandshakeResult::Done(stream) => { + Ok(WebSocketStream { inner: stream }.into()) + }, + HandshakeResult::Incomplete(handshake) => { + self.inner= Some(handshake); + Ok(Async::NotReady) + }, + } + } +} + +impl Future for ServerHandshakeAsync { + type Item = WebSocketStream; + type Error = WsError; + + fn poll(&mut self) -> Poll, WsError> { + let hs = self.inner.take().expect("Cannot poll a handshake twice"); + match hs.handshake()? { + HandshakeResult::Done(stream) => { + Ok(WebSocketStream { inner: stream }.into()) + }, + HandshakeResult::Incomplete(handshake) => { + self.inner= Some(handshake); + Ok(Async::NotReady) + }, + } + } +} + +impl Stream for WebSocketStream where T: Io { + type Item = Message; + type Error = WsError; + + fn poll(&mut self) -> Poll, WsError> { + match self.inner.read_message() { + Ok(message) => Ok(Async::Ready(Some(message))), + Err(error) => { + match error { + WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(Async::NotReady), + _ => Err(error), + } + }, + } + } +} + +impl Sink for WebSocketStream where T: Io { + type SinkItem = Message; + type SinkError = WsError; + + fn start_send(&mut self, item: Message) -> StartSend { + try!(self.inner.write_message(item)); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), WsError> { + Ok(Async::Ready(())) + } +} + #[cfg(test)] mod tests { #[test]