diff --git a/Cargo.toml b/Cargo.toml index 79e13ea..ebc0ee0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,5 @@ futures = "*" tokio-core = "*" url = "*" -tungstenite = { git = "ssh://git@bitbucket.org/mikogo/tungstenite-rs.git" } +tungstenite = { git = "ssh://git@bitbucket.org/mikogo/tungstenite-rs.git", branch = "handshake-refactor" } diff --git a/examples/client.rs b/examples/client.rs index 16324e4..d432849 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -25,10 +25,10 @@ use futures::sync::mpsc; use futures::{Future, Sink, Stream}; use tokio_core::net::TcpStream; use tokio_core::reactor::Core; -use tokio_tungstenite::ClientHandshakeExt; -use tungstenite::handshake::client::{ClientHandshake, Request}; use tungstenite::protocol::Message; +use tokio_tungstenite::client_async; + fn main() { // Specify the server address to which the client will be connecting. let connect_addr = env::args().nth(1).unwrap_or_else(|| { @@ -69,8 +69,7 @@ fn main() { // more work from the remote then we can exit. let mut stdout = io::stdout(); let client = tcp.and_then(|stream| { - let req = Request { url: url }; - ClientHandshake::::new_async(stream, req).and_then(|ws_stream| { + client_async(url, stream).and_then(|ws_stream| { println!("WebSocket handshake has been successfully completed"); // `sink` is the stream of messages going out. diff --git a/examples/server.rs b/examples/server.rs index e23de7b..73303bb 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -29,13 +29,13 @@ use std::io::{Error, ErrorKind}; use std::rc::Rc; use futures::stream::Stream; -use futures::{Future}; -use tokio_core::net::{TcpListener, TcpStream}; +use futures::Future; +use tokio_core::net::TcpListener; use tokio_core::reactor::Core; -use tokio_tungstenite::ServerHandshakeExt; -use tungstenite::handshake::server::ServerHandshake; use tungstenite::protocol::Message; +use tokio_tungstenite::accept_async; + fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = addr.parse().unwrap(); @@ -60,7 +60,7 @@ fn main() { let connections_inner = connections.clone(); let handle_inner = handle.clone(); - ServerHandshake::::new_async(stream).and_then(move |ws_stream| { + accept_async(stream).and_then(move |ws_stream| { println!("New WebSocket connection: {}", addr); // Create a channel for our stream, which other sockets will use to diff --git a/src/lib.rs b/src/lib.rs index a458cb2..52a8fb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,12 @@ //! functionality provided by the `tungestenite` crate, on which this crate is //! built. Configuration is done through `tungestenite` crate as well. -#![deny(missing_docs)] +#![deny( + missing_docs, + unused_must_use, + unused_mut, + unused_imports, + unused_import_braces)] #[macro_use] extern crate futures; @@ -23,14 +28,55 @@ extern crate url; use std::io::ErrorKind; -use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend, task}; +use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; use tokio_core::io::Io; -use tungstenite::handshake::client::{ClientHandshake, Request}; +use url::Url; + +use tungstenite::handshake::client::ClientHandshake; use tungstenite::handshake::server::ServerHandshake; -use tungstenite::handshake::{Handshake, HandshakeResult}; +use tungstenite::handshake::{HandshakeRole, HandshakeError}; use tungstenite::protocol::{WebSocket, Message}; use tungstenite::error::Error as WsError; +use tungstenite::client; +use tungstenite::server; + +/// Create a handshake provided stream and assuming the provided request. +/// +/// This function will internally call `client::client` to create a +/// handshake representation and returns a future representing the +/// resolution of the WebSocket handshake. The returned future will resolve +/// to either `WebSocketStream` or `Error` depending if it's successful +/// or not. +/// +/// This is typically used for clients who have already established, for +/// example, a TCP connection to the remove server. +pub fn client_async(url: Url, stream: S) -> ConnectAsync { + ConnectAsync { + inner: MidHandshake { + inner: Some(client::client(url, stream)) + } + } +} + +/// Accepts a new WebSocket connection with the provided stream. +/// +/// This function will internally call `server::accept` to create a +/// handshake representation and returns a future representing the +/// resolution of the WebSocket handshake. The returned future will resolve +/// to either `WebSocketStream` or `Error` depending if it's successful +/// or not. +/// +/// This is typically used after a socket has been accepted from a +/// `TcpListener`. That socket is then passed to this function to perform +/// the server half of the accepting a client's websocket connection. +pub fn accept_async(stream: S) -> AcceptAsync { + AcceptAsync { + inner: MidHandshake { + inner: Some(server::accept(stream)) + } + } +} /// A wrapper around an underlying raw stream which implements the WebSocket /// protocol. @@ -45,113 +91,81 @@ pub struct WebSocketStream { inner: WebSocket, } -/// Future returned from `ClientHandshakeExt::new_async` which will resolve -/// once the connection handshake has finished. -pub struct ClientHandshakeAsync { - inner: Option>, -} +impl Stream for WebSocketStream where T: Io { + type Item = Message; + type Error = WsError; -/// Future returned from `ServerHandshakeExt::new_async` which will resolve -/// once the connection handshake has finished. -pub struct ServerHandshakeAsync { - inner: Option>, + fn poll(&mut self) -> Poll, WsError> { + self.inner.read_message().map(|m| Some(m)).to_async() + } } -/// Extension trait for the `ClientHandshake` type in the `tungstenite` crate. -pub trait ClientHandshakeExt { - /// Create a handshake provided stream and assuming the provided request. - /// - /// This function will internally call `ClientHandshake::new` to create a - /// handshake representation and returns a future representing the - /// resolution of the WebSocket handshake. The returned future will resolve - /// to either `WebSocketStream` or `Error` depending if it's successful - /// or not. - /// - /// This is typically used for clients who have already established, for - /// example, a TCP connection to the remove server. - fn new_async(stream: S, request: Request) -> ClientHandshakeAsync; +impl Sink for WebSocketStream where T: Io { + type SinkItem = Message; + type SinkError = WsError; + + fn start_send(&mut self, item: Message) -> StartSend { + try!(self.inner.write_message(item).to_async()); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), WsError> { + self.inner.write_pending().to_async() + } } -/// Extension trait for the `ServerHandshake` type in the `tungstenite` crate. -pub trait ServerHandshakeExt { - /// Accepts a new WebSocket connection with the provided stream. - /// - /// This function will internally call `ServerHandshake::new` to create a - /// handshake representation and returns a future representing the - /// resolution of the WebSocket handshake. The returned future will resolve - /// to either `WebSocketStream` or `Error` depending if it's successful - /// or not. - /// - /// This is typically used after a socket has been accepted from a - /// `TcpListener`. That socket is then passed to this function to perform - /// the server half of the accepting a client's websocket connection. - fn new_async(stream: S) -> ServerHandshakeAsync; +/// Future returned from connect_async() which will resolve +/// once the connection handshake has finished. +pub struct ConnectAsync { + inner: MidHandshake, } -impl ClientHandshakeExt for ClientHandshake { - fn new_async(stream: Stream, request: Request) -> ClientHandshakeAsync { - ClientHandshakeAsync { - inner: Some(ClientHandshake::new(stream, request)), - } +impl Future for ConnectAsync { + type Item = WebSocketStream; + type Error = WsError; + + fn poll(&mut self) -> Poll, WsError> { + self.inner.poll() } } -impl ServerHandshakeExt for ServerHandshake { - fn new_async(stream: Stream) -> ServerHandshakeAsync { - ServerHandshakeAsync { - inner: Some(ServerHandshake::new(stream)), - } - } +/// Future returned from accept_async() which will resolve +/// once the connection handshake has finished. +pub struct AcceptAsync { + inner: MidHandshake, } -// FIXME: `ClientHandshakeAsync` and `ServerHandshakeAsync` have the same implementation, we -// have to get rid of this copy-pasting one day. But currently I don't see an elegant way to write -// it. -impl Future for ClientHandshakeAsync { +impl Future for AcceptAsync { type Item = WebSocketStream; type Error = WsError; fn poll(&mut self) -> Poll, WsError> { - let hs = self.inner.take().expect("Cannot poll a handshake twice"); - match hs.handshake()? { - HandshakeResult::Done(stream) => { - Ok(WebSocketStream { inner: stream }.into()) - }, - HandshakeResult::Incomplete(handshake) => { - // FIXME: Remove this line after we have a guarantee that the underlying handshake - // calls to both `read()`/`write()`. Or replace it by `poll_read()` and - // `poll_write()` (this requires making the handshake's stream public). - task::park().unpark(); - - self.inner = Some(handshake); - Ok(Async::NotReady) - }, - } + self.inner.poll() } } -// FIXME: `ClientHandshakeAsync` and `ServerHandshakeAsync` have the same implementation, we -// have to get rid of this copy-pasting one day. But currently I don't see an elegant way to write -// it. -impl Future for ServerHandshakeAsync { +struct MidHandshake { + inner: Option, HandshakeError>>, +} + +impl Future for MidHandshake { type Item = WebSocketStream; type Error = WsError; fn poll(&mut self) -> Poll, WsError> { - let hs = self.inner.take().expect("Cannot poll a handshake twice"); - match hs.handshake()? { - HandshakeResult::Done(stream) => { - Ok(WebSocketStream { inner: stream }.into()) - }, - HandshakeResult::Incomplete(handshake) => { - // FIXME: Remove this line after we have a guarantee that the underlying handshake - // calls to both `read()`/`write()`. Or replace it by `poll_read()` and - // `poll_write()` (this requires making the handshake's stream public). - task::park().unpark(); - - self.inner = Some(handshake); - Ok(Async::NotReady) - }, + match self.inner.take().expect("cannot poll MidHandshake twice") { + Ok(stream) => Ok(WebSocketStream { inner: stream }.into()), + Err(HandshakeError::Failure(e)) => Err(e), + Err(HandshakeError::Interrupted(s)) => { + match s.handshake() { + Ok(stream) => Ok(WebSocketStream { inner: stream }.into()), + Err(HandshakeError::Failure(e)) => Err(e), + Err(HandshakeError::Interrupted(s)) => { + self.inner = Some(Err(HandshakeError::Interrupted(s))); + Ok(Async::NotReady) + } + } + } } } } @@ -176,29 +190,6 @@ impl ToAsync for Result { } } -impl Stream for WebSocketStream where T: Io { - type Item = Message; - type Error = WsError; - - fn poll(&mut self) -> Poll, WsError> { - self.inner.read_message().map(|m| Some(m)).to_async() - } -} - -impl Sink for WebSocketStream where T: Io { - type SinkItem = Message; - type SinkError = WsError; - - fn start_send(&mut self, item: Message) -> StartSend { - try!(self.inner.write_message(item).to_async()); - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), WsError> { - self.inner.write_pending().to_async() - } -} - #[cfg(test)] mod tests { use super::*; @@ -209,8 +200,6 @@ mod tests { use futures::{Future, Stream}; use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::reactor::Core; - use tungstenite::handshake::server::ServerHandshake; - use tungstenite::handshake::client::{ClientHandshake, Request}; #[test] fn handshakes() { @@ -227,7 +216,7 @@ mod tests { let connections = listener.incoming(); tx.send(()).unwrap(); let handshakes = connections.and_then(|(connection, _)| { - ServerHandshake::::new_async(connection) + accept_async(connection) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) }); let server = handshakes.for_each(|_| { @@ -244,7 +233,7 @@ mod tests { let tcp = TcpStream::connect(&address, &handle); let handshake = tcp.and_then(|stream| { let url = url::Url::parse("ws://localhost:12345/").unwrap(); - ClientHandshake::::new_async(stream, Request { url: url }) + client_async(url, stream) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) }); let client = handshake.and_then(|_| {