From e2012633b43098430274cd490316d6f158e0d437 Mon Sep 17 00:00:00 2001 From: Euan Rochester Date: Sun, 20 May 2018 16:44:49 +0100 Subject: [PATCH] Port to tokio reform --- Cargo.toml | 9 ++++----- examples/client.rs | 13 ++++--------- examples/server.rs | 29 +++++++++++++---------------- src/connect.rs | 23 ++++++++++------------- src/lib.rs | 4 ++-- src/stream.rs | 2 +- 6 files changed, 34 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9dd18e6..b51f02b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,13 +12,12 @@ version = "0.5.1" [features] default = ["connect", "tls"] -connect = ["tokio-dns-unofficial", "tokio-core", "stream"] +connect = ["tokio-dns-unofficial", "tokio", "stream"] tls = ["tokio-tls", "native-tls", "stream", "tungstenite/tls"] stream = ["bytes"] [dependencies] futures = "0.1.17" -tokio-io = "0.1.2" [dependencies.tungstenite] version = "0.5.3" @@ -34,11 +33,11 @@ version = "0.1.5" [dependencies.tokio-dns-unofficial] optional = true -version = "0.1.1" +version = "0.3.0" -[dependencies.tokio-core] +[dependencies.tokio] optional = true -version = "0.1.9" +version = "0.1.6" [dependencies.tokio-tls] optional = true diff --git a/examples/client.rs b/examples/client.rs index 62dbe57..6ceb9da 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -11,7 +11,7 @@ //! You can use this example together with the `server` example. extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_tungstenite; extern crate tungstenite; extern crate url; @@ -22,7 +22,6 @@ use std::thread; use futures::sync::mpsc; use futures::{Future, Sink, Stream}; -use tokio_core::reactor::Core; use tungstenite::protocol::Message; use tokio_tungstenite::connect_async; @@ -35,10 +34,6 @@ fn main() { let url = url::Url::parse(&connect_addr).unwrap(); - // Create the event loop and initiate the connection to the remote server. - let mut core = Core::new().unwrap(); - let handle = core.handle(); - // Right now Tokio doesn't support a handle to stdin running on the event // loop, so we farm out that work to a separate thread. This thread will // read data from stdin and then send it to the event loop over a standard @@ -63,7 +58,7 @@ fn main() { // finishes. If we don't have any more data to read or we won't receive any // more work from the remote then we can exit. let mut stdout = io::stdout(); - let client = connect_async(url, handle.remote().clone()).and_then(|(ws_stream, _)| { + let client = connect_async(url).and_then(move |(ws_stream, _)| { println!("WebSocket handshake has been successfully completed"); // `sink` is the stream of messages going out. @@ -73,7 +68,7 @@ fn main() { // We forward all messages, composed out of the data, entered to // the stdin, to the `sink`. let send_stdin = stdin_rx.forward(sink); - let write_stdout = stream.for_each(|message| { + let write_stdout = stream.for_each(move |message| { stdout.write_all(&message.into_data()).unwrap(); Ok(()) }); @@ -88,7 +83,7 @@ fn main() { }); // And now that we've got our client, we execute it in the event loop! - core.run(client).unwrap(); + tokio::runtime::run(client.map_err(|_e| ())); } // Our helper method which will read data from stdin and send it along the diff --git a/examples/server.rs b/examples/server.rs index d6dde95..713c1ae 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -18,20 +18,18 @@ //! messages. extern crate futures; -extern crate tokio_core; +extern crate tokio; extern crate tokio_tungstenite; extern crate tungstenite; -use std::cell::RefCell; use std::collections::HashMap; use std::env; use std::io::{Error, ErrorKind}; -use std::rc::Rc; +use std::sync::{Arc,Mutex}; use futures::stream::Stream; use futures::Future; -use tokio_core::net::TcpListener; -use tokio_core::reactor::Core; +use tokio::net::TcpListener; use tungstenite::protocol::Message; use tokio_tungstenite::accept_async; @@ -41,16 +39,16 @@ fn main() { let addr = addr.parse().unwrap(); // Create the event loop and TCP listener we'll accept connections on. - let mut core = Core::new().unwrap(); - let handle = core.handle(); - let socket = TcpListener::bind(&addr, &handle).unwrap(); + let socket = TcpListener::bind(&addr).unwrap(); println!("Listening on: {}", addr); // This is a single-threaded server, so we can just use Rc and RefCell to // store the map of all connections we know about. - let connections = Rc::new(RefCell::new(HashMap::new())); + let connections = Arc::new(Mutex::new(HashMap::new())); - let srv = socket.incoming().for_each(|(stream, addr)| { + let srv = socket.incoming().for_each(move |stream| { + + let addr = stream.peer_addr().expect("connected streams should have a peer address"); // We have to clone both of these values, because the `and_then` // function below constructs a new future, `and_then` requires @@ -58,7 +56,6 @@ fn main() { // environment inside the future (AndThen future may overlive our // `for_each` future). let connections_inner = connections.clone(); - let handle_inner = handle.clone(); accept_async(stream).and_then(move |ws_stream| { println!("New WebSocket connection: {}", addr); @@ -67,7 +64,7 @@ fn main() { // send us messages. Then register our address with the stream to send // data to us. let (tx, rx) = futures::sync::mpsc::unbounded(); - connections_inner.borrow_mut().insert(addr, tx); + connections_inner.lock().unwrap().insert(addr, tx); // Let's split the WebSocket stream, so we can work with the // reading and writing halves separately. @@ -81,7 +78,7 @@ fn main() { // For each open connection except the sender, send the // string via the channel. - let mut conns = connections.borrow_mut(); + let mut conns = connections.lock().unwrap(); let iter = conns.iter_mut() .filter(|&(&k, _)| k != addr) .map(|(_, v)| v); @@ -105,8 +102,8 @@ fn main() { let connection = ws_reader.map(|_| ()).map_err(|_| ()) .select(ws_writer.map(|_| ()).map_err(|_| ())); - handle_inner.spawn(connection.then(move |_| { - connections_inner.borrow_mut().remove(&addr); + tokio::spawn(connection.then(move |_| { + connections_inner.lock().unwrap().remove(&addr); println!("Connection {} closed.", addr); Ok(()) })); @@ -119,5 +116,5 @@ fn main() { }); // Execute server. - core.run(srv).unwrap(); + tokio::runtime::run(srv.map_err(|_e| ())); } diff --git a/src/connect.rs b/src/connect.rs index 3136d82..0aab23e 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,16 +1,13 @@ //! Connection helper. extern crate tokio_dns; -extern crate tokio_core; use std::io::Result as IoResult; -use self::tokio_core::net::TcpStream; -use self::tokio_core::reactor::Remote; -use self::tokio_dns::tcp_connect; +use tokio::net::TcpStream; use futures::{future, Future}; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; use tungstenite::Error; use tungstenite::client::url_mode; @@ -36,7 +33,7 @@ mod encryption { use std::io::{Read, Write, Result as IoResult}; use futures::{future, Future}; - use tokio_io::{AsyncRead, AsyncWrite}; + use tokio::io::{AsyncRead, AsyncWrite}; use tungstenite::Error; use tungstenite::stream::Mode; @@ -53,9 +50,9 @@ mod encryption { } pub fn wrap_stream(socket: S, domain: String, mode: Mode) - -> Box, Error=Error>> + -> Box, Error=Error> + Send> where - S: 'static + AsyncRead + AsyncWrite, + S: 'static + AsyncRead + AsyncWrite + Send, { match mode { Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))), @@ -106,10 +103,10 @@ fn domain(request: &Request) -> Result { /// Creates a WebSocket handshake from a request and a stream, /// upgrading the stream to TLS if required. pub fn client_async_tls(request: R, stream: S) - -> Box>, Response), Error=Error>> + -> Box>, Response), Error=Error> + Send> where R: Into>, - S: 'static + AsyncRead + AsyncWrite + NoDelay, + S: 'static + AsyncRead + AsyncWrite + NoDelay + Send, { let request: Request = request.into(); @@ -134,8 +131,8 @@ where } /// Connect to a given URL. -pub fn connect_async(request: R, handle: Remote) - -> Box>, Response), Error=Error>> +pub fn connect_async(request: R) + -> Box>, Response), Error=Error> + Send> where R: Into> { @@ -147,6 +144,6 @@ where }; let port = request.url.port_or_known_default().expect("Bug: port unknown"); - Box::new(tcp_connect((domain.as_str(), port), handle).map_err(|e| e.into()) + Box::new(tokio_dns::TcpStream::connect((domain.as_str(), port)).map_err(|e| e.into()) .and_then(move |socket| client_async_tls(request, socket))) } diff --git a/src/lib.rs b/src/lib.rs index 2ac4528..e2def55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ unused_import_braces)] extern crate futures; -extern crate tokio_io; +extern crate tokio; pub extern crate tungstenite; @@ -29,7 +29,7 @@ pub mod stream; use std::io::ErrorKind; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; use tungstenite::handshake::client::{ClientHandshake, Response, Request}; use tungstenite::handshake::server::{ServerHandshake, Callback, NoCallback}; diff --git a/src/stream.rs b/src/stream.rs index d06cb1e..29476e5 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -10,7 +10,7 @@ use std::io::{Read, Write, Result as IoResult, Error as IoError}; use self::bytes::{Buf, BufMut}; use futures::Poll; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; /// Trait to switch TCP_NODELAY. pub trait NoDelay {