From a3e6ec98a01dbddd840263949aec7ef3bf0b7c5b Mon Sep 17 00:00:00 2001 From: Niko Date: Mon, 1 May 2023 17:40:00 +0300 Subject: [PATCH] broker singleton and timeout management (for tests and prod) --- Cargo.lock | 6 + README.md | 23 +-- ng-app-js/Cargo.toml | 7 +- ng-app-js/src/lib.rs | 42 +++-- p2p-client-ws/Cargo.toml | 3 - p2p-client-ws/src/remote_ws.rs | 110 ++++++++----- p2p-client-ws/src/remote_ws_wasm.rs | 142 ++++++++++------- p2p-net/Cargo.toml | 6 +- p2p-net/src/broker.rs | 229 +++++++++++++++++++++++++--- p2p-net/src/connection.rs | 55 +++++-- p2p-net/src/errors.rs | 2 + p2p-net/src/lib.rs | 12 ++ 12 files changed, 484 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbf77b9..027416c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1076,6 +1076,8 @@ dependencies = [ "debug_print", "futures", "getrandom 0.1.16", + "gloo-timers", + "js-sys", "p2p-client-ws", "p2p-net", "p2p-repo", @@ -1085,7 +1087,9 @@ dependencies = [ "serde_bytes", "snow", "wasm-bindgen", + "wasm-bindgen-futures", "wasm-bindgen-test", + "web-sys", "ws_stream_wasm", ] @@ -1249,7 +1253,9 @@ dependencies = [ "blake3", "debug_print", "futures", + "gloo-timers", "num_enum", + "once_cell", "p2p-repo", "serde", "serde_bare", diff --git a/README.md b/README.md index e288e44..3fee3cf 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ This repository is in active development at [https://git.nextgraph.org/NextGraph > NextGraph brings about the convergence between P2P and Semantic Web technologies, towards a decentralized, secure and privacy-preserving cloud, based on CRDTs. > > This open source ecosystem provides solutions for end-users and software developers alike, wishing to use or create **decentralized** apps featuring: **live collaboration** on rich-text documents, peer to peer communication with end-to-end encryption, offline-first, **local-first**, portable and interoperable data, total ownership of data and software, security and privacy. Centered on repositories containing **semantic data** (RDF), **rich text**, and structured data formats like **JSON**, synced between peers belonging to permissioned groups of users, it offers strong eventual consistency, thanks to the use of **CRDTs**. Documents can be linked together, signed, shared securely, queried using the **SPARQL** language and organized into sites and containers. -> +> > More info here [https://nextgraph.org](https://nextgraph.org) ## Support @@ -21,13 +21,14 @@ And our community forum where you can ask questions is here [https://forum.nextg ## How to use NextGraph NextGraph is not ready yet. You can subscribe to [our newsletter](https://list.nextgraph.org/subscription/form) to get updates, and support us with a [donation](https://nextgraph.org/donate/). + ## For developers Read our [getting started guide](https://docs.nextgraph.org/en/getting-started/). ## For contributors -- [Install Rust](https://www.rust-lang.org/tools/install) +- [Install Rust](https://www.rust-lang.org/tools/install) minimum required 1.64.0 - [Install Nodejs](https://nodejs.org/en/download/) ``` @@ -36,11 +37,11 @@ cargo install wasm-pack git clone git@git.nextgraph.org:NextGraph/nextgraph-rs.git cd nextgraph-rs cargo build -``` +``` ### Packages -The crates are organized as follow : +The crates are organized as follow : - p2p-repo : all the common types, traits and structs for the P2P repositories - p2p-net : all the common types, traits and structs for the P2P networks @@ -81,17 +82,20 @@ cargo test --package p2p-repo --lib -- branch::test --nocapture ``` Test end-to-end client and server: -``` + +``` cargo test --package ngcli -- --nocapture -``` +``` Test WASM websocket + ``` cd ng-app-js wasm-pack test --chrome --headless ``` Test Rust websocket + ``` cargo test --package p2p-client-ws --lib -- --nocapture ``` @@ -123,9 +127,10 @@ additional terms or conditions. ## License Licensed under either of - * Apache License, Version 2.0 ([LICENSE-APACHE2](LICENSE-APACHE2) or http://www.apache.org/licenses/LICENSE-2.0) - * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) -at your option. + +- Apache License, Version 2.0 ([LICENSE-APACHE2](LICENSE-APACHE2) or http://www.apache.org/licenses/LICENSE-2.0) +- MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + at your option. `SPDX-License-Identifier: Apache-2.0 OR MIT` diff --git a/ng-app-js/Cargo.toml b/ng-app-js/Cargo.toml index b4c650e..ec425bf 100644 --- a/ng-app-js/Cargo.toml +++ b/ng-app-js/Cargo.toml @@ -34,8 +34,11 @@ getrandom = { version = "0.1.1", features = ["wasm-bindgen"] } # version = "0.2.7" # features = ["js"] -# [target.'cfg(target_arch = "wasm32")'.dependencies] -# wasm-bindgen-futures = "0.4.34" +[target.'cfg(target_arch = "wasm32")'.dependencies] +js-sys = "0.3.61" +wasm-bindgen-futures = "0.4.34" +web-sys = { version = "0.3.61", features = ["Window"] } +gloo-timers = "0.2.6" [dev-dependencies] wasm-bindgen-test = "^0.3" \ No newline at end of file diff --git a/ng-app-js/src/lib.rs b/ng-app-js/src/lib.rs index cfdafc9..cce70f4 100644 --- a/ng-app-js/src/lib.rs +++ b/ng-app-js/src/lib.rs @@ -1,10 +1,11 @@ use async_std::task; +use js_sys::Reflect; #[cfg(target_arch = "wasm32")] use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket; use p2p_net::broker::*; -use p2p_net::log; -use p2p_net::types::IP; +use p2p_net::types::{DirectPeerId, IP}; use p2p_net::utils::{spawn_and_log_error, ResultSend}; +use p2p_net::{log, sleep}; use p2p_repo::utils::generate_keypair; use std::net::IpAddr; use std::str::FromStr; @@ -18,19 +19,20 @@ extern "C" { #[cfg(target_arch = "wasm32")] #[wasm_bindgen] -pub fn greet(name: &str) { +pub async fn greet(name: &str) { log!("I say: {}", name); let mut random_buf = [0u8; 32]; getrandom::getrandom(&mut random_buf).unwrap(); //spawn_and_log_error(testt("ws://127.0.0.1:3012")); async fn method() -> ResultSend<()> { log!("start connecting"); - let cnx = Arc::new(ConnectionWebSocket {}); + //let cnx = Arc::new(); let (priv_key, pub_key) = generate_keypair(); - let broker = Broker::new(); - let res = broker + let res = BROKER + .write() + .await .connect( - cnx, + Box::new(ConnectionWebSocket {}), IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), None, priv_key, @@ -39,10 +41,32 @@ pub fn greet(name: &str) { ) .await; log!("broker.connect : {:?}", res); + BROKER.read().await.print_status(); + //res.expect_throw("assume the connection succeeds"); + + async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> { + async move { + sleep!(std::time::Duration::from_secs(10)); + log!("timeout"); + BROKER + .write() + .await + .close_peer_connection(&remote_peer_id) + .await; + } + .await; + Ok(()) + } + spawn_and_log_error(timer_close(pub_key)); + + //Broker::graceful_shutdown().await; + + Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(12)).await; + Ok(()) } - spawn_and_log_error(method()); + spawn_and_log_error(method()).await; //spawn_and_log_error(Arc::clone(&cnx).open("ws://127.0.0.1:3012", priv_key, pub_key)); } @@ -69,6 +93,6 @@ mod test { #[wasm_bindgen_test] pub async fn test_greet() { - greet("test"); + greet("test").await; } } diff --git a/p2p-client-ws/Cargo.toml b/p2p-client-ws/Cargo.toml index 39cfe85..7a0b72f 100644 --- a/p2p-client-ws/Cargo.toml +++ b/p2p-client-ws/Cargo.toml @@ -31,9 +31,6 @@ wasm-bindgen-test = "^0.3" version = "0.2.7" features = ["js"] -# [target.'cfg(target_arch = "wasm32")'.dependencies] - - [target.'cfg(not(target_arch = "wasm32"))'.dependencies] getrandom = "0.2.7" xactor = "0.7.11" diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index 56f8ccd..6f7199c 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -21,8 +21,8 @@ use debug_print::*; use async_std::sync::Mutex; use futures::io::Close; -use futures::FutureExt; use futures::{future, pin_mut, select, stream, StreamExt}; +use futures::{FutureExt, SinkExt}; use async_std::task; use p2p_net::errors::*; @@ -42,13 +42,13 @@ pub struct ConnectionWebSocket {} #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] impl IConnection for ConnectionWebSocket { async fn open( - self: Arc, + &self, ip: IP, peer_pubk: PrivKey, peer_privk: PubKey, remote_peer: DirectPeerId, - ) -> Result<(), NetError> { - let mut cnx = ConnectionBase::new(ConnectionDir::Client); + ) -> Result { + let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let url = format!("ws://{}:{}", ip, WS_PORT); @@ -108,23 +108,33 @@ impl IConnection for ConnectionWebSocket { cnx.start_read_loop(); let s = cnx.take_sender(); let r = cnx.take_receiver(); - + let mut shutdown = cnx.set_shutdown(); //let ws_in_task = Arc::clone(&ws); - task::spawn(async move { + let join = task::spawn(async move { debug_println!("START of WS loop"); //let w = ws_in_task.lock().await; - ws_loop(websocket, s, r).await; + let res = ws_loop(websocket, s, r).await; // .close(Some(CloseFrame { // code: CloseCode::Library(4000), // reason: std::borrow::Cow::Borrowed(""), // })) // .await; + if res.is_err() { + let _ = shutdown.send(res.err().unwrap()).await; + } debug_println!("END of WS loop"); }); //spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver())); log!("sending..."); + + // + + //cnx.close().await; + + //// let res = cnx.join_shutdown().await; + //// log!("JOIN SHUTDOWN {:?}", res); // cnx.send(ConnectionCommand::Close).await; // cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start( @@ -146,20 +156,16 @@ impl IConnection for ConnectionWebSocket { //log!("WS closed {:?}", last_event.clone()); - //Ok(cnx) - Ok(()) + Ok(cnx) + //Ok(()) } } } - async fn accept(&mut self) -> Result<(), NetError> { - let cnx = ConnectionBase::new(ConnectionDir::Server); + async fn accept(&self) -> Result { + let cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS); - Ok(()) - } - - fn tp(&self) -> TransportProtocol { - TransportProtocol::WS + unimplemented!(); } } @@ -169,8 +175,19 @@ async fn close_ws( code: u16, reason: &str, ) -> Result<(), NetError> { - log!("close_ws"); - let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await; + log!("close_ws {:?}", code); + + let cmd = if code == 1000 { + ConnectionCommand::Close + } else if code < 4000 { + ConnectionCommand::Error(NetError::WsError) + } else if code < 4950 { + ConnectionCommand::ProtocolError(ProtocolError::try_from(code - 4000).unwrap()) + } else { + ConnectionCommand::Error(NetError::try_from(code - 4949).unwrap()) + }; + + let _ = futures::SinkExt::send(receiver, cmd).await; stream .close(Some(CloseFrame { code: CloseCode::Library(code), @@ -200,8 +217,8 @@ async fn ws_loop( log!("GOT MESSAGE {:?}", msg); if msg.is_close() { - if let Message::Close(Some(cf)) = msg { - log!("CLOSE from server: {}",cf.reason); + if let Message::Close(Some(cf)) = msg { + log!("CLOSE from server with closeframe: {}",cf.reason); let last_command = match cf.code { CloseCode::Normal => ConnectionCommand::Close, @@ -227,9 +244,9 @@ async fn ws_loop( futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::(&msg.into_data())?)).await .map_err(|_e| NetError::IoError)?; } - return Ok(ProtocolError::Closing); + //return Ok(ProtocolError::Closing); }, - Some(Err(e)) => break, + Some(Err(e)) => {log!("GOT ERROR {:?}",e);return Err(NetError::WsError);}, None => break }, s = sender.next().fuse() => match s { @@ -261,6 +278,7 @@ async fn ws_loop( match inner_loop(&mut ws, sender, &mut receiver).await { Ok(proto_err) => { if proto_err == ProtocolError::Closing { + //FIXME: remove this case ws.close(None).await.map_err(|_e| NetError::WsError)?; } else if proto_err == ProtocolError::NoError { close_ws(&mut ws, &mut receiver, 1000, "").await?; @@ -295,9 +313,9 @@ mod test { use async_std::task; use p2p_net::broker::*; use p2p_net::errors::NetError; - use p2p_net::log; use p2p_net::types::IP; use p2p_net::utils::{spawn_and_log_error, ResultSend}; + use p2p_net::{log, sleep}; use p2p_repo::utils::generate_keypair; use std::net::IpAddr; use std::str::FromStr; @@ -309,22 +327,42 @@ mod test { //spawn_and_log_error(testt("ws://127.0.0.1:3012")); log!("start connecting"); - let cnx = Arc::new(ConnectionWebSocket {}); let (priv_key, pub_key) = generate_keypair(); - let broker = Broker::new(); - let res = broker - .connect( - cnx, - IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), - None, - priv_key, - pub_key, - pub_key, - ) + { + let res = BROKER + .write() + .await + .connect( + Box::new(ConnectionWebSocket {}), + IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), + None, + priv_key, + pub_key, + pub_key, + ) + .await; + log!("broker.connect : {:?}", res); + //res.expect_throw("assume the connection succeeds"); + } + + async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> { + async move { + sleep!(std::time::Duration::from_secs(10)); + log!("timeout"); + BROKER + .write() + .await + .close_peer_connection(&remote_peer_id) + .await; + } .await; - log!("broker.connect : {:?}", res); - //res.expect_throw("assume the connection succeeds"); + Ok(()) + } + spawn_and_log_error(timer_close(pub_key)); + + //Broker::graceful_shutdown().await; + Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(12)).await; Ok(()) } } diff --git a/p2p-client-ws/src/remote_ws_wasm.rs b/p2p-client-ws/src/remote_ws_wasm.rs index 22fcfe1..f9039ad 100644 --- a/p2p-client-ws/src/remote_ws_wasm.rs +++ b/p2p-client-ws/src/remote_ws_wasm.rs @@ -34,19 +34,15 @@ pub struct ConnectionWebSocket {} #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] impl IConnection for ConnectionWebSocket { - fn tp(&self) -> TransportProtocol { - TransportProtocol::WS - } - async fn open( - self: Arc, + &self, ip: IP, peer_pubk: PrivKey, peer_privk: PubKey, remote_peer: DirectPeerId, - ) -> Result<(), NetError> { + ) -> Result { //pub async fn testt(url: &str) -> ResultSend<()> { - let mut cnx = ConnectionBase::new(ConnectionDir::Client); + let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let url = format!("ws://{}:{}", ip, WS_PORT); @@ -55,75 +51,62 @@ impl IConnection for ConnectionWebSocket { NetError::ConnectionError })?; - let mut evts = ws - .observe(ObserveConfig::default()) - //.observe(Filter::Pointer(WsEvent::is_closed).into()) - .await - .expect_throw("observe"); - //let (mut sender_tx, sender_rx) = mpsc::unbounded(); //let (mut receiver_tx, receiver_rx) = mpsc::unbounded(); cnx.start_read_loop(); + let mut shutdown = cnx.set_shutdown(); - spawn_and_log_error(ws_loop(ws, wsio, cnx.take_sender(), cnx.take_receiver())); + spawn_and_log_error(ws_loop( + ws, + wsio, + cnx.take_sender(), + cnx.take_receiver(), + shutdown, + )); //spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone())); log!("sending..."); + + //cnx.close().await; + // spawn_and_log_error(async move { + // TimeoutFuture::new(10_000).await; + // cnx.close().await; + + // Ok(()) + // // // Do something here after the one second timeout is up! + // }); // cnx.send(ConnectionCommand::Close).await; // cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start( // StartProtocol::Auth(ClientHello::V0()), // ))) // .await; - - cnx.close().await; - + log!("waiting..."); + //let res = join.next().await; + log!("finished..."); + //log!("JOIN SHUTDOWN {:?}", res); // Note that since WsMeta::connect resolves to an opened connection, we don't see // any Open events here. // - //assert!(evts.next().await.unwrap_throw().is_closing()); - let last_event = evts.next().await; - log!("WS closed {:?}", last_event.clone()); - - let last_command = match last_event { - None => ConnectionCommand::Close, - Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen - Some(WsEvent::Error) => ConnectionCommand::Error(NetError::ConnectionError), - Some(WsEvent::Closing) => ConnectionCommand::Close, - Some(WsEvent::Closed(ce)) => { - if ce.code == 1000 { - ConnectionCommand::Close - } else if ce.code < 4000 { - ConnectionCommand::Error(NetError::WsError) - } else if ce.code < 4950 { - ConnectionCommand::ProtocolError( - ProtocolError::try_from(ce.code - 4000).unwrap(), - ) - } else { - ConnectionCommand::Error(NetError::try_from(ce.code - 4949).unwrap()) - } - } - Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError), - }; - let _ = cnx.inject(last_command).await; - let _ = cnx.close_streams().await; + //assert!(events.next().await.unwrap_throw().is_closing()); //Ok(cnx) - Ok(()) + Ok(cnx) } - async fn accept(&mut self) -> Result<(), NetError> { + async fn accept(&self) -> Result { !unimplemented!() } } async fn ws_loop( - ws: WsMeta, + mut ws: WsMeta, mut stream: WsStream, sender: Receiver, - receiver: Sender, + mut receiver: Sender, + mut shutdown: Sender, ) -> ResultSend<()> { async fn inner_loop( stream: &mut WsStream, @@ -132,6 +115,7 @@ async fn ws_loop( ) -> Result { //let mut rx_sender = sender.fuse(); loop { + log!("BEFORE SELECT"); select! { r = stream.next().fuse() => match r { Some(msg) => { @@ -170,29 +154,73 @@ async fn ws_loop( }, } } + log!("END SELECT"); Ok(ProtocolError::NoError) } - match inner_loop(&mut stream, sender, receiver).await { + + let mut events = ws + .observe(ObserveConfig::default()) + //.observe(Filter::Pointer(WsEvent::is_closed).into()) + .await + .expect_throw("observe"); + log!("OBSERVED"); + match inner_loop(&mut stream, sender, receiver.clone()).await { Ok(proto_err) => { if proto_err == ProtocolError::NoError { - ws.close_code(1000).await.map_err(|_e| NetError::WsError)?; + let _ = ws.close_code(1000).await; //.map_err(|_e| NetError::WsError)?; + log!("CLOSED GRACEFULLY"); } else { + log!("PROTOCOL ERR"); let mut code = proto_err.clone() as u16; if code > 949 { code = ProtocolError::OtherError as u16; } - ws.close_reason(code + 4000, proto_err.to_string()) - .await - .map_err(|_e| NetError::WsError)?; - return Err(Box::new(proto_err)); + let _ = ws.close_reason(code + 4000, proto_err.to_string()).await; + //.map_err(|_e| NetError::WsError)?; + //return Err(Box::new(proto_err)); } } Err(e) => { - ws.close_reason(e.clone() as u16 + 4949, e.to_string()) - .await - .map_err(|_e| NetError::WsError)?; - return Err(Box::new(e)); + let _ = ws + .close_reason(e.clone() as u16 + 4949, e.to_string()) + .await; + //.map_err(|_e| NetError::WsError)?; + //return Err(Box::new(e)); + log!("ERR"); } } + + log!("waiting for closing event"); + let last_event = events.next().await; + log!("WS closed {:?}", last_event.clone()); + let last_command = match last_event { + None => ConnectionCommand::Close, + Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen + Some(WsEvent::Error) => ConnectionCommand::Error(NetError::ConnectionError), + Some(WsEvent::Closing) => ConnectionCommand::Close, + Some(WsEvent::Closed(ce)) => { + if ce.code == 1000 { + ConnectionCommand::Close + } else if ce.code < 4000 { + ConnectionCommand::Error(NetError::WsError) + } else if ce.code < 4950 { + ConnectionCommand::ProtocolError(ProtocolError::try_from(ce.code - 4000).unwrap()) + } else { + ConnectionCommand::Error(NetError::try_from(ce.code - 4949).unwrap()) + } + } + Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError), + }; + if let ConnectionCommand::Error(err) = last_command.clone() { + let _ = shutdown.send(err).await; + } else if let ConnectionCommand::ProtocolError(err) = last_command.clone() { + let _ = shutdown.send(NetError::ProtocolError).await; + } // otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown + + receiver + .send(last_command) + .await + .map_err(|_e| NetError::IoError)?; + Ok(()) } diff --git a/p2p-net/Cargo.toml b/p2p-net/Cargo.toml index 199b0b2..1ae762f 100644 --- a/p2p-net/Cargo.toml +++ b/p2p-net/Cargo.toml @@ -20,4 +20,8 @@ async-trait = "0.1.64" blake3 = "1.3.1" async-std = { version = "1.12.0", features = ["attributes","unstable"] } wasm-bindgen = "0.2" -unique_id = "0.1.5" \ No newline at end of file +unique_id = "0.1.5" +once_cell = "1.17.1" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +gloo-timers = "0.2.6" \ No newline at end of file diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index 7b45751..a5a19f6 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -1,84 +1,271 @@ +use crate::actor::*; use crate::connection::*; use crate::errors::*; use crate::types::*; +use crate::utils::spawn_and_log_error; use crate::utils::ResultSend; +use crate::{log, sleep}; +use async_std::stream::StreamExt; +use async_std::sync::{Arc, RwLock}; +use futures::channel::mpsc; +use futures::SinkExt; +use once_cell::sync::Lazy; use p2p_repo::types::{PrivKey, PubKey}; use p2p_repo::utils::generate_keypair; use std::collections::HashMap; use std::net::IpAddr; -use std::sync::{Arc, RwLock}; - -use crate::actor::*; +#[derive(Debug)] pub enum PeerConnection { Core(IP), - Client(Box>), + Client(ConnectionBase), NONE, } +#[derive(Debug)] pub struct BrokerPeerInfo { lastPeerAdvert: Option, //FIXME: remove Option connected: PeerConnection, } +#[derive(Debug)] pub struct DirectConnection { ip: IP, interface: String, remote_peer_id: DirectPeerId, tp: TransportProtocol, //dir: ConnectionDir, - cnx: Box>, + cnx: ConnectionBase, } +pub static BROKER: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new()))); + pub struct Broker { //actors: Arc>>>, - direct_connections: Arc>>, - peers: Arc>>, + direct_connections: HashMap, + peers: HashMap, + shutdown: Option>, + shutdown_sender: Sender, + closing: bool, } impl Broker { + pub fn reconnecting(&mut self, peer_id: &DirectPeerId) { + let mut peerinfo = self.peers.get_mut(peer_id); + match peerinfo { + Some(info) => match &info.connected { + PeerConnection::NONE => {} + PeerConnection::Client(cb) => { + info.connected = PeerConnection::NONE; + } + PeerConnection::Core(ip) => { + self.direct_connections.remove(&ip); + info.connected = PeerConnection::NONE; + } + }, + None => {} + } + } + pub fn remove(&mut self, peer_id: &DirectPeerId) { + let removed = self.peers.remove(peer_id); + match removed { + Some(info) => match info.connected { + PeerConnection::NONE => {} + PeerConnection::Client(cb) => {} + PeerConnection::Core(ip) => { + self.direct_connections.remove(&ip); + } + }, + None => {} + } + } + pub fn new() -> Self { + let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); Broker { - direct_connections: Arc::new(RwLock::new(HashMap::new())), - peers: Arc::new(RwLock::new(HashMap::new())), + shutdown: Some(shutdown_receiver), + shutdown_sender, + direct_connections: HashMap::new(), + peers: HashMap::new(), + closing: false, + } + } + + fn take_shutdown(&mut self) -> Receiver { + self.shutdown.take().unwrap() + } + + pub async fn join_shutdown() -> Result<(), ProtocolError> { + let mut shutdown_join: Receiver; + { + shutdown_join = BROKER.write().await.take_shutdown(); + } + match shutdown_join.next().await { + Some(ProtocolError::Closing) => Ok(()), + Some(error) => Err(error), + None => Ok(()), + } + } + + /// Used in tests mostly + pub async fn join_shutdown_with_timeout( + timeout: std::time::Duration, + ) -> Result<(), ProtocolError> { + async fn timer_shutdown(timeout: std::time::Duration) -> ResultSend<()> { + async move { + sleep!(timeout); + log!("timeout for shutdown"); + let _ = BROKER + .write() + .await + .shutdown_sender + .send(ProtocolError::Timeout) + .await; + } + .await; + Ok(()) } + spawn_and_log_error(timer_shutdown(timeout)); + Broker::join_shutdown().await + } + + pub async fn graceful_shutdown() { + let keys; + { + let mut broker = BROKER.write().await; + if broker.closing { + return; + } + broker.closing = true; + keys = Vec::from_iter(broker.peers.keys().cloned()); + } + for peer_id in keys { + BROKER.write().await.close_peer_connection(&peer_id).await; + } + let _ = BROKER + .write() + .await + .shutdown_sender + .send(ProtocolError::Closing) + .await; + } + + pub async fn shutdown(&mut self) { + if self.closing { + return; + } + self.closing = true; + + let _ = self.shutdown_sender.send(ProtocolError::Closing).await; } pub async fn connect( - &self, - cnx: Arc, + &mut self, + cnx: Box, ip: IP, - core: Option, + core: Option, // the interface used as egress for this connection peer_pubk: PrivKey, peer_privk: PubKey, remote_peer_id: DirectPeerId, ) -> Result<(), NetError> { + if self.closing { + return Err(NetError::Closing); + } + // TODO check that not already connected to peer //IpAddr::from_str("127.0.0.1"); //cnx.open(url, peer_pubk, peer_privk).await?; //let cnx = Arc::new(); let (priv_key, pub_key) = generate_keypair(); - Arc::clone(&cnx) - .open(ip, priv_key, pub_key, remote_peer_id) - .await?; + log!("CONNECTING"); + let connection_res = cnx.open(ip, priv_key, pub_key, remote_peer_id).await; + log!("CONNECTED {:?}", connection_res); + let mut connection = connection_res.unwrap(); + let join = connection.take_shutdown(); + let connected = if core.is_some() { let dc = DirectConnection { ip, - interface: core.unwrap(), + interface: core.clone().unwrap(), remote_peer_id, - tp: cnx.tp(), - cnx: Box::new(Arc::clone(&cnx)), + tp: connection.transport_protocol(), + cnx: connection, }; - self.direct_connections.write().unwrap().insert(ip, dc); + self.direct_connections.insert(ip, dc); PeerConnection::Core(ip) } else { - PeerConnection::Client(Box::new(Arc::clone(&cnx))) + PeerConnection::Client(connection) }; let bpi = BrokerPeerInfo { lastPeerAdvert: None, connected, }; - self.peers.write().unwrap().insert(remote_peer_id, bpi); + self.peers.insert(remote_peer_id, bpi); + + async fn watch_close( + mut join: Receiver, + cnx: Box, + ip: IP, + core: Option, // the interface used as egress for this connection + peer_pubk: PrivKey, + peer_privk: PubKey, + remote_peer_id: DirectPeerId, + ) -> ResultSend<()> { + async move { + let res = join.next().await; + log!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id); + if res.is_some() { + // we intend to reconnect + let mut broker = BROKER.write().await; + broker.reconnecting(&remote_peer_id); + // TODO: deal with cycle error https://users.rust-lang.org/t/recursive-async-method-causes-cycle-error/84628/5 + // let result = broker + // .connect(cnx, ip, core, peer_pubk, peer_privk, remote_peer_id) + // .await; + // log!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id); + // TODO: deal with error and incremental backoff + } else { + log!("REMOVED"); + BROKER.write().await.remove(&remote_peer_id); + } + } + .await; + Ok(()) + } + spawn_and_log_error(watch_close( + join, + cnx, + ip, + core, + peer_pubk, + peer_privk, + remote_peer_id, + )); Ok(()) } + + pub async fn close_peer_connection(&mut self, peer_id: &DirectPeerId) { + if let Some(peer) = self.peers.get_mut(peer_id) { + match &mut peer.connected { + PeerConnection::Core(_) => { + //TODO + unimplemented!(); + } + PeerConnection::Client(cb) => { + cb.close().await; + } + PeerConnection::NONE => {} + } + //self.peers.remove(peer_id); // this is done in the watch_close instead + } + } + + pub fn print_status(&self) { + self.peers.iter().for_each(|(peerId, peerInfo)| { + log!("PEER in BROKER {:?} {:?}", peerId, peerInfo); + }); + self.direct_connections.iter().for_each(|(ip, directCnx)| { + log!("direct_connection in BROKER {:?} {:?}", ip, directCnx) + }); + } } diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 3452e98..2349944 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -18,7 +18,7 @@ use unique_id::GeneratorFromSeed; pub type Sender = mpsc::UnboundedSender; pub type Receiver = mpsc::UnboundedReceiver; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ConnectionCommand { Msg(ProtocolMessage), Error(NetError), @@ -28,45 +28,70 @@ pub enum ConnectionCommand { #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] -pub trait IConnection { +pub trait IConnection: Send + Sync { async fn open( - self: Arc, + &self, ip: IP, peer_pubk: PrivKey, peer_privk: PubKey, remote_peer: DirectPeerId, - ) -> Result<(), NetError>; - async fn accept(&mut self) -> Result<(), NetError>; - fn tp(&self) -> TransportProtocol; + ) -> Result; + async fn accept(&self) -> Result; } -#[derive(PartialEq)] +#[derive(PartialEq, Debug)] pub enum ConnectionDir { Server, Client, } +#[derive(Debug)] pub struct ConnectionBase { sender: Option>, receiver: Option>, sender_tx: Option>, receiver_tx: Option>, + shutdown: Option>, dir: ConnectionDir, next_request_id: SequenceGenerator, + tp: TransportProtocol, } impl ConnectionBase { - pub fn new(dir: ConnectionDir) -> Self { + pub fn new(dir: ConnectionDir, tp: TransportProtocol) -> Self { Self { receiver: None, sender: None, sender_tx: None, receiver_tx: None, + shutdown: None, next_request_id: SequenceGenerator::new(1), dir, + tp, } } + pub fn transport_protocol(&self) -> TransportProtocol { + self.tp + } + + pub fn take_shutdown(&mut self) -> Receiver { + self.shutdown.take().unwrap() + } + + pub async fn join_shutdown(&mut self) -> Result<(), NetError> { + match self.take_shutdown().next().await { + Some(error) => Err(error), + None => Ok(()), + } + } + + pub fn set_shutdown(&mut self) -> Sender { + let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); + self.shutdown = Some(shutdown_receiver); + shutdown_sender + } + pub fn take_sender(&mut self) -> Receiver { self.sender.take().unwrap() } @@ -115,14 +140,14 @@ impl ConnectionBase { let _ = self.sender_tx.as_mut().unwrap().send(cmd).await; } - pub async fn inject(&mut self, cmd: ConnectionCommand) { - let _ = self.receiver_tx.as_mut().unwrap().send(cmd).await; - } + // pub async fn inject(&mut self, cmd: ConnectionCommand) { + // let _ = self.receiver_tx.as_mut().unwrap().send(cmd).await; + // } - pub async fn close_streams(&mut self) { - let _ = self.receiver_tx.as_mut().unwrap().close_channel(); - let _ = self.sender_tx.as_mut().unwrap().close_channel(); - } + // pub async fn close_streams(&mut self) { + // let _ = self.receiver_tx.as_mut().unwrap().close_channel(); + // let _ = self.sender_tx.as_mut().unwrap().close_channel(); + // } pub async fn close(&mut self) { log!("closing..."); diff --git a/p2p-net/src/errors.rs b/p2p-net/src/errors.rs index 157bdec..011318f 100644 --- a/p2p-net/src/errors.rs +++ b/p2p-net/src/errors.rs @@ -29,6 +29,7 @@ pub enum NetError { ConnectionError, SerializationError, ProtocolError, + Closing, } //MAX 50 NetErrors impl Error for NetError {} @@ -64,6 +65,7 @@ pub enum ProtocolError { RepoIdRequired, ConnectionError, + Timeout, PeerAlreadyConnected, NoError, diff --git a/p2p-net/src/lib.rs b/p2p-net/src/lib.rs index ec7c512..1bf94ee 100644 --- a/p2p-net/src/lib.rs +++ b/p2p-net/src/lib.rs @@ -51,3 +51,15 @@ macro_rules! log { macro_rules! log { ($($t:tt)*) => (debug_print::debug_println!($($t)*)) } + +#[cfg(target_arch = "wasm32")] +#[macro_export] +macro_rules! sleep { + ($($t:tt)*) => (gloo_timers::future::sleep($($t)*).await) +} + +#[cfg(not(target_arch = "wasm32"))] +#[macro_export] +macro_rules! sleep { + ($($t:tt)*) => (std::thread::sleep($($t)*)) +}