From 3c150d1174d3752dcd3d9eaa632dd967f6ca2811 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Tue, 22 Mar 2022 18:03:02 +0100 Subject: [PATCH] Make the semantics of `Stream` more reasonable Closes https://github.com/snapview/tokio-tungstenite/issues/200 --- src/lib.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7af3e08..f6919ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -218,6 +218,7 @@ where pub struct WebSocketStream { inner: WebSocket>, closing: bool, + ended: bool, } impl WebSocketStream { @@ -254,6 +255,7 @@ impl WebSocketStream { WebSocketStream { inner: ws, closing: false, + ended: false, } } @@ -311,6 +313,14 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { #[cfg(feature = "verbose-logging")] trace!("{}:{} Stream.poll_next", file!(), line!()); + + // The connection has been closed or a critical error has occurred. + // We have already returned the error to the user, the `Stream` is unusable, + // so we assume that the stream has been "fused". + if self.ended { + return Poll::Ready(None); + } + match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| { #[cfg(feature = "verbose-logging")] trace!( @@ -321,8 +331,14 @@ where cvt(s.read_message()) })) { Ok(v) => Poll::Ready(Some(Ok(v))), - Err(WsError::AlreadyClosed) | Err(WsError::ConnectionClosed) => Poll::Ready(None), - Err(e) => Poll::Ready(Some(Err(e))), + Err(e) => { + self.ended = true; + if matches!(e, WsError::AlreadyClosed | WsError::ConnectionClosed) { + Poll::Ready(None) + } else { + Poll::Ready(Some(Err(e))) + } + } } } }