From 744c7fd1240224da5542f773174a5de1f0cb2628 Mon Sep 17 00:00:00 2001 From: Nathan Torchia Date: Wed, 8 Jan 2020 16:27:20 +0900 Subject: [PATCH] Added server example --- Cargo.toml | 4 ++ examples/server.rs | 107 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 examples/server.rs diff --git a/Cargo.toml b/Cargo.toml index 7a866cc..72ce465 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,10 @@ required-features = ["async-std-runtime"] name = "autobahn-server" required-features = ["async-std-runtime"] +[[example]] +name = "server" +required-features = ["async-std-runtime"] + [[example]] name = "echo-server" required-features = ["async-std-runtime"] diff --git a/examples/server.rs b/examples/server.rs new file mode 100644 index 0000000..a2c54e0 --- /dev/null +++ b/examples/server.rs @@ -0,0 +1,107 @@ +//! A chat server that broadcasts a message to all connections. +//! +//! This is a simple line-based server which accepts WebSocket connections, +//! reads lines from those connections, and broadcasts the lines to all other +//! connected clients. +//! +//! You can test this out by running: +//! +//! cargo run --example server 127.0.0.1:12345 +//! +//! And then in another window run: +//! +//! cargo run --example client ws://127.0.0.1:12345/ +//! +//! You can run the second command in multiple windows and then chat between the +//! two, seeing the messages from the other client as they're received. For all +//! connected clients they'll all join the same room and see everyone else's +//! messages. + +use std::env; +use std::io::Error; + +use async_std::net::{TcpListener, TcpStream}; +use async_std::task; +use futures::channel::mpsc::UnboundedSender; +use futures::stream::SplitStream; +use futures::{SinkExt, StreamExt}; +use log::*; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use tungstenite::protocol::Message; + +type Tx = UnboundedSender; + +type WsRx = SplitStream>; + +type PeerMap = Arc>>; + +async fn handle_message(addr: SocketAddr, peer_map: PeerMap, ws_rx: WsRx) { + let mut ws_rx = ws_rx; + while let Some(msg) = ws_rx.next().await { + let msg = msg.expect("Failed to get response"); + info!( + "Received a message from {}: {}", + addr, + msg.to_text().unwrap() + ); + + for peer in peer_map.lock().unwrap().iter_mut() { + let _ = peer.1.unbounded_send(msg.clone()); + } + } +} + +async fn accept_connection(peer_map: PeerMap, raw_stream: TcpStream) { + let addr = raw_stream + .peer_addr() + .expect("connected streams should have a peer address"); + info!("Peer address: {}", addr); + + let ws_stream = async_tungstenite::accept_async(raw_stream) + .await + .expect("Error during the websocket handshake occurred"); + + info!("New WebSocket connection: {}", addr); + + let (mut ws_tx, ws_rx) = ws_stream.split(); + let (msg_tx, mut msg_rx) = futures::channel::mpsc::unbounded(); + + //We want to be able to send data to a specific peer, use this tx. + peer_map.lock().unwrap().insert(addr, msg_tx); + + //Handle incoming messages + task::spawn(handle_message(addr, peer_map.clone(), ws_rx)); + + //If msg_rx get's some data, send it on the websocket. + while let Some(msg) = msg_rx.next().await { + info!("Sending message to {}", addr); + ws_tx.send(msg).await.expect("Failed to send request"); + } +} + +async fn run() -> Result<(), Error> { + let _ = env_logger::try_init(); + let addr = env::args() + .nth(1) + .unwrap_or_else(|| "127.0.0.1:8080".to_string()); + + let hm: HashMap = HashMap::new(); + let state = PeerMap::new(Mutex::new(hm)); + + // Create the event loop and TCP listener we'll accept connections on. + let try_socket = TcpListener::bind(&addr).await; + let listener = try_socket.expect("Failed to bind"); + info!("Listening on: {}", addr); + + while let Ok((stream, _)) = listener.accept().await { + task::spawn(accept_connection(state.clone(), stream)); + } + + Ok(()) +} + +fn main() -> Result<(), Error> { + task::block_on(run()) +}