//! Async WebSocket usage. //! //! This library is an implementation of WebSocket handshakes and streams. It //! is based on the crate which implements all required WebSocket protocol //! logic. So this crate basically just brings tokio support / tokio integration //! to it. //! //! Each WebSocket stream implements the required `Stream` and `Sink` traits, //! so the socket is just a stream of messages coming in and going out. #![deny( missing_docs, unused_must_use, unused_mut, unused_imports, unused_import_braces)] extern crate futures; extern crate tokio_io; pub extern crate tungstenite; #[cfg(feature="connect")] mod connect; #[cfg(feature="stream")] pub mod stream; use std::io::ErrorKind; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; use tokio_io::{AsyncRead, AsyncWrite}; use tungstenite::{ error::Error as WsError, handshake::{ HandshakeRole, HandshakeError, client::{ClientHandshake, Response, Request}, server::{ServerHandshake, Callback, NoCallback}, }, protocol::{WebSocket, Message, Role, WebSocketConfig}, server, }; #[cfg(feature="connect")] pub use connect::{connect_async, client_async_tls}; /// Creates a WebSocket handshake from a request and a stream. /// For convenience, the user may call this with a url string, a URL, /// or a `Request`. Calling with `Request` allows the user to add /// a WebSocket protocol or other custom headers. /// /// Internally, this custom creates a handshake representation and returns /// a future representing the resolution of the WebSocket handshake. The /// returned future will resolve to either `WebSocketStream` or `Error` /// depending on whether the handshake is successful. /// /// This is typically used for clients who have already established, for /// example, a TCP connection to the remote server. pub fn client_async<'a, R, S>( request: R, stream: S, ) -> ConnectAsync where R: Into>, S: AsyncRead + AsyncWrite { client_async_with_config(request, stream, None) } /// The same as `client_async()` but the one can specify a websocket configuration. /// Please refer to `client_async()` for more details. pub fn client_async_with_config<'a, R, S>( request: R, stream: S, config: Option, ) -> ConnectAsync where R: Into>, S: AsyncRead + AsyncWrite { ConnectAsync { inner: MidHandshake { inner: Some(ClientHandshake::start(stream, request.into(), config).handshake()) } } } /// Accepts a new WebSocket connection with the provided stream. /// /// This function will internally call `server::accept` to create a /// handshake representation and returns a future representing the /// resolution of the WebSocket handshake. The returned future will resolve /// to either `WebSocketStream` or `Error` depending if it's successful /// or not. /// /// This is typically used after a socket has been accepted from a /// `TcpListener`. That socket is then passed to this function to perform /// the server half of the accepting a client's websocket connection. pub fn accept_async(stream: S) -> AcceptAsync where S: AsyncRead + AsyncWrite, { accept_hdr_async(stream, NoCallback) } /// The same as `accept_async()` but the one can specify a websocket configuration. /// Please refer to `accept_async()` for more details. pub fn accept_async_with_config( stream: S, config: Option, ) -> AcceptAsync where S: AsyncRead + AsyncWrite, { accept_hdr_async_with_config(stream, NoCallback, config) } /// Accepts a new WebSocket connection with the provided stream. /// /// This function does the same as `accept_async()` but accepts an extra callback /// for header processing. The callback receives headers of the incoming /// requests and is able to add extra headers to the reply. pub fn accept_hdr_async(stream: S, callback: C) -> AcceptAsync where S: AsyncRead + AsyncWrite, C: Callback, { accept_hdr_async_with_config(stream, callback, None) } /// The same as `accept_hdr_async()` but the one can specify a websocket configuration. /// Please refer to `accept_hdr_async()` for more details. pub fn accept_hdr_async_with_config( stream: S, callback: C, config: Option, ) -> AcceptAsync where S: AsyncRead + AsyncWrite, C: Callback, { AcceptAsync { inner: MidHandshake { inner: Some(server::accept_hdr_with_config(stream, callback, config)) } } } /// A wrapper around an underlying raw stream which implements the WebSocket /// protocol. /// /// A `WebSocketStream` represents a handshake that has been completed /// successfully and both the server and the client are ready for receiving /// and sending data. Message from a `WebSocketStream` are accessible /// through the respective `Stream` and `Sink`. Check more information about /// them in `futures-rs` crate documentation or have a look on the examples /// and unit tests for this crate. pub struct WebSocketStream { inner: WebSocket, } impl WebSocketStream { /// Convert a raw socket into a WebSocketStream without performing a /// handshake. pub fn from_raw_socket( stream: S, role: Role, config: Option, ) -> Self { let ws = WebSocket::from_raw_socket(stream, role, config); WebSocketStream { inner: ws } } /// Convert a raw socket into a WebSocketStream without performing a /// handshake. pub fn from_partially_read( stream: S, part: Vec, role: Role, config: Option, ) -> Self { let ws = WebSocket::from_partially_read(stream, part, role, config); WebSocketStream { inner: ws } } } impl Stream for WebSocketStream where T: AsyncRead + AsyncWrite { type Item = Message; type Error = WsError; fn poll(&mut self) -> Poll, WsError> { self.inner.read_message().map(|m| Some(m)).to_async() } } impl Sink for WebSocketStream where T: AsyncRead + AsyncWrite { type SinkItem = Message; type SinkError = WsError; fn start_send(&mut self, item: Message) -> StartSend { self.inner.write_message(item).to_start_send() } fn poll_complete(&mut self) -> Poll<(), WsError> { self.inner.write_pending().to_async() } fn close(&mut self) -> Poll<(), WsError> { self.inner.close(None).to_async() } } /// Future returned from client_async() which will resolve /// once the connection handshake has finished. pub struct ConnectAsync { inner: MidHandshake>, } impl Future for ConnectAsync { type Item = (WebSocketStream, Response); type Error = WsError; fn poll(&mut self) -> Poll { match self.inner.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream { inner: ws }, resp))), } } } /// Future returned from accept_async() which will resolve /// once the connection handshake has finished. pub struct AcceptAsync { inner: MidHandshake>, } impl Future for AcceptAsync { type Item = WebSocketStream; type Error = WsError; fn poll(&mut self) -> Poll { match self.inner.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(ws) => Ok(Async::Ready(WebSocketStream { inner: ws })), } } } struct MidHandshake { inner: Option::FinalResult, HandshakeError>>, } impl Future for MidHandshake { type Item = ::FinalResult; type Error = WsError; fn poll(&mut self) -> Poll { match self.inner.take().expect("cannot poll MidHandshake twice") { Ok(result) => Ok(Async::Ready(result)), Err(HandshakeError::Failure(e)) => Err(e), Err(HandshakeError::Interrupted(s)) => { match s.handshake() { Ok(result) => Ok(Async::Ready(result)), Err(HandshakeError::Failure(e)) => Err(e), Err(HandshakeError::Interrupted(s)) => { self.inner = Some(Err(HandshakeError::Interrupted(s))); Ok(Async::NotReady) } } } } } } trait ToAsync { type T; type E; fn to_async(self) -> Result, Self::E>; } impl ToAsync for Result { type T = T; type E = WsError; fn to_async(self) -> Result, Self::E> { match self { Ok(x) => Ok(Async::Ready(x)), Err(error) => match error { WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(Async::NotReady), err => Err(err), }, } } } trait ToStartSend { type T; type E; fn to_start_send(self) -> StartSend; } impl ToStartSend for Result<(), WsError> { type T = Message; type E = WsError; fn to_start_send(self) -> StartSend { match self { Ok(_) => Ok(AsyncSink::Ready), Err(error) => match error { WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::Ready), WsError::SendQueueFull(msg) => Ok(AsyncSink::NotReady(msg)), err => Err(err), } } } }