Make the semantics of `Stream` more reasonable

Closes https://github.com/snapview/tokio-tungstenite/issues/200
pull/110/head
Daniel Abramov 3 years ago committed by Sebastian Dröge
parent 683385b323
commit 3c150d1174
  1. 20
      src/lib.rs

@ -218,6 +218,7 @@ where
pub struct WebSocketStream<S> { pub struct WebSocketStream<S> {
inner: WebSocket<AllowStd<S>>, inner: WebSocket<AllowStd<S>>,
closing: bool, closing: bool,
ended: bool,
} }
impl<S> WebSocketStream<S> { impl<S> WebSocketStream<S> {
@ -254,6 +255,7 @@ impl<S> WebSocketStream<S> {
WebSocketStream { WebSocketStream {
inner: ws, inner: ws,
closing: false, closing: false,
ended: false,
} }
} }
@ -311,6 +313,14 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
#[cfg(feature = "verbose-logging")] #[cfg(feature = "verbose-logging")]
trace!("{}:{} Stream.poll_next", file!(), line!()); trace!("{}:{} Stream.poll_next", file!(), line!());
// The connection has been closed or a critical error has occurred.
// We have already returned the error to the user, the `Stream` is unusable,
// so we assume that the stream has been "fused".
if self.ended {
return Poll::Ready(None);
}
match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| { match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| {
#[cfg(feature = "verbose-logging")] #[cfg(feature = "verbose-logging")]
trace!( trace!(
@ -321,8 +331,14 @@ where
cvt(s.read_message()) cvt(s.read_message())
})) { })) {
Ok(v) => Poll::Ready(Some(Ok(v))), Ok(v) => Poll::Ready(Some(Ok(v))),
Err(WsError::AlreadyClosed) | Err(WsError::ConnectionClosed) => Poll::Ready(None), Err(e) => {
Err(e) => Poll::Ready(Some(Err(e))), self.ended = true;
if matches!(e, WsError::AlreadyClosed | WsError::ConnectionClosed) {
Poll::Ready(None)
} else {
Poll::Ready(Some(Err(e)))
}
}
} }
} }
} }

Loading…
Cancel
Save