Merge pull request #53 from snapview/close-as-message

Introduce `Message::Close` support and return `Async::Ready(None)` when the stream is ended
pull/1/head
Alexey Galakhov 5 years ago committed by GitHub
commit eb7b3df7c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      Cargo.toml
  2. 30
      src/lib.rs

@ -8,7 +8,7 @@ license = "MIT"
homepage = "https://github.com/snapview/tokio-tungstenite" homepage = "https://github.com/snapview/tokio-tungstenite"
documentation = "https://docs.rs/tokio-tungstenite/0.6.0" documentation = "https://docs.rs/tokio-tungstenite/0.6.0"
repository = "https://github.com/snapview/tokio-tungstenite" repository = "https://github.com/snapview/tokio-tungstenite"
version = "0.6.0" version = "0.7.0"
[features] [features]
default = ["connect", "tls"] default = ["connect", "tls"]
@ -21,7 +21,10 @@ futures = "0.1.23"
tokio-io = "0.1.7" tokio-io = "0.1.7"
[dependencies.tungstenite] [dependencies.tungstenite]
version = "0.6.0" # Uncomment when `tungstenite-rs` is released.
# version = "0.7.0"
git = "https://github.com/snapview/tungstenite-rs"
branch = "close-as-message"
default-features = false default-features = false
[dependencies.bytes] [dependencies.bytes]

@ -161,6 +161,7 @@ where
/// and unit tests for this crate. /// and unit tests for this crate.
pub struct WebSocketStream<S> { pub struct WebSocketStream<S> {
inner: WebSocket<S>, inner: WebSocket<S>,
stream_ended: bool,
} }
impl<S> WebSocketStream<S> { impl<S> WebSocketStream<S> {
@ -171,8 +172,7 @@ impl<S> WebSocketStream<S> {
role: Role, role: Role,
config: Option<WebSocketConfig>, config: Option<WebSocketConfig>,
) -> Self { ) -> Self {
let ws = WebSocket::from_raw_socket(stream, role, config); Self::new(WebSocket::from_raw_socket(stream, role, config))
WebSocketStream { inner: ws }
} }
/// Convert a raw socket into a WebSocketStream without performing a /// Convert a raw socket into a WebSocketStream without performing a
@ -183,8 +183,14 @@ impl<S> WebSocketStream<S> {
role: Role, role: Role,
config: Option<WebSocketConfig>, config: Option<WebSocketConfig>,
) -> Self { ) -> Self {
let ws = WebSocket::from_partially_read(stream, part, role, config); Self::new(WebSocket::from_partially_read(stream, part, role, config))
WebSocketStream { inner: ws } }
fn new(ws: WebSocket<S>) -> Self {
WebSocketStream {
inner: ws,
stream_ended: false,
}
} }
} }
@ -193,7 +199,17 @@ impl<T> Stream for WebSocketStream<T> where T: AsyncRead + AsyncWrite {
type Error = WsError; type Error = WsError;
fn poll(&mut self) -> Poll<Option<Message>, WsError> { fn poll(&mut self) -> Poll<Option<Message>, WsError> {
self.inner.read_message().map(|m| Some(m)).to_async() if self.stream_ended {
self.stream_ended = false;
return Ok(Async::Ready(None))
}
self.inner.read_message().map(|m| {
if m.is_close() {
self.stream_ended = true;
}
Some(m)
}).to_async()
} }
} }
@ -227,7 +243,7 @@ impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> {
fn poll(&mut self) -> Poll<Self::Item, WsError> { fn poll(&mut self) -> Poll<Self::Item, WsError> {
match self.inner.poll()? { match self.inner.poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream { inner: ws }, resp))), Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream::new(ws), resp))),
} }
} }
} }
@ -245,7 +261,7 @@ impl<S: AsyncRead + AsyncWrite, C: Callback> Future for AcceptAsync<S, C> {
fn poll(&mut self) -> Poll<Self::Item, WsError> { fn poll(&mut self) -> Poll<Self::Item, WsError> {
match self.inner.poll()? { match self.inner.poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(ws) => Ok(Async::Ready(WebSocketStream { inner: ws })), Async::Ready(ws) => Ok(Async::Ready(WebSocketStream::new(ws))),
} }
} }
} }

Loading…
Cancel
Save