/* * Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers * All rights reserved. * Licensed under the Apache License, Version 2.0 * * or the MIT license , * at your option. All files in the project carrying such * notice may not be copied, modified, or distributed except * according to those terms. */ //! Implementation of the Server Broker use std::{ collections::{BTreeMap, HashMap, HashSet}, path::PathBuf, sync::Arc, time::{Duration, SystemTime}, }; use async_std::sync::{Mutex, RwLock}; use either::Either; use futures::{channel::mpsc, SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use ng_repo::{ block_storage::BlockStorage, errors::{NgError, ProtocolError, ServerError}, log::*, types::*, }; use ng_net::{ app_protocol::*, broker::{ClientPeerId, BROKER}, connection::NoiseFSM, server_broker::IServerBroker, types::*, utils::{spawn_and_log_error, Receiver, ResultSend, Sender}, }; use ng_verifier::{ site::SiteV0, types::{BrokerPeerId, VerifierConfig, VerifierConfigType}, verifier::Verifier, }; use crate::rocksdb_server_storage::RocksDbServerStorage; pub struct TopicInfo { pub repo: RepoHash, pub publisher_advert: Option, pub current_heads: HashSet, pub root_commit: Option, /// indicates which users have opened the topic (boolean says if as publisher or not) pub users: HashMap, } pub struct RepoInfo { /// set of users that requested the repo to be exposed on the outer overlay /// only possible if the user is a publisher pub expose_outer: HashSet, /// set of topics of this repo pub topics: HashSet, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EventInfo { pub event: Event, pub blocks: Vec, } pub struct CommitInfo { pub event: Either, pub home_pinned: bool, pub acks: HashSet, pub deps: HashSet, pub futures: HashSet, pub files: HashSet, } #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum OverlayType { OuterOnly, Outer(OverlayId), // the ID of the inner overlay corresponding to this outer. Inner(OverlayId), // the ID of the outer overlay corresponding to the inner InnerOnly, } impl OverlayType { pub fn is_inner_get_outer(&self) -> Option<&OverlayId> { match self { Self::Inner(outer) => Some(outer), _ => None, } } pub fn is_outer_to_inner(&self) -> bool { match self { Self::Outer(_) => true, _ => false, } } pub fn is_outer_only(&self) -> bool { match self { Self::OuterOnly => true, _ => false, } } } impl From for OverlayType { fn from(oa: OverlayAccess) -> OverlayType { match oa { OverlayAccess::ReadOnly(_) => { panic!("cannot create an OverlayType from a ReadOnly OverlayAccess") } OverlayAccess::ReadWrite((_inner, outer)) => OverlayType::Inner(outer), OverlayAccess::WriteOnly(_inner) => OverlayType::InnerOnly, } } } #[allow(dead_code)] pub(crate) struct OverlayInfo { pub overlay_type: OverlayType, pub overlay_topic: Option, pub topics: HashMap, pub repos: HashMap, } struct DetachableVerifier { detach: bool, attached: Option<(DirectPeerId, u64)>, verifier: Verifier, } pub struct ServerBrokerState { #[allow(dead_code)] overlays: HashMap, #[allow(dead_code)] inner_overlays: HashMap>, local_subscriptions: HashMap<(OverlayId, TopicId), HashMap>>, verifiers: HashMap>>, remote_apps: HashMap<(DirectPeerId, u64), UserId>, wallet_rendezvous: HashMap>, wallet_exports: HashMap, wallet_exports_timestamp: BTreeMap, } pub struct ServerBroker { storage: RocksDbServerStorage, state: RwLock, path_users: PathBuf, } impl ServerBroker { pub(crate) fn new(storage: RocksDbServerStorage, path_users: PathBuf) -> Self { ServerBroker { storage: storage, state: RwLock::new(ServerBrokerState { overlays: HashMap::new(), inner_overlays: HashMap::new(), local_subscriptions: HashMap::new(), verifiers: HashMap::new(), remote_apps: HashMap::new(), wallet_rendezvous: HashMap::new(), wallet_exports: HashMap::new(), wallet_exports_timestamp: BTreeMap::new(), }), path_users, } } pub fn load(&mut self) -> Result<(), NgError> { Ok(()) } async fn add_subscription( &self, overlay: OverlayId, topic: TopicId, peer: ClientPeerId, ) -> Result<(), ServerError> { let mut lock = self.state.write().await; let peers_map = lock .local_subscriptions .entry((overlay, topic)) .or_insert(HashMap::with_capacity(1)); log_debug!( "SUBSCRIBING PEER {:?} TOPIC {} OVERLAY {}", peer, topic, overlay ); if peers_map.insert(*peer.key(), peer.value()).is_some() { //return Err(ServerError::PeerAlreadySubscribed); } Ok(()) } #[allow(dead_code)] async fn remove_subscription( &self, overlay: &OverlayId, topic: &TopicId, peer: &PubKey, ) -> Result<(), ServerError> { let mut lock = self.state.write().await; let peers_set = lock .local_subscriptions .get_mut(&(*overlay, *topic)) .ok_or(ServerError::SubscriptionNotFound)?; if peers_set.remove(peer).is_none() { return Err(ServerError::SubscriptionNotFound); } Ok(()) } async fn new_verifier_from_credentials( &self, user_id: &UserId, credentials: Credentials, local_peer_id: DirectPeerId, partial_credentials: bool, ) -> Result { let block_storage = self.get_block_storage(); let mut path = self.get_path_users(); let user_hash: Digest = user_id.into(); path.push(user_hash.to_string()); std::fs::create_dir_all(path.clone()).unwrap(); let peer_id_dh = credentials.peer_priv_key.to_pub().to_dh_from_ed(); let mut verifier = Verifier::new( VerifierConfig { config_type: VerifierConfigType::RocksDb(path), user_master_key: *credentials.user_master_key.slice(), peer_priv_key: credentials.peer_priv_key, user_priv_key: credentials.user_key, private_store_read_cap: if partial_credentials { None } else { Some(credentials.read_cap) }, private_store_id: if partial_credentials { None } else { Some(credentials.private_store) }, protected_store_id: if partial_credentials { None } else { Some(credentials.protected_store) }, public_store_id: if partial_credentials { None } else { Some(credentials.public_store) }, locator: Locator::empty(), }, block_storage, )?; if !partial_credentials { verifier.connected_broker = BrokerPeerId::Local(local_peer_id); // start the local transport connection let mut lock = BROKER.write().await; lock.connect_local(peer_id_dh, *user_id)?; } Ok(verifier) } } use async_std::future::timeout; async fn wait_for_wallet( mut internal_receiver: Receiver, mut sender: Sender>, rendezvous: SymKey, ) -> ResultSend<()> { let wallet_future = internal_receiver.next(); let _ = sender .send( match timeout(Duration::from_millis(5 * 60_000), wallet_future).await { Err(_) => Err(ServerError::ExportWalletTimeOut), Ok(Some(w)) => Ok(w), Ok(None) => Err(ServerError::BrokerError), }, ) .await; BROKER .read() .await .get_server_broker()? .read() .await .remove_rendezvous(&rendezvous) .await; Ok(()) } //TODO: the purpose of this trait is to have a level of indirection so we can keep some data in memory (cache) and avoid hitting the storage backend (rocksdb) at every call. //for now this cache is not implemented, but the structs are ready (see above), and it would just require to change slightly the implementation of the trait functions here below. #[async_trait::async_trait] impl IServerBroker for ServerBroker { async fn remove_rendezvous(&self, rendezvous: &SymKey) { let mut lock = self.state.write().await; let _ = lock.wallet_rendezvous.remove(&rendezvous); } async fn wait_for_wallet_at_rendezvous( &self, rendezvous: SymKey, ) -> Receiver> { let (internal_sender, internal_receiver) = mpsc::unbounded(); let (mut sender, receiver) = mpsc::unbounded(); { let mut state = self.state.write().await; if state.wallet_rendezvous.contains_key(&rendezvous) { let _ = sender.send(Err(ServerError::BrokerError)).await; sender.close_channel(); return receiver; } else { let _ = state .wallet_rendezvous .insert(rendezvous.clone(), internal_sender); } } spawn_and_log_error(wait_for_wallet(internal_receiver, sender, rendezvous)); receiver } async fn get_wallet_export(&self, rendezvous: SymKey) -> Result { let mut state = self.state.write().await; match state.wallet_exports.remove(&rendezvous) { Some(wallet) => Ok(wallet), None => Err(ServerError::NotFound), } } async fn put_wallet_export(&self, rendezvous: SymKey, export: ExportedWallet) { let mut state = self.state.write().await; let _ = state.wallet_exports.insert(rendezvous.clone(), export); let _ = state .wallet_exports_timestamp .insert(SystemTime::now(), rendezvous); } // TODO: periodically (every 5 min) remove entries in wallet_exports_timestamp and wallet_exports async fn put_wallet_at_rendezvous( &self, rendezvous: SymKey, export: ExportedWallet, ) -> Result<(), ServerError> { let mut state = self.state.write().await; match state.wallet_rendezvous.remove(&rendezvous) { None => Err(ServerError::NotFound), Some(mut sender) => { let _ = sender.send(export).await; Ok(()) } } } fn get_block_storage( &self, ) -> std::sync::Arc> { self.storage.get_block_storage() } fn get_path_users(&self) -> PathBuf { self.path_users.clone() } fn has_block(&self, overlay_id: &OverlayId, block_id: &BlockId) -> Result<(), ServerError> { self.storage.has_block(overlay_id, block_id) } fn get_block(&self, overlay_id: &OverlayId, block_id: &BlockId) -> Result { self.storage.get_block(overlay_id, block_id) } fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> { self.storage.next_seq_for_peer(peer, seq) } fn put_block(&self, overlay_id: &OverlayId, block: Block) -> Result<(), ServerError> { self.storage.add_block(overlay_id, block)?; Ok(()) } async fn create_user(&self, broker_id: &DirectPeerId) -> Result { let user_privkey = PrivKey::random_ed(); let user_id = user_privkey.to_pub(); let mut creds = Credentials::new_partial(&user_privkey); let mut verifier = self .new_verifier_from_credentials(&user_id, creds.clone(), *broker_id, true) .await?; let _site = SiteV0::create_personal(user_privkey.clone(), &mut verifier) .await .map_err(|e| { log_err!("create_personal failed with {e}"); ProtocolError::BrokerError })?; // update credentials from config of verifier. verifier.complement_credentials(&mut creds); //verifier.close().await; // save credentials and user self.add_user_credentials(&user_id, &creds)?; verifier.connected_broker = BrokerPeerId::Local(*broker_id); // start the local transport connection { let mut lock = BROKER.write().await; let peer_id_dh = creds.peer_priv_key.to_pub().to_dh_from_ed(); lock.connect_local(peer_id_dh, user_id)?; } let _res = verifier.send_outbox().await; if _res.is_err() { log_err!("{:?}", _res); } Ok(user_id) } fn get_user(&self, user_id: PubKey) -> Result { self.storage.get_user(user_id) } fn add_user_credentials( &self, user_id: &PubKey, credentials: &Credentials, ) -> Result<(), ProtocolError> { self.storage.add_user_credentials(user_id, credentials) } fn get_user_credentials(&self, user_id: &PubKey) -> Result { self.storage.get_user_credentials(user_id) } fn add_user(&self, user_id: PubKey, is_admin: bool) -> Result<(), ProtocolError> { self.storage.add_user(user_id, is_admin) } fn del_user(&self, user_id: PubKey) -> Result<(), ProtocolError> { self.storage.del_user(user_id) } fn list_users(&self, admins: bool) -> Result, ProtocolError> { self.storage.list_users(admins) } fn list_invitations( &self, admin: bool, unique: bool, multi: bool, ) -> Result)>, ProtocolError> { self.storage.list_invitations(admin, unique, multi) } fn add_invitation( &self, invite_code: &InvitationCode, expiry: u32, memo: &Option, ) -> Result<(), ProtocolError> { self.storage.add_invitation(invite_code, expiry, memo) } fn get_invitation_type(&self, invite_code: [u8; 32]) -> Result { self.storage.get_invitation_type(invite_code) } fn remove_invitation(&self, invite_code: [u8; 32]) -> Result<(), ProtocolError> { self.storage.remove_invitation(invite_code) } async fn app_process_request( &self, req: AppRequest, request_id: i64, fsm: &Mutex, ) -> Result<(), ServerError> { // get the session let remote = { fsm.lock() .await .remote_peer() .ok_or(ServerError::SessionNotFound)? }; let session_id = (remote, req.session_id()); let session_lock = { let lock = self.state.read().await; let user_id = lock .remote_apps .get(&session_id) .ok_or(ServerError::SessionNotFound)? .to_owned(); Arc::clone( lock.verifiers .get(&user_id) .ok_or(ServerError::SessionNotFound)?, ) }; let mut session = session_lock.write().await; if session.attached.is_none() || session.attached.unwrap() != session_id { return Err(ServerError::SessionDetached); } if req.command().is_stream() { let res = session.verifier.app_request_stream(req).await; match res { Err(e) => { let error: ServerError = e.into(); let error_res: AppMessage = error.into(); fsm.lock() .await .send_in_reply_to(error_res.into(), request_id) .await?; } Ok((mut receiver, _cancel)) => { //TODO: implement cancel let mut some_sent = false; while let Some(response) = receiver.next().await { some_sent = true; let mut msg: AppMessage = response.into(); msg.set_result(ServerError::PartialContent.into()); fsm.lock() .await .send_in_reply_to(msg.into(), request_id) .await?; } let end: Result = if some_sent { Err(ServerError::EndOfStream) } else { Err(ServerError::EmptyStream) }; fsm.lock() .await .send_in_reply_to(end.into(), request_id) .await?; } } } else { let res = session.verifier.app_request(req).await; //log_debug!("GOT RES {:?}", res); let app_message: AppMessage = match res { Err(e) => { log_debug!("AppRequest error NgError {e}"); let server_err: ServerError = e.into(); server_err.into() } Ok(app_res) => app_res.into(), }; fsm.lock() .await .send_in_reply_to(app_message.into(), request_id) .await?; } Ok(()) } async fn app_session_start( &self, req: AppSessionStart, remote: DirectPeerId, local_peer_id: DirectPeerId, ) -> Result { let user_id = req.user_id(); let id = (remote, req.session_id()); let verifier_lock_res = { let lock = self.state.read().await; lock.verifiers.get(user_id).map(|l| Arc::clone(l)) }; let verifier_lock = match verifier_lock_res { Some(session_lock) => { let mut session = session_lock.write().await; if let Some((peer_id, session_id)) = session.attached { if peer_id != remote || session_id == req.session_id() { // remove the previous session let mut write_lock = self.state.write().await; let _ = write_lock.remote_apps.remove(&(peer_id, session_id)); } } session.attached = Some(id); Arc::clone(&session_lock) } None => { // we create and load a new verifier let credentials = if req.credentials().is_none() { // headless do not have credentials. we fetch them from server_storage self.storage.get_user_credentials(user_id)? } else { req.credentials().clone().unwrap() }; if *user_id != credentials.user_key.to_pub() { log_debug!("InvalidRequest"); return Err(ServerError::InvalidRequest); } let verifier = self .new_verifier_from_credentials(user_id, credentials, local_peer_id, false) .await; if verifier.is_err() { log_err!( "new_verifier failed with: {:?}", verifier.as_ref().unwrap_err() ); } let mut verifier = verifier?; // TODO : key.zeroize(); //load verifier from local_storage let _ = verifier.load(); //TODO: save opened_branches in user_storage, so that when we open again the verifier, the syncing can work verifier.sync().await; let session = DetachableVerifier { detach: true, attached: Some(id), verifier, }; let mut write_lock = self.state.write().await; Arc::clone( write_lock .verifiers .entry(*user_id) .or_insert(Arc::new(RwLock::new(session))), ) } }; let verifier = &verifier_lock.read().await.verifier; let res = AppSessionStartResponse::V0(AppSessionStartResponseV0 { private_store: *verifier.private_store_id(), protected_store: *verifier.protected_store_id(), public_store: *verifier.public_store_id(), }); let mut write_lock = self.state.write().await; if let Some(previous_user) = write_lock.remote_apps.insert(id, *user_id) { // weird. another session was opened for this id. // we have to stop it otherwise it would be dangling. if previous_user != *user_id { if let Some(previous_session_lock) = write_lock .verifiers .get(&previous_user) .map(|v| Arc::clone(v)) { let mut previous_session = previous_session_lock.write().await; if previous_session.detach { previous_session.attached = None; } else { // we stop it and drop it let verifier = write_lock.verifiers.remove(&previous_user); verifier.unwrap().read().await.verifier.close().await; } } } } Ok(res) } async fn app_session_stop( &self, req: AppSessionStop, remote_peer_id: &DirectPeerId, ) -> Result { let id = (*remote_peer_id, req.session_id()); let mut write_lock = self.state.write().await; let must_be_destroyed = { let session_user = write_lock .remote_apps .remove(&id) .ok_or(ServerError::SessionNotFound)?; let session = Arc::clone( write_lock .verifiers .get(&session_user) .ok_or(ServerError::SessionNotFound)?, ); let mut verifier_lock = session.write().await; if !req.is_force_close() && verifier_lock.detach { verifier_lock.attached = None; None } else { Some(session_user) } }; if let Some(user) = must_be_destroyed { let verifier = write_lock.verifiers.remove(&user); verifier.unwrap().read().await.verifier.close().await; } Ok(EmptyAppResponse(())) } fn get_repo_pin_status( &self, overlay: &OverlayId, repo: &RepoHash, user: &UserId, ) -> Result { self.storage.get_repo_pin_status(overlay, repo, user) } async fn pin_repo_write( &self, overlay: &OverlayAccess, repo: &RepoHash, user_id: &UserId, ro_topics: &Vec, rw_topics: &Vec, overlay_root_topic: &Option, expose_outer: bool, peer: &ClientPeerId, ) -> Result { let res = self.storage.pin_repo_write( overlay, repo, user_id, ro_topics, rw_topics, overlay_root_topic, expose_outer, )?; for topic in res.iter() { self.add_subscription( *overlay.overlay_id_for_client_protocol_purpose(), *topic.topic_id(), peer.clone(), ) .await?; } Ok(res) } async fn pin_repo_read( &self, overlay: &OverlayId, repo: &RepoHash, user_id: &UserId, ro_topics: &Vec, peer: &ClientPeerId, ) -> Result { let res = self .storage .pin_repo_read(overlay, repo, user_id, ro_topics)?; for topic in res.iter() { // TODO: those outer subscriptions are not handled yet. they will not emit events. self.add_subscription(*overlay, *topic.topic_id(), peer.clone()) .await?; } Ok(res) } async fn topic_sub( &self, overlay: &OverlayId, repo: &RepoHash, topic: &TopicId, user: &UserId, publisher: Option<&PublisherAdvert>, peer: &ClientPeerId, ) -> Result { let res = self .storage .topic_sub(overlay, repo, topic, user, publisher)?; self.add_subscription(*overlay, *topic, peer.clone()) .await?; Ok(res) } fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result, ServerError> { self.storage.get_commit(overlay, id) } async fn remove_all_subscriptions_of_client(&self, client: &ClientPeerId) { let remote_peer = client.key(); let mut lock = self.state.write().await; for ((overlay, topic), peers) in lock.local_subscriptions.iter_mut() { if peers.remove(remote_peer).is_some() { log_debug!( "subscription of peer {} to topic {} in overlay {} removed", remote_peer, topic, overlay ); } } } async fn dispatch_event( &self, overlay: &OverlayId, event: Event, user_id: &UserId, remote_peer: &PubKey, ) -> Result, ServerError> { let topic = self.storage.save_event(overlay, event, user_id)?; // log_debug!( // "DISPATCH EVENT {} {} {:?}", // overlay, // topic, // self.local_subscriptions // ); let lock = self.state.read().await; let mut map = lock .local_subscriptions .get(&(*overlay, topic)) .map(|map| map.iter().collect()) .unwrap_or(HashMap::new()); map.remove(remote_peer); Ok(map .iter() .map(|(k, v)| ClientPeerId::new_from(k, v)) .collect()) } fn topic_sync_req( &self, overlay: &OverlayId, topic: &TopicId, known_heads: &Vec, target_heads: &Vec, known_commits: &Option, ) -> Result, ServerError> { self.storage .topic_sync_req(overlay, topic, known_heads, target_heads, known_commits) } }