From 273a13b4e548c2adbd1d1ffd7eab9da99f6a740b Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Fri, 17 May 2024 05:42:02 +0300 Subject: [PATCH] session_stop and session_headless_stop with force_close --- nextgraph/src/local_broker.rs | 48 ++++++++++++++++++++++++++++---- ng-broker/src/server_broker.rs | 33 ++++++++++++++++++++-- ng-net/src/actors/app/request.rs | 4 +-- ng-net/src/actors/app/session.rs | 33 ++++++++++++++++++---- ng-net/src/app_protocol.rs | 14 +++++++++- ng-net/src/server_broker.rs | 7 ++++- ng-repo/src/errors.rs | 1 + ng-sdk-js/app-node/index.js | 3 ++ ng-sdk-js/src/lib.rs | 13 +++++++++ ng-verifier/src/verifier.rs | 1 + 10 files changed, 139 insertions(+), 18 deletions(-) diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index c0039cd..fa662e9 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -625,10 +625,11 @@ impl LocalBroker { Ok(()) } - pub(crate) async fn send_request_headless(&self, req: ProtocolMessage) -> Result { + pub(crate) async fn send_request_headless + std::fmt::Debug + Sync + Send + 'static, + B: TryFrom + std::fmt::Debug + Sync + Send + 'static,>(&self, req: A) -> Result { self.err_if_not_headless()?; - match BROKER.read().await.request::(&None, &Some(self.config.headless_config().server_peer_id), req).await { + match BROKER.read().await.request::(&None, &Some(self.config.headless_config().server_peer_id), req).await { Err(e) => Err(e), Ok(SoS::Stream(_)) => Err(NgError::InvalidResponse), Ok(SoS::Single(res)) => Ok(res), @@ -636,10 +637,11 @@ impl LocalBroker { } #[allow(dead_code)] - pub(crate) async fn send_request_stream_headless(&self, req: AppRequest) -> Result<(Receiver, CancelFn), NgError> { + pub(crate) async fn send_request_stream_headless + std::fmt::Debug + Sync + Send + 'static, + B: TryFrom + std::fmt::Debug + Sync + Send + 'static,>(&self, req: A) -> Result<(Receiver, CancelFn), NgError> { self.err_if_not_headless()?; - match BROKER.read().await.request::(&None, &Some(self.config.headless_config().server_peer_id), req).await { + match BROKER.read().await.request::(&None, &Some(self.config.headless_config().server_peer_id), req).await { Err(e) => Err(e), Ok(SoS::Single(_)) => Err(NgError::InvalidResponse), Ok(SoS::Stream(stream)) => { @@ -1641,7 +1643,7 @@ pub async fn session_start(config: SessionConfig) -> Result Result<(), NgError> { let request = AppSessionStop::V0(AppSessionStopV0{ session_id, + force_close: false, }); - let _res = broker.send_request_headless(request.into()).await; + let _res = broker.send_request_headless(request).await?; } _ => { // TODO implement for Remote @@ -1940,6 +1943,39 @@ pub async fn session_stop(user_id: &UserId) -> Result<(), NgError> { Ok(()) } +/// Stops the session, that can be resumed later on. All the local data is flushed from RAM. +#[doc(hidden)] +pub async fn session_headless_stop(session_id: u64, force_close: bool) -> Result<(), NgError> { + let mut broker = match LOCAL_BROKER.get() { + None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized), + Some(Ok(broker)) => broker.write().await, + }; + + match broker.config { + LocalBrokerConfig::Headless(_) => { + + let session = broker + .headless_sessions + .remove(&session_id) + .ok_or(NgError::SessionNotFound)?; + + let _ = broker.opened_sessions.remove(&session.user_id).ok_or(NgError::SessionNotFound)?; + + let request = AppSessionStop::V0(AppSessionStopV0{ + session_id, + force_close, + }); + + let _res = broker.send_request_headless(request).await?; + } + _ => { + return Err(NgError::LocalBrokerIsNotHeadless); + } + } + + Ok(()) +} + /// Disconnects the user from the Server Broker(s), but keep all the local data opened and ready. pub async fn user_disconnect(user_id: &UserId) -> Result<(), NgError> { let mut broker = match LOCAL_BROKER.get() { diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index 926c197..9bb61c0 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -574,8 +574,37 @@ impl IServerBroker for ServerBroker { Ok(res) } - fn app_session_stop(&self, _req: AppSessionStop) -> Result { - //TODO + async fn app_session_stop( + &self, + req: AppSessionStop, + remote_peer_id: &DirectPeerId, + ) -> Result { + let id = (*remote_peer_id, req.session_id()); + + let mut write_lock = self.state.write().await; + let must_be_destroyed = { + let session_user = write_lock + .remote_apps + .remove(&id) + .ok_or(ServerError::SessionNotFound)?; + let session = Arc::clone( + write_lock + .verifiers + .get(&session_user) + .ok_or(ServerError::SessionNotFound)?, + ); + let mut verifier_lock = session.write().await; + if !req.is_force_close() && verifier_lock.detach { + verifier_lock.attached = None; + None + } else { + Some(session_user) + } + }; + if let Some(user) = must_be_destroyed { + let verifier = write_lock.verifiers.remove(&user); + verifier.unwrap().read().await.verifier.close().await; + } Ok(EmptyAppResponse(())) } diff --git a/ng-net/src/actors/app/request.rs b/ng-net/src/actors/app/request.rs index aa57792..a51a387 100644 --- a/ng-net/src/actors/app/request.rs +++ b/ng-net/src/actors/app/request.rs @@ -63,7 +63,7 @@ impl TryFrom for AppResponse { if let AppMessageContentV0::Response(res) = msg.try_into()? { Ok(res) } else { - log_debug!("INVALID AppMessageContentV0::Response"); + log_info!("INVALID AppMessageContentV0::Response"); Err(ProtocolError::InvalidValue) } } @@ -83,7 +83,7 @@ impl TryFrom for AppMessageContentV0 { Err(ProtocolError::ServerError) } } else { - log_debug!("INVALID AppMessageContentV0"); + log_info!("INVALID AppMessageContentV0 {:?}", msg); Err(ProtocolError::InvalidValue) } } diff --git a/ng-net/src/actors/app/session.rs b/ng-net/src/actors/app/session.rs index b74b544..8d582b9 100644 --- a/ng-net/src/actors/app/session.rs +++ b/ng-net/src/actors/app/session.rs @@ -116,7 +116,7 @@ impl EActor for Actor<'_, AppSessionStart, AppSessionStartResponse> { impl AppSessionStop { pub fn get_actor(&self, id: i64) -> Box { - Actor::::new_responder(id) + Actor::::new_responder(id) } } @@ -132,6 +132,19 @@ impl TryFrom for AppSessionStop { } } +impl TryFrom for EmptyAppResponse { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + let res: Result = msg.try_into(); + if let AppMessageContentV0::EmptyResponse = res? { + Ok(EmptyAppResponse(())) + } else { + log_debug!("INVALID AppMessageContentV0::EmptyResponse"); + Err(ProtocolError::InvalidValue) + } + } +} + impl From for ProtocolMessage { fn from(request: AppSessionStop) -> ProtocolMessage { AppMessageContentV0::SessionStop(request).into() @@ -152,10 +165,10 @@ impl From> for ProtocolMessage { } } -impl Actor<'_, AppSessionStop, ()> {} +impl Actor<'_, AppSessionStop, EmptyAppResponse> {} #[async_trait::async_trait] -impl EActor for Actor<'_, AppSessionStop, ()> { +impl EActor for Actor<'_, AppSessionStop, EmptyAppResponse> { async fn respond( &mut self, msg: ProtocolMessage, @@ -163,10 +176,18 @@ impl EActor for Actor<'_, AppSessionStop, ()> { ) -> Result<(), ProtocolError> { let req = AppSessionStop::try_from(msg)?; let res = { - let sb = { BROKER.read().await.get_server_broker()? }; - let lock = sb.read().await; - lock.app_session_stop(req) + let lock = fsm.lock().await; + let remote = lock.remote_peer(); + + if remote.is_none() { + Err(ServerError::BrokerError) + } else { + let sb = { BROKER.read().await.get_server_broker()? }; + let lock = sb.read().await; + lock.app_session_stop(req, remote.as_ref().unwrap()).await + } }; + fsm.lock() .await .send_in_reply_to(res.into(), self.id()) diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index 9b18187..df3ea38 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -189,13 +189,25 @@ impl AppRequest { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AppSessionStopV0 { pub session_id: u64, + pub force_close: bool, } #[derive(Clone, Debug, Serialize, Deserialize)] pub enum AppSessionStop { V0(AppSessionStopV0), } - +impl AppSessionStop { + pub fn session_id(&self) -> u64 { + match self { + Self::V0(v0) => v0.session_id, + } + } + pub fn is_force_close(&self) -> bool { + match self { + Self::V0(v0) => v0.force_close, + } + } +} #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AppSessionStartV0 { pub session_id: u64, diff --git a/ng-net/src/server_broker.rs b/ng-net/src/server_broker.rs index 183866d..bf752ef 100644 --- a/ng-net/src/server_broker.rs +++ b/ng-net/src/server_broker.rs @@ -64,13 +64,18 @@ pub trait IServerBroker: Send + Sync { request_id: i64, fsm: &Mutex, ) -> Result<(), ServerError>; + async fn app_session_start( &self, req: AppSessionStart, remote_peer_id: DirectPeerId, local_peer_id: DirectPeerId, ) -> Result; - fn app_session_stop(&self, req: AppSessionStop) -> Result; + async fn app_session_stop( + &self, + req: AppSessionStop, + remote_peer_id: &DirectPeerId, + ) -> Result; fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError>; diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 92782aa..1aacc14 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -79,6 +79,7 @@ pub enum NgError { OxiGraphError(String), ConfigError(String), LocalBrokerIsHeadless, + LocalBrokerIsNotHeadless, } impl Error for NgError {} diff --git a/ng-sdk-js/app-node/index.js b/ng-sdk-js/app-node/index.js index 3490aa0..1752f5f 100644 --- a/ng-sdk-js/app-node/index.js +++ b/ng-sdk-js/app-node/index.js @@ -30,6 +30,9 @@ ng.init_headless(config).then( async() => { let session = await ng.session_headless_start(user_id); console.log(session); + let res = await ng.session_headless_stop(session.session_id, true); + console.log(res); + } catch (e) { console.error(e); } diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index c2065b3..fe91365 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -187,6 +187,19 @@ pub async fn session_headless_start(user_js: String) -> Result Ok(serde_wasm_bindgen::to_value(&res).unwrap()) } +#[cfg(wasmpack_target = "nodejs")] +#[wasm_bindgen] +pub async fn session_headless_stop(session_id: JsValue, force_close: bool) -> Result<(), String> { + let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) + .map_err(|_| "Invalid session_id".to_string())?; + + let _ = nextgraph::local_broker::session_headless_stop(session_id, force_close) + .await + .map_err(|e: NgError| e.to_string())?; + + Ok(()) +} + #[cfg(wasmpack_target = "nodejs")] #[wasm_bindgen] pub async fn admin_create_user(js_config: JsValue) -> Result { diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 27eed42..61aec7f 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -139,6 +139,7 @@ impl Verifier { } pub async fn close(&self) { + log_debug!("VERIFIER CLOSED {}", self.user_privkey().to_pub()); BROKER .write() .await