diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index d0c38d0..ea6fd6c 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -13,6 +13,7 @@ use std::fs::{read, remove_file, write}; use std::path::PathBuf; use async_once_cell::OnceCell; +use async_std::prelude::FutureExt; use async_std::sync::{Arc, Condvar, Mutex, RwLock}; use futures::channel::mpsc; use futures::{SinkExt, StreamExt}; @@ -571,7 +572,6 @@ impl fmt::Debug for LocalBroker { #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] pub trait ILocalBroker: Send + Sync + EActor { async fn deliver(&mut self, event: Event, overlay: OverlayId, user: UserId); - async fn inbox(&mut self, user_id: UserId, msg: InboxMsg, from_queue: bool); async fn user_disconnected(&mut self, user_id: UserId); } @@ -584,11 +584,6 @@ impl ILocalBroker for LocalBroker { session.verifier.deliver(event, overlay).await; } } - async fn inbox(&mut self, user_id: UserId, msg: InboxMsg, from_queue: bool) { - if let Some(session) = self.get_mut_session_for_user(&user_id) { - session.verifier.inbox(msg, from_queue).await; - } - } async fn user_disconnected(&mut self, user_id: UserId) { if let Some(session) = self.get_mut_session_for_user(&user_id) { session.verifier.connection_lost(); @@ -626,20 +621,38 @@ async fn pump( running = cvar.wait(running).await; } - let mut broker = match LOCAL_BROKER.get() { - None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)), - Some(Ok(broker)) => broker.write().await, - }; match message { LocalBrokerMessage::Deliver { event, overlay, user, - } => broker.deliver(event, overlay, user).await, + } => { + let mut broker = match LOCAL_BROKER.get() { + None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)), + Some(Ok(broker)) => broker.write().await, + }; + broker.deliver(event, overlay, user).await + }, LocalBrokerMessage::Inbox {msg, user_id, from_queue} => { - broker.inbox(user_id, msg, from_queue).await + async_std::task::spawn_local(async move { + let mut broker = match LOCAL_BROKER.get() { + None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)), + Some(Ok(broker)) => broker.write().await, + }; + if let Some(session) = broker.get_mut_session_for_user(&user_id) { + session.verifier.inbox(&msg, from_queue).await; + } + Ok(()) + }).await?; + + }, + LocalBrokerMessage::Disconnected { user_id } => { + let mut broker = match LOCAL_BROKER.get() { + None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)), + Some(Ok(broker)) => broker.write().await, + }; + broker.user_disconnected(user_id).await }, - LocalBrokerMessage::Disconnected { user_id } => broker.user_disconnected(user_id).await, } } diff --git a/ng-verifier/src/inbox_processor.rs b/ng-verifier/src/inbox_processor.rs index 8453ec0..287bc58 100644 --- a/ng-verifier/src/inbox_processor.rs +++ b/ng-verifier/src/inbox_processor.rs @@ -184,7 +184,7 @@ impl Verifier { pub(crate) async fn process_inbox( &mut self, - msg: InboxMsg, + msg: &InboxMsg, content: InboxMsgContent, ) -> Result<(), VerifierError> { diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index e80f9cd..494337e 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -1642,7 +1642,7 @@ impl Verifier { } } - pub async fn inbox(&mut self, msg: InboxMsg, from_queue: bool) { + pub async fn inbox(&mut self, msg: &InboxMsg, from_queue: bool) { //log_info!("RECEIVED INBOX MSG {:?}", msg);