diff --git a/examples/client.rs b/examples/client.rs index ab5bf71..0ee800c 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -12,8 +12,8 @@ use std::env; -use futures::{SinkExt, StreamExt}; -use log::*; +use futures::{future, pin_mut, StreamExt}; +use log::info; use tungstenite::protocol::Message; use async_std::io; @@ -24,44 +24,32 @@ use async_tungstenite::connect_async; async fn run() { let _ = env_logger::try_init(); - // Specify the server address to which the client will be connecting. let connect_addr = env::args() .nth(1) .unwrap_or_else(|| panic!("this program requires at least one argument")); let url = url::Url::parse(&connect_addr).unwrap(); - // Spawn a new task that will will read data from stdin and then send it to the event loop over - // a standard futures channel. - let (stdin_tx, mut stdin_rx) = futures::channel::mpsc::unbounded(); + let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded(); task::spawn(read_stdin(stdin_tx)); - // After the TCP connection has been established, we set up our client to - // start forwarding data. - // - // First we do a WebSocket handshake on a TCP stream, i.e. do the upgrade - // request. - // - // Half of the work we're going to do is to take all data we receive on - // stdin (`stdin_rx`) and send that along the WebSocket stream (`sink`). - // The second half is to take all the data we receive (`stream`) and then - // write that to stdout. Currently we just write to stdout in a synchronous - // fashion. - // - // Finally we set the client to terminate once either half of this work - // finishes. If we don't have any more data to read or we won't receive any - // more work from the remote then we can exit. - let mut stdout = io::stdout(); - let (mut ws_stream, _) = connect_async(url).await.expect("Failed to connect"); + let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); info!("WebSocket handshake has been successfully completed"); - while let Some(msg) = stdin_rx.next().await { - ws_stream.send(msg).await.expect("Failed to send request"); - if let Some(msg) = ws_stream.next().await { - let msg = msg.expect("Failed to get response"); - stdout.write_all(&msg.into_data()).await.unwrap(); - } - } + let (write, read) = ws_stream.split(); + + let stdin_to_ws = stdin_rx.map(|msg| Ok(msg)).forward(write); + let ws_to_stdout = { + read.for_each(|message| { + async { + let data = message.unwrap().into_data(); + async_std::io::stdout().write_all(&data).await.unwrap(); + } + }) + }; + + pin_mut!(stdin_to_ws, ws_to_stdout); + future::select(stdin_to_ws, ws_to_stdout).await; } // Our helper method which will read data from stdin and send it along the diff --git a/examples/echo-server.rs b/examples/echo-server.rs new file mode 100644 index 0000000..43af835 --- /dev/null +++ b/examples/echo-server.rs @@ -0,0 +1,60 @@ +//! A simple echo server. +//! +//! You can test this out by running: +//! +//! cargo run --example server 127.0.0.1:12345 +//! +//! And then in another window run: +//! +//! cargo run --example client ws://127.0.0.1:12345/ + +use std::{env, io::Error}; + +use async_std::net::{TcpListener, TcpStream, ToSocketAddrs}; +use async_std::task; +use futures::StreamExt; +use log::info; + +async fn run() -> Result<(), Error> { + let _ = env_logger::try_init(); + let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); + let addr = addr + .to_socket_addrs() + .await + .expect("Not a valid address") + .next() + .expect("Not a socket address"); + + // Create the event loop and TCP listener we'll accept connections on. + let try_socket = TcpListener::bind(&addr).await; + let listener = try_socket.expect("Failed to bind"); + info!("Listening on: {}", addr); + + while let Ok((stream, _)) = listener.accept().await { + task::spawn(accept_connection(stream)); + } + + Ok(()) +} + +async fn accept_connection(stream: TcpStream) { + let addr = stream + .peer_addr() + .expect("connected streams should have a peer address"); + info!("Peer address: {}", addr); + + let ws_stream = async_tungstenite::accept_async(stream) + .await + .expect("Error during the websocket handshake occurred"); + + info!("New WebSocket connection: {}", addr); + + let (write, read) = ws_stream.split(); + read.forward(write) + .await + .expect("Failed to forward message") +} + +fn main() -> Result<(), Error> { + task::block_on(run()) +} diff --git a/examples/server.rs b/examples/server.rs deleted file mode 100644 index 5bdf76b..0000000 --- a/examples/server.rs +++ /dev/null @@ -1,107 +0,0 @@ -//! A chat server that broadcasts a message to all connections. -//! -//! This is a simple line-based server which accepts WebSocket connections, -//! reads lines from those connections, and broadcasts the lines to all other -//! connected clients. -//! -//! You can test this out by running: -//! -//! cargo run --example server 127.0.0.1:12345 -//! -//! And then in another window run: -//! -//! cargo run --example client ws://127.0.0.1:12345/ -//! -//! You can run the second command in multiple windows and then chat between the -//! two, seeing the messages from the other client as they're received. For all -//! connected clients they'll all join the same room and see everyone else's -//! messages. - -use std::env; -use std::io::Error; - -use async_std::net::{SocketAddr, ToSocketAddrs}; -use async_std::net::{TcpListener, TcpStream}; -use async_std::task; -use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; -use futures::{SinkExt, StreamExt}; -use log::*; -use tungstenite::protocol::Message; - -struct Connection { - addr: SocketAddr, - rx: UnboundedReceiver, - tx: UnboundedSender, -} - -async fn handle_connection(connection: Connection) { - let mut connection = connection; - while let Some(msg) = connection.rx.next().await { - info!("Received a message from {}: {}", connection.addr, msg); - connection - .tx - .unbounded_send(msg) - .expect("Failed to forward message"); - } -} - -async fn accept_connection(stream: TcpStream) { - let addr = stream - .peer_addr() - .expect("connected streams should have a peer address"); - info!("Peer address: {}", addr); - - let mut ws_stream = async_tungstenite::accept_async(stream) - .await - .expect("Error during the websocket handshake occurred"); - - info!("New WebSocket connection: {}", addr); - - // Create a channel for our stream, which other sockets will use to - // send us messages. Then register our address with the stream to send - // data to us. - let (msg_tx, msg_rx) = futures::channel::mpsc::unbounded(); - let (response_tx, mut response_rx) = futures::channel::mpsc::unbounded(); - let c = Connection { - addr: addr, - rx: msg_rx, - tx: response_tx, - }; - task::spawn(handle_connection(c)); - - while let Some(message) = ws_stream.next().await { - let message = message.expect("Failed to get request"); - msg_tx - .unbounded_send(message) - .expect("Failed to forward request"); - if let Some(resp) = response_rx.next().await { - ws_stream.send(resp).await.expect("Failed to send response"); - } - } -} - -async fn run() -> Result<(), Error> { - let _ = env_logger::try_init(); - let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); - let addr = addr - .to_socket_addrs() - .await - .expect("Not a valid address") - .next() - .expect("Not a socket address"); - - // Create the event loop and TCP listener we'll accept connections on. - let try_socket = TcpListener::bind(&addr).await; - let listener = try_socket.expect("Failed to bind"); - info!("Listening on: {}", addr); - - while let Ok((stream, _)) = listener.accept().await { - task::spawn(accept_connection(stream)); - } - - Ok(()) -} - -fn main() -> Result<(), Error> { - task::block_on(run()) -} diff --git a/examples/split-client.rs b/examples/split-client.rs deleted file mode 100644 index 91a27f6..0000000 --- a/examples/split-client.rs +++ /dev/null @@ -1,84 +0,0 @@ -//! A simple example of hooking up stdin/stdout to a WebSocket stream. -//! -//! This example will connect to a server specified in the argument list and -//! then forward all data read on stdin to the server, printing out all data -//! received on stdout. -//! -//! Note that this is not currently optimized for performance, especially around -//! buffer management. Rather it's intended to show an example of working with a -//! client. -//! -//! You can use this example together with the `server` example. - -use std::env; - -use async_std::io; -use async_std::prelude::*; -use async_std::task; -use async_tungstenite::connect_async; -use futures::{SinkExt, StreamExt}; -use log::*; -use tungstenite::protocol::Message; - -async fn run() { - let _ = env_logger::try_init(); - - // Specify the server address to which the client will be connecting. - let connect_addr = env::args() - .nth(1) - .unwrap_or_else(|| panic!("this program requires at least one argument")); - - let url = url::Url::parse(&connect_addr).unwrap(); - - // Spawn a new task that will read data from stdin and then send it to the event loop over a - // standard futures channel. - let (stdin_tx, mut stdin_rx) = futures::channel::mpsc::unbounded(); - task::spawn(read_stdin(stdin_tx)); - - // After the TCP connection has been established, we set up our client to - // start forwarding data. - // - // First we do a WebSocket handshake on a TCP stream, i.e. do the upgrade - // request. - // - // Half of the work we're going to do is to take all data we receive on - // stdin (`stdin_rx`) and send that along the WebSocket stream (`sink`). - // The second half is to take all the data we receive (`stream`) and then - // write that to stdout. Currently we just write to stdout in a synchronous - // fashion. - // - // Finally we set the client to terminate once either half of this work - // finishes. If we don't have any more data to read or we won't receive any - // more work from the remote then we can exit. - let mut stdout = io::stdout(); - let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); - let (mut ws_tx, mut ws_rx) = ws_stream.split(); - info!("WebSocket handshake has been successfully completed"); - - while let Some(msg) = stdin_rx.next().await { - ws_tx.send(msg).await.expect("Failed to send request"); - if let Some(msg) = ws_rx.next().await { - let msg = msg.expect("Failed to get response"); - stdout.write_all(&msg.into_data()).await.unwrap(); - } - } -} - -// Our helper method which will read data from stdin and send it along the -// sender provided. -async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender) { - let mut stdin = io::stdin(); - loop { - let mut buf = vec![0; 1024]; - let n = match stdin.read(&mut buf).await { - Err(_) | Ok(0) => break, - Ok(n) => n, - }; - buf.truncate(n); - tx.unbounded_send(Message::binary(buf)).unwrap(); - } -} - -fn main() { - task::block_on(run()) -}