Merge pull request #38 from Eroc33/master

Port to tokio reform
pull/1/head
Daniel Abramov 6 years ago committed by GitHub
commit 336bb2573b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      Cargo.toml
  2. 13
      examples/client.rs
  3. 29
      examples/server.rs
  4. 23
      src/connect.rs
  5. 4
      src/lib.rs
  6. 2
      src/stream.rs

@ -12,13 +12,12 @@ version = "0.5.1"
[features] [features]
default = ["connect", "tls"] default = ["connect", "tls"]
connect = ["tokio-dns-unofficial", "tokio-core", "stream"] connect = ["tokio-dns-unofficial", "tokio", "stream"]
tls = ["tokio-tls", "native-tls", "stream", "tungstenite/tls"] tls = ["tokio-tls", "native-tls", "stream", "tungstenite/tls"]
stream = ["bytes"] stream = ["bytes"]
[dependencies] [dependencies]
futures = "0.1.17" futures = "0.1.17"
tokio-io = "0.1.2"
[dependencies.tungstenite] [dependencies.tungstenite]
version = "0.5.3" version = "0.5.3"
@ -34,11 +33,11 @@ version = "0.1.5"
[dependencies.tokio-dns-unofficial] [dependencies.tokio-dns-unofficial]
optional = true optional = true
version = "0.1.1" version = "0.3.0"
[dependencies.tokio-core] [dependencies.tokio]
optional = true optional = true
version = "0.1.9" version = "0.1.6"
[dependencies.tokio-tls] [dependencies.tokio-tls]
optional = true optional = true

@ -11,7 +11,7 @@
//! You can use this example together with the `server` example. //! You can use this example together with the `server` example.
extern crate futures; extern crate futures;
extern crate tokio_core; extern crate tokio;
extern crate tokio_tungstenite; extern crate tokio_tungstenite;
extern crate tungstenite; extern crate tungstenite;
extern crate url; extern crate url;
@ -22,7 +22,6 @@ use std::thread;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use tokio_core::reactor::Core;
use tungstenite::protocol::Message; use tungstenite::protocol::Message;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
@ -35,10 +34,6 @@ fn main() {
let url = url::Url::parse(&connect_addr).unwrap(); let url = url::Url::parse(&connect_addr).unwrap();
// Create the event loop and initiate the connection to the remote server.
let mut core = Core::new().unwrap();
let handle = core.handle();
// Right now Tokio doesn't support a handle to stdin running on the event // Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will // loop, so we farm out that work to a separate thread. This thread will
// read data from stdin and then send it to the event loop over a standard // read data from stdin and then send it to the event loop over a standard
@ -63,7 +58,7 @@ fn main() {
// finishes. If we don't have any more data to read or we won't receive any // finishes. If we don't have any more data to read or we won't receive any
// more work from the remote then we can exit. // more work from the remote then we can exit.
let mut stdout = io::stdout(); let mut stdout = io::stdout();
let client = connect_async(url, handle.remote().clone()).and_then(|(ws_stream, _)| { let client = connect_async(url).and_then(move |(ws_stream, _)| {
println!("WebSocket handshake has been successfully completed"); println!("WebSocket handshake has been successfully completed");
// `sink` is the stream of messages going out. // `sink` is the stream of messages going out.
@ -73,7 +68,7 @@ fn main() {
// We forward all messages, composed out of the data, entered to // We forward all messages, composed out of the data, entered to
// the stdin, to the `sink`. // the stdin, to the `sink`.
let send_stdin = stdin_rx.forward(sink); let send_stdin = stdin_rx.forward(sink);
let write_stdout = stream.for_each(|message| { let write_stdout = stream.for_each(move |message| {
stdout.write_all(&message.into_data()).unwrap(); stdout.write_all(&message.into_data()).unwrap();
Ok(()) Ok(())
}); });
@ -88,7 +83,7 @@ fn main() {
}); });
// And now that we've got our client, we execute it in the event loop! // And now that we've got our client, we execute it in the event loop!
core.run(client).unwrap(); tokio::runtime::run(client.map_err(|_e| ()));
} }
// Our helper method which will read data from stdin and send it along the // Our helper method which will read data from stdin and send it along the

@ -18,20 +18,18 @@
//! messages. //! messages.
extern crate futures; extern crate futures;
extern crate tokio_core; extern crate tokio;
extern crate tokio_tungstenite; extern crate tokio_tungstenite;
extern crate tungstenite; extern crate tungstenite;
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::io::{Error, ErrorKind}; use std::io::{Error, ErrorKind};
use std::rc::Rc; use std::sync::{Arc,Mutex};
use futures::stream::Stream; use futures::stream::Stream;
use futures::Future; use futures::Future;
use tokio_core::net::TcpListener; use tokio::net::TcpListener;
use tokio_core::reactor::Core;
use tungstenite::protocol::Message; use tungstenite::protocol::Message;
use tokio_tungstenite::accept_async; use tokio_tungstenite::accept_async;
@ -41,16 +39,16 @@ fn main() {
let addr = addr.parse().unwrap(); let addr = addr.parse().unwrap();
// Create the event loop and TCP listener we'll accept connections on. // Create the event loop and TCP listener we'll accept connections on.
let mut core = Core::new().unwrap(); let socket = TcpListener::bind(&addr).unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr); println!("Listening on: {}", addr);
// This is a single-threaded server, so we can just use Rc and RefCell to // This is a single-threaded server, so we can just use Rc and RefCell to
// store the map of all connections we know about. // store the map of all connections we know about.
let connections = Rc::new(RefCell::new(HashMap::new())); let connections = Arc::new(Mutex::new(HashMap::new()));
let srv = socket.incoming().for_each(|(stream, addr)| { let srv = socket.incoming().for_each(move |stream| {
let addr = stream.peer_addr().expect("connected streams should have a peer address");
// We have to clone both of these values, because the `and_then` // We have to clone both of these values, because the `and_then`
// function below constructs a new future, `and_then` requires // function below constructs a new future, `and_then` requires
@ -58,7 +56,6 @@ fn main() {
// environment inside the future (AndThen future may overlive our // environment inside the future (AndThen future may overlive our
// `for_each` future). // `for_each` future).
let connections_inner = connections.clone(); let connections_inner = connections.clone();
let handle_inner = handle.clone();
accept_async(stream).and_then(move |ws_stream| { accept_async(stream).and_then(move |ws_stream| {
println!("New WebSocket connection: {}", addr); println!("New WebSocket connection: {}", addr);
@ -67,7 +64,7 @@ fn main() {
// send us messages. Then register our address with the stream to send // send us messages. Then register our address with the stream to send
// data to us. // data to us.
let (tx, rx) = futures::sync::mpsc::unbounded(); let (tx, rx) = futures::sync::mpsc::unbounded();
connections_inner.borrow_mut().insert(addr, tx); connections_inner.lock().unwrap().insert(addr, tx);
// Let's split the WebSocket stream, so we can work with the // Let's split the WebSocket stream, so we can work with the
// reading and writing halves separately. // reading and writing halves separately.
@ -81,7 +78,7 @@ fn main() {
// For each open connection except the sender, send the // For each open connection except the sender, send the
// string via the channel. // string via the channel.
let mut conns = connections.borrow_mut(); let mut conns = connections.lock().unwrap();
let iter = conns.iter_mut() let iter = conns.iter_mut()
.filter(|&(&k, _)| k != addr) .filter(|&(&k, _)| k != addr)
.map(|(_, v)| v); .map(|(_, v)| v);
@ -105,8 +102,8 @@ fn main() {
let connection = ws_reader.map(|_| ()).map_err(|_| ()) let connection = ws_reader.map(|_| ()).map_err(|_| ())
.select(ws_writer.map(|_| ()).map_err(|_| ())); .select(ws_writer.map(|_| ()).map_err(|_| ()));
handle_inner.spawn(connection.then(move |_| { tokio::spawn(connection.then(move |_| {
connections_inner.borrow_mut().remove(&addr); connections_inner.lock().unwrap().remove(&addr);
println!("Connection {} closed.", addr); println!("Connection {} closed.", addr);
Ok(()) Ok(())
})); }));
@ -119,5 +116,5 @@ fn main() {
}); });
// Execute server. // Execute server.
core.run(srv).unwrap(); tokio::runtime::run(srv.map_err(|_e| ()));
} }

@ -1,16 +1,13 @@
//! Connection helper. //! Connection helper.
extern crate tokio_dns; extern crate tokio_dns;
extern crate tokio_core;
use std::io::Result as IoResult; use std::io::Result as IoResult;
use self::tokio_core::net::TcpStream; use tokio::net::TcpStream;
use self::tokio_core::reactor::Remote;
use self::tokio_dns::tcp_connect;
use futures::{future, Future}; use futures::{future, Future};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tungstenite::Error; use tungstenite::Error;
use tungstenite::client::url_mode; use tungstenite::client::url_mode;
@ -36,7 +33,7 @@ mod encryption {
use std::io::{Read, Write, Result as IoResult}; use std::io::{Read, Write, Result as IoResult};
use futures::{future, Future}; use futures::{future, Future};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tungstenite::Error; use tungstenite::Error;
use tungstenite::stream::Mode; use tungstenite::stream::Mode;
@ -53,9 +50,9 @@ mod encryption {
} }
pub fn wrap_stream<S>(socket: S, domain: String, mode: Mode) pub fn wrap_stream<S>(socket: S, domain: String, mode: Mode)
-> Box<Future<Item=AutoStream<S>, Error=Error>> -> Box<Future<Item=AutoStream<S>, Error=Error> + Send>
where where
S: 'static + AsyncRead + AsyncWrite, S: 'static + AsyncRead + AsyncWrite + Send,
{ {
match mode { match mode {
Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))), Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))),
@ -106,10 +103,10 @@ fn domain(request: &Request) -> Result<String, Error> {
/// Creates a WebSocket handshake from a request and a stream, /// Creates a WebSocket handshake from a request and a stream,
/// upgrading the stream to TLS if required. /// upgrading the stream to TLS if required.
pub fn client_async_tls<R, S>(request: R, stream: S) pub fn client_async_tls<R, S>(request: R, stream: S)
-> Box<Future<Item=(WebSocketStream<AutoStream<S>>, Response), Error=Error>> -> Box<Future<Item=(WebSocketStream<AutoStream<S>>, Response), Error=Error> + Send>
where where
R: Into<Request<'static>>, R: Into<Request<'static>>,
S: 'static + AsyncRead + AsyncWrite + NoDelay, S: 'static + AsyncRead + AsyncWrite + NoDelay + Send,
{ {
let request: Request = request.into(); let request: Request = request.into();
@ -134,8 +131,8 @@ where
} }
/// Connect to a given URL. /// Connect to a given URL.
pub fn connect_async<R>(request: R, handle: Remote) pub fn connect_async<R>(request: R)
-> Box<Future<Item=(WebSocketStream<AutoStream<TcpStream>>, Response), Error=Error>> -> Box<Future<Item=(WebSocketStream<AutoStream<TcpStream>>, Response), Error=Error> + Send>
where where
R: Into<Request<'static>> R: Into<Request<'static>>
{ {
@ -147,6 +144,6 @@ where
}; };
let port = request.url.port_or_known_default().expect("Bug: port unknown"); let port = request.url.port_or_known_default().expect("Bug: port unknown");
Box::new(tcp_connect((domain.as_str(), port), handle).map_err(|e| e.into()) Box::new(tokio_dns::TcpStream::connect((domain.as_str(), port)).map_err(|e| e.into())
.and_then(move |socket| client_async_tls(request, socket))) .and_then(move |socket| client_async_tls(request, socket)))
} }

@ -16,7 +16,7 @@
unused_import_braces)] unused_import_braces)]
extern crate futures; extern crate futures;
extern crate tokio_io; extern crate tokio;
pub extern crate tungstenite; pub extern crate tungstenite;
@ -29,7 +29,7 @@ pub mod stream;
use std::io::ErrorKind; use std::io::ErrorKind;
use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tungstenite::handshake::client::{ClientHandshake, Response, Request}; use tungstenite::handshake::client::{ClientHandshake, Response, Request};
use tungstenite::handshake::server::{ServerHandshake, Callback, NoCallback}; use tungstenite::handshake::server::{ServerHandshake, Callback, NoCallback};

@ -10,7 +10,7 @@ use std::io::{Read, Write, Result as IoResult, Error as IoError};
use self::bytes::{Buf, BufMut}; use self::bytes::{Buf, BufMut};
use futures::Poll; use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
/// Trait to switch TCP_NODELAY. /// Trait to switch TCP_NODELAY.
pub trait NoDelay { pub trait NoDelay {

Loading…
Cancel
Save