//! A chat server that broadcasts a message to all connections. //! //! This is a simple line-based server which accepts WebSocket connections, //! reads lines from those connections, and broadcasts the lines to all other //! connected clients. //! //! You can test this out by running: //! //! cargo run --features="async-std-runtime" --example server 127.0.0.1:12345 //! //! And then in another window run: //! //! cargo run --features="async-std-runtime" --example client ws://127.0.0.1:12345/ //! //! You can run the second command in multiple windows and then chat between the //! two, seeing the messages from the other client as they're received. For all //! connected clients they'll all join the same room and see everyone else's //! messages. use std::{ collections::HashMap, env, io::Error as IoError, net::SocketAddr, sync::{Arc, Mutex}, }; use futures::prelude::*; use futures::{ channel::mpsc::{unbounded, UnboundedSender}, future, pin_mut, }; use async_std::net::{TcpListener, TcpStream}; use async_std::task; use async_tungstenite::tungstenite::protocol::Message; type Tx = UnboundedSender; type PeerMap = Arc>>; async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) { println!("Incoming TCP connection from: {}", addr); let ws_stream = async_tungstenite::accept_async(raw_stream) .await .expect("Error during the websocket handshake occurred"); println!("WebSocket connection established: {}", addr); // Insert the write part of this peer to the peer map. let (tx, rx) = unbounded(); peer_map.lock().unwrap().insert(addr, tx); let (outgoing, incoming) = ws_stream.split(); let broadcast_incoming = incoming .try_filter(|msg| { // Broadcasting a Close message from one client // will close the other clients. future::ready(!msg.is_close()) }) .try_for_each(|msg| { println!( "Received a message from {}: {}", addr, msg.to_text().unwrap() ); let peers = peer_map.lock().unwrap(); // We want to broadcast the message to everyone except ourselves. let broadcast_recipients = peers .iter() .filter(|(peer_addr, _)| peer_addr != &&addr) .map(|(_, ws_sink)| ws_sink); for recp in broadcast_recipients { recp.unbounded_send(msg.clone()).unwrap(); } future::ok(()) }); let receive_from_others = rx.map(Ok).forward(outgoing); pin_mut!(broadcast_incoming, receive_from_others); future::select(broadcast_incoming, receive_from_others).await; println!("{} disconnected", &addr); peer_map.lock().unwrap().remove(&addr); } async fn run() -> Result<(), IoError> { let addr = env::args() .nth(1) .unwrap_or_else(|| "127.0.0.1:8080".to_string()); let state = PeerMap::new(Mutex::new(HashMap::new())); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&addr).await; let listener = try_socket.expect("Failed to bind"); println!("Listening on: {}", addr); // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { task::spawn(handle_connection(state.clone(), stream, addr)); } Ok(()) } fn main() -> Result<(), IoError> { task::block_on(run()) }