Migrate to the tokio-io (fixes #1)

pull/1/head
Daniel Abramov 8 years ago
parent b79698b7ce
commit b2b77893c2
  1. 6
      Cargo.toml
  2. 75
      src/lib.rs
  3. 53
      tests/handshakes.rs

@ -10,7 +10,9 @@ version = "0.1.0"
[dependencies] [dependencies]
futures = "0.1.10" futures = "0.1.10"
tokio-core = "0.1.4" tokio-io = "0.1.1"
tungstenite = "0.1.0"
url = "1.4.0" url = "1.4.0"
tungstenite = "0.1.0" [dev-dependencies]
tokio-core = "0.1.4"

@ -21,14 +21,14 @@
unused_import_braces)] unused_import_braces)]
extern crate futures; extern crate futures;
extern crate tokio_core; extern crate tokio_io;
extern crate tungstenite; extern crate tungstenite;
extern crate url; extern crate url;
use std::io::ErrorKind; use std::io::ErrorKind;
use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
use tokio_core::io::Io; use tokio_io::{AsyncRead, AsyncWrite};
use url::Url; use url::Url;
@ -37,8 +37,7 @@ use tungstenite::handshake::server::ServerHandshake;
use tungstenite::handshake::{HandshakeRole, HandshakeError}; use tungstenite::handshake::{HandshakeRole, HandshakeError};
use tungstenite::protocol::{WebSocket, Message}; use tungstenite::protocol::{WebSocket, Message};
use tungstenite::error::Error as WsError; use tungstenite::error::Error as WsError;
use tungstenite::client; use tungstenite::{client, server};
use tungstenite::server;
/// Create a handshake provided stream and assuming the provided request. /// Create a handshake provided stream and assuming the provided request.
/// ///
@ -50,7 +49,7 @@ use tungstenite::server;
/// ///
/// This is typically used for clients who have already established, for /// This is typically used for clients who have already established, for
/// example, a TCP connection to the remove server. /// example, a TCP connection to the remove server.
pub fn client_async<S: Io>(url: Url, stream: S) -> ConnectAsync<S> { pub fn client_async<S: AsyncRead + AsyncWrite>(url: Url, stream: S) -> ConnectAsync<S> {
ConnectAsync { ConnectAsync {
inner: MidHandshake { inner: MidHandshake {
inner: Some(client::client(url, stream)) inner: Some(client::client(url, stream))
@ -69,7 +68,7 @@ pub fn client_async<S: Io>(url: Url, stream: S) -> ConnectAsync<S> {
/// This is typically used after a socket has been accepted from a /// This is typically used after a socket has been accepted from a
/// `TcpListener`. That socket is then passed to this function to perform /// `TcpListener`. That socket is then passed to this function to perform
/// the server half of the accepting a client's websocket connection. /// the server half of the accepting a client's websocket connection.
pub fn accept_async<S: Io>(stream: S) -> AcceptAsync<S> { pub fn accept_async<S: AsyncRead + AsyncWrite>(stream: S) -> AcceptAsync<S> {
AcceptAsync { AcceptAsync {
inner: MidHandshake { inner: MidHandshake {
inner: Some(server::accept(stream)) inner: Some(server::accept(stream))
@ -90,7 +89,7 @@ pub struct WebSocketStream<S> {
inner: WebSocket<S>, inner: WebSocket<S>,
} }
impl<T> Stream for WebSocketStream<T> where T: Io { impl<T> Stream for WebSocketStream<T> where T: AsyncRead + AsyncWrite {
type Item = Message; type Item = Message;
type Error = WsError; type Error = WsError;
@ -99,7 +98,7 @@ impl<T> Stream for WebSocketStream<T> where T: Io {
} }
} }
impl<T> Sink for WebSocketStream<T> where T: Io { impl<T> Sink for WebSocketStream<T> where T: AsyncRead + AsyncWrite {
type SinkItem = Message; type SinkItem = Message;
type SinkError = WsError; type SinkError = WsError;
@ -113,13 +112,13 @@ impl<T> Sink for WebSocketStream<T> where T: Io {
} }
} }
/// Future returned from connect_async() which will resolve /// Future returned from client_async() which will resolve
/// once the connection handshake has finished. /// once the connection handshake has finished.
pub struct ConnectAsync<S> { pub struct ConnectAsync<S> {
inner: MidHandshake<S, ClientHandshake>, inner: MidHandshake<S, ClientHandshake>,
} }
impl<S: Io> Future for ConnectAsync<S> { impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> {
type Item = WebSocketStream<S>; type Item = WebSocketStream<S>;
type Error = WsError; type Error = WsError;
@ -134,7 +133,7 @@ pub struct AcceptAsync<S> {
inner: MidHandshake<S, ServerHandshake>, inner: MidHandshake<S, ServerHandshake>,
} }
impl<S: Io> Future for AcceptAsync<S> { impl<S: AsyncRead + AsyncWrite> Future for AcceptAsync<S> {
type Item = WebSocketStream<S>; type Item = WebSocketStream<S>;
type Error = WsError; type Error = WsError;
@ -147,7 +146,7 @@ struct MidHandshake<S, R> {
inner: Option<Result<WebSocket<S>, HandshakeError<S, R>>>, inner: Option<Result<WebSocket<S>, HandshakeError<S, R>>>,
} }
impl<S: Io, R: HandshakeRole> Future for MidHandshake<S, R> { impl<S: AsyncRead + AsyncWrite, R: HandshakeRole> Future for MidHandshake<S, R> {
type Item = WebSocketStream<S>; type Item = WebSocketStream<S>;
type Error = WsError; type Error = WsError;
@ -189,55 +188,3 @@ impl<T> ToAsync for Result<T, WsError> {
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use url;
use std::io;
use futures::{Future, Stream};
use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::reactor::Core;
#[test]
fn handshakes() {
use std::sync::mpsc::channel;
use std::thread;
let (tx, rx) = channel();
thread::spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
let address = "0.0.0.0:12345".parse().unwrap();
let listener = TcpListener::bind(&address, &handle).unwrap();
let connections = listener.incoming();
tx.send(()).unwrap();
let handshakes = connections.and_then(|(connection, _)| {
accept_async(connection)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
});
let server = handshakes.for_each(|_| {
Ok(())
});
core.run(server).unwrap();
});
rx.recv().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let address = "0.0.0.0:12345".parse().unwrap();
let tcp = TcpStream::connect(&address, &handle);
let handshake = tcp.and_then(|stream| {
let url = url::Url::parse("ws://localhost:12345/").unwrap();
client_async(url, stream)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
});
let client = handshake.and_then(|_| {
Ok(())
});
core.run(client).unwrap();
}
}

@ -0,0 +1,53 @@
extern crate futures;
extern crate tokio_core;
extern crate tokio_tungstenite;
extern crate url;
use std::io;
use futures::{Future, Stream};
use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::reactor::Core;
use tokio_tungstenite::{client_async, accept_async};
#[test]
fn handshakes() {
use std::sync::mpsc::channel;
use std::thread;
let (tx, rx) = channel();
thread::spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
let address = "0.0.0.0:12345".parse().unwrap();
let listener = TcpListener::bind(&address, &handle).unwrap();
let connections = listener.incoming();
tx.send(()).unwrap();
let handshakes = connections.and_then(|(connection, _)| {
accept_async(connection)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
});
let server = handshakes.for_each(|_| {
Ok(())
});
core.run(server).unwrap();
});
rx.recv().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let address = "0.0.0.0:12345".parse().unwrap();
let tcp = TcpStream::connect(&address, &handle);
let handshake = tcp.and_then(|stream| {
let url = url::Url::parse("ws://localhost:12345/").unwrap();
client_async(url, stream)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
});
let client = handshake.and_then(|_| {
Ok(())
});
core.run(client).unwrap();
}
Loading…
Cancel
Save