Merged in handshake-refactor (pull request #1)

refactor: use asyncronous handshakes

Approved-by: Daniel Abramov
pull/1/head
Alexey Galakhov 8 years ago committed by Daniel Abramov
commit 0b656bf24c
  1. 2
      Cargo.toml
  2. 7
      examples/client.rs
  3. 10
      examples/server.rs
  4. 201
      src/lib.rs

@ -8,5 +8,5 @@ futures = "*"
tokio-core = "*" tokio-core = "*"
url = "*" url = "*"
tungstenite = { git = "ssh://git@bitbucket.org/mikogo/tungstenite-rs.git" } tungstenite = { git = "ssh://git@bitbucket.org/mikogo/tungstenite-rs.git", branch = "handshake-refactor" }

@ -25,10 +25,10 @@ use futures::sync::mpsc;
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_tungstenite::ClientHandshakeExt;
use tungstenite::handshake::client::{ClientHandshake, Request};
use tungstenite::protocol::Message; use tungstenite::protocol::Message;
use tokio_tungstenite::client_async;
fn main() { fn main() {
// Specify the server address to which the client will be connecting. // Specify the server address to which the client will be connecting.
let connect_addr = env::args().nth(1).unwrap_or_else(|| { let connect_addr = env::args().nth(1).unwrap_or_else(|| {
@ -69,8 +69,7 @@ fn main() {
// more work from the remote then we can exit. // more work from the remote then we can exit.
let mut stdout = io::stdout(); let mut stdout = io::stdout();
let client = tcp.and_then(|stream| { let client = tcp.and_then(|stream| {
let req = Request { url: url }; client_async(url, stream).and_then(|ws_stream| {
ClientHandshake::<TcpStream>::new_async(stream, req).and_then(|ws_stream| {
println!("WebSocket handshake has been successfully completed"); println!("WebSocket handshake has been successfully completed");
// `sink` is the stream of messages going out. // `sink` is the stream of messages going out.

@ -29,13 +29,13 @@ use std::io::{Error, ErrorKind};
use std::rc::Rc; use std::rc::Rc;
use futures::stream::Stream; use futures::stream::Stream;
use futures::{Future}; use futures::Future;
use tokio_core::net::{TcpListener, TcpStream}; use tokio_core::net::TcpListener;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_tungstenite::ServerHandshakeExt;
use tungstenite::handshake::server::ServerHandshake;
use tungstenite::protocol::Message; use tungstenite::protocol::Message;
use tokio_tungstenite::accept_async;
fn main() { fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap(); let addr = addr.parse().unwrap();
@ -60,7 +60,7 @@ fn main() {
let connections_inner = connections.clone(); let connections_inner = connections.clone();
let handle_inner = handle.clone(); let handle_inner = handle.clone();
ServerHandshake::<TcpStream>::new_async(stream).and_then(move |ws_stream| { accept_async(stream).and_then(move |ws_stream| {
println!("New WebSocket connection: {}", addr); println!("New WebSocket connection: {}", addr);
// Create a channel for our stream, which other sockets will use to // Create a channel for our stream, which other sockets will use to

@ -13,7 +13,12 @@
//! functionality provided by the `tungestenite` crate, on which this crate is //! functionality provided by the `tungestenite` crate, on which this crate is
//! built. Configuration is done through `tungestenite` crate as well. //! built. Configuration is done through `tungestenite` crate as well.
#![deny(missing_docs)] #![deny(
missing_docs,
unused_must_use,
unused_mut,
unused_imports,
unused_import_braces)]
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
@ -23,45 +28,22 @@ extern crate url;
use std::io::ErrorKind; use std::io::ErrorKind;
use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend, task}; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
use tokio_core::io::Io; use tokio_core::io::Io;
use tungstenite::handshake::client::{ClientHandshake, Request}; use url::Url;
use tungstenite::handshake::client::ClientHandshake;
use tungstenite::handshake::server::ServerHandshake; use tungstenite::handshake::server::ServerHandshake;
use tungstenite::handshake::{Handshake, HandshakeResult}; use tungstenite::handshake::{HandshakeRole, HandshakeError};
use tungstenite::protocol::{WebSocket, Message}; use tungstenite::protocol::{WebSocket, Message};
use tungstenite::error::Error as WsError; use tungstenite::error::Error as WsError;
use tungstenite::client;
use tungstenite::server;
/// A wrapper around an underlying raw stream which implements the WebSocket
/// protocol.
///
/// A `WebSocketStream<S>` represents a handshake that has been completed
/// successfully and both the server and the client are ready for receiving
/// and sending data. Message from a `WebSocketStream<S>` are accessible
/// through the respective `Stream` and `Sink`. Check more information about
/// them in `futures-rs` crate documentation or have a look on the examples
/// and unit tests for this crate.
pub struct WebSocketStream<S> {
inner: WebSocket<S>,
}
/// Future returned from `ClientHandshakeExt::new_async` which will resolve
/// once the connection handshake has finished.
pub struct ClientHandshakeAsync<S> {
inner: Option<ClientHandshake<S>>,
}
/// Future returned from `ServerHandshakeExt::new_async` which will resolve
/// once the connection handshake has finished.
pub struct ServerHandshakeAsync<S: Io> {
inner: Option<ServerHandshake<S>>,
}
/// Extension trait for the `ClientHandshake` type in the `tungstenite` crate.
pub trait ClientHandshakeExt {
/// Create a handshake provided stream and assuming the provided request. /// Create a handshake provided stream and assuming the provided request.
/// ///
/// This function will internally call `ClientHandshake::new` to create a /// This function will internally call `client::client` to create a
/// handshake representation and returns a future representing the /// handshake representation and returns a future representing the
/// resolution of the WebSocket handshake. The returned future will resolve /// resolution of the WebSocket handshake. The returned future will resolve
/// to either `WebSocketStream<S>` or `Error` depending if it's successful /// to either `WebSocketStream<S>` or `Error` depending if it's successful
@ -69,14 +51,17 @@ pub trait ClientHandshakeExt {
/// ///
/// This is typically used for clients who have already established, for /// This is typically used for clients who have already established, for
/// example, a TCP connection to the remove server. /// example, a TCP connection to the remove server.
fn new_async<S: Io>(stream: S, request: Request) -> ClientHandshakeAsync<S>; pub fn client_async<S: Io>(url: Url, stream: S) -> ConnectAsync<S> {
ConnectAsync {
inner: MidHandshake {
inner: Some(client::client(url, stream))
}
}
} }
/// Extension trait for the `ServerHandshake` type in the `tungstenite` crate.
pub trait ServerHandshakeExt {
/// Accepts a new WebSocket connection with the provided stream. /// Accepts a new WebSocket connection with the provided stream.
/// ///
/// This function will internally call `ServerHandshake::new` to create a /// This function will internally call `server::accept` to create a
/// handshake representation and returns a future representing the /// handshake representation and returns a future representing the
/// resolution of the WebSocket handshake. The returned future will resolve /// resolution of the WebSocket handshake. The returned future will resolve
/// to either `WebSocketStream<S>` or `Error` depending if it's successful /// to either `WebSocketStream<S>` or `Error` depending if it's successful
@ -85,73 +70,102 @@ pub trait ServerHandshakeExt {
/// This is typically used after a socket has been accepted from a /// This is typically used after a socket has been accepted from a
/// `TcpListener`. That socket is then passed to this function to perform /// `TcpListener`. That socket is then passed to this function to perform
/// the server half of the accepting a client's websocket connection. /// the server half of the accepting a client's websocket connection.
fn new_async<S: Io>(stream: S) -> ServerHandshakeAsync<S>; pub fn accept_async<S: Io>(stream: S) -> AcceptAsync<S> {
AcceptAsync {
inner: MidHandshake {
inner: Some(server::accept(stream))
}
}
} }
impl<S: Io> ClientHandshakeExt for ClientHandshake<S> { /// A wrapper around an underlying raw stream which implements the WebSocket
fn new_async<Stream: Io>(stream: Stream, request: Request) -> ClientHandshakeAsync<Stream> { /// protocol.
ClientHandshakeAsync { ///
inner: Some(ClientHandshake::new(stream, request)), /// A `WebSocketStream<S>` represents a handshake that has been completed
/// successfully and both the server and the client are ready for receiving
/// and sending data. Message from a `WebSocketStream<S>` are accessible
/// through the respective `Stream` and `Sink`. Check more information about
/// them in `futures-rs` crate documentation or have a look on the examples
/// and unit tests for this crate.
pub struct WebSocketStream<S> {
inner: WebSocket<S>,
}
impl<T> Stream for WebSocketStream<T> where T: Io {
type Item = Message;
type Error = WsError;
fn poll(&mut self) -> Poll<Option<Message>, WsError> {
self.inner.read_message().map(|m| Some(m)).to_async()
} }
} }
impl<T> Sink for WebSocketStream<T> where T: Io {
type SinkItem = Message;
type SinkError = WsError;
fn start_send(&mut self, item: Message) -> StartSend<Message, WsError> {
try!(self.inner.write_message(item).to_async());
Ok(AsyncSink::Ready)
} }
impl<S: Io> ServerHandshakeExt for ServerHandshake<S> { fn poll_complete(&mut self) -> Poll<(), WsError> {
fn new_async<Stream: Io>(stream: Stream) -> ServerHandshakeAsync<Stream> { self.inner.write_pending().to_async()
ServerHandshakeAsync {
inner: Some(ServerHandshake::new(stream)),
} }
} }
/// Future returned from connect_async() which will resolve
/// once the connection handshake has finished.
pub struct ConnectAsync<S> {
inner: MidHandshake<S, ClientHandshake>,
} }
// FIXME: `ClientHandshakeAsync<S>` and `ServerHandshakeAsync<S>` have the same implementation, we impl<S: Io> Future for ConnectAsync<S> {
// have to get rid of this copy-pasting one day. But currently I don't see an elegant way to write
// it.
impl<S: Io> Future for ClientHandshakeAsync<S> {
type Item = WebSocketStream<S>; type Item = WebSocketStream<S>;
type Error = WsError; type Error = WsError;
fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> { fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> {
let hs = self.inner.take().expect("Cannot poll a handshake twice"); self.inner.poll()
match hs.handshake()? {
HandshakeResult::Done(stream) => {
Ok(WebSocketStream { inner: stream }.into())
},
HandshakeResult::Incomplete(handshake) => {
// FIXME: Remove this line after we have a guarantee that the underlying handshake
// calls to both `read()`/`write()`. Or replace it by `poll_read()` and
// `poll_write()` (this requires making the handshake's stream public).
task::park().unpark();
self.inner = Some(handshake);
Ok(Async::NotReady)
},
} }
} }
/// Future returned from accept_async() which will resolve
/// once the connection handshake has finished.
pub struct AcceptAsync<S> {
inner: MidHandshake<S, ServerHandshake>,
} }
// FIXME: `ClientHandshakeAsync<S>` and `ServerHandshakeAsync<S>` have the same implementation, we impl<S: Io> Future for AcceptAsync<S> {
// have to get rid of this copy-pasting one day. But currently I don't see an elegant way to write
// it.
impl<S: Io> Future for ServerHandshakeAsync<S> {
type Item = WebSocketStream<S>; type Item = WebSocketStream<S>;
type Error = WsError; type Error = WsError;
fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> { fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> {
let hs = self.inner.take().expect("Cannot poll a handshake twice"); self.inner.poll()
match hs.handshake()? { }
HandshakeResult::Done(stream) => { }
Ok(WebSocketStream { inner: stream }.into())
}, struct MidHandshake<S, R> {
HandshakeResult::Incomplete(handshake) => { inner: Option<Result<WebSocket<S>, HandshakeError<S, R>>>,
// FIXME: Remove this line after we have a guarantee that the underlying handshake }
// calls to both `read()`/`write()`. Or replace it by `poll_read()` and
// `poll_write()` (this requires making the handshake's stream public). impl<S: Io, R: HandshakeRole> Future for MidHandshake<S, R> {
task::park().unpark(); type Item = WebSocketStream<S>;
type Error = WsError;
self.inner = Some(handshake); fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> {
match self.inner.take().expect("cannot poll MidHandshake twice") {
Ok(stream) => Ok(WebSocketStream { inner: stream }.into()),
Err(HandshakeError::Failure(e)) => Err(e),
Err(HandshakeError::Interrupted(s)) => {
match s.handshake() {
Ok(stream) => Ok(WebSocketStream { inner: stream }.into()),
Err(HandshakeError::Failure(e)) => Err(e),
Err(HandshakeError::Interrupted(s)) => {
self.inner = Some(Err(HandshakeError::Interrupted(s)));
Ok(Async::NotReady) Ok(Async::NotReady)
}, }
}
}
} }
} }
} }
@ -176,29 +190,6 @@ impl<T> ToAsync for Result<T, WsError> {
} }
} }
impl<T> Stream for WebSocketStream<T> where T: Io {
type Item = Message;
type Error = WsError;
fn poll(&mut self) -> Poll<Option<Message>, WsError> {
self.inner.read_message().map(|m| Some(m)).to_async()
}
}
impl<T> Sink for WebSocketStream<T> where T: Io {
type SinkItem = Message;
type SinkError = WsError;
fn start_send(&mut self, item: Message) -> StartSend<Message, WsError> {
try!(self.inner.write_message(item).to_async());
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), WsError> {
self.inner.write_pending().to_async()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -209,8 +200,6 @@ mod tests {
use futures::{Future, Stream}; use futures::{Future, Stream};
use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tungstenite::handshake::server::ServerHandshake;
use tungstenite::handshake::client::{ClientHandshake, Request};
#[test] #[test]
fn handshakes() { fn handshakes() {
@ -227,7 +216,7 @@ mod tests {
let connections = listener.incoming(); let connections = listener.incoming();
tx.send(()).unwrap(); tx.send(()).unwrap();
let handshakes = connections.and_then(|(connection, _)| { let handshakes = connections.and_then(|(connection, _)| {
ServerHandshake::<TcpStream>::new_async(connection) accept_async(connection)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}); });
let server = handshakes.for_each(|_| { let server = handshakes.for_each(|_| {
@ -244,7 +233,7 @@ mod tests {
let tcp = TcpStream::connect(&address, &handle); let tcp = TcpStream::connect(&address, &handle);
let handshake = tcp.and_then(|stream| { let handshake = tcp.and_then(|stream| {
let url = url::Url::parse("ws://localhost:12345/").unwrap(); let url = url::Url::parse("ws://localhost:12345/").unwrap();
ClientHandshake::<TcpStream>::new_async(stream, Request { url: url }) client_async(url, stream)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}); });
let client = handshake.and_then(|_| { let client = handshake.and_then(|_| {

Loading…
Cancel
Save