From b61ed965615d74e33e1afd3ce0ee5fa571db1b07 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Thu, 19 Jul 2018 17:21:41 +0200 Subject: [PATCH] Update to the newest `tungstenite-rs` version * Fixes #35 * Replaces https://github.com/snapview/tokio-tungstenite/pull/40 * Gives a good background for https://github.com/snapview/tokio-tungstenite/pull/41 --- Cargo.toml | 7 +++- src/connect.rs | 3 +- src/lib.rs | 106 ++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 97 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 36ec764..32a31fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ stream = ["bytes"] futures = "0.1.17" [dependencies.tungstenite] -version = "0.5.3" +version = "0.6.0" default-features = false [dependencies.bytes] @@ -29,7 +29,7 @@ version = "0.4.6" [dependencies.native-tls] optional = true -version = "0.1.5" +version = "0.2.0" [dependencies.tokio-dns-unofficial] optional = true @@ -46,3 +46,6 @@ version = "0.1.4" [dev-dependencies] tokio-core = "0.1.12" url = "1.6.0" + +[patch.crates-io] +tokio-tls = { git = "https://github.com/aep/tokio-tls.git", rev = "7865734d2167160cabd4422aca76b8478e643b41" } diff --git a/src/connect.rs b/src/connect.rs index 4727019..aa47a70 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -57,8 +57,7 @@ mod encryption { match mode { Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))), Mode::Tls => { - Box::new(future::result(TlsConnector::builder()) - .and_then(move |builder| future::result(builder.build())) + Box::new(future::result(TlsConnector::builder().build()) .and_then(move |connector| connector.connect_async(&domain, socket)) .map(|s| StreamSwitcher::Tls(s)) .map_err(|e| Error::Tls(e))) diff --git a/src/lib.rs b/src/lib.rs index e2def55..6f0993f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,12 +31,16 @@ use std::io::ErrorKind; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; use tokio::io::{AsyncRead, AsyncWrite}; -use tungstenite::handshake::client::{ClientHandshake, Response, Request}; -use tungstenite::handshake::server::{ServerHandshake, Callback, NoCallback}; -use tungstenite::handshake::{HandshakeRole, HandshakeError}; -use tungstenite::protocol::{WebSocket, Message, Role}; -use tungstenite::error::Error as WsError; -use tungstenite::server; +use tungstenite::{ + error::Error as WsError, + handshake::{ + HandshakeRole, HandshakeError, + client::{ClientHandshake, Response, Request}, + server::{ServerHandshake, Callback, NoCallback}, + }, + protocol::{WebSocket, Message, Role, WebSocketConfig}, + server, +}; #[cfg(feature="connect")] pub use connect::{connect_async, client_async_tls}; @@ -53,14 +57,31 @@ pub use connect::{connect_async, client_async_tls}; /// /// This is typically used for clients who have already established, for /// example, a TCP connection to the remote server. -pub fn client_async<'a, R, S>(request: R, stream: S) -> ConnectAsync +pub fn client_async<'a, R, S>( + request: R, + stream: S, +) -> ConnectAsync +where + R: Into>, + S: AsyncRead + AsyncWrite +{ + client_async_with_config(request, stream, None) +} + +/// The same as `client_async()` but the one can specify a websocket configuration. +/// Please refer to `client_async()` for more details. +pub fn client_async_with_config<'a, R, S>( + request: R, + stream: S, + config: Option, +) -> ConnectAsync where R: Into>, S: AsyncRead + AsyncWrite { ConnectAsync { inner: MidHandshake { - inner: Some(ClientHandshake::start(stream, request.into()).handshake()) + inner: Some(ClientHandshake::start(stream, request.into(), config).handshake()) } } } @@ -83,19 +104,45 @@ where accept_hdr_async(stream, NoCallback) } +/// The same as `accept_async()` but the one can specify a websocket configuration. +/// Please refer to `accept_async()` for more details. +pub fn accept_async_with_config( + stream: S, + config: Option, +) -> AcceptAsync +where + S: AsyncRead + AsyncWrite, +{ + accept_hdr_async_with_config(stream, NoCallback, config) +} + /// Accepts a new WebSocket connection with the provided stream. /// /// This function does the same as `accept_async()` but accepts an extra callback /// for header processing. The callback receives headers of the incoming /// requests and is able to add extra headers to the reply. pub fn accept_hdr_async(stream: S, callback: C) -> AcceptAsync +where + S: AsyncRead + AsyncWrite, + C: Callback, +{ + accept_hdr_async_with_config(stream, callback, None) +} + +/// The same as `accept_hdr_async()` but the one can specify a websocket configuration. +/// Please refer to `accept_hdr_async()` for more details. +pub fn accept_hdr_async_with_config( + stream: S, + callback: C, + config: Option, +) -> AcceptAsync where S: AsyncRead + AsyncWrite, C: Callback, { AcceptAsync { inner: MidHandshake { - inner: Some(server::accept_hdr(stream, callback)) + inner: Some(server::accept_hdr_with_config(stream, callback, config)) } } } @@ -116,15 +163,24 @@ pub struct WebSocketStream { impl WebSocketStream { /// Convert a raw socket into a WebSocketStream without performing a /// handshake. - pub fn from_raw_socket(stream: S, role: Role) -> Self { - let ws = WebSocket::from_raw_socket(stream, role); + pub fn from_raw_socket( + stream: S, + role: Role, + config: Option, + ) -> Self { + let ws = WebSocket::from_raw_socket(stream, role, config); WebSocketStream { inner: ws } } /// Convert a raw socket into a WebSocketStream without performing a /// handshake. - pub fn from_partially_read(stream: S, part: Vec, role: Role) -> Self { - let ws = WebSocket::from_partially_read(stream, part, role); + pub fn from_partially_read( + stream: S, + part: Vec, + role: Role, + config: Option, + ) -> Self { + let ws = WebSocket::from_partially_read(stream, part, role, config); WebSocketStream { inner: ws } } } @@ -143,8 +199,7 @@ impl Sink for WebSocketStream where T: AsyncRead + AsyncWrite { type SinkError = WsError; fn start_send(&mut self, item: Message) -> StartSend { - try!(self.inner.write_message(item).to_async()); - Ok(AsyncSink::Ready) + self.inner.write_message(item).to_start_send() } fn poll_complete(&mut self) -> Poll<(), WsError> { @@ -238,3 +293,24 @@ impl ToAsync for Result { } } +trait ToStartSend { + type T; + type E; + fn to_start_send(self) -> StartSend; +} + +impl ToStartSend for Result<(), WsError> { + type T = Message; + type E = WsError; + fn to_start_send(self) -> StartSend { + match self { + Ok(_) => Ok(AsyncSink::Ready), + Err(error) => match error { + WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::Ready), + WsError::SendQueueFull(msg) => Ok(AsyncSink::NotReady(msg)), + err => Err(err), + } + } + } +} +