broker singleton and timeout management (for tests and prod)

Niko 2 years ago
parent 8c4915d9c5
commit 960cdf3189
  1. 6
      Cargo.lock
  2. 23
      README.md
  3. 7
      ng-app-js/Cargo.toml
  4. 42
      ng-app-js/src/lib.rs
  5. 3
      p2p-client-ws/Cargo.toml
  6. 110
      p2p-client-ws/src/remote_ws.rs
  7. 142
      p2p-client-ws/src/remote_ws_wasm.rs
  8. 6
      p2p-net/Cargo.toml
  9. 229
      p2p-net/src/broker.rs
  10. 55
      p2p-net/src/connection.rs
  11. 2
      p2p-net/src/errors.rs
  12. 12
      p2p-net/src/lib.rs

6
Cargo.lock generated

@ -1076,6 +1076,8 @@ dependencies = [
"debug_print", "debug_print",
"futures", "futures",
"getrandom 0.1.16", "getrandom 0.1.16",
"gloo-timers",
"js-sys",
"p2p-client-ws", "p2p-client-ws",
"p2p-net", "p2p-net",
"p2p-repo", "p2p-repo",
@ -1085,7 +1087,9 @@ dependencies = [
"serde_bytes", "serde_bytes",
"snow", "snow",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures",
"wasm-bindgen-test", "wasm-bindgen-test",
"web-sys",
"ws_stream_wasm", "ws_stream_wasm",
] ]
@ -1249,7 +1253,9 @@ dependencies = [
"blake3", "blake3",
"debug_print", "debug_print",
"futures", "futures",
"gloo-timers",
"num_enum", "num_enum",
"once_cell",
"p2p-repo", "p2p-repo",
"serde", "serde",
"serde_bare", "serde_bare",

@ -9,7 +9,7 @@ This repository is in active development at [https://git.nextgraph.org/NextGraph
> NextGraph brings about the convergence between P2P and Semantic Web technologies, towards a decentralized, secure and privacy-preserving cloud, based on CRDTs. > NextGraph brings about the convergence between P2P and Semantic Web technologies, towards a decentralized, secure and privacy-preserving cloud, based on CRDTs.
> >
> This open source ecosystem provides solutions for end-users and software developers alike, wishing to use or create **decentralized** apps featuring: **live collaboration** on rich-text documents, peer to peer communication with end-to-end encryption, offline-first, **local-first**, portable and interoperable data, total ownership of data and software, security and privacy. Centered on repositories containing **semantic data** (RDF), **rich text**, and structured data formats like **JSON**, synced between peers belonging to permissioned groups of users, it offers strong eventual consistency, thanks to the use of **CRDTs**. Documents can be linked together, signed, shared securely, queried using the **SPARQL** language and organized into sites and containers. > This open source ecosystem provides solutions for end-users and software developers alike, wishing to use or create **decentralized** apps featuring: **live collaboration** on rich-text documents, peer to peer communication with end-to-end encryption, offline-first, **local-first**, portable and interoperable data, total ownership of data and software, security and privacy. Centered on repositories containing **semantic data** (RDF), **rich text**, and structured data formats like **JSON**, synced between peers belonging to permissioned groups of users, it offers strong eventual consistency, thanks to the use of **CRDTs**. Documents can be linked together, signed, shared securely, queried using the **SPARQL** language and organized into sites and containers.
> >
> More info here [https://nextgraph.org](https://nextgraph.org) > More info here [https://nextgraph.org](https://nextgraph.org)
## Support ## Support
@ -21,13 +21,14 @@ And our community forum where you can ask questions is here [https://forum.nextg
## How to use NextGraph ## How to use NextGraph
NextGraph is not ready yet. You can subscribe to [our newsletter](https://list.nextgraph.org/subscription/form) to get updates, and support us with a [donation](https://nextgraph.org/donate/). NextGraph is not ready yet. You can subscribe to [our newsletter](https://list.nextgraph.org/subscription/form) to get updates, and support us with a [donation](https://nextgraph.org/donate/).
## For developers ## For developers
Read our [getting started guide](https://docs.nextgraph.org/en/getting-started/). Read our [getting started guide](https://docs.nextgraph.org/en/getting-started/).
## For contributors ## For contributors
- [Install Rust](https://www.rust-lang.org/tools/install) - [Install Rust](https://www.rust-lang.org/tools/install) minimum required 1.64.0
- [Install Nodejs](https://nodejs.org/en/download/) - [Install Nodejs](https://nodejs.org/en/download/)
``` ```
@ -36,11 +37,11 @@ cargo install wasm-pack
git clone git@git.nextgraph.org:NextGraph/nextgraph-rs.git git clone git@git.nextgraph.org:NextGraph/nextgraph-rs.git
cd nextgraph-rs cd nextgraph-rs
cargo build cargo build
``` ```
### Packages ### Packages
The crates are organized as follow : The crates are organized as follow :
- p2p-repo : all the common types, traits and structs for the P2P repositories - p2p-repo : all the common types, traits and structs for the P2P repositories
- p2p-net : all the common types, traits and structs for the P2P networks - p2p-net : all the common types, traits and structs for the P2P networks
@ -81,17 +82,20 @@ cargo test --package p2p-repo --lib -- branch::test --nocapture
``` ```
Test end-to-end client and server: Test end-to-end client and server:
```
```
cargo test --package ngcli -- --nocapture cargo test --package ngcli -- --nocapture
``` ```
Test WASM websocket Test WASM websocket
``` ```
cd ng-app-js cd ng-app-js
wasm-pack test --chrome --headless wasm-pack test --chrome --headless
``` ```
Test Rust websocket Test Rust websocket
``` ```
cargo test --package p2p-client-ws --lib -- --nocapture cargo test --package p2p-client-ws --lib -- --nocapture
``` ```
@ -123,9 +127,10 @@ additional terms or conditions.
## License ## License
Licensed under either of Licensed under either of
* Apache License, Version 2.0 ([LICENSE-APACHE2](LICENSE-APACHE2) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) - Apache License, Version 2.0 ([LICENSE-APACHE2](LICENSE-APACHE2) or http://www.apache.org/licenses/LICENSE-2.0)
at your option. - MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.
`SPDX-License-Identifier: Apache-2.0 OR MIT` `SPDX-License-Identifier: Apache-2.0 OR MIT`

@ -34,8 +34,11 @@ getrandom = { version = "0.1.1", features = ["wasm-bindgen"] }
# version = "0.2.7" # version = "0.2.7"
# features = ["js"] # features = ["js"]
# [target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
# wasm-bindgen-futures = "0.4.34" js-sys = "0.3.61"
wasm-bindgen-futures = "0.4.34"
web-sys = { version = "0.3.61", features = ["Window"] }
gloo-timers = "0.2.6"
[dev-dependencies] [dev-dependencies]
wasm-bindgen-test = "^0.3" wasm-bindgen-test = "^0.3"

@ -1,10 +1,11 @@
use async_std::task; use async_std::task;
use js_sys::Reflect;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket; use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket;
use p2p_net::broker::*; use p2p_net::broker::*;
use p2p_net::log; use p2p_net::types::{DirectPeerId, IP};
use p2p_net::types::IP;
use p2p_net::utils::{spawn_and_log_error, ResultSend}; use p2p_net::utils::{spawn_and_log_error, ResultSend};
use p2p_net::{log, sleep};
use p2p_repo::utils::generate_keypair; use p2p_repo::utils::generate_keypair;
use std::net::IpAddr; use std::net::IpAddr;
use std::str::FromStr; use std::str::FromStr;
@ -18,19 +19,20 @@ extern "C" {
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub fn greet(name: &str) { pub async fn greet(name: &str) {
log!("I say: {}", name); log!("I say: {}", name);
let mut random_buf = [0u8; 32]; let mut random_buf = [0u8; 32];
getrandom::getrandom(&mut random_buf).unwrap(); getrandom::getrandom(&mut random_buf).unwrap();
//spawn_and_log_error(testt("ws://127.0.0.1:3012")); //spawn_and_log_error(testt("ws://127.0.0.1:3012"));
async fn method() -> ResultSend<()> { async fn method() -> ResultSend<()> {
log!("start connecting"); log!("start connecting");
let cnx = Arc::new(ConnectionWebSocket {}); //let cnx = Arc::new();
let (priv_key, pub_key) = generate_keypair(); let (priv_key, pub_key) = generate_keypair();
let broker = Broker::new(); let res = BROKER
let res = broker .write()
.await
.connect( .connect(
cnx, Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
None, None,
priv_key, priv_key,
@ -39,10 +41,32 @@ pub fn greet(name: &str) {
) )
.await; .await;
log!("broker.connect : {:?}", res); log!("broker.connect : {:?}", res);
BROKER.read().await.print_status();
//res.expect_throw("assume the connection succeeds"); //res.expect_throw("assume the connection succeeds");
async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> {
async move {
sleep!(std::time::Duration::from_secs(10));
log!("timeout");
BROKER
.write()
.await
.close_peer_connection(&remote_peer_id)
.await;
}
.await;
Ok(())
}
spawn_and_log_error(timer_close(pub_key));
//Broker::graceful_shutdown().await;
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(12)).await;
Ok(()) Ok(())
} }
spawn_and_log_error(method()); spawn_and_log_error(method()).await;
//spawn_and_log_error(Arc::clone(&cnx).open("ws://127.0.0.1:3012", priv_key, pub_key)); //spawn_and_log_error(Arc::clone(&cnx).open("ws://127.0.0.1:3012", priv_key, pub_key));
} }
@ -69,6 +93,6 @@ mod test {
#[wasm_bindgen_test] #[wasm_bindgen_test]
pub async fn test_greet() { pub async fn test_greet() {
greet("test"); greet("test").await;
} }
} }

@ -31,9 +31,6 @@ wasm-bindgen-test = "^0.3"
version = "0.2.7" version = "0.2.7"
features = ["js"] features = ["js"]
# [target.'cfg(target_arch = "wasm32")'.dependencies]
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
getrandom = "0.2.7" getrandom = "0.2.7"
xactor = "0.7.11" xactor = "0.7.11"

@ -21,8 +21,8 @@ use debug_print::*;
use async_std::sync::Mutex; use async_std::sync::Mutex;
use futures::io::Close; use futures::io::Close;
use futures::FutureExt;
use futures::{future, pin_mut, select, stream, StreamExt}; use futures::{future, pin_mut, select, stream, StreamExt};
use futures::{FutureExt, SinkExt};
use async_std::task; use async_std::task;
use p2p_net::errors::*; use p2p_net::errors::*;
@ -42,13 +42,13 @@ pub struct ConnectionWebSocket {}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl IConnection for ConnectionWebSocket { impl IConnection for ConnectionWebSocket {
async fn open( async fn open(
self: Arc<Self>, &self,
ip: IP, ip: IP,
peer_pubk: PrivKey, peer_pubk: PrivKey,
peer_privk: PubKey, peer_privk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
) -> Result<(), NetError> { ) -> Result<ConnectionBase, NetError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, WS_PORT); let url = format!("ws://{}:{}", ip, WS_PORT);
@ -108,23 +108,33 @@ impl IConnection for ConnectionWebSocket {
cnx.start_read_loop(); cnx.start_read_loop();
let s = cnx.take_sender(); let s = cnx.take_sender();
let r = cnx.take_receiver(); let r = cnx.take_receiver();
let mut shutdown = cnx.set_shutdown();
//let ws_in_task = Arc::clone(&ws); //let ws_in_task = Arc::clone(&ws);
task::spawn(async move { let join = task::spawn(async move {
debug_println!("START of WS loop"); debug_println!("START of WS loop");
//let w = ws_in_task.lock().await; //let w = ws_in_task.lock().await;
ws_loop(websocket, s, r).await; let res = ws_loop(websocket, s, r).await;
// .close(Some(CloseFrame { // .close(Some(CloseFrame {
// code: CloseCode::Library(4000), // code: CloseCode::Library(4000),
// reason: std::borrow::Cow::Borrowed(""), // reason: std::borrow::Cow::Borrowed(""),
// })) // }))
// .await; // .await;
if res.is_err() {
let _ = shutdown.send(res.err().unwrap()).await;
}
debug_println!("END of WS loop"); debug_println!("END of WS loop");
}); });
//spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver())); //spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver()));
log!("sending..."); log!("sending...");
//
//cnx.close().await;
//// let res = cnx.join_shutdown().await;
//// log!("JOIN SHUTDOWN {:?}", res);
// cnx.send(ConnectionCommand::Close).await; // cnx.send(ConnectionCommand::Close).await;
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start( // cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
@ -146,20 +156,16 @@ impl IConnection for ConnectionWebSocket {
//log!("WS closed {:?}", last_event.clone()); //log!("WS closed {:?}", last_event.clone());
//Ok(cnx) Ok(cnx)
Ok(()) //Ok(())
} }
} }
} }
async fn accept(&mut self) -> Result<(), NetError> { async fn accept(&self) -> Result<ConnectionBase, NetError> {
let cnx = ConnectionBase::new(ConnectionDir::Server); let cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS);
Ok(()) unimplemented!();
}
fn tp(&self) -> TransportProtocol {
TransportProtocol::WS
} }
} }
@ -169,8 +175,19 @@ async fn close_ws(
code: u16, code: u16,
reason: &str, reason: &str,
) -> Result<(), NetError> { ) -> Result<(), NetError> {
log!("close_ws"); log!("close_ws {:?}", code);
let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await;
let cmd = if code == 1000 {
ConnectionCommand::Close
} else if code < 4000 {
ConnectionCommand::Error(NetError::WsError)
} else if code < 4950 {
ConnectionCommand::ProtocolError(ProtocolError::try_from(code - 4000).unwrap())
} else {
ConnectionCommand::Error(NetError::try_from(code - 4949).unwrap())
};
let _ = futures::SinkExt::send(receiver, cmd).await;
stream stream
.close(Some(CloseFrame { .close(Some(CloseFrame {
code: CloseCode::Library(code), code: CloseCode::Library(code),
@ -200,8 +217,8 @@ async fn ws_loop(
log!("GOT MESSAGE {:?}", msg); log!("GOT MESSAGE {:?}", msg);
if msg.is_close() { if msg.is_close() {
if let Message::Close(Some(cf)) = msg { if let Message::Close(Some(cf)) = msg {
log!("CLOSE from server: {}",cf.reason); log!("CLOSE from server with closeframe: {}",cf.reason);
let last_command = match cf.code { let last_command = match cf.code {
CloseCode::Normal => CloseCode::Normal =>
ConnectionCommand::Close, ConnectionCommand::Close,
@ -227,9 +244,9 @@ async fn ws_loop(
futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&msg.into_data())?)).await futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&msg.into_data())?)).await
.map_err(|_e| NetError::IoError)?; .map_err(|_e| NetError::IoError)?;
} }
return Ok(ProtocolError::Closing); //return Ok(ProtocolError::Closing);
}, },
Some(Err(e)) => break, Some(Err(e)) => {log!("GOT ERROR {:?}",e);return Err(NetError::WsError);},
None => break None => break
}, },
s = sender.next().fuse() => match s { s = sender.next().fuse() => match s {
@ -261,6 +278,7 @@ async fn ws_loop(
match inner_loop(&mut ws, sender, &mut receiver).await { match inner_loop(&mut ws, sender, &mut receiver).await {
Ok(proto_err) => { Ok(proto_err) => {
if proto_err == ProtocolError::Closing { if proto_err == ProtocolError::Closing {
//FIXME: remove this case
ws.close(None).await.map_err(|_e| NetError::WsError)?; ws.close(None).await.map_err(|_e| NetError::WsError)?;
} else if proto_err == ProtocolError::NoError { } else if proto_err == ProtocolError::NoError {
close_ws(&mut ws, &mut receiver, 1000, "").await?; close_ws(&mut ws, &mut receiver, 1000, "").await?;
@ -295,9 +313,9 @@ mod test {
use async_std::task; use async_std::task;
use p2p_net::broker::*; use p2p_net::broker::*;
use p2p_net::errors::NetError; use p2p_net::errors::NetError;
use p2p_net::log;
use p2p_net::types::IP; use p2p_net::types::IP;
use p2p_net::utils::{spawn_and_log_error, ResultSend}; use p2p_net::utils::{spawn_and_log_error, ResultSend};
use p2p_net::{log, sleep};
use p2p_repo::utils::generate_keypair; use p2p_repo::utils::generate_keypair;
use std::net::IpAddr; use std::net::IpAddr;
use std::str::FromStr; use std::str::FromStr;
@ -309,22 +327,42 @@ mod test {
//spawn_and_log_error(testt("ws://127.0.0.1:3012")); //spawn_and_log_error(testt("ws://127.0.0.1:3012"));
log!("start connecting"); log!("start connecting");
let cnx = Arc::new(ConnectionWebSocket {});
let (priv_key, pub_key) = generate_keypair(); let (priv_key, pub_key) = generate_keypair();
let broker = Broker::new(); {
let res = broker let res = BROKER
.connect( .write()
cnx, .await
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), .connect(
None, Box::new(ConnectionWebSocket {}),
priv_key, IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
pub_key, None,
pub_key, priv_key,
) pub_key,
pub_key,
)
.await;
log!("broker.connect : {:?}", res);
//res.expect_throw("assume the connection succeeds");
}
async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> {
async move {
sleep!(std::time::Duration::from_secs(10));
log!("timeout");
BROKER
.write()
.await
.close_peer_connection(&remote_peer_id)
.await;
}
.await; .await;
log!("broker.connect : {:?}", res); Ok(())
//res.expect_throw("assume the connection succeeds"); }
spawn_and_log_error(timer_close(pub_key));
//Broker::graceful_shutdown().await;
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(12)).await;
Ok(()) Ok(())
} }
} }

@ -34,19 +34,15 @@ pub struct ConnectionWebSocket {}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl IConnection for ConnectionWebSocket { impl IConnection for ConnectionWebSocket {
fn tp(&self) -> TransportProtocol {
TransportProtocol::WS
}
async fn open( async fn open(
self: Arc<Self>, &self,
ip: IP, ip: IP,
peer_pubk: PrivKey, peer_pubk: PrivKey,
peer_privk: PubKey, peer_privk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
) -> Result<(), NetError> { ) -> Result<ConnectionBase, NetError> {
//pub async fn testt(url: &str) -> ResultSend<()> { //pub async fn testt(url: &str) -> ResultSend<()> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, WS_PORT); let url = format!("ws://{}:{}", ip, WS_PORT);
@ -55,75 +51,62 @@ impl IConnection for ConnectionWebSocket {
NetError::ConnectionError NetError::ConnectionError
})?; })?;
let mut evts = ws
.observe(ObserveConfig::default())
//.observe(Filter::Pointer(WsEvent::is_closed).into())
.await
.expect_throw("observe");
//let (mut sender_tx, sender_rx) = mpsc::unbounded(); //let (mut sender_tx, sender_rx) = mpsc::unbounded();
//let (mut receiver_tx, receiver_rx) = mpsc::unbounded(); //let (mut receiver_tx, receiver_rx) = mpsc::unbounded();
cnx.start_read_loop(); cnx.start_read_loop();
let mut shutdown = cnx.set_shutdown();
spawn_and_log_error(ws_loop(ws, wsio, cnx.take_sender(), cnx.take_receiver())); spawn_and_log_error(ws_loop(
ws,
wsio,
cnx.take_sender(),
cnx.take_receiver(),
shutdown,
));
//spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone())); //spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone()));
log!("sending..."); log!("sending...");
//cnx.close().await;
// spawn_and_log_error(async move {
// TimeoutFuture::new(10_000).await;
// cnx.close().await;
// Ok(())
// // // Do something here after the one second timeout is up!
// });
// cnx.send(ConnectionCommand::Close).await; // cnx.send(ConnectionCommand::Close).await;
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start( // cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
// StartProtocol::Auth(ClientHello::V0()), // StartProtocol::Auth(ClientHello::V0()),
// ))) // )))
// .await; // .await;
log!("waiting...");
cnx.close().await; //let res = join.next().await;
log!("finished...");
//log!("JOIN SHUTDOWN {:?}", res);
// Note that since WsMeta::connect resolves to an opened connection, we don't see // Note that since WsMeta::connect resolves to an opened connection, we don't see
// any Open events here. // any Open events here.
// //
//assert!(evts.next().await.unwrap_throw().is_closing()); //assert!(events.next().await.unwrap_throw().is_closing());
let last_event = evts.next().await;
log!("WS closed {:?}", last_event.clone());
let last_command = match last_event {
None => ConnectionCommand::Close,
Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen
Some(WsEvent::Error) => ConnectionCommand::Error(NetError::ConnectionError),
Some(WsEvent::Closing) => ConnectionCommand::Close,
Some(WsEvent::Closed(ce)) => {
if ce.code == 1000 {
ConnectionCommand::Close
} else if ce.code < 4000 {
ConnectionCommand::Error(NetError::WsError)
} else if ce.code < 4950 {
ConnectionCommand::ProtocolError(
ProtocolError::try_from(ce.code - 4000).unwrap(),
)
} else {
ConnectionCommand::Error(NetError::try_from(ce.code - 4949).unwrap())
}
}
Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError),
};
let _ = cnx.inject(last_command).await;
let _ = cnx.close_streams().await;
//Ok(cnx) //Ok(cnx)
Ok(()) Ok(cnx)
} }
async fn accept(&mut self) -> Result<(), NetError> { async fn accept(&self) -> Result<ConnectionBase, NetError> {
!unimplemented!() !unimplemented!()
} }
} }
async fn ws_loop( async fn ws_loop(
ws: WsMeta, mut ws: WsMeta,
mut stream: WsStream, mut stream: WsStream,
sender: Receiver<ConnectionCommand>, sender: Receiver<ConnectionCommand>,
receiver: Sender<ConnectionCommand>, mut receiver: Sender<ConnectionCommand>,
mut shutdown: Sender<NetError>,
) -> ResultSend<()> { ) -> ResultSend<()> {
async fn inner_loop( async fn inner_loop(
stream: &mut WsStream, stream: &mut WsStream,
@ -132,6 +115,7 @@ async fn ws_loop(
) -> Result<ProtocolError, NetError> { ) -> Result<ProtocolError, NetError> {
//let mut rx_sender = sender.fuse(); //let mut rx_sender = sender.fuse();
loop { loop {
log!("BEFORE SELECT");
select! { select! {
r = stream.next().fuse() => match r { r = stream.next().fuse() => match r {
Some(msg) => { Some(msg) => {
@ -170,29 +154,73 @@ async fn ws_loop(
}, },
} }
} }
log!("END SELECT");
Ok(ProtocolError::NoError) Ok(ProtocolError::NoError)
} }
match inner_loop(&mut stream, sender, receiver).await {
let mut events = ws
.observe(ObserveConfig::default())
//.observe(Filter::Pointer(WsEvent::is_closed).into())
.await
.expect_throw("observe");
log!("OBSERVED");
match inner_loop(&mut stream, sender, receiver.clone()).await {
Ok(proto_err) => { Ok(proto_err) => {
if proto_err == ProtocolError::NoError { if proto_err == ProtocolError::NoError {
ws.close_code(1000).await.map_err(|_e| NetError::WsError)?; let _ = ws.close_code(1000).await; //.map_err(|_e| NetError::WsError)?;
log!("CLOSED GRACEFULLY");
} else { } else {
log!("PROTOCOL ERR");
let mut code = proto_err.clone() as u16; let mut code = proto_err.clone() as u16;
if code > 949 { if code > 949 {
code = ProtocolError::OtherError as u16; code = ProtocolError::OtherError as u16;
} }
ws.close_reason(code + 4000, proto_err.to_string()) let _ = ws.close_reason(code + 4000, proto_err.to_string()).await;
.await //.map_err(|_e| NetError::WsError)?;
.map_err(|_e| NetError::WsError)?; //return Err(Box::new(proto_err));
return Err(Box::new(proto_err));
} }
} }
Err(e) => { Err(e) => {
ws.close_reason(e.clone() as u16 + 4949, e.to_string()) let _ = ws
.await .close_reason(e.clone() as u16 + 4949, e.to_string())
.map_err(|_e| NetError::WsError)?; .await;
return Err(Box::new(e)); //.map_err(|_e| NetError::WsError)?;
//return Err(Box::new(e));
log!("ERR");
} }
} }
log!("waiting for closing event");
let last_event = events.next().await;
log!("WS closed {:?}", last_event.clone());
let last_command = match last_event {
None => ConnectionCommand::Close,
Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen
Some(WsEvent::Error) => ConnectionCommand::Error(NetError::ConnectionError),
Some(WsEvent::Closing) => ConnectionCommand::Close,
Some(WsEvent::Closed(ce)) => {
if ce.code == 1000 {
ConnectionCommand::Close
} else if ce.code < 4000 {
ConnectionCommand::Error(NetError::WsError)
} else if ce.code < 4950 {
ConnectionCommand::ProtocolError(ProtocolError::try_from(ce.code - 4000).unwrap())
} else {
ConnectionCommand::Error(NetError::try_from(ce.code - 4949).unwrap())
}
}
Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError),
};
if let ConnectionCommand::Error(err) = last_command.clone() {
let _ = shutdown.send(err).await;
} else if let ConnectionCommand::ProtocolError(err) = last_command.clone() {
let _ = shutdown.send(NetError::ProtocolError).await;
} // otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown
receiver
.send(last_command)
.await
.map_err(|_e| NetError::IoError)?;
Ok(()) Ok(())
} }

@ -20,4 +20,8 @@ async-trait = "0.1.64"
blake3 = "1.3.1" blake3 = "1.3.1"
async-std = { version = "1.12.0", features = ["attributes","unstable"] } async-std = { version = "1.12.0", features = ["attributes","unstable"] }
wasm-bindgen = "0.2" wasm-bindgen = "0.2"
unique_id = "0.1.5" unique_id = "0.1.5"
once_cell = "1.17.1"
[target.'cfg(target_arch = "wasm32")'.dependencies]
gloo-timers = "0.2.6"

@ -1,84 +1,271 @@
use crate::actor::*;
use crate::connection::*; use crate::connection::*;
use crate::errors::*; use crate::errors::*;
use crate::types::*; use crate::types::*;
use crate::utils::spawn_and_log_error;
use crate::utils::ResultSend; use crate::utils::ResultSend;
use crate::{log, sleep};
use async_std::stream::StreamExt;
use async_std::sync::{Arc, RwLock};
use futures::channel::mpsc;
use futures::SinkExt;
use once_cell::sync::Lazy;
use p2p_repo::types::{PrivKey, PubKey}; use p2p_repo::types::{PrivKey, PubKey};
use p2p_repo::utils::generate_keypair; use p2p_repo::utils::generate_keypair;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::sync::{Arc, RwLock};
use crate::actor::*;
#[derive(Debug)]
pub enum PeerConnection { pub enum PeerConnection {
Core(IP), Core(IP),
Client(Box<Arc<dyn IConnection>>), Client(ConnectionBase),
NONE, NONE,
} }
#[derive(Debug)]
pub struct BrokerPeerInfo { pub struct BrokerPeerInfo {
lastPeerAdvert: Option<PeerAdvert>, //FIXME: remove Option lastPeerAdvert: Option<PeerAdvert>, //FIXME: remove Option
connected: PeerConnection, connected: PeerConnection,
} }
#[derive(Debug)]
pub struct DirectConnection { pub struct DirectConnection {
ip: IP, ip: IP,
interface: String, interface: String,
remote_peer_id: DirectPeerId, remote_peer_id: DirectPeerId,
tp: TransportProtocol, tp: TransportProtocol,
//dir: ConnectionDir, //dir: ConnectionDir,
cnx: Box<Arc<dyn IConnection>>, cnx: ConnectionBase,
} }
pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new())));
pub struct Broker { pub struct Broker {
//actors: Arc<RwLock<HashMap<i64, Box<dyn IActor>>>>, //actors: Arc<RwLock<HashMap<i64, Box<dyn IActor>>>>,
direct_connections: Arc<RwLock<HashMap<IP, DirectConnection>>>, direct_connections: HashMap<IP, DirectConnection>,
peers: Arc<RwLock<HashMap<DirectPeerId, BrokerPeerInfo>>>, peers: HashMap<DirectPeerId, BrokerPeerInfo>,
shutdown: Option<Receiver<ProtocolError>>,
shutdown_sender: Sender<ProtocolError>,
closing: bool,
} }
impl Broker { impl Broker {
pub fn reconnecting(&mut self, peer_id: &DirectPeerId) {
let mut peerinfo = self.peers.get_mut(peer_id);
match peerinfo {
Some(info) => match &info.connected {
PeerConnection::NONE => {}
PeerConnection::Client(cb) => {
info.connected = PeerConnection::NONE;
}
PeerConnection::Core(ip) => {
self.direct_connections.remove(&ip);
info.connected = PeerConnection::NONE;
}
},
None => {}
}
}
pub fn remove(&mut self, peer_id: &DirectPeerId) {
let removed = self.peers.remove(peer_id);
match removed {
Some(info) => match info.connected {
PeerConnection::NONE => {}
PeerConnection::Client(cb) => {}
PeerConnection::Core(ip) => {
self.direct_connections.remove(&ip);
}
},
None => {}
}
}
pub fn new() -> Self { pub fn new() -> Self {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<ProtocolError>();
Broker { Broker {
direct_connections: Arc::new(RwLock::new(HashMap::new())), shutdown: Some(shutdown_receiver),
peers: Arc::new(RwLock::new(HashMap::new())), shutdown_sender,
direct_connections: HashMap::new(),
peers: HashMap::new(),
closing: false,
}
}
fn take_shutdown(&mut self) -> Receiver<ProtocolError> {
self.shutdown.take().unwrap()
}
pub async fn join_shutdown() -> Result<(), ProtocolError> {
let mut shutdown_join: Receiver<ProtocolError>;
{
shutdown_join = BROKER.write().await.take_shutdown();
}
match shutdown_join.next().await {
Some(ProtocolError::Closing) => Ok(()),
Some(error) => Err(error),
None => Ok(()),
}
}
/// Used in tests mostly
pub async fn join_shutdown_with_timeout(
timeout: std::time::Duration,
) -> Result<(), ProtocolError> {
async fn timer_shutdown(timeout: std::time::Duration) -> ResultSend<()> {
async move {
sleep!(timeout);
log!("timeout for shutdown");
let _ = BROKER
.write()
.await
.shutdown_sender
.send(ProtocolError::Timeout)
.await;
}
.await;
Ok(())
} }
spawn_and_log_error(timer_shutdown(timeout));
Broker::join_shutdown().await
}
pub async fn graceful_shutdown() {
let keys;
{
let mut broker = BROKER.write().await;
if broker.closing {
return;
}
broker.closing = true;
keys = Vec::from_iter(broker.peers.keys().cloned());
}
for peer_id in keys {
BROKER.write().await.close_peer_connection(&peer_id).await;
}
let _ = BROKER
.write()
.await
.shutdown_sender
.send(ProtocolError::Closing)
.await;
}
pub async fn shutdown(&mut self) {
if self.closing {
return;
}
self.closing = true;
let _ = self.shutdown_sender.send(ProtocolError::Closing).await;
} }
pub async fn connect( pub async fn connect(
&self, &mut self,
cnx: Arc<dyn IConnection>, cnx: Box<dyn IConnection>,
ip: IP, ip: IP,
core: Option<String>, core: Option<String>, // the interface used as egress for this connection
peer_pubk: PrivKey, peer_pubk: PrivKey,
peer_privk: PubKey, peer_privk: PubKey,
remote_peer_id: DirectPeerId, remote_peer_id: DirectPeerId,
) -> Result<(), NetError> { ) -> Result<(), NetError> {
if self.closing {
return Err(NetError::Closing);
}
// TODO check that not already connected to peer // TODO check that not already connected to peer
//IpAddr::from_str("127.0.0.1"); //IpAddr::from_str("127.0.0.1");
//cnx.open(url, peer_pubk, peer_privk).await?; //cnx.open(url, peer_pubk, peer_privk).await?;
//let cnx = Arc::new(); //let cnx = Arc::new();
let (priv_key, pub_key) = generate_keypair(); let (priv_key, pub_key) = generate_keypair();
Arc::clone(&cnx) log!("CONNECTING");
.open(ip, priv_key, pub_key, remote_peer_id) let connection_res = cnx.open(ip, priv_key, pub_key, remote_peer_id).await;
.await?; log!("CONNECTED {:?}", connection_res);
let mut connection = connection_res.unwrap();
let join = connection.take_shutdown();
let connected = if core.is_some() { let connected = if core.is_some() {
let dc = DirectConnection { let dc = DirectConnection {
ip, ip,
interface: core.unwrap(), interface: core.clone().unwrap(),
remote_peer_id, remote_peer_id,
tp: cnx.tp(), tp: connection.transport_protocol(),
cnx: Box::new(Arc::clone(&cnx)), cnx: connection,
}; };
self.direct_connections.write().unwrap().insert(ip, dc); self.direct_connections.insert(ip, dc);
PeerConnection::Core(ip) PeerConnection::Core(ip)
} else { } else {
PeerConnection::Client(Box::new(Arc::clone(&cnx))) PeerConnection::Client(connection)
}; };
let bpi = BrokerPeerInfo { let bpi = BrokerPeerInfo {
lastPeerAdvert: None, lastPeerAdvert: None,
connected, connected,
}; };
self.peers.write().unwrap().insert(remote_peer_id, bpi); self.peers.insert(remote_peer_id, bpi);
async fn watch_close(
mut join: Receiver<NetError>,
cnx: Box<dyn IConnection>,
ip: IP,
core: Option<String>, // the interface used as egress for this connection
peer_pubk: PrivKey,
peer_privk: PubKey,
remote_peer_id: DirectPeerId,
) -> ResultSend<()> {
async move {
let res = join.next().await;
log!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id);
if res.is_some() {
// we intend to reconnect
let mut broker = BROKER.write().await;
broker.reconnecting(&remote_peer_id);
// TODO: deal with cycle error https://users.rust-lang.org/t/recursive-async-method-causes-cycle-error/84628/5
// let result = broker
// .connect(cnx, ip, core, peer_pubk, peer_privk, remote_peer_id)
// .await;
// log!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id);
// TODO: deal with error and incremental backoff
} else {
log!("REMOVED");
BROKER.write().await.remove(&remote_peer_id);
}
}
.await;
Ok(())
}
spawn_and_log_error(watch_close(
join,
cnx,
ip,
core,
peer_pubk,
peer_privk,
remote_peer_id,
));
Ok(()) Ok(())
} }
pub async fn close_peer_connection(&mut self, peer_id: &DirectPeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
match &mut peer.connected {
PeerConnection::Core(_) => {
//TODO
unimplemented!();
}
PeerConnection::Client(cb) => {
cb.close().await;
}
PeerConnection::NONE => {}
}
//self.peers.remove(peer_id); // this is done in the watch_close instead
}
}
pub fn print_status(&self) {
self.peers.iter().for_each(|(peerId, peerInfo)| {
log!("PEER in BROKER {:?} {:?}", peerId, peerInfo);
});
self.direct_connections.iter().for_each(|(ip, directCnx)| {
log!("direct_connection in BROKER {:?} {:?}", ip, directCnx)
});
}
} }

@ -18,7 +18,7 @@ use unique_id::GeneratorFromSeed;
pub type Sender<T> = mpsc::UnboundedSender<T>; pub type Sender<T> = mpsc::UnboundedSender<T>;
pub type Receiver<T> = mpsc::UnboundedReceiver<T>; pub type Receiver<T> = mpsc::UnboundedReceiver<T>;
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum ConnectionCommand { pub enum ConnectionCommand {
Msg(ProtocolMessage), Msg(ProtocolMessage),
Error(NetError), Error(NetError),
@ -28,45 +28,70 @@ pub enum ConnectionCommand {
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait IConnection { pub trait IConnection: Send + Sync {
async fn open( async fn open(
self: Arc<Self>, &self,
ip: IP, ip: IP,
peer_pubk: PrivKey, peer_pubk: PrivKey,
peer_privk: PubKey, peer_privk: PubKey,
remote_peer: DirectPeerId, remote_peer: DirectPeerId,
) -> Result<(), NetError>; ) -> Result<ConnectionBase, NetError>;
async fn accept(&mut self) -> Result<(), NetError>; async fn accept(&self) -> Result<ConnectionBase, NetError>;
fn tp(&self) -> TransportProtocol;
} }
#[derive(PartialEq)] #[derive(PartialEq, Debug)]
pub enum ConnectionDir { pub enum ConnectionDir {
Server, Server,
Client, Client,
} }
#[derive(Debug)]
pub struct ConnectionBase { pub struct ConnectionBase {
sender: Option<Receiver<ConnectionCommand>>, sender: Option<Receiver<ConnectionCommand>>,
receiver: Option<Sender<ConnectionCommand>>, receiver: Option<Sender<ConnectionCommand>>,
sender_tx: Option<Sender<ConnectionCommand>>, sender_tx: Option<Sender<ConnectionCommand>>,
receiver_tx: Option<Sender<ConnectionCommand>>, receiver_tx: Option<Sender<ConnectionCommand>>,
shutdown: Option<Receiver<NetError>>,
dir: ConnectionDir, dir: ConnectionDir,
next_request_id: SequenceGenerator, next_request_id: SequenceGenerator,
tp: TransportProtocol,
} }
impl ConnectionBase { impl ConnectionBase {
pub fn new(dir: ConnectionDir) -> Self { pub fn new(dir: ConnectionDir, tp: TransportProtocol) -> Self {
Self { Self {
receiver: None, receiver: None,
sender: None, sender: None,
sender_tx: None, sender_tx: None,
receiver_tx: None, receiver_tx: None,
shutdown: None,
next_request_id: SequenceGenerator::new(1), next_request_id: SequenceGenerator::new(1),
dir, dir,
tp,
} }
} }
pub fn transport_protocol(&self) -> TransportProtocol {
self.tp
}
pub fn take_shutdown(&mut self) -> Receiver<NetError> {
self.shutdown.take().unwrap()
}
pub async fn join_shutdown(&mut self) -> Result<(), NetError> {
match self.take_shutdown().next().await {
Some(error) => Err(error),
None => Ok(()),
}
}
pub fn set_shutdown(&mut self) -> Sender<NetError> {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<NetError>();
self.shutdown = Some(shutdown_receiver);
shutdown_sender
}
pub fn take_sender(&mut self) -> Receiver<ConnectionCommand> { pub fn take_sender(&mut self) -> Receiver<ConnectionCommand> {
self.sender.take().unwrap() self.sender.take().unwrap()
} }
@ -115,14 +140,14 @@ impl ConnectionBase {
let _ = self.sender_tx.as_mut().unwrap().send(cmd).await; let _ = self.sender_tx.as_mut().unwrap().send(cmd).await;
} }
pub async fn inject(&mut self, cmd: ConnectionCommand) { // pub async fn inject(&mut self, cmd: ConnectionCommand) {
let _ = self.receiver_tx.as_mut().unwrap().send(cmd).await; // let _ = self.receiver_tx.as_mut().unwrap().send(cmd).await;
} // }
pub async fn close_streams(&mut self) { // pub async fn close_streams(&mut self) {
let _ = self.receiver_tx.as_mut().unwrap().close_channel(); // let _ = self.receiver_tx.as_mut().unwrap().close_channel();
let _ = self.sender_tx.as_mut().unwrap().close_channel(); // let _ = self.sender_tx.as_mut().unwrap().close_channel();
} // }
pub async fn close(&mut self) { pub async fn close(&mut self) {
log!("closing..."); log!("closing...");

@ -29,6 +29,7 @@ pub enum NetError {
ConnectionError, ConnectionError,
SerializationError, SerializationError,
ProtocolError, ProtocolError,
Closing,
} //MAX 50 NetErrors } //MAX 50 NetErrors
impl Error for NetError {} impl Error for NetError {}
@ -64,6 +65,7 @@ pub enum ProtocolError {
RepoIdRequired, RepoIdRequired,
ConnectionError, ConnectionError,
Timeout,
PeerAlreadyConnected, PeerAlreadyConnected,
NoError, NoError,

@ -51,3 +51,15 @@ macro_rules! log {
macro_rules! log { macro_rules! log {
($($t:tt)*) => (debug_print::debug_println!($($t)*)) ($($t:tt)*) => (debug_print::debug_println!($($t)*))
} }
#[cfg(target_arch = "wasm32")]
#[macro_export]
macro_rules! sleep {
($($t:tt)*) => (gloo_timers::future::sleep($($t)*).await)
}
#[cfg(not(target_arch = "wasm32"))]
#[macro_export]
macro_rules! sleep {
($($t:tt)*) => (std::thread::sleep($($t)*))
}

Loading…
Cancel
Save