@ -16,6 +16,8 @@ use std::collections::HashMap;
use std ::collections ::HashSet ;
use async_std ::stream ::StreamExt ;
#[ cfg(not(target_arch = " wasm32 " )) ]
use async_std ::sync ::Mutex ;
use async_std ::sync ::{ Arc , RwLock } ;
use either ::Either ;
use futures ::channel ::mpsc ;
@ -34,13 +36,57 @@ use crate::types::*;
use crate ::utils ::spawn_and_log_error ;
use crate ::utils ::{ Receiver , ResultSend , Sender } ;
#[ doc(hidden) ]
#[ derive(Debug, Clone) ]
pub enum ClientPeerId {
Local ( ( UserId , DirectPeerId ) ) ,
Remote ( DirectPeerId ) ,
}
impl ClientPeerId {
pub fn key ( & self ) -> & DirectPeerId {
match self {
Self ::Remote ( dpi ) = > dpi ,
Self ::Local ( ( _user , dpi ) ) = > dpi ,
}
}
pub fn value ( & self ) -> Option < UserId > {
match self {
Self ::Remote ( _ ) = > None ,
Self ::Local ( ( user , _ ) ) = > Some ( * user ) ,
}
}
pub fn new_from ( peer : & DirectPeerId , local_user : & Option < UserId > ) -> Self {
match local_user {
Some ( user ) = > ClientPeerId ::Local ( ( * user , * peer ) ) ,
None = > ClientPeerId ::Remote ( * peer ) ,
}
}
}
#[ derive(Debug) ]
enum PeerConnection {
Core ( BindAddress ) ,
Client ( ConnectionBase ) ,
Local ( LocalTransport ) ,
NONE ,
}
#[ derive(Debug) ]
struct LocalTransport {
#[ allow(dead_code) ]
client_peer_id : DirectPeerId ,
client_cnx : ConnectionBase ,
server_cnx : ConnectionBase ,
}
impl LocalTransport {
async fn close ( & mut self ) {
self . client_cnx . close ( ) . await ;
self . server_cnx . close ( ) . await ;
}
}
#[ derive(Debug) ]
struct BrokerPeerInfo {
#[ allow(dead_code) ]
@ -81,8 +127,8 @@ pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new
pub struct Broker {
direct_connections : HashMap < BindAddress , DirectConnection > ,
/// tuple of optional userId and peer key in montgomery form. userId is always None on the server side.
peers : HashMap < ( Option < PubKey > , X25519PubKey ) , BrokerPeerInfo > ,
/// tuple of optional userId and peer key in montgomery form. userId is always None on the server side (except for local transport) .
peers : HashMap < ( Option < PubKey > , Option < X25519PubKey > ) , BrokerPeerInfo > ,
/// (local,remote) -> ConnectionBase
anonymous_connections : HashMap < ( BindAddress , BindAddress ) , ConnectionBase > ,
@ -90,7 +136,7 @@ pub struct Broker {
shutdown : Option < Receiver < ProtocolError > > ,
shutdown_sender : Sender < ProtocolError > ,
closing : bool ,
server_broker : Option < Box < dyn IServerBroker + Send + Sync > > ,
server_broker : Option < Arc < RwLock < dyn IServerBroker + Send + Sync > > > ,
//local_broker: Option<Box<dyn ILocalBroker + Send + Sync + 'a>>,
local_broker : Option < Arc < RwLock < dyn ILocalBroker > > > ,
@ -100,7 +146,7 @@ pub struct Broker {
#[ cfg(not(target_arch = " wasm32 " )) ]
bind_addresses : HashMap < BindAddress , String > ,
#[ cfg(not(target_arch = " wasm32 " )) ]
users_peers : HashMap < UserId , HashSet < X25519PubKey > > ,
users_peers : HashMap < UserId , HashSet < Option < X25519PubKey > > > ,
}
impl Broker {
@ -123,6 +169,10 @@ impl Broker {
// }
// }
pub fn get_server_peer_id ( & self ) -> DirectPeerId {
self . config . as_ref ( ) . unwrap ( ) . peer_id
}
pub ( crate ) fn get_config ( & self ) -> Option < & ServerConfig > {
self . config . as_ref ( )
}
@ -143,7 +193,7 @@ impl Broker {
#[ doc(hidden) ]
pub fn set_server_broker ( & mut self , broker : impl IServerBroker + ' static ) {
//log_debug!("set_server_broker");
self . server_broker = Some ( Box ::new ( broker ) ) ;
self . server_broker = Some ( Arc ::new ( RwLock ::new ( broker ) ) ) ;
}
#[ doc(hidden) ]
@ -174,23 +224,26 @@ impl Broker {
( copy_listeners , copy_bind_addresses )
}
pub ( crate ) fn get_server_broker (
#[ doc(hidden) ]
pub fn get_server_broker (
& self ,
) -> Result < & Box < dyn IServerBroker + Send + Sync > , ProtocolError > {
) -> Result < Arc < RwLock < dyn IServerBroker + Send + Sync > > , ProtocolError > {
//log_debug!("GET STORAGE {:?}", self.server_storage);
Ok ( Arc ::clone (
self . server_broker
. as_ref ( )
. ok_or ( ProtocolError ::BrokerError )
. ok_or ( ProtocolError ::BrokerError ) ? ,
) )
}
pub ( crate ) fn get_server_broker_mut (
& mut self ,
) -> Result < & mut Box < dyn IServerBroker + Send + Sync > , ProtocolError > {
//log_debug!("GET STORAGE {:?}", self.server_storage);
self . server_broker
. as_mut ( )
. ok_or ( ProtocolError ::BrokerError )
}
// pub(crate) fn get_server_broker_mut(
// &mut self,
// ) -> Result<&mut Box<dyn IServerBroker + Send + Sync>, ProtocolError> {
// //log_debug!("GET STORAGE {:?}", self.server_storage);
// self.server_broker
// .as_mut()
// .ok_or(ProtocolError::BrokerError)
// }
//Option<Arc<RwLock<dyn ILocalBroker>>>,
pub ( crate ) fn get_local_broker ( & self ) -> Result < Arc < RwLock < dyn ILocalBroker > > , ProtocolError > {
@ -202,7 +255,7 @@ impl Broker {
}
#[ cfg(not(target_arch = " wasm32 " )) ]
pub ( crate ) fn authorize (
pub ( crate ) async fn authorize (
& self ,
bind_addresses : & ( BindAddress , BindAddress ) ,
auth : Authorization ,
@ -230,7 +283,8 @@ impl Broker {
Authorization ::Client ( user_and_registration ) = > {
if user_and_registration . 1. is_some ( ) {
// user wants to register
let storage = self . get_server_broker ( ) ? ;
let lock = self . get_server_broker ( ) ? ;
let storage = lock . read ( ) . await ;
if storage . get_user ( user_and_registration . 0 ) . is_ok ( ) {
return Ok ( ( ) ) ;
}
@ -274,8 +328,7 @@ impl Broker {
storage . remove_invitation ( code ) ? ;
}
}
self . get_server_broker ( ) ?
. add_user ( user_and_registration . 0 , is_admin ) ? ;
storage . add_user ( user_and_registration . 0 , is_admin ) ? ;
Ok ( ( ) )
}
} ;
@ -298,7 +351,7 @@ impl Broker {
return Ok ( ( ) ) ;
}
}
let found = self . get_server_broker ( ) ? . get_user ( admin_user ) ;
let found = self . get_server_broker ( ) ? . read ( ) . await . get_user ( admin_user ) ;
if found . is_ok ( ) & & found . unwrap ( ) {
return Ok ( ( ) ) ;
}
@ -310,7 +363,7 @@ impl Broker {
}
fn reconnecting ( & mut self , peer_id : X25519PrivKey , user : Option < PubKey > ) {
let peerinfo = self . peers . get_mut ( & ( user , peer_id ) ) ;
let peerinfo = self . peers . get_mut ( & ( user , Some ( peer_id ) ) ) ;
match peerinfo {
Some ( info ) = > match & info . connected {
PeerConnection ::NONE = > { }
@ -321,11 +374,24 @@ impl Broker {
self . direct_connections . remove ( & ip ) ;
info . connected = PeerConnection ::NONE ;
}
PeerConnection ::Local ( _ ) = > {
panic! ( "local transport connections cannot disconnect. shouldn't reconnect" )
}
} ,
None = > { }
}
}
async fn remove_peer_id ( & mut self , peer_id : X25519PrivKey , user : Option < PubKey > ) {
self . remove_peer_id_ ( Some ( peer_id ) , user ) . await
}
#[ allow(dead_code) ]
async fn remove_local_transport ( & mut self , user : PubKey ) {
self . remove_peer_id_ ( None , Some ( user ) ) . await
}
async fn remove_peer_id_ ( & mut self , peer_id : Option < X25519PrivKey > , user : Option < PubKey > ) {
let removed = self . peers . remove ( & ( user , peer_id ) ) ;
match removed {
Some ( info ) = > match info . connected {
@ -336,19 +402,43 @@ impl Broker {
// server side
if let Some ( fsm ) = _cb . fsm {
if let Ok ( user ) = fsm . lock ( ) . await . user_id ( ) {
let _ = self . remove_user_peer ( & user , & peer_id ) ;
let _ = self
. remove_user_peer ( & user , & Some ( peer_id . to_owned ( ) . unwrap ( ) ) ) ;
}
}
let peer = PubKey ::X25519PubKey ( peer_id ) ;
let peer = PubKey ::X25519PubKey ( peer_id . unwrap ( ) ) ;
log_debug ! ( "unsubscribing peer {}" , peer ) ;
self . get_server_broker_mut ( )
self . get_server_broker ( )
. unwrap ( )
. remove_all_subscriptions_of_peer ( & peer ) ;
. read ( )
. await
. remove_all_subscriptions_of_client ( & ClientPeerId ::new_from (
& peer , & user ,
) )
. await ;
}
}
PeerConnection ::Core ( ip ) = > {
self . direct_connections . remove ( & ip ) ;
}
PeerConnection ::Local ( _lt ) = > {
#[ cfg(not(target_arch = " wasm32 " )) ]
if peer_id . is_none ( ) & & user . is_some ( ) {
// server side
let _ = self . remove_user_peer ( user . as_ref ( ) . unwrap ( ) , & None ) ;
log_debug ! ( "unsubscribing local peer {}" , _lt . client_peer_id ) ;
self . get_server_broker ( )
. unwrap ( )
. read ( )
. await
. remove_all_subscriptions_of_client ( & ClientPeerId ::new_from (
& _lt . client_peer_id ,
& user ,
) )
. await ;
}
}
} ,
None = > { }
}
@ -489,12 +579,14 @@ impl Broker {
anonymous = Vec ::from_iter ( broker . anonymous_connections . keys ( ) . cloned ( ) ) ;
}
for peer_id in peer_ids {
if peer_id . 1. is_some ( ) {
BROKER
. write ( )
. await
. close_peer_connection_x ( peer_id . 1 , peer_id . 0 )
. await ;
}
}
for anon in anonymous {
BROKER . write ( ) . await . close_anonymous ( anon . 1 , anon . 0 ) . await ;
}
@ -577,7 +669,11 @@ impl Broker {
}
#[ cfg(not(target_arch = " wasm32 " )) ]
fn add_user_peer ( & mut self , user : UserId , peer : X25519PrivKey ) -> Result < ( ) , ProtocolError > {
fn add_user_peer (
& mut self ,
user : UserId ,
peer : Option < X25519PrivKey > ,
) -> Result < ( ) , ProtocolError > {
let peers_set = self
. users_peers
. entry ( user )
@ -593,7 +689,7 @@ impl Broker {
fn remove_user_peer (
& mut self ,
user : & UserId ,
peer : & X25519PrivKey ,
peer : & Option < X25519PrivKey > ,
) -> Result < ( ) , ProtocolError > {
let peers_set = self
. users_peers
@ -603,6 +699,54 @@ impl Broker {
if ! peers_set . remove ( peer ) {
return Err ( ProtocolError ::PeerNotConnected ) ;
}
if peers_set . is_empty ( ) {
let _ = self . users_peers . remove ( user ) ;
}
Ok ( ( ) )
}
#[ cfg(not(target_arch = " wasm32 " )) ]
pub ( crate ) async fn attach_and_authorize_app (
& mut self ,
remote_bind_address : BindAddress ,
local_bind_address : BindAddress ,
remote_peer_id : X25519PrivKey ,
user : & Option < UserId > ,
_info : & ClientInfo ,
) -> Result < ( ) , ProtocolError > {
let already = self . peers . get ( & ( None , Some ( remote_peer_id ) ) ) ;
if already . is_some ( ) {
match already . unwrap ( ) . connected {
PeerConnection ::NONE = > { }
_ = > {
return Err ( ProtocolError ::PeerAlreadyConnected ) ;
}
} ;
}
//TODO: check permissions for user/remote_bind_address or headless if no user
//TODO: keep the info
let mut connection = self
. anonymous_connections
. remove ( & ( local_bind_address , remote_bind_address ) )
. ok_or ( ProtocolError ::BrokerError ) ? ;
connection . reset_shutdown ( remote_peer_id ) . await ;
if user . is_some ( ) {
self . add_user_peer ( user . unwrap ( ) , Some ( remote_peer_id ) ) ? ;
}
let connected = PeerConnection ::Client ( connection ) ;
let bpi = BrokerPeerInfo {
last_peer_advert : None ,
connected ,
} ;
self . peers . insert ( ( None , Some ( remote_peer_id ) ) , bpi ) ;
Ok ( ( ) )
}
@ -618,7 +762,7 @@ impl Broker {
) -> Result < ( ) , ProtocolError > {
log_debug ! ( "ATTACH PEER_ID {:?}" , remote_peer_id ) ;
let already = self . peers . get ( & ( None , remote_peer_id ) ) ;
let already = self . peers . get ( & ( None , Some ( remote_peer_id ) ) ) ;
if already . is_some ( ) {
match already . unwrap ( ) . connected {
PeerConnection ::NONE = > { }
@ -653,7 +797,8 @@ impl Broker {
self . authorize (
& ( local_bind_address , remote_bind_address ) ,
Authorization ::Client ( ( client . user . clone ( ) , client . registration . clone ( ) ) ) ,
) ? ;
)
. await ? ;
// TODO add client to storage
false
@ -668,7 +813,7 @@ impl Broker {
let connected = if ! is_core {
let user = client . unwrap ( ) . user ;
fsm . set_user_id ( user ) ;
self . add_user_peer ( user , remote_peer_id ) ? ;
self . add_user_peer ( user , Some ( remote_peer_id ) ) ? ;
PeerConnection ::Client ( connection )
} else {
@ -685,7 +830,7 @@ impl Broker {
last_peer_advert : None ,
connected ,
} ;
self . peers . insert ( ( None , remote_peer_id ) , bpi ) ;
self . peers . insert ( ( None , Some ( remote_peer_id ) ) , bpi ) ;
Ok ( ( ) )
}
@ -741,6 +886,27 @@ impl Broker {
connection . admin ::< A > ( ) . await
}
#[ doc(hidden) ]
pub fn connect_local ( & mut self , peer_pubk : PubKey , user : UserId ) -> Result < ( ) , ProtocolError > {
if self . closing {
return Err ( ProtocolError ::Closing ) ;
}
let ( client_cnx , server_cnx ) = ConnectionBase ::create_local_transport_pipe ( user , peer_pubk ) ;
let bpi = BrokerPeerInfo {
last_peer_advert : None ,
connected : PeerConnection ::Local ( LocalTransport {
client_peer_id : peer_pubk ,
client_cnx ,
server_cnx ,
} ) ,
} ;
self . peers . insert ( ( Some ( user ) , None ) , bpi ) ;
Ok ( ( ) )
}
pub async fn connect (
& mut self ,
cnx : Arc < Box < dyn IConnect > > ,
@ -760,7 +926,7 @@ impl Broker {
if config . is_keep_alive ( ) {
let already = self
. peers
. get ( & ( config . get_user ( ) , * remote_peer_id_dh . slice ( ) ) ) ;
. get ( & ( config . get_user ( ) , Some ( * remote_peer_id_dh . slice ( ) ) ) ) ;
if already . is_some ( ) {
match already . unwrap ( ) . connected {
PeerConnection ::NONE = > { }
@ -799,7 +965,7 @@ impl Broker {
self . direct_connections . insert ( config . addr , dc ) ;
PeerConnection ::Core ( config . addr )
}
StartConfig ::Client ( _c onfig ) = > PeerConnection ::Client ( connection ) ,
StartConfig ::Client ( _ ) | StartC onfig::App ( _ ) = > PeerConnection ::Client ( connection ) ,
_ = > unimplemented! ( ) ,
} ;
@ -809,7 +975,7 @@ impl Broker {
} ;
self . peers
. insert ( ( config . get_user ( ) , * remote_peer_id_dh . slice ( ) ) , bpi ) ;
. insert ( ( config . get_user ( ) , Some ( * remote_peer_id_dh . slice ( ) ) ) , bpi ) ;
async fn watch_close (
mut join : Receiver < Either < NetError , X25519PrivKey > > ,
@ -831,6 +997,7 @@ impl Broker {
let mut broker = BROKER . write ( ) . await ;
broker . reconnecting ( remote_peer_id , config . get_user ( ) ) ;
// TODO: deal with cycle error https://users.rust-lang.org/t/recursive-async-method-causes-cycle-error/84628/5
// there is async_recursion now. use that
// use a channel and send the reconnect job to it.
// create a spawned loop to read the channel and process the reconnection requests.
// let result = broker
@ -874,48 +1041,77 @@ impl Broker {
B : TryFrom < ProtocolMessage , Error = ProtocolError > + std ::fmt ::Debug + Sync + Send + ' static ,
> (
& self ,
user : & UserId ,
remote_peer_id : & DirectPeerId ,
user : & Option < UserId > ,
remote_peer_id : & Option < DirectPeerId > , // None means local
msg : A ,
) -> Result < SoS < B > , NgError > {
let bpi = self
. peers
. get ( & ( Some ( * user ) , remote_peer_id . to_dh_slice ( ) ) )
. get ( & ( * user , remote_peer_id . map ( | rpi | rpi . to_dh_slice ( ) ) ) )
. ok_or ( NgError ::ConnectionNotFound ) ? ;
if let PeerConnection ::Client ( cnx ) = & bpi . connected {
cnx . request ( msg ) . await
match & bpi . connected {
PeerConnection ::Client ( cnx ) = > cnx . request ( msg ) . await ,
PeerConnection ::Local ( lt ) = > lt . client_cnx . request ( msg ) . await ,
_ = > Err ( NgError ::BrokerError ) ,
}
}
#[ cfg(not(target_arch = " wasm32 " )) ]
fn get_fsm_for_client ( & self , client : & ClientPeerId ) -> Option < Arc < Mutex < NoiseFSM > > > {
match client {
ClientPeerId ::Local ( ( user , _ ) ) = > {
if let Some ( BrokerPeerInfo {
connected :
PeerConnection ::Local ( LocalTransport {
server_cnx : ConnectionBase { fsm : Some ( fsm ) , .. } ,
..
} ) ,
..
} ) = self . peers . get ( & ( Some ( * user ) , None ) )
{
Some ( Arc ::clone ( fsm ) )
} else {
Err ( NgError ::BrokerError )
None
}
}
ClientPeerId ::Remote ( peer ) = > {
if let Some ( BrokerPeerInfo {
connected : PeerConnection ::Client ( ConnectionBase { fsm : Some ( fsm ) , .. } ) ,
..
} ) = self . peers . get ( & ( None , Some ( peer . to_dh ( ) ) ) )
{
Some ( Arc ::clone ( fsm ) )
} else {
None
}
}
}
}
#[ cfg(not(target_arch = " wasm32 " )) ]
pub ( crate ) async fn dispatch_event (
& mut self ,
& self ,
overlay : & OverlayId ,
event : Event ,
user_id : & UserId ,
remote_peer : & PubKey ,
) -> Result < ( ) , ServerError > {
) -> Result < Vec < ClientPeerId > , ServerError > {
// TODO: deal with subscriptions on the outer overlay. for now we assume everything is on the inner overlay
let peers_for_local_dispatch : Vec < PubKey > = self
. get_server_broker ( ) ?
. dispatch_event ( overlay , event . clone ( ) , user_id , remote_peer ) ?
. into_iter ( )
. cloned ( )
. collect ( ) ;
let mut clients_to_remove = vec! [ ] ;
//log_debug!("dispatch_event {:?}", peers_for_local_dispatch);
let peers_for_local_dispatch = {
self . get_server_broker ( ) ?
. read ( )
. await
. dispatch_event ( overlay , event . clone ( ) , user_id , remote_peer )
. await ?
} ;
for peer in peers_for_local_dispatch {
//log_debug!("dispatch_event peer {:?}", peer);
if let Some ( BrokerPeerInfo {
connected : PeerConnection ::Client ( ConnectionBase { fsm : Some ( fsm ) , .. } ) ,
..
} ) = self . peers . get ( & ( None , peer . to_dh ( ) ) )
{
//log_debug!("ForwardedEvent peer {:?}", peer);
for client in peers_for_local_dispatch {
//log_debug!("dispatch_event peer {:?}", client);
if let Some ( fsm ) = self . get_fsm_for_client ( & client ) {
//log_debug!("ForwardedEvent peer {:?}", client);
let _ = fsm
. lock ( )
. await
@ -929,15 +1125,19 @@ impl Broker {
. await ;
} else {
// we remove the peer from all local_subscriptions
self . get_server_broker_mut ( ) ?
. remove_all_subscriptions_of_peer ( & peer ) ;
clients_to_remove . push ( client ) ;
}
}
Ok ( ( ) )
Ok ( clients_to_remove )
}
async fn close_peer_connection_x ( & mut self , peer_id : X25519PubKey , user : Option < PubKey > ) {
#[ doc(hidden) ]
pub async fn close_peer_connection_x (
& mut self ,
peer_id : Option < X25519PubKey > ,
user : Option < PubKey > ,
) {
if let Some ( peer ) = self . peers . get_mut ( & ( user , peer_id ) ) {
match & mut peer . connected {
PeerConnection ::Core ( _ ) = > {
@ -948,13 +1148,24 @@ impl Broker {
cb . close ( ) . await ;
}
PeerConnection ::NONE = > { }
PeerConnection ::Local ( lt ) = > {
assert! ( peer_id . is_none ( ) ) ;
assert! ( user . is_some ( ) ) ;
lt . close ( ) . await ;
if self . peers . remove ( & ( user , None ) ) . is_some ( ) {
log_debug ! (
"Local transport connection closed ! {}" ,
user . unwrap ( ) . to_string ( )
) ;
}
}
}
//self.peers.remove(peer_id); // this is done in the watch_close instead
}
}
pub async fn close_peer_connection ( & mut self , peer_id : & DirectPeerId , user : Option < PubKey > ) {
self . close_peer_connection_x ( peer_id . to_dh_slice ( ) , user )
self . close_peer_connection_x ( Some ( peer_id . to_dh_slice ( ) ) , user )
. await
}