connect support timout

pull/117/head
error.d 5 years ago
parent 1ef18d54ad
commit 770b8f3582
  1. 27
      src/client.rs
  2. 2
      tests/connection_reset.rs
  3. 2
      tests/no_send_after_close.rs
  4. 2
      tests/receive_after_init_close.rs

@ -9,6 +9,8 @@ use log::*;
use url::Url; use url::Url;
use std::time::Duration;
use crate::handshake::client::{Request, Response}; use crate::handshake::client::{Request, Response};
use crate::protocol::WebSocketConfig; use crate::protocol::WebSocketConfig;
@ -88,6 +90,7 @@ use crate::stream::{Mode, NoDelay};
/// `connect` since it's the only function that uses native_tls. /// `connect` since it's the only function that uses native_tls.
pub fn connect_with_config<Req: IntoClientRequest>( pub fn connect_with_config<Req: IntoClientRequest>(
request: Req, request: Req,
timeout: Option<Duration>,
config: Option<WebSocketConfig>, config: Option<WebSocketConfig>,
) -> Result<(WebSocket<AutoStream>, Response)> { ) -> Result<(WebSocket<AutoStream>, Response)> {
let request: Request = request.into_client_request()?; let request: Request = request.into_client_request()?;
@ -102,7 +105,7 @@ pub fn connect_with_config<Req: IntoClientRequest>(
Mode::Tls => 443, Mode::Tls => 443,
}); });
let addrs = (host, port).to_socket_addrs()?; let addrs = (host, port).to_socket_addrs()?;
let mut stream = connect_to_some(addrs.as_slice(), &request.uri(), mode)?; let mut stream = connect_to_some(addrs.as_slice(), &request.uri(), mode, timeout)?;
NoDelay::set_nodelay(&mut stream, true)?; NoDelay::set_nodelay(&mut stream, true)?;
client_with_config(request, stream, config).map_err(|e| match e { client_with_config(request, stream, config).map_err(|e| match e {
HandshakeError::Failure(f) => f, HandshakeError::Failure(f) => f,
@ -122,19 +125,29 @@ pub fn connect_with_config<Req: IntoClientRequest>(
/// This function uses `native_tls` to do TLS. If you want to use other TLS libraries, /// This function uses `native_tls` to do TLS. If you want to use other TLS libraries,
/// use `client` instead. There is no need to enable the "tls" feature if you don't call /// use `client` instead. There is no need to enable the "tls" feature if you don't call
/// `connect` since it's the only function that uses native_tls. /// `connect` since it's the only function that uses native_tls.
pub fn connect<Req: IntoClientRequest>(request: Req) -> Result<(WebSocket<AutoStream>, Response)> { pub fn connect<Req: IntoClientRequest>(request: Req, timeout: Option<Duration>) -> Result<(WebSocket<AutoStream>, Response)> {
connect_with_config(request, None) connect_with_config(request, timeout, None)
} }
fn connect_to_some(addrs: &[SocketAddr], uri: &Uri, mode: Mode) -> Result<AutoStream> { fn connect_to_some(addrs: &[SocketAddr], uri: &Uri, mode: Mode, timeout: Option<Duration>) -> Result<AutoStream> {
let domain = uri let domain = uri
.host() .host()
.ok_or_else(|| Error::Url("No host name in the URL".into()))?; .ok_or_else(|| Error::Url("No host name in the URL".into()))?;
for addr in addrs { for addr in addrs {
debug!("Trying to contact {} at {}...", uri, addr); debug!("Trying to contact {} at {}...", uri, addr);
if let Ok(raw_stream) = TcpStream::connect(addr) { let raw_stream = if let Some(timeout) = timeout {
if let Ok(stream) = wrap_stream(raw_stream, domain, mode) { TcpStream::connect_timeout(addr, timeout)
return Ok(stream); } else {
TcpStream::connect(addr)
};
if let Err(err) = raw_stream {
debug!("connect {} at {} error: {:?}", uri, addr, err);
} else {
let stream = wrap_stream(raw_stream.unwrap(), domain, mode);
if let Err(err) = stream {
debug!("warp_stream error: {:?}", err);
} else {
return Ok(stream.unwrap());
} }
} }
} }

@ -30,7 +30,7 @@ where
.expect("Can't listen, is port already in use?"); .expect("Can't listen, is port already in use?");
let client_thread = spawn(move || { let client_thread = spawn(move || {
let (client, _) = connect(Url::parse(&format!("ws://localhost:{}/socket", port)).unwrap()) let (client, _) = connect(Url::parse(&format!("ws://localhost:{}/socket", port)).unwrap(), None)
.expect("Can't connect to port"); .expect("Can't connect to port");
client_task(client); client_task(client);

@ -22,7 +22,7 @@ fn test_no_send_after_close() {
let server = TcpListener::bind("127.0.0.1:3013").unwrap(); let server = TcpListener::bind("127.0.0.1:3013").unwrap();
let client_thread = spawn(move || { let client_thread = spawn(move || {
let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap()).unwrap(); let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap(), None).unwrap();
let message = client.read_message().unwrap(); // receive close from server let message = client.read_message().unwrap(); // receive close from server
assert!(message.is_close()); assert!(message.is_close());

@ -22,7 +22,7 @@ fn test_receive_after_init_close() {
let server = TcpListener::bind("127.0.0.1:3013").unwrap(); let server = TcpListener::bind("127.0.0.1:3013").unwrap();
let client_thread = spawn(move || { let client_thread = spawn(move || {
let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap()).unwrap(); let (mut client, _) = connect(Url::parse("ws://localhost:3013/socket").unwrap(), None).unwrap();
client client
.write_message(Message::Text("Hello WebSocket".into())) .write_message(Message::Text("Hello WebSocket".into()))

Loading…
Cancel
Save