broker singleton and timeout management (for tests and prod)

master
Niko 1 year ago
parent 9c970a8ceb
commit a3e6ec98a0
  1. 6
      Cargo.lock
  2. 23
      README.md
  3. 7
      ng-app-js/Cargo.toml
  4. 42
      ng-app-js/src/lib.rs
  5. 3
      p2p-client-ws/Cargo.toml
  6. 110
      p2p-client-ws/src/remote_ws.rs
  7. 142
      p2p-client-ws/src/remote_ws_wasm.rs
  8. 6
      p2p-net/Cargo.toml
  9. 229
      p2p-net/src/broker.rs
  10. 55
      p2p-net/src/connection.rs
  11. 2
      p2p-net/src/errors.rs
  12. 12
      p2p-net/src/lib.rs

6
Cargo.lock generated

@ -1076,6 +1076,8 @@ dependencies = [
"debug_print",
"futures",
"getrandom 0.1.16",
"gloo-timers",
"js-sys",
"p2p-client-ws",
"p2p-net",
"p2p-repo",
@ -1085,7 +1087,9 @@ dependencies = [
"serde_bytes",
"snow",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-bindgen-test",
"web-sys",
"ws_stream_wasm",
]
@ -1249,7 +1253,9 @@ dependencies = [
"blake3",
"debug_print",
"futures",
"gloo-timers",
"num_enum",
"once_cell",
"p2p-repo",
"serde",
"serde_bare",

@ -9,7 +9,7 @@ This repository is in active development at [https://git.nextgraph.org/NextGraph
> NextGraph brings about the convergence between P2P and Semantic Web technologies, towards a decentralized, secure and privacy-preserving cloud, based on CRDTs.
>
> This open source ecosystem provides solutions for end-users and software developers alike, wishing to use or create **decentralized** apps featuring: **live collaboration** on rich-text documents, peer to peer communication with end-to-end encryption, offline-first, **local-first**, portable and interoperable data, total ownership of data and software, security and privacy. Centered on repositories containing **semantic data** (RDF), **rich text**, and structured data formats like **JSON**, synced between peers belonging to permissioned groups of users, it offers strong eventual consistency, thanks to the use of **CRDTs**. Documents can be linked together, signed, shared securely, queried using the **SPARQL** language and organized into sites and containers.
>
>
> More info here [https://nextgraph.org](https://nextgraph.org)
## Support
@ -21,13 +21,14 @@ And our community forum where you can ask questions is here [https://forum.nextg
## How to use NextGraph
NextGraph is not ready yet. You can subscribe to [our newsletter](https://list.nextgraph.org/subscription/form) to get updates, and support us with a [donation](https://nextgraph.org/donate/).
## For developers
Read our [getting started guide](https://docs.nextgraph.org/en/getting-started/).
## For contributors
- [Install Rust](https://www.rust-lang.org/tools/install)
- [Install Rust](https://www.rust-lang.org/tools/install) minimum required 1.64.0
- [Install Nodejs](https://nodejs.org/en/download/)
```
@ -36,11 +37,11 @@ cargo install wasm-pack
git clone git@git.nextgraph.org:NextGraph/nextgraph-rs.git
cd nextgraph-rs
cargo build
```
```
### Packages
The crates are organized as follow :
The crates are organized as follow :
- p2p-repo : all the common types, traits and structs for the P2P repositories
- p2p-net : all the common types, traits and structs for the P2P networks
@ -81,17 +82,20 @@ cargo test --package p2p-repo --lib -- branch::test --nocapture
```
Test end-to-end client and server:
```
```
cargo test --package ngcli -- --nocapture
```
```
Test WASM websocket
```
cd ng-app-js
wasm-pack test --chrome --headless
```
Test Rust websocket
```
cargo test --package p2p-client-ws --lib -- --nocapture
```
@ -123,9 +127,10 @@ additional terms or conditions.
## License
Licensed under either of
* Apache License, Version 2.0 ([LICENSE-APACHE2](LICENSE-APACHE2) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.
- Apache License, Version 2.0 ([LICENSE-APACHE2](LICENSE-APACHE2) or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.
`SPDX-License-Identifier: Apache-2.0 OR MIT`

@ -34,8 +34,11 @@ getrandom = { version = "0.1.1", features = ["wasm-bindgen"] }
# version = "0.2.7"
# features = ["js"]
# [target.'cfg(target_arch = "wasm32")'.dependencies]
# wasm-bindgen-futures = "0.4.34"
[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.61"
wasm-bindgen-futures = "0.4.34"
web-sys = { version = "0.3.61", features = ["Window"] }
gloo-timers = "0.2.6"
[dev-dependencies]
wasm-bindgen-test = "^0.3"

@ -1,10 +1,11 @@
use async_std::task;
use js_sys::Reflect;
#[cfg(target_arch = "wasm32")]
use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket;
use p2p_net::broker::*;
use p2p_net::log;
use p2p_net::types::IP;
use p2p_net::types::{DirectPeerId, IP};
use p2p_net::utils::{spawn_and_log_error, ResultSend};
use p2p_net::{log, sleep};
use p2p_repo::utils::generate_keypair;
use std::net::IpAddr;
use std::str::FromStr;
@ -18,19 +19,20 @@ extern "C" {
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub fn greet(name: &str) {
pub async fn greet(name: &str) {
log!("I say: {}", name);
let mut random_buf = [0u8; 32];
getrandom::getrandom(&mut random_buf).unwrap();
//spawn_and_log_error(testt("ws://127.0.0.1:3012"));
async fn method() -> ResultSend<()> {
log!("start connecting");
let cnx = Arc::new(ConnectionWebSocket {});
//let cnx = Arc::new();
let (priv_key, pub_key) = generate_keypair();
let broker = Broker::new();
let res = broker
let res = BROKER
.write()
.await
.connect(
cnx,
Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
None,
priv_key,
@ -39,10 +41,32 @@ pub fn greet(name: &str) {
)
.await;
log!("broker.connect : {:?}", res);
BROKER.read().await.print_status();
//res.expect_throw("assume the connection succeeds");
async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> {
async move {
sleep!(std::time::Duration::from_secs(10));
log!("timeout");
BROKER
.write()
.await
.close_peer_connection(&remote_peer_id)
.await;
}
.await;
Ok(())
}
spawn_and_log_error(timer_close(pub_key));
//Broker::graceful_shutdown().await;
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(12)).await;
Ok(())
}
spawn_and_log_error(method());
spawn_and_log_error(method()).await;
//spawn_and_log_error(Arc::clone(&cnx).open("ws://127.0.0.1:3012", priv_key, pub_key));
}
@ -69,6 +93,6 @@ mod test {
#[wasm_bindgen_test]
pub async fn test_greet() {
greet("test");
greet("test").await;
}
}

@ -31,9 +31,6 @@ wasm-bindgen-test = "^0.3"
version = "0.2.7"
features = ["js"]
# [target.'cfg(target_arch = "wasm32")'.dependencies]
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
getrandom = "0.2.7"
xactor = "0.7.11"

@ -21,8 +21,8 @@ use debug_print::*;
use async_std::sync::Mutex;
use futures::io::Close;
use futures::FutureExt;
use futures::{future, pin_mut, select, stream, StreamExt};
use futures::{FutureExt, SinkExt};
use async_std::task;
use p2p_net::errors::*;
@ -42,13 +42,13 @@ pub struct ConnectionWebSocket {}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl IConnection for ConnectionWebSocket {
async fn open(
self: Arc<Self>,
&self,
ip: IP,
peer_pubk: PrivKey,
peer_privk: PubKey,
remote_peer: DirectPeerId,
) -> Result<(), NetError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client);
) -> Result<ConnectionBase, NetError> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, WS_PORT);
@ -108,23 +108,33 @@ impl IConnection for ConnectionWebSocket {
cnx.start_read_loop();
let s = cnx.take_sender();
let r = cnx.take_receiver();
let mut shutdown = cnx.set_shutdown();
//let ws_in_task = Arc::clone(&ws);
task::spawn(async move {
let join = task::spawn(async move {
debug_println!("START of WS loop");
//let w = ws_in_task.lock().await;
ws_loop(websocket, s, r).await;
let res = ws_loop(websocket, s, r).await;
// .close(Some(CloseFrame {
// code: CloseCode::Library(4000),
// reason: std::borrow::Cow::Borrowed(""),
// }))
// .await;
if res.is_err() {
let _ = shutdown.send(res.err().unwrap()).await;
}
debug_println!("END of WS loop");
});
//spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver()));
log!("sending...");
//
//cnx.close().await;
//// let res = cnx.join_shutdown().await;
//// log!("JOIN SHUTDOWN {:?}", res);
// cnx.send(ConnectionCommand::Close).await;
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
@ -146,20 +156,16 @@ impl IConnection for ConnectionWebSocket {
//log!("WS closed {:?}", last_event.clone());
//Ok(cnx)
Ok(())
Ok(cnx)
//Ok(())
}
}
}
async fn accept(&mut self) -> Result<(), NetError> {
let cnx = ConnectionBase::new(ConnectionDir::Server);
async fn accept(&self) -> Result<ConnectionBase, NetError> {
let cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS);
Ok(())
}
fn tp(&self) -> TransportProtocol {
TransportProtocol::WS
unimplemented!();
}
}
@ -169,8 +175,19 @@ async fn close_ws(
code: u16,
reason: &str,
) -> Result<(), NetError> {
log!("close_ws");
let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await;
log!("close_ws {:?}", code);
let cmd = if code == 1000 {
ConnectionCommand::Close
} else if code < 4000 {
ConnectionCommand::Error(NetError::WsError)
} else if code < 4950 {
ConnectionCommand::ProtocolError(ProtocolError::try_from(code - 4000).unwrap())
} else {
ConnectionCommand::Error(NetError::try_from(code - 4949).unwrap())
};
let _ = futures::SinkExt::send(receiver, cmd).await;
stream
.close(Some(CloseFrame {
code: CloseCode::Library(code),
@ -200,8 +217,8 @@ async fn ws_loop(
log!("GOT MESSAGE {:?}", msg);
if msg.is_close() {
if let Message::Close(Some(cf)) = msg {
log!("CLOSE from server: {}",cf.reason);
if let Message::Close(Some(cf)) = msg {
log!("CLOSE from server with closeframe: {}",cf.reason);
let last_command = match cf.code {
CloseCode::Normal =>
ConnectionCommand::Close,
@ -227,9 +244,9 @@ async fn ws_loop(
futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&msg.into_data())?)).await
.map_err(|_e| NetError::IoError)?;
}
return Ok(ProtocolError::Closing);
//return Ok(ProtocolError::Closing);
},
Some(Err(e)) => break,
Some(Err(e)) => {log!("GOT ERROR {:?}",e);return Err(NetError::WsError);},
None => break
},
s = sender.next().fuse() => match s {
@ -261,6 +278,7 @@ async fn ws_loop(
match inner_loop(&mut ws, sender, &mut receiver).await {
Ok(proto_err) => {
if proto_err == ProtocolError::Closing {
//FIXME: remove this case
ws.close(None).await.map_err(|_e| NetError::WsError)?;
} else if proto_err == ProtocolError::NoError {
close_ws(&mut ws, &mut receiver, 1000, "").await?;
@ -295,9 +313,9 @@ mod test {
use async_std::task;
use p2p_net::broker::*;
use p2p_net::errors::NetError;
use p2p_net::log;
use p2p_net::types::IP;
use p2p_net::utils::{spawn_and_log_error, ResultSend};
use p2p_net::{log, sleep};
use p2p_repo::utils::generate_keypair;
use std::net::IpAddr;
use std::str::FromStr;
@ -309,22 +327,42 @@ mod test {
//spawn_and_log_error(testt("ws://127.0.0.1:3012"));
log!("start connecting");
let cnx = Arc::new(ConnectionWebSocket {});
let (priv_key, pub_key) = generate_keypair();
let broker = Broker::new();
let res = broker
.connect(
cnx,
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
None,
priv_key,
pub_key,
pub_key,
)
{
let res = BROKER
.write()
.await
.connect(
Box::new(ConnectionWebSocket {}),
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(),
None,
priv_key,
pub_key,
pub_key,
)
.await;
log!("broker.connect : {:?}", res);
//res.expect_throw("assume the connection succeeds");
}
async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> {
async move {
sleep!(std::time::Duration::from_secs(10));
log!("timeout");
BROKER
.write()
.await
.close_peer_connection(&remote_peer_id)
.await;
}
.await;
log!("broker.connect : {:?}", res);
//res.expect_throw("assume the connection succeeds");
Ok(())
}
spawn_and_log_error(timer_close(pub_key));
//Broker::graceful_shutdown().await;
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(12)).await;
Ok(())
}
}

@ -34,19 +34,15 @@ pub struct ConnectionWebSocket {}
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl IConnection for ConnectionWebSocket {
fn tp(&self) -> TransportProtocol {
TransportProtocol::WS
}
async fn open(
self: Arc<Self>,
&self,
ip: IP,
peer_pubk: PrivKey,
peer_privk: PubKey,
remote_peer: DirectPeerId,
) -> Result<(), NetError> {
) -> Result<ConnectionBase, NetError> {
//pub async fn testt(url: &str) -> ResultSend<()> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client);
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, WS_PORT);
@ -55,75 +51,62 @@ impl IConnection for ConnectionWebSocket {
NetError::ConnectionError
})?;
let mut evts = ws
.observe(ObserveConfig::default())
//.observe(Filter::Pointer(WsEvent::is_closed).into())
.await
.expect_throw("observe");
//let (mut sender_tx, sender_rx) = mpsc::unbounded();
//let (mut receiver_tx, receiver_rx) = mpsc::unbounded();
cnx.start_read_loop();
let mut shutdown = cnx.set_shutdown();
spawn_and_log_error(ws_loop(ws, wsio, cnx.take_sender(), cnx.take_receiver()));
spawn_and_log_error(ws_loop(
ws,
wsio,
cnx.take_sender(),
cnx.take_receiver(),
shutdown,
));
//spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone()));
log!("sending...");
//cnx.close().await;
// spawn_and_log_error(async move {
// TimeoutFuture::new(10_000).await;
// cnx.close().await;
// Ok(())
// // // Do something here after the one second timeout is up!
// });
// cnx.send(ConnectionCommand::Close).await;
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
// StartProtocol::Auth(ClientHello::V0()),
// )))
// .await;
cnx.close().await;
log!("waiting...");
//let res = join.next().await;
log!("finished...");
//log!("JOIN SHUTDOWN {:?}", res);
// Note that since WsMeta::connect resolves to an opened connection, we don't see
// any Open events here.
//
//assert!(evts.next().await.unwrap_throw().is_closing());
let last_event = evts.next().await;
log!("WS closed {:?}", last_event.clone());
let last_command = match last_event {
None => ConnectionCommand::Close,
Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen
Some(WsEvent::Error) => ConnectionCommand::Error(NetError::ConnectionError),
Some(WsEvent::Closing) => ConnectionCommand::Close,
Some(WsEvent::Closed(ce)) => {
if ce.code == 1000 {
ConnectionCommand::Close
} else if ce.code < 4000 {
ConnectionCommand::Error(NetError::WsError)
} else if ce.code < 4950 {
ConnectionCommand::ProtocolError(
ProtocolError::try_from(ce.code - 4000).unwrap(),
)
} else {
ConnectionCommand::Error(NetError::try_from(ce.code - 4949).unwrap())
}
}
Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError),
};
let _ = cnx.inject(last_command).await;
let _ = cnx.close_streams().await;
//assert!(events.next().await.unwrap_throw().is_closing());
//Ok(cnx)
Ok(())
Ok(cnx)
}
async fn accept(&mut self) -> Result<(), NetError> {
async fn accept(&self) -> Result<ConnectionBase, NetError> {
!unimplemented!()
}
}
async fn ws_loop(
ws: WsMeta,
mut ws: WsMeta,
mut stream: WsStream,
sender: Receiver<ConnectionCommand>,
receiver: Sender<ConnectionCommand>,
mut receiver: Sender<ConnectionCommand>,
mut shutdown: Sender<NetError>,
) -> ResultSend<()> {
async fn inner_loop(
stream: &mut WsStream,
@ -132,6 +115,7 @@ async fn ws_loop(
) -> Result<ProtocolError, NetError> {
//let mut rx_sender = sender.fuse();
loop {
log!("BEFORE SELECT");
select! {
r = stream.next().fuse() => match r {
Some(msg) => {
@ -170,29 +154,73 @@ async fn ws_loop(
},
}
}
log!("END SELECT");
Ok(ProtocolError::NoError)
}
match inner_loop(&mut stream, sender, receiver).await {
let mut events = ws
.observe(ObserveConfig::default())
//.observe(Filter::Pointer(WsEvent::is_closed).into())
.await
.expect_throw("observe");
log!("OBSERVED");
match inner_loop(&mut stream, sender, receiver.clone()).await {
Ok(proto_err) => {
if proto_err == ProtocolError::NoError {
ws.close_code(1000).await.map_err(|_e| NetError::WsError)?;
let _ = ws.close_code(1000).await; //.map_err(|_e| NetError::WsError)?;
log!("CLOSED GRACEFULLY");
} else {
log!("PROTOCOL ERR");
let mut code = proto_err.clone() as u16;
if code > 949 {
code = ProtocolError::OtherError as u16;
}
ws.close_reason(code + 4000, proto_err.to_string())
.await
.map_err(|_e| NetError::WsError)?;
return Err(Box::new(proto_err));
let _ = ws.close_reason(code + 4000, proto_err.to_string()).await;
//.map_err(|_e| NetError::WsError)?;
//return Err(Box::new(proto_err));
}
}
Err(e) => {
ws.close_reason(e.clone() as u16 + 4949, e.to_string())
.await
.map_err(|_e| NetError::WsError)?;
return Err(Box::new(e));
let _ = ws
.close_reason(e.clone() as u16 + 4949, e.to_string())
.await;
//.map_err(|_e| NetError::WsError)?;
//return Err(Box::new(e));
log!("ERR");
}
}
log!("waiting for closing event");
let last_event = events.next().await;
log!("WS closed {:?}", last_event.clone());
let last_command = match last_event {
None => ConnectionCommand::Close,
Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen
Some(WsEvent::Error) => ConnectionCommand::Error(NetError::ConnectionError),
Some(WsEvent::Closing) => ConnectionCommand::Close,
Some(WsEvent::Closed(ce)) => {
if ce.code == 1000 {
ConnectionCommand::Close
} else if ce.code < 4000 {
ConnectionCommand::Error(NetError::WsError)
} else if ce.code < 4950 {
ConnectionCommand::ProtocolError(ProtocolError::try_from(ce.code - 4000).unwrap())
} else {
ConnectionCommand::Error(NetError::try_from(ce.code - 4949).unwrap())
}
}
Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError),
};
if let ConnectionCommand::Error(err) = last_command.clone() {
let _ = shutdown.send(err).await;
} else if let ConnectionCommand::ProtocolError(err) = last_command.clone() {
let _ = shutdown.send(NetError::ProtocolError).await;
} // otherwise, shutdown gracefully (with None). it is done automatically during destroy of shutdown
receiver
.send(last_command)
.await
.map_err(|_e| NetError::IoError)?;
Ok(())
}

@ -20,4 +20,8 @@ async-trait = "0.1.64"
blake3 = "1.3.1"
async-std = { version = "1.12.0", features = ["attributes","unstable"] }
wasm-bindgen = "0.2"
unique_id = "0.1.5"
unique_id = "0.1.5"
once_cell = "1.17.1"
[target.'cfg(target_arch = "wasm32")'.dependencies]
gloo-timers = "0.2.6"

@ -1,84 +1,271 @@
use crate::actor::*;
use crate::connection::*;
use crate::errors::*;
use crate::types::*;
use crate::utils::spawn_and_log_error;
use crate::utils::ResultSend;
use crate::{log, sleep};
use async_std::stream::StreamExt;
use async_std::sync::{Arc, RwLock};
use futures::channel::mpsc;
use futures::SinkExt;
use once_cell::sync::Lazy;
use p2p_repo::types::{PrivKey, PubKey};
use p2p_repo::utils::generate_keypair;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::{Arc, RwLock};
use crate::actor::*;
#[derive(Debug)]
pub enum PeerConnection {
Core(IP),
Client(Box<Arc<dyn IConnection>>),
Client(ConnectionBase),
NONE,
}
#[derive(Debug)]
pub struct BrokerPeerInfo {
lastPeerAdvert: Option<PeerAdvert>, //FIXME: remove Option
connected: PeerConnection,
}
#[derive(Debug)]
pub struct DirectConnection {
ip: IP,
interface: String,
remote_peer_id: DirectPeerId,
tp: TransportProtocol,
//dir: ConnectionDir,
cnx: Box<Arc<dyn IConnection>>,
cnx: ConnectionBase,
}
pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new())));
pub struct Broker {
//actors: Arc<RwLock<HashMap<i64, Box<dyn IActor>>>>,
direct_connections: Arc<RwLock<HashMap<IP, DirectConnection>>>,
peers: Arc<RwLock<HashMap<DirectPeerId, BrokerPeerInfo>>>,
direct_connections: HashMap<IP, DirectConnection>,
peers: HashMap<DirectPeerId, BrokerPeerInfo>,
shutdown: Option<Receiver<ProtocolError>>,
shutdown_sender: Sender<ProtocolError>,
closing: bool,
}
impl Broker {
pub fn reconnecting(&mut self, peer_id: &DirectPeerId) {
let mut peerinfo = self.peers.get_mut(peer_id);
match peerinfo {
Some(info) => match &info.connected {
PeerConnection::NONE => {}
PeerConnection::Client(cb) => {
info.connected = PeerConnection::NONE;
}
PeerConnection::Core(ip) => {
self.direct_connections.remove(&ip);
info.connected = PeerConnection::NONE;
}
},
None => {}
}
}
pub fn remove(&mut self, peer_id: &DirectPeerId) {
let removed = self.peers.remove(peer_id);
match removed {
Some(info) => match info.connected {
PeerConnection::NONE => {}
PeerConnection::Client(cb) => {}
PeerConnection::Core(ip) => {
self.direct_connections.remove(&ip);
}
},
None => {}
}
}
pub fn new() -> Self {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<ProtocolError>();
Broker {
direct_connections: Arc::new(RwLock::new(HashMap::new())),
peers: Arc::new(RwLock::new(HashMap::new())),
shutdown: Some(shutdown_receiver),
shutdown_sender,
direct_connections: HashMap::new(),
peers: HashMap::new(),
closing: false,
}
}
fn take_shutdown(&mut self) -> Receiver<ProtocolError> {
self.shutdown.take().unwrap()
}
pub async fn join_shutdown() -> Result<(), ProtocolError> {
let mut shutdown_join: Receiver<ProtocolError>;
{
shutdown_join = BROKER.write().await.take_shutdown();
}
match shutdown_join.next().await {
Some(ProtocolError::Closing) => Ok(()),
Some(error) => Err(error),
None => Ok(()),
}
}
/// Used in tests mostly
pub async fn join_shutdown_with_timeout(
timeout: std::time::Duration,
) -> Result<(), ProtocolError> {
async fn timer_shutdown(timeout: std::time::Duration) -> ResultSend<()> {
async move {
sleep!(timeout);
log!("timeout for shutdown");
let _ = BROKER
.write()
.await
.shutdown_sender
.send(ProtocolError::Timeout)
.await;
}
.await;
Ok(())
}
spawn_and_log_error(timer_shutdown(timeout));
Broker::join_shutdown().await
}
pub async fn graceful_shutdown() {
let keys;
{
let mut broker = BROKER.write().await;
if broker.closing {
return;
}
broker.closing = true;
keys = Vec::from_iter(broker.peers.keys().cloned());
}
for peer_id in keys {
BROKER.write().await.close_peer_connection(&peer_id).await;
}
let _ = BROKER
.write()
.await
.shutdown_sender
.send(ProtocolError::Closing)
.await;
}
pub async fn shutdown(&mut self) {
if self.closing {
return;
}
self.closing = true;
let _ = self.shutdown_sender.send(ProtocolError::Closing).await;
}
pub async fn connect(
&self,
cnx: Arc<dyn IConnection>,
&mut self,
cnx: Box<dyn IConnection>,
ip: IP,
core: Option<String>,
core: Option<String>, // the interface used as egress for this connection
peer_pubk: PrivKey,
peer_privk: PubKey,
remote_peer_id: DirectPeerId,
) -> Result<(), NetError> {
if self.closing {
return Err(NetError::Closing);
}
// TODO check that not already connected to peer
//IpAddr::from_str("127.0.0.1");
//cnx.open(url, peer_pubk, peer_privk).await?;
//let cnx = Arc::new();
let (priv_key, pub_key) = generate_keypair();
Arc::clone(&cnx)
.open(ip, priv_key, pub_key, remote_peer_id)
.await?;
log!("CONNECTING");
let connection_res = cnx.open(ip, priv_key, pub_key, remote_peer_id).await;
log!("CONNECTED {:?}", connection_res);
let mut connection = connection_res.unwrap();
let join = connection.take_shutdown();
let connected = if core.is_some() {
let dc = DirectConnection {
ip,
interface: core.unwrap(),
interface: core.clone().unwrap(),
remote_peer_id,
tp: cnx.tp(),
cnx: Box::new(Arc::clone(&cnx)),
tp: connection.transport_protocol(),
cnx: connection,
};
self.direct_connections.write().unwrap().insert(ip, dc);
self.direct_connections.insert(ip, dc);
PeerConnection::Core(ip)
} else {
PeerConnection::Client(Box::new(Arc::clone(&cnx)))
PeerConnection::Client(connection)
};
let bpi = BrokerPeerInfo {
lastPeerAdvert: None,
connected,
};
self.peers.write().unwrap().insert(remote_peer_id, bpi);
self.peers.insert(remote_peer_id, bpi);
async fn watch_close(
mut join: Receiver<NetError>,
cnx: Box<dyn IConnection>,
ip: IP,
core: Option<String>, // the interface used as egress for this connection
peer_pubk: PrivKey,
peer_privk: PubKey,
remote_peer_id: DirectPeerId,
) -> ResultSend<()> {
async move {
let res = join.next().await;
log!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id);
if res.is_some() {
// we intend to reconnect
let mut broker = BROKER.write().await;
broker.reconnecting(&remote_peer_id);
// TODO: deal with cycle error https://users.rust-lang.org/t/recursive-async-method-causes-cycle-error/84628/5
// let result = broker
// .connect(cnx, ip, core, peer_pubk, peer_privk, remote_peer_id)
// .await;
// log!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id);
// TODO: deal with error and incremental backoff
} else {
log!("REMOVED");
BROKER.write().await.remove(&remote_peer_id);
}
}
.await;
Ok(())
}
spawn_and_log_error(watch_close(
join,
cnx,
ip,
core,
peer_pubk,
peer_privk,
remote_peer_id,
));
Ok(())
}
pub async fn close_peer_connection(&mut self, peer_id: &DirectPeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
match &mut peer.connected {
PeerConnection::Core(_) => {
//TODO
unimplemented!();
}
PeerConnection::Client(cb) => {
cb.close().await;
}
PeerConnection::NONE => {}
}
//self.peers.remove(peer_id); // this is done in the watch_close instead
}
}
pub fn print_status(&self) {
self.peers.iter().for_each(|(peerId, peerInfo)| {
log!("PEER in BROKER {:?} {:?}", peerId, peerInfo);
});
self.direct_connections.iter().for_each(|(ip, directCnx)| {
log!("direct_connection in BROKER {:?} {:?}", ip, directCnx)
});
}
}

@ -18,7 +18,7 @@ use unique_id::GeneratorFromSeed;
pub type Sender<T> = mpsc::UnboundedSender<T>;
pub type Receiver<T> = mpsc::UnboundedReceiver<T>;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ConnectionCommand {
Msg(ProtocolMessage),
Error(NetError),
@ -28,45 +28,70 @@ pub enum ConnectionCommand {
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait IConnection {
pub trait IConnection: Send + Sync {
async fn open(
self: Arc<Self>,
&self,
ip: IP,
peer_pubk: PrivKey,
peer_privk: PubKey,
remote_peer: DirectPeerId,
) -> Result<(), NetError>;
async fn accept(&mut self) -> Result<(), NetError>;
fn tp(&self) -> TransportProtocol;
) -> Result<ConnectionBase, NetError>;
async fn accept(&self) -> Result<ConnectionBase, NetError>;
}
#[derive(PartialEq)]
#[derive(PartialEq, Debug)]
pub enum ConnectionDir {
Server,
Client,
}
#[derive(Debug)]
pub struct ConnectionBase {
sender: Option<Receiver<ConnectionCommand>>,
receiver: Option<Sender<ConnectionCommand>>,
sender_tx: Option<Sender<ConnectionCommand>>,
receiver_tx: Option<Sender<ConnectionCommand>>,
shutdown: Option<Receiver<NetError>>,
dir: ConnectionDir,
next_request_id: SequenceGenerator,
tp: TransportProtocol,
}
impl ConnectionBase {
pub fn new(dir: ConnectionDir) -> Self {
pub fn new(dir: ConnectionDir, tp: TransportProtocol) -> Self {
Self {
receiver: None,
sender: None,
sender_tx: None,
receiver_tx: None,
shutdown: None,
next_request_id: SequenceGenerator::new(1),
dir,
tp,
}
}
pub fn transport_protocol(&self) -> TransportProtocol {
self.tp
}
pub fn take_shutdown(&mut self) -> Receiver<NetError> {
self.shutdown.take().unwrap()
}
pub async fn join_shutdown(&mut self) -> Result<(), NetError> {
match self.take_shutdown().next().await {
Some(error) => Err(error),
None => Ok(()),
}
}
pub fn set_shutdown(&mut self) -> Sender<NetError> {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<NetError>();
self.shutdown = Some(shutdown_receiver);
shutdown_sender
}
pub fn take_sender(&mut self) -> Receiver<ConnectionCommand> {
self.sender.take().unwrap()
}
@ -115,14 +140,14 @@ impl ConnectionBase {
let _ = self.sender_tx.as_mut().unwrap().send(cmd).await;
}
pub async fn inject(&mut self, cmd: ConnectionCommand) {
let _ = self.receiver_tx.as_mut().unwrap().send(cmd).await;
}
// pub async fn inject(&mut self, cmd: ConnectionCommand) {
// let _ = self.receiver_tx.as_mut().unwrap().send(cmd).await;
// }
pub async fn close_streams(&mut self) {
let _ = self.receiver_tx.as_mut().unwrap().close_channel();
let _ = self.sender_tx.as_mut().unwrap().close_channel();
}
// pub async fn close_streams(&mut self) {
// let _ = self.receiver_tx.as_mut().unwrap().close_channel();
// let _ = self.sender_tx.as_mut().unwrap().close_channel();
// }
pub async fn close(&mut self) {
log!("closing...");

@ -29,6 +29,7 @@ pub enum NetError {
ConnectionError,
SerializationError,
ProtocolError,
Closing,
} //MAX 50 NetErrors
impl Error for NetError {}
@ -64,6 +65,7 @@ pub enum ProtocolError {
RepoIdRequired,
ConnectionError,
Timeout,
PeerAlreadyConnected,
NoError,

@ -51,3 +51,15 @@ macro_rules! log {
macro_rules! log {
($($t:tt)*) => (debug_print::debug_println!($($t)*))
}
#[cfg(target_arch = "wasm32")]
#[macro_export]
macro_rules! sleep {
($($t:tt)*) => (gloo_timers::future::sleep($($t)*).await)
}
#[cfg(not(target_arch = "wasm32"))]
#[macro_export]
macro_rules! sleep {
($($t:tt)*) => (std::thread::sleep($($t)*))
}

Loading…
Cancel
Save