From c02a5399b20261c5834d3c2214be39b03b64ede7 Mon Sep 17 00:00:00 2001 From: Daniel Abramov Date: Thu, 9 Jan 2020 13:51:47 +0100 Subject: [PATCH] examples: improve server example --- examples/server.rs | 103 +++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 51 deletions(-) diff --git a/examples/server.rs b/examples/server.rs index a2c54e0..243146b 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -17,91 +17,92 @@ //! connected clients they'll all join the same room and see everyone else's //! messages. -use std::env; -use std::io::Error; +use std::{ + collections::HashMap, + env, + io::Error as IoError, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use futures::{ + channel::mpsc::{unbounded, UnboundedSender}, + future, pin_mut, + stream::TryStreamExt, + StreamExt, +}; use async_std::net::{TcpListener, TcpStream}; use async_std::task; -use futures::channel::mpsc::UnboundedSender; -use futures::stream::SplitStream; -use futures::{SinkExt, StreamExt}; -use log::*; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; use tungstenite::protocol::Message; type Tx = UnboundedSender; +type PeerMap = Arc>>; -type WsRx = SplitStream>; +async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr) { + println!("Incoming TCP connection from: {}", addr); -type PeerMap = Arc>>; + let ws_stream = async_tungstenite::accept_async(raw_stream) + .await + .expect("Error during the websocket handshake occurred"); + println!("WebSocket connection established: {}", addr); -async fn handle_message(addr: SocketAddr, peer_map: PeerMap, ws_rx: WsRx) { - let mut ws_rx = ws_rx; - while let Some(msg) = ws_rx.next().await { - let msg = msg.expect("Failed to get response"); - info!( + // Insert the write part of this peer to the peer map. + let (tx, rx) = unbounded(); + peer_map.lock().unwrap().insert(addr, tx); + + let (outgoing, incoming) = ws_stream.split(); + + let broadcast_incoming = incoming.try_for_each(|msg| { + println!( "Received a message from {}: {}", addr, msg.to_text().unwrap() ); + let peers = peer_map.lock().unwrap(); - for peer in peer_map.lock().unwrap().iter_mut() { - let _ = peer.1.unbounded_send(msg.clone()); - } - } -} - -async fn accept_connection(peer_map: PeerMap, raw_stream: TcpStream) { - let addr = raw_stream - .peer_addr() - .expect("connected streams should have a peer address"); - info!("Peer address: {}", addr); - - let ws_stream = async_tungstenite::accept_async(raw_stream) - .await - .expect("Error during the websocket handshake occurred"); + // We want to broadcast the message to everyone except ourselves. + let broadcast_recipients = peers + .iter() + .filter(|(peer_addr, _)| peer_addr != &&addr) + .map(|(_, ws_sink)| ws_sink); - info!("New WebSocket connection: {}", addr); + for recp in broadcast_recipients { + recp.unbounded_send(msg.clone()).unwrap(); + } - let (mut ws_tx, ws_rx) = ws_stream.split(); - let (msg_tx, mut msg_rx) = futures::channel::mpsc::unbounded(); + future::ok(()) + }); - //We want to be able to send data to a specific peer, use this tx. - peer_map.lock().unwrap().insert(addr, msg_tx); + let receive_from_others = rx.map(Ok).forward(outgoing); - //Handle incoming messages - task::spawn(handle_message(addr, peer_map.clone(), ws_rx)); + pin_mut!(broadcast_incoming, receive_from_others); + future::select(broadcast_incoming, receive_from_others).await; - //If msg_rx get's some data, send it on the websocket. - while let Some(msg) = msg_rx.next().await { - info!("Sending message to {}", addr); - ws_tx.send(msg).await.expect("Failed to send request"); - } + println!("{} disconnected", &addr); + peer_map.lock().unwrap().remove(&addr); } -async fn run() -> Result<(), Error> { - let _ = env_logger::try_init(); +async fn run() -> Result<(), IoError> { let addr = env::args() .nth(1) .unwrap_or_else(|| "127.0.0.1:8080".to_string()); - let hm: HashMap = HashMap::new(); - let state = PeerMap::new(Mutex::new(hm)); + let state = PeerMap::new(Mutex::new(HashMap::new())); // Create the event loop and TCP listener we'll accept connections on. let try_socket = TcpListener::bind(&addr).await; let listener = try_socket.expect("Failed to bind"); - info!("Listening on: {}", addr); + println!("Listening on: {}", addr); - while let Ok((stream, _)) = listener.accept().await { - task::spawn(accept_connection(state.clone(), stream)); + // Let's spawn the handling of each connection in a separate task. + while let Ok((stream, addr)) = listener.accept().await { + task::spawn(handle_connection(state.clone(), stream, addr)); } Ok(()) } -fn main() -> Result<(), Error> { +fn main() -> Result<(), IoError> { task::block_on(run()) }