Adapts tokio-tungstenite to tungstenite/headers

Basically changes the code of tokio-tungstenite to match the latest
(current) status of tungstenite-rs. Fixes #13, fixes #9, fixes #6.
pull/1/head
Daniel Abramov 7 years ago
parent 035630ff23
commit b0ad00230b
  1. 3
      Cargo.toml
  2. 2
      examples/client.rs
  3. 2
      examples/server.rs
  4. 22
      src/connect.rs
  5. 70
      src/lib.rs
  6. 6
      tests/handshakes.rs

@ -22,7 +22,8 @@ tokio-io = "0.1.2"
url = "1.4.0" url = "1.4.0"
[dependencies.tungstenite] [dependencies.tungstenite]
version = "0.2.4" git = "https://github.com/snapview/tungstenite-rs"
branch = "headers"
default-features = false default-features = false
[dependencies.bytes] [dependencies.bytes]

@ -63,7 +63,7 @@ fn main() {
// finishes. If we don't have any more data to read or we won't receive any // finishes. If we don't have any more data to read or we won't receive any
// more work from the remote then we can exit. // more work from the remote then we can exit.
let mut stdout = io::stdout(); let mut stdout = io::stdout();
let client = connect_async(url, handle.remote().clone()).and_then(|ws_stream| { let client = connect_async(url, handle.remote().clone()).and_then(|(ws_stream, _)| {
println!("WebSocket handshake has been successfully completed"); println!("WebSocket handshake has been successfully completed");
// `sink` is the stream of messages going out. // `sink` is the stream of messages going out.

@ -60,7 +60,7 @@ fn main() {
let connections_inner = connections.clone(); let connections_inner = connections.clone();
let handle_inner = handle.clone(); let handle_inner = handle.clone();
accept_async(stream).and_then(move |ws_stream| { accept_async(stream, None).and_then(move |ws_stream| {
println!("New WebSocket connection: {}", addr); println!("New WebSocket connection: {}", addr);
// Create a channel for our stream, which other sockets will use to // Create a channel for our stream, which other sockets will use to

@ -3,20 +3,20 @@
extern crate tokio_dns; extern crate tokio_dns;
extern crate tokio_core; extern crate tokio_core;
use self::tokio_dns::tcp_connect;
use self::tokio_core::reactor::Remote;
use std::io::Result as IoResult; use std::io::Result as IoResult;
use futures::{Future, BoxFuture}; use self::tokio_core::net::TcpStream;
use futures::future; use self::tokio_core::reactor::Remote;
use self::tokio_dns::tcp_connect;
use super::{WebSocketStream, Request, client_async}; use futures::future;
use futures::{Future, BoxFuture};
use tungstenite::Error; use tungstenite::Error;
use tungstenite::client::url_mode; use tungstenite::client::url_mode;
use stream::NoDelay; use tungstenite::handshake::client::Response;
use self::tokio_core::net::TcpStream; use stream::NoDelay;
use super::{WebSocketStream, Request, client_async};
impl NoDelay for TcpStream { impl NoDelay for TcpStream {
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
@ -91,8 +91,10 @@ mod encryption {
use self::encryption::{AutoStream, wrap_stream}; use self::encryption::{AutoStream, wrap_stream};
/// Connect to a given URL. /// Connect to a given URL.
pub fn connect_async<R>(request: R, handle: Remote) -> BoxFuture<WebSocketStream<AutoStream>, Error> pub fn connect_async<R>(request: R, handle: Remote)
where R: Into<Request<'static>> -> BoxFuture<(WebSocketStream<AutoStream>, Response), Error>
where
R: Into<Request<'static>>
{ {
let request: Request = request.into(); let request: Request = request.into();

@ -7,11 +7,6 @@
//! //!
//! Each WebSocket stream implements the required `Stream` and `Sink` traits, //! Each WebSocket stream implements the required `Stream` and `Sink` traits,
//! so the socket is just a stream of messages coming in and going out. //! so the socket is just a stream of messages coming in and going out.
//!
//! This crate primarily exports this ability through two extension traits,
//! `ClientHandshakeExt` and `ServerHandshakeExt`. These traits augment the
//! functionality provided by the `tungestenite` crate, on which this crate is
//! built. Configuration is done through `tungestenite` crate as well.
#![deny( #![deny(
missing_docs, missing_docs,
@ -38,8 +33,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
use url::Url; use url::Url;
use tungstenite::handshake::client::ClientHandshake; use tungstenite::handshake::client::{ClientHandshake, Response};
use tungstenite::handshake::server::ServerHandshake; use tungstenite::handshake::server::{ServerHandshake, Callback};
use tungstenite::handshake::{HandshakeRole, HandshakeError}; use tungstenite::handshake::{HandshakeRole, HandshakeError};
use tungstenite::protocol::{WebSocket, Message}; use tungstenite::protocol::{WebSocket, Message};
use tungstenite::error::Error as WsError; use tungstenite::error::Error as WsError;
@ -90,11 +85,16 @@ impl<'a, U: Into<Url>> From<U> for Request<'a> {
/// depending on whether the handshake is successful. /// depending on whether the handshake is successful.
/// ///
/// This is typically used for clients who have already established, for /// This is typically used for clients who have already established, for
/// example, a TCP connection to the remove server. /// example, a TCP connection to the remote server.
pub fn client_async<'a, R, S>(request: R, stream: S) -> ConnectAsync<S> pub fn client_async<'a, R, S>(request: R, stream: S) -> ConnectAsync<S>
where R: Into<Request<'a>>, S: AsyncRead + AsyncWrite { where
let Request{url, headers} = request.into(); R: Into<Request<'a>>,
let tungstenite_request = tungstenite::handshake::client::Request{url: url, extra_headers: Some(&headers)}; S: AsyncRead + AsyncWrite
{
let Request{ url, headers } = request.into();
let tungstenite_request = {
tungstenite::handshake::client::Request { url, extra_headers: Some(&headers) }
};
let handshake = ClientHandshake::start(stream, tungstenite_request).handshake(); let handshake = ClientHandshake::start(stream, tungstenite_request).handshake();
ConnectAsync { ConnectAsync {
@ -115,10 +115,16 @@ pub fn client_async<'a, R, S>(request: R, stream: S) -> ConnectAsync<S>
/// This is typically used after a socket has been accepted from a /// This is typically used after a socket has been accepted from a
/// `TcpListener`. That socket is then passed to this function to perform /// `TcpListener`. That socket is then passed to this function to perform
/// the server half of the accepting a client's websocket connection. /// the server half of the accepting a client's websocket connection.
pub fn accept_async<S: AsyncRead + AsyncWrite>(stream: S) -> AcceptAsync<S> { ///
/// You can also pass an optional `callback` which will
/// be called when the websocket request is received from an incoming client.
pub fn accept_async<S>(stream: S, callback: Option<Callback>) -> AcceptAsync<S>
where
S: AsyncRead + AsyncWrite,
{
AcceptAsync { AcceptAsync {
inner: MidHandshake { inner: MidHandshake {
inner: Some(server::accept(stream)) inner: Some(server::accept(stream, callback))
} }
} }
} }
@ -161,49 +167,55 @@ impl<T> Sink for WebSocketStream<T> where T: AsyncRead + AsyncWrite {
/// Future returned from client_async() which will resolve /// Future returned from client_async() which will resolve
/// once the connection handshake has finished. /// once the connection handshake has finished.
pub struct ConnectAsync<S> { pub struct ConnectAsync<S: AsyncRead + AsyncWrite> {
inner: MidHandshake<S, ClientHandshake>, inner: MidHandshake<ClientHandshake<S>>,
} }
impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> { impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> {
type Item = WebSocketStream<S>; type Item = (WebSocketStream<S>, Response);
type Error = WsError; type Error = WsError;
fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> { fn poll(&mut self) -> Poll<Self::Item, WsError> {
self.inner.poll() match self.inner.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready((ws, resp)) => Ok(Async::Ready((WebSocketStream { inner: ws }, resp))),
}
} }
} }
/// Future returned from accept_async() which will resolve /// Future returned from accept_async() which will resolve
/// once the connection handshake has finished. /// once the connection handshake has finished.
pub struct AcceptAsync<S> { pub struct AcceptAsync<S: AsyncRead + AsyncWrite> {
inner: MidHandshake<S, ServerHandshake>, inner: MidHandshake<ServerHandshake<S>>,
} }
impl<S: AsyncRead + AsyncWrite> Future for AcceptAsync<S> { impl<S: AsyncRead + AsyncWrite> Future for AcceptAsync<S> {
type Item = WebSocketStream<S>; type Item = WebSocketStream<S>;
type Error = WsError; type Error = WsError;
fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> { fn poll(&mut self) -> Poll<Self::Item, WsError> {
self.inner.poll() match self.inner.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(ws) => Ok(Async::Ready(WebSocketStream { inner: ws })),
}
} }
} }
struct MidHandshake<S, R> { struct MidHandshake<H: HandshakeRole> {
inner: Option<Result<WebSocket<S>, HandshakeError<S, R>>>, inner: Option<Result<<H as HandshakeRole>::FinalResult, HandshakeError<H>>>,
} }
impl<S: AsyncRead + AsyncWrite, R: HandshakeRole> Future for MidHandshake<S, R> { impl<H: HandshakeRole> Future for MidHandshake<H> {
type Item = WebSocketStream<S>; type Item = <H as HandshakeRole>::FinalResult;
type Error = WsError; type Error = WsError;
fn poll(&mut self) -> Poll<WebSocketStream<S>, WsError> { fn poll(&mut self) -> Poll<Self::Item, WsError> {
match self.inner.take().expect("cannot poll MidHandshake twice") { match self.inner.take().expect("cannot poll MidHandshake twice") {
Ok(stream) => Ok(WebSocketStream { inner: stream }.into()), Ok(result) => Ok(Async::Ready(result)),
Err(HandshakeError::Failure(e)) => Err(e), Err(HandshakeError::Failure(e)) => Err(e),
Err(HandshakeError::Interrupted(s)) => { Err(HandshakeError::Interrupted(s)) => {
match s.handshake() { match s.handshake() {
Ok(stream) => Ok(WebSocketStream { inner: stream }.into()), Ok(result) => Ok(Async::Ready(result)),
Err(HandshakeError::Failure(e)) => Err(e), Err(HandshakeError::Failure(e)) => Err(e),
Err(HandshakeError::Interrupted(s)) => { Err(HandshakeError::Interrupted(s)) => {
self.inner = Some(Err(HandshakeError::Interrupted(s))); self.inner = Some(Err(HandshakeError::Interrupted(s)));

@ -25,8 +25,7 @@ fn handshakes() {
let connections = listener.incoming(); let connections = listener.incoming();
tx.send(()).unwrap(); tx.send(()).unwrap();
let handshakes = connections.and_then(|(connection, _)| { let handshakes = connections.and_then(|(connection, _)| {
accept_async(connection) accept_async(connection, None).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}); });
let server = handshakes.for_each(|_| { let server = handshakes.for_each(|_| {
Ok(()) Ok(())
@ -42,8 +41,7 @@ fn handshakes() {
let tcp = TcpStream::connect(&address, &handle); let tcp = TcpStream::connect(&address, &handle);
let handshake = tcp.and_then(|stream| { let handshake = tcp.and_then(|stream| {
let url = url::Url::parse("ws://localhost:12345/").unwrap(); let url = url::Url::parse("ws://localhost:12345/").unwrap();
client_async(url, stream) client_async(url, stream).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}); });
let client = handshake.and_then(|_| { let client = handshake.and_then(|_| {
Ok(()) Ok(())

Loading…
Cancel
Save