diff --git a/Cargo.toml b/Cargo.toml index 186a875..beddedc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT" homepage = "https://github.com/snapview/tokio-tungstenite" documentation = "https://docs.rs/tokio-tungstenite/0.6.0" repository = "https://github.com/snapview/tokio-tungstenite" -version = "0.6.0" +version = "0.7.0" [features] default = ["connect", "tls"] @@ -21,7 +21,10 @@ futures = "0.1.23" tokio-io = "0.1.7" [dependencies.tungstenite] -version = "0.6.0" +# Uncomment when `tungstenite-rs` is released. +# version = "0.7.0" +git = "https://github.com/snapview/tungstenite-rs" +branch = "close-as-message" default-features = false [dependencies.bytes] diff --git a/src/lib.rs b/src/lib.rs index 7cfe2c0..a55d696 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -161,6 +161,7 @@ where /// and unit tests for this crate. pub struct WebSocketStream { inner: WebSocket, + stream_ended: bool, } impl WebSocketStream { @@ -171,8 +172,7 @@ impl WebSocketStream { role: Role, config: Option, ) -> Self { - let ws = WebSocket::from_raw_socket(stream, role, config); - WebSocketStream { inner: ws } + Self::new(WebSocket::from_raw_socket(stream, role, config)) } /// Convert a raw socket into a WebSocketStream without performing a @@ -183,8 +183,14 @@ impl WebSocketStream { role: Role, config: Option, ) -> Self { - let ws = WebSocket::from_partially_read(stream, part, role, config); - WebSocketStream { inner: ws } + Self::new(WebSocket::from_partially_read(stream, part, role, config)) + } + + fn new(ws: WebSocket) -> Self { + WebSocketStream { + inner: ws, + stream_ended: false, + } } } @@ -193,7 +199,17 @@ impl Stream for WebSocketStream where T: AsyncRead + AsyncWrite { type Error = WsError; fn poll(&mut self) -> Poll, WsError> { - self.inner.read_message().map(|m| Some(m)).to_async() + if self.stream_ended { + self.stream_ended = false; + return Ok(Async::Ready(None)) + } + + self.inner.read_message().map(|m| { + if m.is_close() { + self.stream_ended = true; + } + Some(m) + }).to_async() } } @@ -227,7 +243,7 @@ impl Future for ConnectAsync { fn poll(&mut self) -> Poll { match self.inner.poll()? { Async::NotReady => Ok(Async::NotReady), - Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream { inner: ws }, resp))), + Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream::new(ws), resp))), } } } @@ -245,7 +261,7 @@ impl Future for AcceptAsync { fn poll(&mut self) -> Poll { match self.inner.poll()? { Async::NotReady => Ok(Async::NotReady), - Async::Ready(ws) => Ok(Async::Ready(WebSocketStream { inner: ws })), + Async::Ready(ws) => Ok(Async::Ready(WebSocketStream::new(ws))), } } }