From 776c88e3ae2ddb7299ed35386fb44ca3dbe78e99 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Wed, 1 May 2024 13:33:44 +0300 Subject: [PATCH] TopicSub --- ng-broker/src/rocksdb_server_storage.rs | 58 ++++++++++++++++++++++--- ng-broker/src/server_broker.rs | 3 +- ng-net/src/actors/client/pin_repo.rs | 2 +- ng-net/src/actors/client/topic_sub.rs | 8 +++- ng-repo/src/errors.rs | 3 ++ ng-repo/src/types.rs | 7 +++ 6 files changed, 73 insertions(+), 8 deletions(-) diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs index 95559a2..fd1a463 100644 --- a/ng-broker/src/rocksdb_server_storage.rs +++ b/ng-broker/src/rocksdb_server_storage.rs @@ -278,6 +278,8 @@ impl RocksDbServerStorage { ) -> Result { assert!(!overlay_access.is_read_only()); + // TODO: all the below DB operations should be done inside a single transaction. need refactor of Object-KCV-Mapping to take an optional transaction. + let inner_overlay = overlay_access.overlay_id_for_client_protocol_purpose(); let mut inner_overlay_storage = match OverlayStorage::open(inner_overlay, &self.core_storage) { @@ -403,13 +405,59 @@ impl RocksDbServerStorage { overlay: &OverlayId, repo: &RepoHash, topic: &TopicId, + user_id: &UserId, publisher: Option<&PublisherAdvert>, ) -> Result { - Ok(TopicSubRes::V0(TopicSubResV0 { - topic: topic.clone(), - known_heads: vec![], - publisher: publisher.is_some(), - })) + let mut overlay_storage = + OverlayStorage::open(overlay, &self.core_storage).map_err(|e| match e { + StorageError::NotFound => ServerError::OverlayNotFound, + _ => e.into(), + })?; + let overlay = match overlay_storage.overlay_type() { + OverlayType::Outer(_) => { + if overlay.is_outer() { + overlay + } else { + return Err(ServerError::OverlayMismatch); + } + } + OverlayType::Inner(outer) => { + if outer.is_outer() { + outer + } else { + return Err(ServerError::OverlayMismatch); + } + } + OverlayType::InnerOnly => { + if overlay.is_inner() { + overlay + } else { + return Err(ServerError::OverlayMismatch); + } + } + }; + // now we check that the repo was previously pinned. + // if it was opened but not pinned, then this should be deal with in the ServerBroker, in memory, not here) + + let is_publisher = publisher.is_some(); + // (we already checked that the advert is valid) + + let mut topic_storage = + TopicStorage::create(topic, overlay, repo, &self.core_storage, true)?; + let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &is_publisher)?; + + if is_publisher { + let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, publisher.unwrap())?; + } + + let mut repo_info = RepoHashStorage::open(repo, overlay, &self.core_storage)?; + RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?; + + Ok(TopicSubRes::new_from_heads( + TopicStorage::get_all_heads(&mut topic_storage)?, + is_publisher, + *topic, + )) } pub(crate) fn get_commit( diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index e5ab9f6..93b7c77 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -206,7 +206,8 @@ impl IServerBroker for ServerBroker { user: &UserId, publisher: Option<&PublisherAdvert>, ) -> Result { - self.storage.topic_sub(overlay, repo, topic, publisher) + self.storage + .topic_sub(overlay, repo, topic, user, publisher) } fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result, ServerError> { diff --git a/ng-net/src/actors/client/pin_repo.rs b/ng-net/src/actors/client/pin_repo.rs index a87a8eb..3d5ee69 100644 --- a/ng-net/src/actors/client/pin_repo.rs +++ b/ng-net/src/actors/client/pin_repo.rs @@ -108,7 +108,7 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> { let broker = BROKER.read().await; - // check the validity of the PublisherAdvert(s) + // check the validity of the PublisherAdvert(s). this will return a ProtocolError (will close the connection) let server_peer_id = broker.get_config().unwrap().peer_id; for pub_ad in req.rw_topics() { pub_ad.verify_for_broker(&server_peer_id)?; diff --git a/ng-net/src/actors/client/topic_sub.rs b/ng-net/src/actors/client/topic_sub.rs index 5942d03..7a2e33c 100644 --- a/ng-net/src/actors/client/topic_sub.rs +++ b/ng-net/src/actors/client/topic_sub.rs @@ -98,8 +98,14 @@ impl EActor for Actor<'_, TopicSub, TopicSubRes> { ) -> Result<(), ProtocolError> { let req = TopicSub::try_from(msg)?; - //TODO implement all the server side logic let broker = BROKER.read().await; + + // check the validity of the PublisherAdvert. this will return a ProtocolError (will close the connection) + if let Some(advert) = req.publisher() { + let server_peer_id = broker.get_config().unwrap().peer_id; + advert.verify_for_broker(&server_peer_id)?; + } + let res = broker.get_server_broker()?.topic_sub( req.overlay(), req.hash(), diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 50833ca..223784d 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -219,6 +219,9 @@ pub enum ServerError { InvalidRequest, InvalidSignature, OtherError, + OverlayMismatch, + OverlayNotFound, + TopicNotFound, } impl From for ServerError { diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index 28b6f89..cd3b571 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -575,6 +575,13 @@ impl OverlayId { _ => false, } } + + pub fn is_outer(&self) -> bool { + match self { + Self::Outer(_) => true, + _ => false, + } + } } /// List of Store Overlay types