From ef006f58f18c7cc19c3f23b45be0a0e7c65bff1b Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Tue, 4 Apr 2017 14:35:03 +0200 Subject: [PATCH 1/5] Bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f83efd5..1bc289f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT" homepage = "https://github.com/snapview/tokio-tungstenite" documentation = "https://docs.rs/tokio-tungstenite" repository = "https://github.com/snapview/tokio-tungstenite" -version = "0.1.2" +version = "0.1.3" [dependencies] futures = "*" From c8dc978dd54c75a2617c269e4b891f121bb74aef Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Fri, 19 May 2017 18:09:40 +0200 Subject: [PATCH 2/5] Bump version Signed-off-by: Alexey Galakhov --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c7ed320..5ba5ac0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT" homepage = "https://github.com/snapview/tokio-tungstenite" documentation = "https://docs.rs/tokio-tungstenite" repository = "https://github.com/snapview/tokio-tungstenite" -version = "0.1.3" +version = "0.2.1" [dependencies] futures = "*" From 5fbcd8d41325f5cc3326fba576949aa470d810d4 Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Mon, 22 May 2017 21:39:23 +0200 Subject: [PATCH 3/5] Add "connect" convenience function. Signed-off-by: Alexey Galakhov --- Cargo.toml | 25 ++++++++++++++ src/connect.rs | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 8 +++++ src/stream.rs | 73 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+) create mode 100644 src/connect.rs create mode 100644 src/stream.rs diff --git a/Cargo.toml b/Cargo.toml index 5ba5ac0..57dcaac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,11 +10,36 @@ documentation = "https://docs.rs/tokio-tungstenite" repository = "https://github.com/snapview/tokio-tungstenite" version = "0.2.1" +[features] +default = ["connect", "tls"] +connect = ["tokio-dns-unofficial", "tokio-core"] +tls = ["tokio-tls", "native-tls", "bytes"] + [dependencies] futures = "*" tokio-io = "*" tungstenite = "*" url = "*" +[dependencies.bytes] +optional = true +version = "*" + +[dependencies.native-tls] +optional = true +version = "*" + +[dependencies.tokio-dns-unofficial] +optional = true +version = "*" + +[dependencies.tokio-core] +optional = true +version = "*" + +[dependencies.tokio-tls] +optional = true +version = "*" + [dev-dependencies] tokio-core = "*" diff --git a/src/connect.rs b/src/connect.rs new file mode 100644 index 0000000..6743019 --- /dev/null +++ b/src/connect.rs @@ -0,0 +1,93 @@ +//! Connection helper. + +extern crate tokio_dns; +extern crate tokio_core; + +use self::tokio_dns::tcp_connect; +use self::tokio_core::reactor::Remote; + +use futures::{Future, BoxFuture}; +use futures::future; + +use super::{WebSocketStream, Request, client_async}; +use tungstenite::Error; +use tungstenite::client::url_mode; + +#[cfg(feature="tls")] +mod encryption { + extern crate native_tls; + extern crate tokio_tls; + + use super::tokio_core::net::TcpStream; + + use self::native_tls::TlsConnector; + use self::tokio_tls::{TlsConnectorExt, TlsStream}; + + use futures::{Future, BoxFuture}; + use futures::future; + + use tungstenite::Error; + use tungstenite::stream::Mode; + + pub use stream::Stream as StreamSwitcher; + pub type AutoStream = StreamSwitcher>; + + pub fn wrap_stream(socket: TcpStream, domain: String, mode: Mode) -> BoxFuture { + match mode { + Mode::Plain => future::ok(StreamSwitcher::Plain(socket)).boxed(), + Mode::Tls => { + future::result(TlsConnector::builder()) + .and_then(move |builder| future::result(builder.build())) + .and_then(move |connector| connector.connect_async(&domain, socket)) + .map(|s| StreamSwitcher::Tls(s)) + .map_err(|e| Error::Tls(e)) + .boxed() + } + } + } +} + +#[cfg(not(feature="tls"))] +mod encryption { + use super::tokio_core::net::TcpStream; + + use futures::{Future, BoxFuture}; + use futures::future; + + use tungstenite::Error; + use tungstenite::stream::Mode; + + pub type AutoStream = TcpStream; + + pub fn wrap_stream(socket: TcpStream, _domain: String, mode: Mode) -> BoxFuture { + match mode { + Mode::Plain => future::ok(socket).boxed(), + Mode::Tls => future::err(Error::Url("TLS support not compiled in.".into())).boxed(), + } + } +} + +use self::encryption::{AutoStream, wrap_stream}; + +/// Connect to a given URL. +pub fn connect_async(request: R, handle: Remote) -> BoxFuture, Error> +where R: Into> +{ + let request: Request = request.into(); + + // Make sure we check domain and mode first. URL must be valid. + let mode = match url_mode(&request.url) { + Ok(m) => m, + Err(e) => return future::err(e.into()).boxed(), + }; + let domain = match request.url.host_str() { + Some(d) => d.to_string(), + None => return future::err(Error::Url("No host name in the URL".into())).boxed(), + }; + let port = request.url.port_or_known_default().expect("Bug: port unknown"); + + tcp_connect((domain.as_str(), port), handle).map_err(|e| e.into()) + .and_then(move |socket| wrap_stream(socket, domain, mode)) + .and_then(move |stream| client_async(request, stream)) + .boxed() +} diff --git a/src/lib.rs b/src/lib.rs index 0d40efd..6076272 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,11 @@ extern crate tokio_io; extern crate tungstenite; extern crate url; +#[cfg(feature="connect")] +mod connect; +#[cfg(all(feature="connect", feature="tls"))] +mod stream; + use std::io::ErrorKind; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; @@ -39,6 +44,9 @@ use tungstenite::protocol::{WebSocket, Message}; use tungstenite::error::Error as WsError; use tungstenite::server; +#[cfg(feature="connect")] +pub use connect::connect_async; + /// A WebSocket request pub struct Request<'a> { /// URL of the request. diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..51992e3 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,73 @@ +//! Convenience wrapper for streams to switch between plain TCP and TLS at runtime. +//! +//! There is no dependency on actual TLS implementations. Everything like +//! `native_tls` or `openssl` will work as long as there is a TLS stream supporting standard +//! `Read + Write` traits. + +extern crate bytes; + +use std::io::{Read, Write, Result as IoResult, Error as IoError}; + +use self::bytes::{Buf, BufMut}; +use futures::Poll; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Stream, either plain TCP or TLS. +pub enum Stream { + Plain(S), + Tls(T), +} + +impl Read for Stream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + match *self { + Stream::Plain(ref mut s) => s.read(buf), + Stream::Tls(ref mut s) => s.read(buf), + } + } +} + +impl Write for Stream { + fn write(&mut self, buf: &[u8]) -> IoResult { + match *self { + Stream::Plain(ref mut s) => s.write(buf), + Stream::Tls(ref mut s) => s.write(buf), + } + } + fn flush(&mut self) -> IoResult<()> { + match *self { + Stream::Plain(ref mut s) => s.flush(), + Stream::Tls(ref mut s) => s.flush(), + } + } +} + +impl AsyncRead for Stream { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + match *self { + Stream::Plain(ref s) => s.prepare_uninitialized_buffer(buf), + Stream::Tls(ref s) => s.prepare_uninitialized_buffer(buf), + } + } + fn read_buf(&mut self, buf: &mut B) -> Poll { + match *self { + Stream::Plain(ref mut s) => s.read_buf(buf), + Stream::Tls(ref mut s) => s.read_buf(buf), + } + } +} + +impl AsyncWrite for Stream { + fn shutdown(&mut self) -> Poll<(), IoError> { + match *self { + Stream::Plain(ref mut s) => s.shutdown(), + Stream::Tls(ref mut s) => s.shutdown(), + } + } + fn write_buf(&mut self, buf: &mut B) -> Poll { + match *self { + Stream::Plain(ref mut s) => s.write_buf(buf), + Stream::Tls(ref mut s) => s.write_buf(buf), + } + } +} From 7fbf836851076ca5cea6849c782e7329636522d1 Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Mon, 22 May 2017 22:55:46 +0200 Subject: [PATCH 4/5] Update example for new API. Signed-off-by: Alexey Galakhov --- examples/client.rs | 47 ++++++++++++++++++++-------------------------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index d432849..f02afd6 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -18,16 +18,14 @@ extern crate url; use std::env; use std::io::{self, Read, Write}; -use std::net::ToSocketAddrs; use std::thread; use futures::sync::mpsc; use futures::{Future, Sink, Stream}; -use tokio_core::net::TcpStream; use tokio_core::reactor::Core; use tungstenite::protocol::Message; -use tokio_tungstenite::client_async; +use tokio_tungstenite::connect_async; fn main() { // Specify the server address to which the client will be connecting. @@ -35,14 +33,11 @@ fn main() { panic!("this program requires at least one argument") }); - // Get a first IP address of the server from the server URL. let url = url::Url::parse(&connect_addr).unwrap(); - let addr = url.to_socket_addrs().unwrap().next().unwrap(); // Create the event loop and initiate the connection to the remote server. let mut core = Core::new().unwrap(); let handle = core.handle(); - let tcp = TcpStream::connect(&addr, &handle); // Right now Tokio doesn't support a handle to stdin running on the event // loop, so we farm out that work to a separate thread. This thread will @@ -68,30 +63,28 @@ fn main() { // finishes. If we don't have any more data to read or we won't receive any // more work from the remote then we can exit. let mut stdout = io::stdout(); - let client = tcp.and_then(|stream| { - client_async(url, stream).and_then(|ws_stream| { - println!("WebSocket handshake has been successfully completed"); + let client = connect_async(url, handle.remote().clone()).and_then(|ws_stream| { + println!("WebSocket handshake has been successfully completed"); - // `sink` is the stream of messages going out. - // `stream` is the stream of incoming messages. - let (sink, stream) = ws_stream.split(); + // `sink` is the stream of messages going out. + // `stream` is the stream of incoming messages. + let (sink, stream) = ws_stream.split(); - // We forward all messages, composed out of the data, entered to - // the stdin, to the `sink`. - let send_stdin = stdin_rx.forward(sink); - let write_stdout = stream.for_each(|message| { - stdout.write_all(&message.into_data()).unwrap(); - Ok(()) - }); + // We forward all messages, composed out of the data, entered to + // the stdin, to the `sink`. + let send_stdin = stdin_rx.forward(sink); + let write_stdout = stream.for_each(|message| { + stdout.write_all(&message.into_data()).unwrap(); + Ok(()) + }); - // Wait for either of futures to complete. - send_stdin.map(|_| ()) - .select(write_stdout.map(|_| ())) - .then(|_| Ok(())) - }).map_err(|e| { - println!("Error during the websocket handshake occurred: {}", e); - io::Error::new(io::ErrorKind::Other, e) - }) + // Wait for either of futures to complete. + send_stdin.map(|_| ()) + .select(write_stdout.map(|_| ())) + .then(|_| Ok(())) + }).map_err(|e| { + println!("Error during the websocket handshake occurred: {}", e); + io::Error::new(io::ErrorKind::Other, e) }); // And now that we've got our client, we execute it in the event loop! From e7c3ef525086eec00e41ec84a7678e7d92f527b9 Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Tue, 23 May 2017 03:11:34 +0200 Subject: [PATCH 5/5] Set TCP_NODELAY on connect. Signed-off-by: Alexey Galakhov --- Cargo.toml | 5 +++-- src/connect.rs | 26 ++++++++++++++++++++++++++ src/lib.rs | 5 +++-- src/stream.rs | 17 +++++++++++++++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 57dcaac..1c9c97e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,9 @@ version = "0.2.1" [features] default = ["connect", "tls"] -connect = ["tokio-dns-unofficial", "tokio-core"] -tls = ["tokio-tls", "native-tls", "bytes"] +connect = ["tokio-dns-unofficial", "tokio-core", "stream"] +tls = ["tokio-tls", "native-tls", "stream"] +stream = ["bytes"] [dependencies] futures = "*" diff --git a/src/connect.rs b/src/connect.rs index 6743019..4d43694 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -6,12 +6,23 @@ extern crate tokio_core; use self::tokio_dns::tcp_connect; use self::tokio_core::reactor::Remote; +use std::io::Result as IoResult; + use futures::{Future, BoxFuture}; use futures::future; use super::{WebSocketStream, Request, client_async}; use tungstenite::Error; use tungstenite::client::url_mode; +use stream::NoDelay; + +use self::tokio_core::net::TcpStream; + +impl NoDelay for TcpStream { + fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { + TcpStream::set_nodelay(self, nodelay) + } +} #[cfg(feature="tls")] mod encryption { @@ -23,15 +34,25 @@ mod encryption { use self::native_tls::TlsConnector; use self::tokio_tls::{TlsConnectorExt, TlsStream}; + use std::io::{Read, Write, Result as IoResult}; + use futures::{Future, BoxFuture}; use futures::future; use tungstenite::Error; use tungstenite::stream::Mode; + use stream::NoDelay; + pub use stream::Stream as StreamSwitcher; pub type AutoStream = StreamSwitcher>; + impl NoDelay for TlsStream { + fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { + self.get_mut().get_mut().set_nodelay(nodelay) + } + } + pub fn wrap_stream(socket: TcpStream, domain: String, mode: Mode) -> BoxFuture { match mode { Mode::Plain => future::ok(StreamSwitcher::Plain(socket)).boxed(), @@ -88,6 +109,11 @@ where R: Into> tcp_connect((domain.as_str(), port), handle).map_err(|e| e.into()) .and_then(move |socket| wrap_stream(socket, domain, mode)) + .and_then(|mut stream| { + NoDelay::set_nodelay(&mut stream, true) + .map(move |()| stream) + .map_err(|e| e.into()) + }) .and_then(move |stream| client_async(request, stream)) .boxed() } diff --git a/src/lib.rs b/src/lib.rs index 6076272..203e1a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,8 +27,9 @@ extern crate url; #[cfg(feature="connect")] mod connect; -#[cfg(all(feature="connect", feature="tls"))] -mod stream; + +#[cfg(feature="stream")] +pub mod stream; use std::io::ErrorKind; diff --git a/src/stream.rs b/src/stream.rs index 51992e3..d06cb1e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -12,9 +12,17 @@ use self::bytes::{Buf, BufMut}; use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; +/// Trait to switch TCP_NODELAY. +pub trait NoDelay { + /// Set the TCP_NODELAY option to the given value. + fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()>; +} + /// Stream, either plain TCP or TLS. pub enum Stream { + /// Unencrypted socket stream. Plain(S), + /// Encrypted socket stream. Tls(T), } @@ -42,6 +50,15 @@ impl Write for Stream { } } +impl NoDelay for Stream { + fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { + match *self { + Stream::Plain(ref mut s) => s.set_nodelay(nodelay), + Stream::Tls(ref mut s) => s.set_nodelay(nodelay), + } + } +} + impl AsyncRead for Stream { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { match *self {