From 3a994e6e3b7f3641548a90278a14ed8c05703bdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 31 Jan 2020 17:17:13 +0200 Subject: [PATCH] Update to tungstenite 0.10 Partially based on tungstenite commits - 46dfd9ed3ee75b0261e9f5f71c8e70492407248b by Alexey Galakhov - 31010fd636b3edc683199e3182ea34d799118d5b by Alexey Galakhov --- Cargo.toml | 4 +-- examples/async-std-echo.rs | 4 +-- examples/autobahn-client.rs | 23 ++++----------- examples/client.rs | 14 ++++----- examples/gio-echo.rs | 2 +- examples/tokio-echo.rs | 4 +-- src/async_std.rs | 58 +++++++++++++++++------------------- src/async_tls.rs | 18 +++++------ src/gio.rs | 20 +++++-------- src/lib.rs | 38 ++++++++++++++++++++---- src/tokio.rs | 59 ++++++++++++++++--------------------- tests/communication.rs | 2 +- 12 files changed, 122 insertions(+), 124 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 72ce465..a3b7431 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT" homepage = "https://github.com/sdroege/async-tungstenite" repository = "https://github.com/sdroege/async-tungstenite" documentation = "https://docs.rs/async-tungstenite" -version = "0.3.1" +version = "0.4.0" edition = "2018" readme = "README.md" @@ -30,7 +30,7 @@ futures = "0.3" pin-project = "0.4" [dependencies.tungstenite] -version = "0.9.2" +version = "0.10.0" default-features = false [dependencies.async-std] diff --git a/examples/async-std-echo.rs b/examples/async-std-echo.rs index 7119900..e08e5cb 100644 --- a/examples/async-std-echo.rs +++ b/examples/async-std-echo.rs @@ -5,9 +5,9 @@ use async_std::task; async fn run() -> Result<(), Box> { #[cfg(any(feature = "async-tls", feature = "async-native-tls"))] - let url = url::Url::parse("wss://echo.websocket.org").unwrap(); + let url = "wss://echo.websocket.org"; #[cfg(not(any(feature = "async-tls", feature = "async-native-tls")))] - let url = url::Url::parse("ws://echo.websocket.org").unwrap(); + let url = "ws://echo.websocket.org"; let (mut ws_stream, _) = connect_async(url).await?; diff --git a/examples/autobahn-client.rs b/examples/autobahn-client.rs index 1110e1d..ed6c35b 100644 --- a/examples/autobahn-client.rs +++ b/examples/autobahn-client.rs @@ -1,15 +1,11 @@ use async_tungstenite::{async_std::connect_async, tungstenite::Error, tungstenite::Result}; use futures::{SinkExt, StreamExt}; use log::*; -use url::Url; const AGENT: &str = "Tungstenite"; async fn get_case_count() -> Result { - let (mut socket, _) = connect_async( - Url::parse("ws://localhost:9001/getCaseCount").expect("Can't connect to case count URL"), - ) - .await?; + let (mut socket, _) = connect_async("ws://localhost:9001/getCaseCount").await?; let msg = socket.next().await.expect("Can't fetch case count")?; socket.close(None).await?; Ok(msg @@ -19,13 +15,10 @@ async fn get_case_count() -> Result { } async fn update_reports() -> Result<()> { - let (mut socket, _) = connect_async( - Url::parse(&format!( - "ws://localhost:9001/updateReports?agent={}", - AGENT - )) - .expect("Can't update reports"), - ) + let (mut socket, _) = connect_async(&format!( + "ws://localhost:9001/updateReports?agent={}", + AGENT + )) .await?; socket.close(None).await?; Ok(()) @@ -33,11 +26,7 @@ async fn update_reports() -> Result<()> { async fn run_test(case: u32) -> Result<()> { info!("Running test case {}", case); - let case_url = Url::parse(&format!( - "ws://localhost:9001/runCase?case={}&agent={}", - case, AGENT - )) - .expect("Bad testcase URL"); + let case_url = &format!("ws://localhost:9001/runCase?case={}&agent={}", case, AGENT); let (mut ws_stream, _) = connect_async(case_url).await?; while let Some(msg) = ws_stream.next().await { diff --git a/examples/client.rs b/examples/client.rs index 737188b..2a28e42 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -25,23 +25,21 @@ async fn run() { .nth(1) .unwrap_or_else(|| panic!("this program requires at least one argument")); - let url = url::Url::parse(&connect_addr).unwrap(); - let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded(); task::spawn(read_stdin(stdin_tx)); - let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); + let (ws_stream, _) = connect_async(&connect_addr) + .await + .expect("Failed to connect"); println!("WebSocket handshake has been successfully completed"); let (write, read) = ws_stream.split(); let stdin_to_ws = stdin_rx.map(Ok).forward(write); let ws_to_stdout = { - read.for_each(|message| { - async { - let data = message.unwrap().into_data(); - async_std::io::stdout().write_all(&data).await.unwrap(); - } + read.for_each(|message| async { + let data = message.unwrap().into_data(); + async_std::io::stdout().write_all(&data).await.unwrap(); }) }; diff --git a/examples/gio-echo.rs b/examples/gio-echo.rs index 7c9d5e9..bcc5399 100644 --- a/examples/gio-echo.rs +++ b/examples/gio-echo.rs @@ -2,7 +2,7 @@ use async_tungstenite::{gio::connect_async, tungstenite::Message}; use futures::prelude::*; async fn run() -> Result<(), Box> { - let url = url::Url::parse("wss://echo.websocket.org").unwrap(); + let url = "wss://echo.websocket.org"; let (mut ws_stream, _) = connect_async(url).await?; diff --git a/examples/tokio-echo.rs b/examples/tokio-echo.rs index 9ab022c..d3b7a43 100644 --- a/examples/tokio-echo.rs +++ b/examples/tokio-echo.rs @@ -3,9 +3,9 @@ use futures::prelude::*; async fn run() -> Result<(), Box> { #[cfg(any(feature = "async-tls", feature = "tokio-tls"))] - let url = url::Url::parse("wss://echo.websocket.org").unwrap(); + let url = "wss://echo.websocket.org"; #[cfg(not(any(feature = "async-tls", feature = "tokio-tls")))] - let url = url::Url::parse("ws://echo.websocket.org").unwrap(); + let url = "ws://echo.websocket.org"; let (mut ws_stream, _) = connect_async(url).await?; diff --git a/src/async_std.rs b/src/async_std.rs index 54b1cbd..39fef94 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -1,11 +1,12 @@ //! `async-std` integration. -use tungstenite::handshake::client::Response; +use tungstenite::client::IntoClientRequest; +use tungstenite::handshake::client::{Request, Response}; use tungstenite::protocol::WebSocketConfig; use tungstenite::Error; use async_std::net::TcpStream; -use super::{domain, Request, WebSocketStream}; +use super::{domain, port, WebSocketStream}; #[cfg(feature = "async-native-tls")] use futures::io::{AsyncRead, AsyncWrite}; @@ -16,7 +17,8 @@ pub(crate) mod async_native_tls { use async_native_tls::TlsStream; use real_async_native_tls as async_native_tls; - use tungstenite::client::url_mode; + use tungstenite::client::uri_mode; + use tungstenite::handshake::client::Request; use tungstenite::stream::Mode; use tungstenite::Error; @@ -24,7 +26,8 @@ pub(crate) mod async_native_tls { use crate::stream::Stream as StreamSwitcher; use crate::{ - client_async_with_config, domain, Request, Response, WebSocketConfig, WebSocketStream, + client_async_with_config, domain, IntoClientRequest, Response, WebSocketConfig, + WebSocketStream, }; /// A stream that might be protected with TLS. @@ -72,16 +75,16 @@ pub(crate) mod async_native_tls { config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; // Make sure we check domain and mode first. URL must be valid. - let mode = url_mode(&request.url)?; + let mode = uri_mode(request.uri())?; let stream = wrap_stream(stream, domain, connector, mode).await?; client_async_with_config(request, stream, config).await @@ -92,13 +95,12 @@ pub(crate) mod async_native_tls { pub(crate) mod dummy_tls { use futures::io::{AsyncRead, AsyncWrite}; - use tungstenite::client::url_mode; + use tungstenite::client::{uri_mode, IntoClientRequest}; + use tungstenite::handshake::client::Request; use tungstenite::stream::Mode; use tungstenite::Error; - use crate::{ - client_async_with_config, domain, Request, Response, WebSocketConfig, WebSocketStream, - }; + use crate::{client_async_with_config, domain, Response, WebSocketConfig, WebSocketStream}; pub type AutoStream = S; type Connector = (); @@ -125,16 +127,16 @@ pub(crate) mod dummy_tls { config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; // Make sure we check domain and mode first. URL must be valid. - let mode = url_mode(&request.url)?; + let mode = uri_mode(request.uri())?; let stream = wrap_stream(stream, domain, connector, mode).await?; client_async_with_config(request, stream, config).await @@ -160,7 +162,7 @@ pub async fn client_async_tls( stream: S, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -177,7 +179,7 @@ pub async fn client_async_tls_with_config( config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -194,7 +196,7 @@ pub async fn client_async_tls_with_connector( connector: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -206,7 +208,7 @@ pub async fn connect_async( request: R, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { connect_async_with_config(request, None).await } @@ -217,15 +219,12 @@ pub async fn connect_async_with_config( config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; - let port = request - .url - .port_or_known_default() - .expect("Bug: port unknown"); + let port = port(&request)?; let try_socket = TcpStream::connect((domain.as_str(), port)).await; let socket = try_socket.map_err(Error::Io)?; @@ -239,7 +238,7 @@ pub async fn connect_async_with_tls_connector( connector: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { connect_async_with_tls_connector_and_config(request, connector, None).await } @@ -252,15 +251,12 @@ pub async fn connect_async_with_tls_connector_and_config( config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; - let port = request - .url - .port_or_known_default() - .expect("Bug: port unknown"); + let port = port(&request)?; let try_socket = TcpStream::connect((domain.as_str(), port)).await; let socket = try_socket.map_err(Error::Io)?; diff --git a/src/async_tls.rs b/src/async_tls.rs index 8d35622..a81f52a 100644 --- a/src/async_tls.rs +++ b/src/async_tls.rs @@ -1,12 +1,12 @@ //! `async-tls` integration. -use tungstenite::client::url_mode; -use tungstenite::handshake::client::Response; +use tungstenite::client::{uri_mode, IntoClientRequest}; +use tungstenite::handshake::client::{Request, Response}; use tungstenite::protocol::WebSocketConfig; use tungstenite::Error; use futures::io::{AsyncRead, AsyncWrite}; -use super::{client_async_with_config, Request, WebSocketStream}; +use super::{client_async_with_config, WebSocketStream}; use async_tls::client::TlsStream; use async_tls::TlsConnector as AsyncTlsConnector; @@ -49,7 +49,7 @@ pub async fn client_async_tls( stream: S, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -65,7 +65,7 @@ pub async fn client_async_tls_with_config( config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -81,7 +81,7 @@ pub async fn client_async_tls_with_connector( connector: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -98,16 +98,16 @@ pub async fn client_async_tls_with_connector_and_config( config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; // Make sure we check domain and mode first. URL must be valid. - let mode = url_mode(&request.url)?; + let mode = uri_mode(request.uri())?; let stream = wrap_stream(stream, domain, connector, mode).await?; client_async_with_config(request, stream, config).await diff --git a/src/gio.rs b/src/gio.rs index 6bf6a27..cd1bb5d 100644 --- a/src/gio.rs +++ b/src/gio.rs @@ -7,12 +7,11 @@ use gio::prelude::*; use futures::io::{AsyncRead, AsyncWrite}; -use tungstenite::client::url_mode; +use tungstenite::client::{uri_mode, IntoClientRequest}; +use tungstenite::handshake::client::Request; use tungstenite::stream::Mode; -use crate::{ - client_async_with_config, domain, Request, Response, WebSocketConfig, WebSocketStream, -}; +use crate::{client_async_with_config, domain, port, Response, WebSocketConfig, WebSocketStream}; type MaybeTlsStream = IOStreamAsyncReadWrite; @@ -21,7 +20,7 @@ pub async fn connect_async( request: R, ) -> Result<(WebSocketStream, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { connect_async_with_config(request, None).await } @@ -32,20 +31,17 @@ pub async fn connect_async_with_config( config: Option, ) -> Result<(WebSocketStream, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; - let port = request - .url - .port_or_known_default() - .expect("Bug: port unknown"); + let port = port(&request)?; let client = gio::SocketClient::new(); // Make sure we check domain and mode first. URL must be valid. - let mode = url_mode(&request.url)?; + let mode = uri_mode(request.uri())?; if let Mode::Tls = mode { client.set_tls(true); } else { diff --git a/src/lib.rs b/src/lib.rs index 9e24792..e62a18e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,9 +54,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tungstenite::{ + client::IntoClientRequest, error::Error as WsError, handshake::{ - client::{ClientHandshake, Request, Response}, + client::{ClientHandshake, Response}, server::{Callback, NoCallback}, }, protocol::{Message, Role, WebSocket, WebSocketConfig}, @@ -92,7 +93,7 @@ pub async fn client_async<'a, R, S>( stream: S, ) -> Result<(WebSocketStream, Response), WsError> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: AsyncRead + AsyncWrite + Unpin, { client_async_with_config(request, stream, None).await @@ -106,11 +107,12 @@ pub async fn client_async_with_config<'a, R, S>( config: Option, ) -> Result<(WebSocketStream, Response), WsError> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: AsyncRead + AsyncWrite + Unpin, { let f = handshake::client_handshake(stream, move |allow_std| { - let cli_handshake = ClientHandshake::start(allow_std, request.into(), config); + let request = request.into_client_request()?; + let cli_handshake = ClientHandshake::start(allow_std, request, config)?; cli_handshake.handshake() }); f.await.map_err(|e| { @@ -346,9 +348,33 @@ where ))] /// Get a domain from an URL. #[inline] -pub(crate) fn domain(request: &Request) -> Result { - match request.url.host_str() { +pub(crate) fn domain( + request: &tungstenite::handshake::client::Request, +) -> Result { + match request.uri().host() { Some(d) => Ok(d.to_string()), None => Err(tungstenite::Error::Url("no host name in the url".into())), } } + +#[cfg(any( + feature = "async-tls", + feature = "async-std-runtime", + feature = "tokio-runtime", + feature = "gio-runtime" +))] +/// Get the port from an URL. +#[inline] +pub(crate) fn port( + request: &tungstenite::handshake::client::Request, +) -> Result { + request + .uri() + .port_u16() + .or_else(|| match request.uri().scheme_str() { + Some("wss") => Some(443), + Some("ws") => Some(80), + _ => None, + }) + .ok_or(tungstenite::Error::Url("Url scheme not supported".into())) +} diff --git a/src/tokio.rs b/src/tokio.rs index a6aa783..69bef39 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -1,11 +1,12 @@ //! `tokio` integration. -use tungstenite::handshake::client::Response; +use tungstenite::client::IntoClientRequest; +use tungstenite::handshake::client::{Request, Response}; use tungstenite::protocol::WebSocketConfig; use tungstenite::Error; use tokio::net::TcpStream; -use super::{domain, Request, WebSocketStream}; +use super::{domain, port, WebSocketStream}; use futures::io::{AsyncRead, AsyncWrite}; @@ -14,16 +15,15 @@ pub(crate) mod tokio_tls { use real_tokio_tls::TlsConnector as AsyncTlsConnector; use real_tokio_tls::TlsStream; - use tungstenite::client::url_mode; + use tungstenite::client::{uri_mode, IntoClientRequest}; + use tungstenite::handshake::client::Request; use tungstenite::stream::Mode; use tungstenite::Error; use futures::io::{AsyncRead, AsyncWrite}; use crate::stream::Stream as StreamSwitcher; - use crate::{ - client_async_with_config, domain, Request, Response, WebSocketConfig, WebSocketStream, - }; + use crate::{client_async_with_config, domain, Response, WebSocketConfig, WebSocketStream}; use super::TokioAdapter; @@ -73,16 +73,16 @@ pub(crate) mod tokio_tls { config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; // Make sure we check domain and mode first. URL must be valid. - let mode = url_mode(&request.url)?; + let mode = uri_mode(request.uri())?; let stream = wrap_stream(stream, domain, connector, mode).await?; client_async_with_config(request, stream, config).await @@ -93,13 +93,12 @@ pub(crate) mod tokio_tls { pub(crate) mod dummy_tls { use futures::io::{AsyncRead, AsyncWrite}; - use tungstenite::client::url_mode; + use tungstenite::client::{uri_mode, IntoClientRequest}; + use tungstenite::handshake::client::Request; use tungstenite::stream::Mode; use tungstenite::Error; - use crate::{ - client_async_with_config, domain, Request, Response, WebSocketConfig, WebSocketStream, - }; + use crate::{client_async_with_config, domain, Response, WebSocketConfig, WebSocketStream}; pub type AutoStream = S; type Connector = (); @@ -126,16 +125,16 @@ pub(crate) mod dummy_tls { config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; // Make sure we check domain and mode first. URL must be valid. - let mode = url_mode(&request.url)?; + let mode = uri_mode(request.uri())?; let stream = wrap_stream(stream, domain, connector, mode).await?; client_async_with_config(request, stream, config).await @@ -161,7 +160,7 @@ pub async fn client_async_tls( stream: S, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -178,7 +177,7 @@ pub async fn client_async_tls_with_config( config: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -195,7 +194,7 @@ pub async fn client_async_tls_with_connector( connector: Option, ) -> Result<(WebSocketStream>, Response), Error> where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, S: 'static + AsyncRead + AsyncWrite + Unpin, AutoStream: Unpin, { @@ -213,7 +212,7 @@ pub async fn connect_async( Error, > where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { connect_async_with_config(request, None).await } @@ -230,15 +229,12 @@ pub async fn connect_async_with_config( Error, > where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; - let port = request - .url - .port_or_known_default() - .expect("Bug: port unknown"); + let port = port(&request)?; let try_socket = TcpStream::connect((domain.as_str(), port)).await; let socket = try_socket.map_err(Error::Io)?; @@ -258,7 +254,7 @@ pub async fn connect_async_with_tls_connector( Error, > where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { connect_async_with_tls_connector_and_config(request, connector, None).await } @@ -277,15 +273,12 @@ pub async fn connect_async_with_tls_connector_and_config( Error, > where - R: Into> + Unpin, + R: IntoClientRequest + Unpin, { - let request: Request = request.into(); + let request: Request = request.into_client_request()?; let domain = domain(&request)?; - let port = request - .url - .port_or_known_default() - .expect("Bug: port unknown"); + let port = port(&request)?; let try_socket = TcpStream::connect((domain.as_str(), port)).await; let socket = try_socket.map_err(Error::Io)?; diff --git a/tests/communication.rs b/tests/communication.rs index 010f6cc..d0b05f8 100644 --- a/tests/communication.rs +++ b/tests/communication.rs @@ -48,7 +48,7 @@ async fn communication() { let tcp = TcpStream::connect("0.0.0.0:12345") .await .expect("Failed to connect"); - let url = url::Url::parse("ws://localhost:12345/").unwrap(); + let url = "ws://localhost:12345/"; let (mut stream, _) = client_async(url, tcp) .await .expect("Client failed to connect");