diff --git a/Cargo.toml b/Cargo.toml index e504ed9..b787b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ homepage = "https://github.com/snapview/tokio-tungstenite" documentation = "https://docs.rs/tokio-tungstenite/0.9.0" repository = "https://github.com/snapview/tokio-tungstenite" version = "0.9.0" +edition = "2018" [features] default = ["connect", "tls"] diff --git a/README.md b/README.md index ca1e1f7..9aa6e65 100644 --- a/README.md +++ b/README.md @@ -10,19 +10,13 @@ Asynchronous WebSockets for Tokio stack. ## Usage -First, you need to add this in your `Cargo.toml`: +Add this in your `Cargo.toml`: ```toml [dependencies] tokio-tungstenite = "*" ``` -Next, add this to your crate: - -```rust -extern crate tokio_tungstenite; -``` - Take a look at the `examples/` directory for client and server examples. You may also want to get familiar with [tokio](https://tokio.rs/) if you don't have any experience with it. diff --git a/examples/autobahn-client.rs b/examples/autobahn-client.rs index 9dc8125..172cd5a 100644 --- a/examples/autobahn-client.rs +++ b/examples/autobahn-client.rs @@ -1,27 +1,15 @@ -#[macro_use] extern crate log; -extern crate env_logger; -extern crate futures; -extern crate tokio; -extern crate tokio_tungstenite; -extern crate url; - -use url::Url; use futures::{Future, Stream}; +use log::*; use tokio_tungstenite::{ connect_async, - tungstenite::{ - connect, - Result, - Error as WsError, - }, + tungstenite::{connect, Error as WsError, Result}, }; +use url::Url; const AGENT: &'static str = "Tungstenite"; fn get_case_count() -> Result { - let (mut socket, _) = connect( - Url::parse("ws://localhost:9001/getCaseCount").unwrap(), - )?; + let (mut socket, _) = connect(Url::parse("ws://localhost:9001/getCaseCount").unwrap())?; let msg = socket.read_message()?; socket.close(None)?; Ok(msg.into_text()?.parse::().unwrap()) @@ -29,7 +17,11 @@ fn get_case_count() -> Result { fn update_reports() -> Result<()> { let (mut socket, _) = connect( - Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT)).unwrap(), + Url::parse(&format!( + "ws://localhost:9001/updateReports?agent={}", + AGENT + )) + .unwrap(), )?; socket.close(None)?; Ok(()) @@ -37,9 +29,11 @@ fn update_reports() -> Result<()> { fn run_test(case: u32) { info!("Running test case {}", case); - let case_url = Url::parse( - &format!("ws://localhost:9001/runCase?case={}&agent={}", case, AGENT) - ).unwrap(); + let case_url = Url::parse(&format!( + "ws://localhost:9001/runCase?case={}&agent={}", + case, AGENT + )) + .unwrap(); let job = connect_async(case_url) .map_err(|err| error!("Connect error: {}", err)) @@ -49,11 +43,9 @@ fn run_test(case: u32) { .filter(|msg| msg.is_text() || msg.is_binary()) .forward(sink) .and_then(|(_stream, _sink)| Ok(())) - .map_err(|err| { - match err { - WsError::ConnectionClosed => (), - err => info!("WS error {}", err), - } + .map_err(|err| match err { + WsError::ConnectionClosed => (), + err => info!("WS error {}", err), }) }); diff --git a/examples/autobahn-server.rs b/examples/autobahn-server.rs index ef51241..23505d2 100644 --- a/examples/autobahn-server.rs +++ b/examples/autobahn-server.rs @@ -1,15 +1,7 @@ -#[macro_use] extern crate log; -extern crate env_logger; -extern crate futures; -extern crate tokio; -extern crate tokio_tungstenite; - use futures::{Future, Stream}; +use log::*; use tokio::net::TcpListener; -use tokio_tungstenite::{ - accept_async, - tungstenite::Error as WsError, -}; +use tokio_tungstenite::{accept_async, tungstenite::Error as WsError}; fn main() { env_logger::init(); @@ -20,30 +12,31 @@ fn main() { let socket = TcpListener::bind(&addr).unwrap(); info!("Listening on: {}", addr); - let srv = socket.incoming().map_err(Into::into).for_each(move |stream| { - - let peer = stream.peer_addr().expect("connected streams should have a peer address"); - info!("Peer address: {}", peer); - - accept_async(stream).and_then(move |ws_stream| { - info!("New WebSocket connection: {}", peer); - let (sink, stream) = ws_stream.split(); - let job = stream - .filter(|msg| msg.is_text() || msg.is_binary()) - .forward(sink) - .and_then(|(_stream, _sink)| Ok(())) - .map_err(|err| { - match err { + let srv = socket + .incoming() + .map_err(Into::into) + .for_each(move |stream| { + let peer = stream + .peer_addr() + .expect("connected streams should have a peer address"); + info!("Peer address: {}", peer); + + accept_async(stream).and_then(move |ws_stream| { + info!("New WebSocket connection: {}", peer); + let (sink, stream) = ws_stream.split(); + let job = stream + .filter(|msg| msg.is_text() || msg.is_binary()) + .forward(sink) + .and_then(|(_stream, _sink)| Ok(())) + .map_err(|err| match err { WsError::ConnectionClosed => (), err => info!("WS error: {}", err), - } - }); - - tokio::spawn(job); - Ok(()) - }) - }); + }); + tokio::spawn(job); + Ok(()) + }) + }); runtime.block_on(srv).unwrap(); } diff --git a/examples/client.rs b/examples/client.rs index 4d746b5..00eaa35 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -10,12 +10,6 @@ //! //! You can use this example together with the `server` example. -extern crate futures; -extern crate tokio; -extern crate tokio_tungstenite; -extern crate tungstenite; -extern crate url; - use std::env; use std::io::{self, Read, Write}; use std::thread; @@ -29,9 +23,9 @@ use tokio_tungstenite::stream::PeerAddr; fn main() { // Specify the server address to which the client will be connecting. - let connect_addr = env::args().nth(1).unwrap_or_else(|| { - panic!("this program requires at least one argument") - }); + let connect_addr = env::args() + .nth(1) + .unwrap_or_else(|| panic!("this program requires at least one argument")); let url = url::Url::parse(&connect_addr).unwrap(); @@ -59,32 +53,37 @@ fn main() { // finishes. If we don't have any more data to read or we won't receive any // more work from the remote then we can exit. let mut stdout = io::stdout(); - let client = connect_async(url).and_then(move |(ws_stream, _)| { - println!("WebSocket handshake has been successfully completed"); + let client = connect_async(url) + .and_then(move |(ws_stream, _)| { + println!("WebSocket handshake has been successfully completed"); - let addr = ws_stream.peer_addr().expect("connected streams should have a peer address"); - println!("Peer address: {}", addr); + let addr = ws_stream + .peer_addr() + .expect("connected streams should have a peer address"); + println!("Peer address: {}", addr); - // `sink` is the stream of messages going out. - // `stream` is the stream of incoming messages. - let (sink, stream) = ws_stream.split(); + // `sink` is the stream of messages going out. + // `stream` is the stream of incoming messages. + let (sink, stream) = ws_stream.split(); - // We forward all messages, composed out of the data, entered to - // the stdin, to the `sink`. - let send_stdin = stdin_rx.forward(sink); - let write_stdout = stream.for_each(move |message| { - stdout.write_all(&message.into_data()).unwrap(); - Ok(()) - }); + // We forward all messages, composed out of the data, entered to + // the stdin, to the `sink`. + let send_stdin = stdin_rx.forward(sink); + let write_stdout = stream.for_each(move |message| { + stdout.write_all(&message.into_data()).unwrap(); + Ok(()) + }); - // Wait for either of futures to complete. - send_stdin.map(|_| ()) - .select(write_stdout.map(|_| ())) - .then(|_| Ok(())) - }).map_err(|e| { - println!("Error during the websocket handshake occurred: {}", e); - io::Error::new(io::ErrorKind::Other, e) - }); + // Wait for either of futures to complete. + send_stdin + .map(|_| ()) + .select(write_stdout.map(|_| ())) + .then(|_| Ok(())) + }) + .map_err(|e| { + println!("Error during the websocket handshake occurred: {}", e); + io::Error::new(io::ErrorKind::Other, e) + }); // And now that we've got our client, we execute it in the event loop! tokio::runtime::run(client.map_err(|_e| ())); @@ -97,8 +96,7 @@ fn read_stdin(mut tx: mpsc::Sender) { loop { let mut buf = vec![0; 1024]; let n = match stdin.read(&mut buf) { - Err(_) | - Ok(0) => break, + Err(_) | Ok(0) => break, Ok(n) => n, }; buf.truncate(n); diff --git a/examples/server.rs b/examples/server.rs index f24a71c..82fe890 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -17,15 +17,10 @@ //! connected clients they'll all join the same room and see everyone else's //! messages. -extern crate futures; -extern crate tokio; -extern crate tokio_tungstenite; -extern crate tungstenite; - use std::collections::HashMap; use std::env; use std::io::{Error, ErrorKind}; -use std::sync::{Arc,Mutex}; +use std::sync::{Arc, Mutex}; use futures::stream::Stream; use futures::Future; @@ -47,8 +42,9 @@ fn main() { let connections = Arc::new(Mutex::new(HashMap::new())); let srv = socket.incoming().for_each(move |stream| { - - let addr = stream.peer_addr().expect("connected streams should have a peer address"); + let addr = stream + .peer_addr() + .expect("connected streams should have a peer address"); println!("Peer address: {}", addr); // We have to clone both of these values, because the `and_then` @@ -58,62 +54,67 @@ fn main() { // `for_each` future). let connections_inner = connections.clone(); - accept_async(stream).and_then(move |ws_stream| { - println!("New WebSocket connection: {}", addr); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (tx, rx) = futures::sync::mpsc::unbounded(); - connections_inner.lock().unwrap().insert(addr, tx); - - // Let's split the WebSocket stream, so we can work with the - // reading and writing halves separately. - let (sink, stream) = ws_stream.split(); - - // Whenever we receive a message from the client, we print it and - // send to other clients, excluding the sender. - let connections = connections_inner.clone(); - let ws_reader = stream.for_each(move |message: Message| { - println!("Received a message from {}: {}", addr, message); - - // For each open connection except the sender, send the - // string via the channel. - let mut conns = connections.lock().unwrap(); - let iter = conns.iter_mut() - .filter(|&(&k, _)| k != addr) - .map(|(_, v)| v); - for tx in iter { - tx.unbounded_send(message.clone()).unwrap(); - } - Ok(()) - }); - - // Whenever we receive a string on the Receiver, we write it to - // `WriteHalf`. - let ws_writer = rx.fold(sink, |mut sink, msg| { - use futures::Sink; - sink.start_send(msg).unwrap(); - Ok(sink) - }); - - // Now that we've got futures representing each half of the socket, we - // use the `select` combinator to wait for either half to be done to - // tear down the other. Then we spawn off the result. - let connection = ws_reader.map(|_| ()).map_err(|_| ()) - .select(ws_writer.map(|_| ()).map_err(|_| ())); - - tokio::spawn(connection.then(move |_| { - connections_inner.lock().unwrap().remove(&addr); - println!("Connection {} closed.", addr); - Ok(()) - })); + accept_async(stream) + .and_then(move |ws_stream| { + println!("New WebSocket connection: {}", addr); + + // Create a channel for our stream, which other sockets will use to + // send us messages. Then register our address with the stream to send + // data to us. + let (tx, rx) = futures::sync::mpsc::unbounded(); + connections_inner.lock().unwrap().insert(addr, tx); + + // Let's split the WebSocket stream, so we can work with the + // reading and writing halves separately. + let (sink, stream) = ws_stream.split(); + + // Whenever we receive a message from the client, we print it and + // send to other clients, excluding the sender. + let connections = connections_inner.clone(); + let ws_reader = stream.for_each(move |message: Message| { + println!("Received a message from {}: {}", addr, message); + + // For each open connection except the sender, send the + // string via the channel. + let mut conns = connections.lock().unwrap(); + let iter = conns + .iter_mut() + .filter(|&(&k, _)| k != addr) + .map(|(_, v)| v); + for tx in iter { + tx.unbounded_send(message.clone()).unwrap(); + } + Ok(()) + }); + + // Whenever we receive a string on the Receiver, we write it to + // `WriteHalf`. + let ws_writer = rx.fold(sink, |mut sink, msg| { + use futures::Sink; + sink.start_send(msg).unwrap(); + Ok(sink) + }); + + // Now that we've got futures representing each half of the socket, we + // use the `select` combinator to wait for either half to be done to + // tear down the other. Then we spawn off the result. + let connection = ws_reader + .map(|_| ()) + .map_err(|_| ()) + .select(ws_writer.map(|_| ()).map_err(|_| ())); + + tokio::spawn(connection.then(move |_| { + connections_inner.lock().unwrap().remove(&addr); + println!("Connection {} closed.", addr); + Ok(()) + })); - Ok(()) - }).map_err(|e| { - println!("Error during the websocket handshake occurred: {}", e); - Error::new(ErrorKind::Other, e) - }) + Ok(()) + }) + .map_err(|e| { + println!("Error during the websocket handshake occurred: {}", e); + Error::new(ErrorKind::Other, e) + }) }); // Execute server. diff --git a/src/connect.rs b/src/connect.rs index 2189eb4..00e4fb3 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,22 +1,19 @@ //! Connection helper. -extern crate tokio_dns; -extern crate tokio_tcp; - -use std::net::SocketAddr; use std::io::Result as IoResult; +use std::net::SocketAddr; -use self::tokio_tcp::TcpStream; +use tokio_tcp::TcpStream; use futures::{future, Future}; use tokio_io::{AsyncRead, AsyncWrite}; -use tungstenite::Error; use tungstenite::client::url_mode; use tungstenite::handshake::client::Response; +use tungstenite::Error; -use stream::{NoDelay, PeerAddr}; -use super::{WebSocketStream, Request, client_async}; +use super::{client_async, Request, WebSocketStream}; +use crate::stream::{NoDelay, PeerAddr}; impl NoDelay for TcpStream { fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { @@ -30,24 +27,21 @@ impl PeerAddr for TcpStream { } } -#[cfg(feature="tls")] +#[cfg(feature = "tls")] mod encryption { - extern crate native_tls; - extern crate tokio_tls; - - use self::native_tls::TlsConnector; - use self::tokio_tls::{TlsConnector as TokioTlsConnector, TlsStream}; + use native_tls::TlsConnector; + use tokio_tls::{TlsConnector as TokioTlsConnector, TlsStream}; + use std::io::{Read, Result as IoResult, Write}; use std::net::SocketAddr; - use std::io::{Read, Write, Result as IoResult}; use futures::{future, Future}; use tokio_io::{AsyncRead, AsyncWrite}; - use tungstenite::Error; use tungstenite::stream::Mode; + use tungstenite::Error; - use stream::{NoDelay, PeerAddr, Stream as StreamSwitcher}; + use crate::stream::{NoDelay, PeerAddr, Stream as StreamSwitcher}; /// A stream that might be protected with TLS. pub type MaybeTlsStream = StreamSwitcher>; @@ -66,50 +60,58 @@ mod encryption { } } - pub fn wrap_stream(socket: S, domain: String, mode: Mode) - -> Box, Error=Error> + Send> + pub fn wrap_stream( + socket: S, + domain: String, + mode: Mode, + ) -> Box, Error = Error> + Send> where S: 'static + AsyncRead + AsyncWrite + Send, { match mode { Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))), - Mode::Tls => { - Box::new(future::result(TlsConnector::new()) - .map(TokioTlsConnector::from) - .and_then(move |connector| connector.connect(&domain, socket)) - .map(StreamSwitcher::Tls) - .map_err(Error::Tls)) - } + Mode::Tls => Box::new( + future::result(TlsConnector::new()) + .map(TokioTlsConnector::from) + .and_then(move |connector| connector.connect(&domain, socket)) + .map(StreamSwitcher::Tls) + .map_err(Error::Tls), + ), } } } -#[cfg(feature="tls")] +#[cfg(feature = "tls")] pub use self::encryption::MaybeTlsStream; -#[cfg(not(feature="tls"))] +#[cfg(not(feature = "tls"))] mod encryption { use futures::{future, Future}; use tokio_io::{AsyncRead, AsyncWrite}; - use tungstenite::Error; use tungstenite::stream::Mode; + use tungstenite::Error; pub type AutoStream = S; - pub fn wrap_stream(socket: S, _domain: String, mode: Mode) - -> Box, Error=Error> + Send> + pub fn wrap_stream( + socket: S, + _domain: String, + mode: Mode, + ) -> Box, Error = Error> + Send> where S: 'static + AsyncRead + AsyncWrite + Send, { match mode { Mode::Plain => Box::new(future::ok(socket)), - Mode::Tls => Box::new(future::err(Error::Url("TLS support not compiled in.".into()))), + Mode::Tls => Box::new(future::err(Error::Url( + "TLS support not compiled in.".into(), + ))), } } } -use self::encryption::{AutoStream, wrap_stream}; +use self::encryption::{wrap_stream, AutoStream}; /// Get a domain from an URL. #[inline] @@ -122,8 +124,10 @@ fn domain(request: &Request) -> Result { /// Creates a WebSocket handshake from a request and a stream, /// upgrading the stream to TLS if required. -pub fn client_async_tls(request: R, stream: S) - -> Box>, Response), Error=Error> + Send> +pub fn client_async_tls( + request: R, + stream: S, +) -> Box>, Response), Error = Error> + Send> where R: Into>, S: 'static + AsyncRead + AsyncWrite + NoDelay + Send, @@ -141,20 +145,23 @@ where Err(e) => return Box::new(future::err(e)), }; - Box::new(wrap_stream(stream, domain, mode) - .and_then(|mut stream| { - NoDelay::set_nodelay(&mut stream, true) - .map(move |()| stream) - .map_err(|e| e.into()) - }) - .and_then(move |stream| client_async(request, stream))) + Box::new( + wrap_stream(stream, domain, mode) + .and_then(|mut stream| { + NoDelay::set_nodelay(&mut stream, true) + .map(move |()| stream) + .map_err(|e| e.into()) + }) + .and_then(move |stream| client_async(request, stream)), + ) } /// Connect to a given URL. -pub fn connect_async(request: R) - -> Box>, Response), Error=Error> + Send> +pub fn connect_async( + request: R, +) -> Box>, Response), Error = Error> + Send> where - R: Into> + R: Into>, { let request: Request = request.into(); @@ -162,8 +169,14 @@ where Ok(domain) => domain, Err(err) => return Box::new(future::err(err)), }; - let port = request.url.port_or_known_default().expect("Bug: port unknown"); - - Box::new(tokio_dns::TcpStream::connect((domain.as_str(), port)).map_err(|e| e.into()) - .and_then(move |socket| client_async_tls(request, socket))) + let port = request + .url + .port_or_known_default() + .expect("Bug: port unknown"); + + Box::new( + tokio_dns::TcpStream::connect((domain.as_str(), port)) + .map_err(|e| e.into()) + .and_then(move |socket| client_async_tls(request, socket)), + ) } diff --git a/src/lib.rs b/src/lib.rs index fb70c47..469920b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,48 +13,43 @@ unused_must_use, unused_mut, unused_imports, - unused_import_braces)] + unused_import_braces +)] -extern crate futures; -extern crate tokio_io; +pub use tungstenite; -pub extern crate tungstenite; - -#[cfg(feature="connect")] +#[cfg(feature = "connect")] mod connect; -#[cfg(feature="stream")] +#[cfg(feature = "stream")] pub mod stream; use std::io::ErrorKind; -#[cfg(feature="stream")] -use std::{ - net::SocketAddr, - io::Result as IoResult, -}; +#[cfg(feature = "stream")] +use std::{io::Result as IoResult, net::SocketAddr}; -use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; +use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use tungstenite::{ error::Error as WsError, handshake::{ - HandshakeRole, HandshakeError, - client::{ClientHandshake, Response, Request}, - server::{ServerHandshake, Callback, NoCallback}, + client::{ClientHandshake, Request, Response}, + server::{Callback, NoCallback, ServerHandshake}, + HandshakeError, HandshakeRole, }, - protocol::{WebSocket, Message, Role, WebSocketConfig}, + protocol::{Message, Role, WebSocket, WebSocketConfig}, server, }; -#[cfg(feature="connect")] -pub use connect::{connect_async, client_async_tls}; +#[cfg(feature = "connect")] +pub use connect::{client_async_tls, connect_async}; -#[cfg(feature="stream")] +#[cfg(feature = "stream")] pub use stream::PeerAddr; -#[cfg(all(feature="connect", feature="tls"))] +#[cfg(all(feature = "connect", feature = "tls"))] pub use connect::MaybeTlsStream; /// Creates a WebSocket handshake from a request and a stream. @@ -69,13 +64,10 @@ pub use connect::MaybeTlsStream; /// /// This is typically used for clients who have already established, for /// example, a TCP connection to the remote server. -pub fn client_async<'a, R, S>( - request: R, - stream: S, -) -> ConnectAsync +pub fn client_async<'a, R, S>(request: R, stream: S) -> ConnectAsync where R: Into>, - S: AsyncRead + AsyncWrite + S: AsyncRead + AsyncWrite, { client_async_with_config(request, stream, None) } @@ -89,12 +81,12 @@ pub fn client_async_with_config<'a, R, S>( ) -> ConnectAsync where R: Into>, - S: AsyncRead + AsyncWrite + S: AsyncRead + AsyncWrite, { ConnectAsync { inner: MidHandshake { - inner: Some(ClientHandshake::start(stream, request.into(), config).handshake()) - } + inner: Some(ClientHandshake::start(stream, request.into(), config).handshake()), + }, } } @@ -154,8 +146,8 @@ where { AcceptAsync { inner: MidHandshake { - inner: Some(server::accept_hdr_with_config(stream, callback, config)) - } + inner: Some(server::accept_hdr_with_config(stream, callback, config)), + }, } } @@ -175,11 +167,7 @@ pub struct WebSocketStream { impl WebSocketStream { /// Convert a raw socket into a WebSocketStream without performing a /// handshake. - pub fn from_raw_socket( - stream: S, - role: Role, - config: Option, - ) -> Self { + pub fn from_raw_socket(stream: S, role: Role, config: Option) -> Self { Self::new(WebSocket::from_raw_socket(stream, role, config)) } @@ -195,20 +183,21 @@ impl WebSocketStream { } fn new(ws: WebSocket) -> Self { - WebSocketStream { - inner: ws, - } + WebSocketStream { inner: ws } } } -#[cfg(feature="stream")] +#[cfg(feature = "stream")] impl PeerAddr for WebSocketStream { fn peer_addr(&self) -> IoResult { self.inner.get_ref().peer_addr() } } -impl Stream for WebSocketStream where T: AsyncRead + AsyncWrite { +impl Stream for WebSocketStream +where + T: AsyncRead + AsyncWrite, +{ type Item = Message; type Error = WsError; @@ -219,12 +208,15 @@ impl Stream for WebSocketStream where T: AsyncRead + AsyncWrite { .to_async() .or_else(|err| match err { WsError::ConnectionClosed => Ok(Async::Ready(None)), - err => Err(err) + err => Err(err), }) } } -impl Sink for WebSocketStream where T: AsyncRead + AsyncWrite { +impl Sink for WebSocketStream +where + T: AsyncRead + AsyncWrite, +{ type SinkItem = Message; type SinkError = WsError; @@ -289,16 +281,14 @@ impl Future for MidHandshake { match self.inner.take().expect("cannot poll MidHandshake twice") { Ok(result) => Ok(Async::Ready(result)), Err(HandshakeError::Failure(e)) => Err(e), - Err(HandshakeError::Interrupted(s)) => { - match s.handshake() { - Ok(result) => Ok(Async::Ready(result)), - Err(HandshakeError::Failure(e)) => Err(e), - Err(HandshakeError::Interrupted(s)) => { - self.inner = Some(Err(HandshakeError::Interrupted(s))); - Ok(Async::NotReady) - } + Err(HandshakeError::Interrupted(s)) => match s.handshake() { + Ok(result) => Ok(Async::Ready(result)), + Err(HandshakeError::Failure(e)) => Err(e), + Err(HandshakeError::Interrupted(s)) => { + self.inner = Some(Err(HandshakeError::Interrupted(s))); + Ok(Async::NotReady) } - } + }, } } } @@ -339,8 +329,7 @@ impl ToStartSend for Result<(), WsError> { WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::Ready), WsError::SendQueueFull(msg) => Ok(AsyncSink::NotReady(msg)), err => Err(err), - } + }, } } } - diff --git a/src/stream.rs b/src/stream.rs index 944972e..d961abe 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -4,12 +4,10 @@ //! `native_tls` or `openssl` will work as long as there is a TLS stream supporting standard //! `Read + Write` traits. -extern crate bytes; - +use std::io::{Error as IoError, Read, Result as IoResult, Write}; use std::net::SocketAddr; -use std::io::{Read, Write, Result as IoResult, Error as IoError}; -use self::bytes::{Buf, BufMut}; +use bytes::{Buf, BufMut}; use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; diff --git a/tests/handshakes.rs b/tests/handshakes.rs index e721994..530e7f6 100644 --- a/tests/handshakes.rs +++ b/tests/handshakes.rs @@ -1,13 +1,8 @@ -extern crate futures; -extern crate tokio_tcp; -extern crate tokio_tungstenite; -extern crate url; - use std::io; use futures::{Future, Stream}; -use tokio_tcp::{TcpStream, TcpListener}; -use tokio_tungstenite::{client_async, accept_async}; +use tokio_tcp::{TcpListener, TcpStream}; +use tokio_tungstenite::{accept_async, client_async}; #[test] fn handshakes() { @@ -24,9 +19,7 @@ fn handshakes() { let handshakes = connections.and_then(|connection| { accept_async(connection).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) }); - let server = handshakes.for_each(|_| { - Ok(()) - }); + let server = handshakes.for_each(|_| Ok(())); server.wait().unwrap(); }); @@ -38,8 +31,6 @@ fn handshakes() { let url = url::Url::parse("ws://localhost:12345/").unwrap(); client_async(url, stream).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) }); - let client = handshake.and_then(|_| { - Ok(()) - }); + let client = handshake.and_then(|_| Ok(())); client.wait().unwrap(); }