use crate::compat::AllowStd; #[cfg(feature = "handshake")] use crate::compat::SetWaker; use crate::WebSocketStream; use futures_io::{AsyncRead, AsyncWrite}; #[allow(unused_imports)] use log::*; use ng_tungstenite::WebSocket; #[cfg(feature = "handshake")] use ng_tungstenite::{ handshake::{ client::Response, server::Callback, HandshakeError as Error, HandshakeRole, MidHandshake as WsHandshake, }, ClientHandshake, ServerHandshake, }; use std::future::Future; use std::io::{Read, Write}; use std::pin::Pin; use std::task::{Context, Poll}; pub(crate) async fn without_handshake(stream: S, f: F) -> WebSocketStream where F: FnOnce(AllowStd) -> WebSocket> + Unpin, S: AsyncRead + AsyncWrite + Unpin, { let start = SkippedHandshakeFuture(Some(SkippedHandshakeFutureInner { f, stream })); let ws = start.await; WebSocketStream::new(ws) } struct SkippedHandshakeFuture(Option>); struct SkippedHandshakeFutureInner { f: F, stream: S, } impl Future for SkippedHandshakeFuture where F: FnOnce(AllowStd) -> WebSocket> + Unpin, S: Unpin, AllowStd: Read + Write, { type Output = WebSocket>; fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { let inner = self .get_mut() .0 .take() .expect("future polled after completion"); #[cfg(feature = "verbose-logging")] trace!("Setting context when skipping handshake"); let stream = AllowStd::new(inner.stream, ctx.waker()); Poll::Ready((inner.f)(stream)) } } #[cfg(feature = "handshake")] struct MidHandshake(Option>); #[cfg(feature = "handshake")] enum StartedHandshake { Done(Role::FinalResult), Mid(WsHandshake), } #[cfg(feature = "handshake")] struct StartedHandshakeFuture(Option>); #[cfg(feature = "handshake")] struct StartedHandshakeFutureInner { f: F, stream: S, } #[cfg(feature = "handshake")] async fn handshake(stream: S, f: F) -> Result> where Role: HandshakeRole + Unpin, Role::InternalStream: SetWaker + Unpin, F: FnOnce(AllowStd) -> Result> + Unpin, S: AsyncRead + AsyncWrite + Unpin, { let start = StartedHandshakeFuture(Some(StartedHandshakeFutureInner { f, stream })); match start.await? { StartedHandshake::Done(r) => Ok(r), StartedHandshake::Mid(s) => { let res: Result> = MidHandshake::(Some(s)).await; res } } } #[cfg(feature = "handshake")] pub(crate) async fn client_handshake( stream: S, f: F, ) -> Result<(WebSocketStream, Response), Error>>> where F: FnOnce( AllowStd, ) -> Result< > as HandshakeRole>::FinalResult, Error>>, > + Unpin, S: AsyncRead + AsyncWrite + Unpin, { let result = handshake(stream, f).await?; let (s, r) = result; Ok((WebSocketStream::new(s), r)) } #[cfg(feature = "handshake")] pub(crate) async fn server_handshake( stream: S, f: F, ) -> Result, Error, C>>> where C: Callback + Unpin, F: FnOnce( AllowStd, ) -> Result< , C> as HandshakeRole>::FinalResult, Error, C>>, > + Unpin, S: AsyncRead + AsyncWrite + Unpin, { let s: WebSocket> = handshake(stream, f).await?; Ok(WebSocketStream::new(s)) } #[cfg(feature = "handshake")] impl Future for StartedHandshakeFuture where Role: HandshakeRole, Role::InternalStream: SetWaker, F: FnOnce(AllowStd) -> Result> + Unpin, S: Unpin, AllowStd: Read + Write, { type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { let inner = self.0.take().expect("future polled after completion"); #[cfg(feature = "verbose-logging")] trace!("Setting ctx when starting handshake"); let stream = AllowStd::new(inner.stream, ctx.waker()); match (inner.f)(stream) { Ok(r) => Poll::Ready(Ok(StartedHandshake::Done(r))), Err(Error::Interrupted(mid)) => Poll::Ready(Ok(StartedHandshake::Mid(mid))), Err(Error::Failure(e)) => Poll::Ready(Err(Error::Failure(e))), } } } #[cfg(feature = "handshake")] impl Future for MidHandshake where Role: HandshakeRole + Unpin, Role::InternalStream: SetWaker + Unpin, { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut s = self .as_mut() .0 .take() .expect("future polled after completion"); let machine = s.get_mut(); #[cfg(feature = "verbose-logging")] trace!("Setting context in handshake"); machine.get_mut().set_waker(cx.waker()); match s.handshake() { Ok(stream) => Poll::Ready(Ok(stream)), Err(Error::Failure(e)) => Poll::Ready(Err(Error::Failure(e))), Err(Error::Interrupted(mid)) => { self.0 = Some(mid); Poll::Pending } } } }