fix promise not send for inbox processing and sparql queries

master
Niko PLP 18 hours ago
parent ca72009ae7
commit 6808d64a3c
  1. 39
      nextgraph/src/local_broker.rs
  2. 2
      ng-verifier/src/inbox_processor.rs
  3. 2
      ng-verifier/src/verifier.rs

@ -13,6 +13,7 @@ use std::fs::{read, remove_file, write};
use std::path::PathBuf;
use async_once_cell::OnceCell;
use async_std::prelude::FutureExt;
use async_std::sync::{Arc, Condvar, Mutex, RwLock};
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
@ -571,7 +572,6 @@ impl fmt::Debug for LocalBroker {
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait ILocalBroker: Send + Sync + EActor {
async fn deliver(&mut self, event: Event, overlay: OverlayId, user: UserId);
async fn inbox(&mut self, user_id: UserId, msg: InboxMsg, from_queue: bool);
async fn user_disconnected(&mut self, user_id: UserId);
}
@ -584,11 +584,6 @@ impl ILocalBroker for LocalBroker {
session.verifier.deliver(event, overlay).await;
}
}
async fn inbox(&mut self, user_id: UserId, msg: InboxMsg, from_queue: bool) {
if let Some(session) = self.get_mut_session_for_user(&user_id) {
session.verifier.inbox(msg, from_queue).await;
}
}
async fn user_disconnected(&mut self, user_id: UserId) {
if let Some(session) = self.get_mut_session_for_user(&user_id) {
session.verifier.connection_lost();
@ -626,20 +621,38 @@ async fn pump(
running = cvar.wait(running).await;
}
let mut broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)),
Some(Ok(broker)) => broker.write().await,
};
match message {
LocalBrokerMessage::Deliver {
event,
overlay,
user,
} => broker.deliver(event, overlay, user).await,
} => {
let mut broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)),
Some(Ok(broker)) => broker.write().await,
};
broker.deliver(event, overlay, user).await
},
LocalBrokerMessage::Inbox {msg, user_id, from_queue} => {
broker.inbox(user_id, msg, from_queue).await
async_std::task::spawn_local(async move {
let mut broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)),
Some(Ok(broker)) => broker.write().await,
};
if let Some(session) = broker.get_mut_session_for_user(&user_id) {
session.verifier.inbox(&msg, from_queue).await;
}
Ok(())
}).await?;
},
LocalBrokerMessage::Disconnected { user_id } => {
let mut broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(Box::new(NgError::LocalBrokerNotInitialized)),
Some(Ok(broker)) => broker.write().await,
};
broker.user_disconnected(user_id).await
},
LocalBrokerMessage::Disconnected { user_id } => broker.user_disconnected(user_id).await,
}
}

@ -184,7 +184,7 @@ impl Verifier {
pub(crate) async fn process_inbox(
&mut self,
msg: InboxMsg,
msg: &InboxMsg,
content: InboxMsgContent,
) -> Result<(), VerifierError> {

@ -1642,7 +1642,7 @@ impl Verifier {
}
}
pub async fn inbox(&mut self, msg: InboxMsg, from_queue: bool) {
pub async fn inbox(&mut self, msg: &InboxMsg, from_queue: bool) {
//log_info!("RECEIVED INBOX MSG {:?}", msg);

Loading…
Cancel
Save