diff --git a/Cargo.lock b/Cargo.lock index 596f0ca..654face 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3228,6 +3228,7 @@ dependencies = [ "async-std", "async-trait", "base64-url", + "futures", "ng-client-ws", "ng-net", "ng-repo", diff --git a/nextgraph/Cargo.toml b/nextgraph/Cargo.toml index 8a025ea..872f096 100644 --- a/nextgraph/Cargo.toml +++ b/nextgraph/Cargo.toml @@ -32,6 +32,7 @@ async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] } zeroize = { version = "1.6.0", features = ["zeroize_derive"] } serde_json = "1.0" async-trait = "0.1.64" +futures = "0.3.24" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" } diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index fe3df93..90df58f 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -10,6 +10,8 @@ use async_once_cell::OnceCell; use async_std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; use core::fmt; +use futures::channel::mpsc; +use futures::SinkExt; use ng_net::actor::EActor; use ng_net::connection::{ClientConfig, IConnect, NoiseFSM, StartConfig}; use ng_net::types::{ClientInfo, ClientType, ProtocolMessage}; @@ -386,6 +388,9 @@ struct LocalBroker { pub opened_sessions_list: Vec>, tauri_streams: HashMap, + + disconnections_sender: Sender, + disconnections_receiver: Option>, } impl fmt::Debug for LocalBroker { @@ -407,6 +412,12 @@ impl ILocalBroker for LocalBroker { session.verifier.deliver(event, overlay).await; } } + async fn user_disconnected(&mut self, user_id: UserId) { + if let Some(session) = self.get_mut_session_for_user(&user_id) { + session.verifier.connection_lost(); + let _ = self.disconnections_sender.send(user_id.to_string()).await; + } + } } // this is used if an Actor does a BROKER.local_broker.respond @@ -512,7 +523,7 @@ impl LocalBroker { let session = self.opened_sessions_list[*session as usize] .as_mut() .ok_or(NgError::SessionNotFound)?; - session.verifier.connected_server_id = None; + session.verifier.connection_lost(); } None => {} } @@ -895,7 +906,7 @@ async fn init_(config: LocalBrokerConfig) -> Result>, Ng } } }; - + let (disconnections_sender, disconnections_receiver) = mpsc::unbounded::(); let local_broker = LocalBroker { config, wallets, @@ -904,6 +915,8 @@ async fn init_(config: LocalBrokerConfig) -> Result>, Ng opened_sessions: HashMap::new(), opened_sessions_list: vec![], tauri_streams: HashMap::new(), + disconnections_sender, + disconnections_receiver: Some(disconnections_receiver), }; //log_debug!("{:?}", &local_broker); @@ -948,6 +961,7 @@ pub async fn tauri_stream_cancel(stream_id: String) -> Result<(), NgError> { broker.tauri_stream_cancel(stream_id); Ok(()) } + /// Initialize the configuration of your local broker /// /// , by passing in a function (or closure) that returns a `LocalBrokerConfig`. @@ -1383,20 +1397,11 @@ pub async fn user_connect_with_device_info( )); } if tried.is_some() && tried.as_ref().unwrap().3.is_none() { - session.verifier.connected_server_id = Some(server_key); - // successful. we can stop here - - // load verifier from remote connection (if not RocksDb type, or after import on tauri) - if let Err(e) = session.verifier.bootstrap().await { - session.verifier.connected_server_id = None; + if let Err(e) = + session.verifier.connection_opened(server_key).await + { Broker::close_all_connections().await; tried.as_mut().unwrap().3 = Some(e.to_string()); - } else { - // we can send outbox now that the verifier is loaded - let res = session.verifier.send_outbox().await; - log_info!("SENDING EVENTS FROM OUTBOX RETURNED: {:?}", res); - - //log_info!("VERIFIER DUMP {:?}", session.verifier); } break; @@ -1591,6 +1596,19 @@ pub async fn personal_site_store(session_id: u64, store: SiteStoreType) -> Resul } } +#[doc(hidden)] +pub async fn take_disconnections_receiver() -> Result, NgError> { + let mut broker = match LOCAL_BROKER.get() { + None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized), + Some(Ok(broker)) => broker.write().await, + }; + + broker + .disconnections_receiver + .take() + .ok_or(NgError::BrokerError) +} + #[cfg(test)] mod test { use super::*; diff --git a/ng-app/src-tauri/src/lib.rs b/ng-app/src-tauri/src/lib.rs index 858fd67..1a1c035 100644 --- a/ng-app/src-tauri/src/lib.rs +++ b/ng-app/src-tauri/src/lib.rs @@ -335,14 +335,12 @@ async fn cancel_stream(stream_id: &str) -> Result<(), String> { } #[tauri::command(rename_all = "snake_case")] -async fn disconnections_subscribe(app: tauri::AppHandle) -> Result<(), ()> { +async fn disconnections_subscribe(app: tauri::AppHandle) -> Result<(), String> { let main_window = app.get_window("main").unwrap(); - let reader = BROKER - .write() + let reader = nextgraph::local_broker::take_disconnections_receiver() .await - .take_disconnections_receiver() - .ok_or(())?; + .map_err(|e: NgError| e.to_string())?; async fn inner_task( mut reader: Receiver, diff --git a/ng-app/src/lib/FullLayout.svelte b/ng-app/src/lib/FullLayout.svelte index 195b1be..028c4da 100644 --- a/ng-app/src/lib/FullLayout.svelte +++ b/ng-app/src/lib/FullLayout.svelte @@ -62,6 +62,7 @@ "flex items-center p-2 text-base font-normal text-gray-900 rounded-lg dark:text-white hover:bg-gray-200 dark:hover:bg-gray-700"; + {#if mobile}
@@ -185,7 +186,6 @@
{/if} -