diff --git a/examples/autobahn-client.rs b/examples/autobahn-client.rs index 3e641c4..a536f0e 100644 --- a/examples/autobahn-client.rs +++ b/examples/autobahn-client.rs @@ -1,6 +1,6 @@ +use async_tungstenite::{connect_async, tungstenite::Result}; use futures::StreamExt; use log::*; -use async_tungstenite::{connect_async, tungstenite::Result}; use url::Url; const AGENT: &'static str = "Tungstenite"; diff --git a/examples/autobahn-server.rs b/examples/autobahn-server.rs index e2f2169..8ccbf10 100644 --- a/examples/autobahn-server.rs +++ b/examples/autobahn-server.rs @@ -1,7 +1,7 @@ +use async_std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; +use async_tungstenite::accept_async; use futures::StreamExt; use log::*; -use async_std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs}; -use async_tungstenite::accept_async; async fn accept_connection(peer: SocketAddr, stream: TcpStream) { let mut ws_stream = accept_async(stream).await.expect("Failed to accept"); diff --git a/examples/client.rs b/examples/client.rs index 5700dbf..f0351ee 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -16,8 +16,8 @@ use futures::StreamExt; use log::*; use tungstenite::protocol::Message; -use async_std::prelude::*; use async_std::io; +use async_std::prelude::*; use async_std::task; use async_tungstenite::connect_async; diff --git a/examples/server.rs b/examples/server.rs index 983c648..c38e141 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -20,12 +20,12 @@ use std::env; use std::io::Error; +use async_std::net::{SocketAddr, ToSocketAddrs}; +use async_std::net::{TcpListener, TcpStream}; +use async_std::task; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::StreamExt; use log::*; -use async_std::task; -use async_std::net::{SocketAddr, ToSocketAddrs}; -use async_std::net::{TcpListener, TcpStream}; use tungstenite::protocol::Message; struct Connection { diff --git a/examples/split-client.rs b/examples/split-client.rs index 969e111..91a27f6 100644 --- a/examples/split-client.rs +++ b/examples/split-client.rs @@ -12,13 +12,13 @@ use std::env; -use futures::{SinkExt, StreamExt}; -use log::*; -use tungstenite::protocol::Message; -use async_std::prelude::*; use async_std::io; +use async_std::prelude::*; use async_std::task; use async_tungstenite::connect_async; +use futures::{SinkExt, StreamExt}; +use log::*; +use tungstenite::protocol::Message; async fn run() { let _ = env_logger::try_init(); diff --git a/src/compat.rs b/src/compat.rs index 7cc5574..e6ad9cc 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -46,7 +46,7 @@ where unsafe { if !self.context.0 { //was called by start_send without context - return Poll::Pending + return Poll::Pending; } assert!(!self.context.1.is_null()); let waker = &mut *(self.context.1 as *mut _); diff --git a/src/connect.rs b/src/connect.rs index e339902..56187d9 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -9,8 +9,8 @@ use super::{client_async, Request, WebSocketStream}; #[cfg(feature = "tls")] pub(crate) mod encryption { - use async_tls::TlsConnector as AsyncTlsConnector; use async_tls::client::TlsStream; + use async_tls::TlsConnector as AsyncTlsConnector; use tungstenite::stream::Mode; use tungstenite::Error; @@ -51,8 +51,8 @@ pub use self::encryption::MaybeTlsStream; #[cfg(not(feature = "tls"))] pub(crate) mod encryption { - use futures::{future, Future}; use futures::io::{AsyncRead, AsyncWrite}; + use futures::{future, Future}; use tungstenite::stream::Mode; use tungstenite::Error; diff --git a/src/handshake.rs b/src/handshake.rs index 05d0f72..7d517ab 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -1,12 +1,12 @@ use crate::compat::{AllowStd, HasContext}; use crate::WebSocketStream; +use futures::io::{AsyncRead, AsyncWrite}; use log::*; use pin_project::pin_project; use std::future::Future; use std::io::{Read, Write}; use std::pin::Pin; use std::task::{Context, Poll}; -use futures::io::{AsyncRead, AsyncWrite}; use tungstenite::handshake::client::Response; use tungstenite::handshake::server::Callback; use tungstenite::handshake::{HandshakeError as Error, HandshakeRole, MidHandshake as WsHandshake}; @@ -165,7 +165,9 @@ where let machine = s.get_mut(); trace!("Setting context in handshake"); - machine.get_mut().set_context((true, cx as *mut _ as *mut ())); + machine + .get_mut() + .set_context((true, cx as *mut _ as *mut ())); match s.handshake() { Ok(stream) => Poll::Ready(Ok(stream)), diff --git a/src/lib.rs b/src/lib.rs index 07cdf85..2cbb5b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,13 +28,13 @@ pub mod stream; use std::io::{Read, Write}; use compat::{cvt, AllowStd}; -use futures::{Stream, Sink}; +use futures::io::{AsyncRead, AsyncWrite}; +use futures::{Sink, Stream}; use log::*; use pin_project::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use futures::io::{AsyncRead, AsyncWrite}; use tungstenite::{ error::Error as WsError, @@ -297,9 +297,9 @@ where } impl Sink for WebSocketStream - where - T: AsyncRead + AsyncWrite + Unpin, - AllowStd: Read + Write, +where + T: AsyncRead + AsyncWrite + Unpin, + AllowStd: Read + Write, { type Error = WsError; @@ -310,7 +310,9 @@ impl Sink for WebSocketStream fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { match (*self).with_context(None, |s| s.write_message(item)) { Ok(()) => Ok(()), - Err(::tungstenite::Error::Io(ref err)) if err.kind() == std::io::ErrorKind::WouldBlock => { + Err(::tungstenite::Error::Io(ref err)) + if err.kind() == std::io::ErrorKind::WouldBlock => + { // the message was accepted and queued // isn't an error. Ok(()) @@ -354,7 +356,10 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let message = this.message.take().expect("Cannot poll twice"); - Poll::Ready(this.stream.with_context(Some(cx), |s| s.write_message(message))) + Poll::Ready( + this.stream + .with_context(Some(cx), |s| s.write_message(message)), + ) } } @@ -381,11 +386,11 @@ where #[cfg(test)] mod tests { use crate::compat::AllowStd; - #[cfg(feature="connect")] + #[cfg(feature = "connect")] use crate::connect::encryption::AutoStream; use crate::WebSocketStream; - use std::io::{Read, Write}; use futures::io::{AsyncReadExt, AsyncWriteExt}; + use std::io::{Read, Write}; fn is_read() {} fn is_write() {} @@ -398,13 +403,13 @@ mod tests { is_read::>(); is_write::>(); - #[cfg(feature="connect")] + #[cfg(feature = "connect")] is_async_read::>(); - #[cfg(feature="connect")] + #[cfg(feature = "connect")] is_async_write::>(); is_unpin::>(); - #[cfg(feature="connect")] + #[cfg(feature = "connect")] is_unpin::>>(); } } diff --git a/tests/communication.rs b/tests/communication.rs index 5708438..cd1eebf 100644 --- a/tests/communication.rs +++ b/tests/communication.rs @@ -1,8 +1,8 @@ -use futures::{SinkExt, StreamExt, AsyncRead, AsyncWrite}; -use log::*; -use async_std::task; use async_std::net::{TcpListener, TcpStream, ToSocketAddrs}; +use async_std::task; use async_tungstenite::{accept_async, client_async, WebSocketStream}; +use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; +use log::*; use tungstenite::Message; async fn run_connection( @@ -67,7 +67,10 @@ async fn communication() { for i in 1..10 { info!("Sending message"); - stream.send(Message::Text(format!("{}", i))).await.expect("Failed to send message"); + stream + .send(Message::Text(format!("{}", i))) + .await + .expect("Failed to send message"); } stream.close(None).await.expect("Failed to close"); @@ -123,7 +126,9 @@ async fn split_communication() { for i in 1..10 { info!("Sending message"); - tx.send(Message::Text(format!("{}", i))).await.expect("Failed to send message"); + tx.send(Message::Text(format!("{}", i))) + .await + .expect("Failed to send message"); } tx.close().await.expect("Failed to close"); diff --git a/tests/handshakes.rs b/tests/handshakes.rs index a06b789..e29b58c 100644 --- a/tests/handshakes.rs +++ b/tests/handshakes.rs @@ -1,5 +1,5 @@ -use async_std::task; use async_std::net::{TcpListener, TcpStream, ToSocketAddrs}; +use async_std::task; use async_tungstenite::{accept_async, client_async}; #[async_std::test]