From 42d6771357500bd573f80c8ab00e17f96ecefe67 Mon Sep 17 00:00:00 2001 From: hbgl Date: Sun, 2 Feb 2020 22:49:20 +0100 Subject: [PATCH] Add example of server periodically updating client --- Cargo.toml | 6 ++- examples/README.md | 1 + examples/interval-server.rs | 77 +++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 examples/interval-server.rs diff --git a/Cargo.toml b/Cargo.toml index a3b7431..36aaa68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ version = "0.9" [dev-dependencies] url = "2.0.0" env_logger = "0.7" -async-std = { version = "1.0", features = ["attributes"] } +async-std = { version = "1.0", features = ["attributes", "unstable"] } [[example]] name = "autobahn-client" @@ -95,6 +95,10 @@ required-features = ["async-std-runtime"] name = "echo-server" required-features = ["async-std-runtime"] +[[example]] +name = "interval-server" +required-features = ["async-std-runtime"] + [[example]] name = "gio-echo" required-features = ["gio-runtime"] diff --git a/examples/README.md b/examples/README.md index 36dd7b5..f885548 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,3 +5,4 @@ Examples * [client.rs](https://github.com/sdroege/async-tungstenite/blob/master/examples/client.rs) * [echo-server.rs](https://github.com/sdroege/async-tungstenite/blob/master/examples/echo-server.rs) * [server.rs](https://github.com/sdroege/async-tungstenite/blob/master/examples/server.rs) +* [interval-server.rs](https://github.com/sdroege/async-tungstenite/blob/master/examples/interval-server.rs) diff --git a/examples/interval-server.rs b/examples/interval-server.rs new file mode 100644 index 0000000..f091207 --- /dev/null +++ b/examples/interval-server.rs @@ -0,0 +1,77 @@ +use async_std::net::{TcpListener, TcpStream}; +use async_std::task; +use async_tungstenite::{accept_async, tungstenite::Error}; +use futures::future::{select, Either}; +use futures::{SinkExt, StreamExt}; +use log::*; +use std::net::SocketAddr; +use std::time::Duration; +use tungstenite::{Message, Result}; + +async fn accept_connection(peer: SocketAddr, stream: TcpStream) { + if let Err(e) = handle_connection(peer, stream).await { + match e { + Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (), + err => error!("Error processing connection: {}", err), + } + } +} + +async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> { + let ws_stream = accept_async(stream).await.expect("Failed to accept"); + info!("New WebSocket connection: {}", peer); + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); + let mut interval = async_std::stream::interval(Duration::from_millis(1000)); + + // Echo incoming WebSocket messages and send a message periodically every second. + + let mut msg_fut = ws_receiver.next(); + let mut tick_fut = interval.next(); + loop { + match select(msg_fut, tick_fut).await { + Either::Left((msg, tick_fut_continue)) => { + match msg { + Some(msg) => { + let msg = msg?; + if msg.is_text() || msg.is_binary() { + ws_sender.send(msg).await?; + } else if msg.is_close() { + break; + } + tick_fut = tick_fut_continue; // Continue waiting for tick. + msg_fut = ws_receiver.next(); // Receive next WebSocket message. + } + None => break, // WebSocket stream terminated. + }; + } + Either::Right((_, msg_fut_continue)) => { + ws_sender.send(Message::Text("tick".to_owned())).await?; + msg_fut = msg_fut_continue; // Continue receiving the WebSocket message. + tick_fut = interval.next(); // Wait for next tick. + } + } + } + + Ok(()) +} + +async fn run() { + env_logger::init(); + + let addr = "127.0.0.1:9002"; + let listener = TcpListener::bind(&addr).await.expect("Can't listen"); + info!("Listening on: {}", addr); + + while let Ok((stream, _)) = listener.accept().await { + let peer = stream + .peer_addr() + .expect("connected streams should have a peer address"); + info!("Peer address: {}", peer); + + task::spawn(accept_connection(peer, stream)); + } +} + +fn main() { + task::block_on(run()); +}