From 93b59304804874e4c1bb86dce08f11ec2a0e139e Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Wed, 1 May 2024 18:21:35 +0300 Subject: [PATCH] CommitGet actor --- ng-broker/src/rocksdb_server_storage.rs | 45 +++++++++++++++++--- ng-broker/src/server_broker.rs | 13 ++++++ ng-broker/src/server_storage/core/overlay.rs | 22 +++++++--- ng-verifier/src/verifier.rs | 7 +-- 4 files changed, 74 insertions(+), 13 deletions(-) diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs index 9ea7d0b..adfdbab 100644 --- a/ng-broker/src/rocksdb_server_storage.rs +++ b/ng-broker/src/rocksdb_server_storage.rs @@ -291,6 +291,7 @@ impl RocksDbServerStorage { OverlayStorage::create( inner_overlay, &(*overlay_access).into(), + expose_outer, &self.core_storage, )? } @@ -299,7 +300,7 @@ impl RocksDbServerStorage { }; // the overlay we use to store all the info is: the outer for a RW access, and the inner for a WO access. let overlay = match inner_overlay_storage.overlay_type() { - OverlayType::Outer(_) => { + OverlayType::Outer(_) | OverlayType::OuterOnly => { panic!("shouldnt happen: we are pinning to an inner overlay. why is it outer type?") } OverlayType::Inner(outer) => outer, @@ -410,6 +411,13 @@ impl RocksDbServerStorage { _ => e.into(), })?; Ok(match overlay_storage.overlay_type() { + OverlayType::OuterOnly => { + if overlay.is_outer() { + *overlay + } else { + return Err(ServerError::OverlayMismatch); + } + } OverlayType::Outer(_) => { if overlay.is_outer() { *overlay @@ -472,8 +480,35 @@ impl RocksDbServerStorage { overlay: &OverlayId, id: &ObjectId, ) -> Result, ServerError> { - //TODO: implement correctly ! - Ok(vec![Block::dummy()]) + let overlay = self.check_overlay(overlay)?; + + let mut commit_storage = CommitStorage::open(id, &overlay, &self.core_storage)?; + + let event_info = commit_storage + .event() + .as_ref() + .ok_or(ServerError::NotFound)?; + + // // rehydrate the event : + // let mut blocks = Vec::with_capacity(event_info.blocks.len()); + // for block_id in event_info.blocks { + // let block = self.block_storage.get(&overlay, &block_id)?; + // blocks.push(block); + // } + + // match event_info.event { + // Event::V0(mut v0) => { + // v0.content.blocks = blocks; + // } + // } + + let mut blocks = Vec::with_capacity(event_info.blocks.len()); + for block_id in event_info.blocks.iter() { + let block = self.block_storage.get(&overlay, block_id)?; + blocks.push(block); + } + + Ok(blocks) } fn add_block( @@ -500,7 +535,7 @@ impl RocksDbServerStorage { let overlay = self.check_overlay(overlay)?; let overlay = &overlay; - // check that the sequence number is correct + // TODO: check that the sequence number is correct // check that the topic exists and that this user has pinned it as publisher let mut topic_storage = TopicStorage::open(event.topic_id(), overlay, &self.core_storage) @@ -518,7 +553,7 @@ impl RocksDbServerStorage { return Err(ServerError::AccessDenied); } - // remove the blocks from inside the event, and save the empty event and each block separately. + // remove the blocks from inside the event, and save the "dehydrated" event and each block separately. match event { Event::V0(mut v0) => { let mut overlay_storage = OverlayStorage::new(overlay, &self.core_storage); diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index 623c0aa..29a24f2 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -61,6 +61,7 @@ pub struct CommitInfo { #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum OverlayType { + OuterOnly, Outer(OverlayId), // the ID of the inner overlay corresponding to this outer. Inner(OverlayId), // the ID of the outer overlay corresponding to the inner InnerOnly, @@ -73,6 +74,18 @@ impl OverlayType { _ => None, } } + pub fn is_outer_to_inner(&self) -> bool { + match self { + Self::Outer(_) => true, + _ => false, + } + } + pub fn is_outer_only(&self) -> bool { + match self { + Self::OuterOnly => true, + _ => false, + } + } } impl From for OverlayType { diff --git a/ng-broker/src/server_storage/core/overlay.rs b/ng-broker/src/server_storage/core/overlay.rs index 4b04204..f3b761b 100644 --- a/ng-broker/src/server_storage/core/overlay.rs +++ b/ng-broker/src/server_storage/core/overlay.rs @@ -101,10 +101,19 @@ impl<'a> OverlayStorage<'a> { pub fn create( id: &OverlayId, overlay_type: &OverlayType, + expose_outer: bool, storage: &'a dyn KCVStorage, ) -> Result, StorageError> { let mut overlay = OverlayStorage::new(id, storage); if overlay.exists() { + if !expose_outer + && overlay_type.is_outer_to_inner() + && overlay.overlay_type().is_outer_only() + { + // we are asked to upgrade an OuterOnly to an Outer(). + // let's do it + ExistentialValue::save(&overlay, overlay_type)?; + } return Err(StorageError::AlreadyExists); } overlay.overlay_type.set(overlay_type)?; @@ -112,12 +121,15 @@ impl<'a> OverlayStorage<'a> { if id.is_inner() { if let Some(outer) = overlay_type.is_inner_get_outer() { - match OverlayStorage::create(outer, &OverlayType::Outer(*id), storage) { - Err(StorageError::AlreadyExists) => { - //it is ok if the Outer overlay already exists. someone else had pinned it before, in read_only, and the broker ahd subscribed to it from another broker + if expose_outer { + match OverlayStorage::create(outer, &OverlayType::Outer(*id), false, storage) { + Err(StorageError::AlreadyExists) => { + //it is ok if the Outer overlay already exists. someone else had pinned it before, in read_only, and the broker had subscribed to it from another broker + // or some other user pinned it before as expose_outer. + } + Err(e) => return Err(e), //TODO: in case of error, remove the existentialvalue that was previously saved (or use a transaction) + Ok(_) => {} } - Err(e) => return Err(e), //TODO: remove the existentialvalue that was previously saved (or use a transaction) - Ok(_) => {} } } } diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 179e4a3..05d4619 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -768,13 +768,14 @@ impl Verifier { let user = self.config.user_priv_key.to_pub(); let remote = self.connected_server_id.to_owned().unwrap(); let read_cap = self.config.private_store_read_cap.as_ref().unwrap(); + let private_store_id = self.config.private_store_id.as_ref().unwrap(); + let private_inner_overlay_id = OverlayId::inner(private_store_id, &read_cap.key); + // first we fetch the read_cap commit of private store repo. let msg = CommitGet::V0(CommitGetV0 { id: read_cap.id, topic: None, // we dont have the topic (only available from RepoLink/BranchLink) but we are pretty sure the Broker has the commit anyway. - overlay: Some(OverlayId::outer( - self.config.private_store_id.as_ref().unwrap(), - )), + overlay: Some(private_inner_overlay_id), }); match broker .request::(&user, &remote, msg)