From 1fa0eb0dc7ef7bd8bb9dc5a1586785d8a12da328 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Wed, 1 May 2024 01:43:51 +0300 Subject: [PATCH] PinRepo --- ng-broker/src/rocksdb_server_storage.rs | 168 ++++++++++++++++--- ng-broker/src/server_broker.rs | 83 +++++---- ng-broker/src/server_storage/core/commit.rs | 2 +- ng-broker/src/server_storage/core/overlay.rs | 14 +- ng-broker/src/server_storage/core/repo.rs | 34 +++- ng-broker/src/server_storage/core/topic.rs | 9 +- ng-net/src/actors/client/commit_get.rs | 2 +- ng-net/src/actors/client/pin_repo.rs | 74 ++++++-- ng-net/src/actors/client/repo_pin_status.rs | 8 +- ng-net/src/actors/client/topic_sub.rs | 1 + ng-net/src/connection.rs | 7 + ng-net/src/server_broker.rs | 17 +- ng-net/src/types.rs | 82 ++++++++- ng-repo/src/errors.rs | 24 ++- ng-repo/src/kcv_storage.rs | 116 +++++++++++-- 15 files changed, 551 insertions(+), 90 deletions(-) diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs index 5ea9431..95559a2 100644 --- a/ng-broker/src/rocksdb_server_storage.rs +++ b/ng-broker/src/rocksdb_server_storage.rs @@ -15,15 +15,15 @@ use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::Mutex; +use crate::server_broker::*; use crate::server_storage::admin::account::Account; use crate::server_storage::admin::invitation::Invitation; use crate::server_storage::admin::wallet::Wallet; use crate::server_storage::core::*; use crate::types::*; -use ng_net::server_broker::*; use ng_net::types::*; use ng_repo::errors::{ProtocolError, ServerError, StorageError}; -use ng_repo::kcv_storage::KCVStorage; + use ng_repo::log::*; use ng_repo::types::*; use ng_storage_rocksdb::block_storage::RocksDbBlockStorage; @@ -239,36 +239,163 @@ impl RocksDbServerStorage { &self, overlay: &OverlayId, repo: &RepoHash, + user: &UserId, ) -> Result { - Err(ServerError::False) - //TODO: implement correctly ! - // Ok(RepoPinStatus::V0(RepoPinStatusV0 { - // hash: repo.clone(), - - // // only possible for RW overlays - // expose_outer: false, + let repo_info = RepoHashStorage::load_for_user(user, repo, overlay, &self.core_storage)?; + let mut topics = vec![]; + for topic in repo_info.topics { + if let Ok(mut model) = TopicStorage::open(&topic, overlay, &self.core_storage) { + match TopicStorage::USERS.get(&mut model, user) { + Err(_) => {} + Ok(publisher) => topics.push(TopicSubRes::new_from_heads( + TopicStorage::get_all_heads(&mut model)?, + publisher, + topic, + )), + } + } + } + if topics.len() == 0 { + return Err(ServerError::False); + } - // // list of topics that are subscribed to - // topics: vec![], - // })) + Ok(RepoPinStatus::V0(RepoPinStatusV0 { + hash: repo.clone(), + expose_outer: repo_info.expose_outer.len() > 0, + topics, + })) } - pub(crate) fn pin_repo( + pub(crate) fn pin_repo_write( &self, - overlay: &OverlayId, + overlay_access: &OverlayAccess, repo: &RepoHash, + user_id: &UserId, ro_topics: &Vec, rw_topics: &Vec, + overlay_root_topic: &Option, + expose_outer: bool, ) -> Result { - //TODO: implement correctly ! - let mut opened = Vec::with_capacity(ro_topics.len() + rw_topics.len()); - for topic in ro_topics { - opened.push((*topic).into()); + assert!(!overlay_access.is_read_only()); + + let inner_overlay = overlay_access.overlay_id_for_client_protocol_purpose(); + let mut inner_overlay_storage = + match OverlayStorage::open(inner_overlay, &self.core_storage) { + Err(StorageError::NotFound) => { + // inner overlay doesn't exist, we need to create it + OverlayStorage::create( + inner_overlay, + &(*overlay_access).into(), + &self.core_storage, + )? + } + Err(e) => return Err(e.into()), + Ok(os) => os, + }; + // the overlay we use to store all the info is: the outer for a RW access, and the inner for a WO access. + let overlay = match inner_overlay_storage.overlay_type() { + OverlayType::Outer(_) => { + panic!("shouldnt happen: we are pinning to an inner overlay. why is it outer type?") + } + OverlayType::Inner(outer) => outer, + OverlayType::InnerOnly => inner_overlay, + } + .clone(); + + // if an overlay_root_topic was provided, we update it in the DB: + // this information is stored on the inner overlay record, contrary to the rest of the info below, that is stored on the outer (except for WO) + if overlay_root_topic.is_some() { + OverlayStorage::TOPIC.set( + &mut inner_overlay_storage, + overlay_root_topic.as_ref().unwrap(), + )?; } + + // we now do the pinning : + + let mut result: RepoOpened = vec![]; + let mut repo_info = RepoHashStorage::open(repo, &overlay, &self.core_storage)?; + + if expose_outer { + RepoHashStorage::EXPOSE_OUTER.add(&mut repo_info, user_id)?; + } + + let mut rw_topics_added: HashMap = + HashMap::with_capacity(rw_topics.len()); for topic in rw_topics { - opened.push((*topic).into()); + let topic_id = topic.topic_id(); + let mut topic_storage = + TopicStorage::create(topic_id, &overlay, repo, &self.core_storage, true)?; + + RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic_id)?; + + let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, topic)?; + + TopicStorage::USERS.add_or_change(&mut topic_storage, user_id, &true)?; + + rw_topics_added.insert( + *topic_id, + TopicSubRes::new_from_heads( + TopicStorage::get_all_heads(&mut topic_storage)?, + true, + *topic_id, + ), + ); + } + + for topic in ro_topics { + if rw_topics_added.contains_key(topic) { + continue; + //we do not want to add again as read_only, a topic that was just opened as RW (publisher) + } + + let mut topic_storage = + TopicStorage::create(topic, &overlay, repo, &self.core_storage, true)?; + + RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?; + + let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &false)?; + + result.push(TopicSubRes::new_from_heads( + TopicStorage::get_all_heads(&mut topic_storage)?, + false, + *topic, + )); + } + result.extend(rw_topics_added.into_values()); + Ok(result) + } + + pub(crate) fn pin_repo_read( + &self, + overlay: &OverlayId, + repo: &RepoHash, + user_id: &UserId, + ro_topics: &Vec, + ) -> Result { + let mut overlay_storage = OverlayStorage::open(overlay, &self.core_storage)?; + match overlay_storage.overlay_type() { + OverlayType::Outer(_) => { + let mut result: RepoOpened = vec![]; + let repo_info = RepoHashStorage::load_topics(repo, overlay, &self.core_storage)?; + for topic in ro_topics { + if repo_info.topics.contains(topic) { + let mut topic_storage = + TopicStorage::open(topic, overlay, &self.core_storage)?; + let _ = + TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &false)?; + + result.push(TopicSubRes::new_from_heads( + TopicStorage::get_all_heads(&mut topic_storage)?, + false, + *topic, + )); + } + } + Ok(result) + } + _ => return Err(ServerError::NotFound), } - Ok(opened) } pub(crate) fn topic_sub( @@ -278,7 +405,6 @@ impl RocksDbServerStorage { topic: &TopicId, publisher: Option<&PublisherAdvert>, ) -> Result { - //TODO: implement correctly ! Ok(TopicSubRes::V0(TopicSubResV0 { topic: topic.clone(), known_heads: vec![], diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index 6cfaf48..e5ab9f6 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -66,6 +66,27 @@ pub enum OverlayType { InnerOnly, } +impl OverlayType { + pub fn is_inner_get_outer(&self) -> Option<&OverlayId> { + match self { + Self::Inner(outer) => Some(outer), + _ => None, + } + } +} + +impl From for OverlayType { + fn from(oa: OverlayAccess) -> OverlayType { + match oa { + OverlayAccess::ReadOnly(_) => { + panic!("cannot create an OverlayType from a ReadOnly OverlayAccess") + } + OverlayAccess::ReadWrite((inner, outer)) => OverlayType::Inner(outer), + OverlayAccess::WriteOnly(inner) => OverlayType::InnerOnly, + } + } +} + pub struct OverlayInfo { pub overlay_type: OverlayType, pub overlay_topic: Option, @@ -75,6 +96,7 @@ pub struct OverlayInfo { pub struct ServerBroker { storage: RocksDbServerStorage, + overlays: HashMap, inner_overlays: HashMap>, } @@ -93,6 +115,9 @@ impl ServerBroker { } } +//TODO: the purpose of this trait is to have a level of indirection so we can keep some data in memory (cache) and avoid hitting the storage backend (rocksdb) at every call. +//for now this cache is not implemented, but the structs are ready (see above), and it would just require to change slightly the implementation of the trait functions here below. + impl IServerBroker for ServerBroker { fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> { self.storage.next_seq_for_peer(peer, seq) @@ -136,36 +161,41 @@ impl IServerBroker for ServerBroker { &self, overlay: &OverlayId, repo: &RepoHash, + user: &UserId, ) -> Result { - Err(ServerError::False) - //TODO: implement correctly ! - // Ok(RepoPinStatus::V0(RepoPinStatusV0 { - // hash: repo.clone(), - - // // only possible for RW overlays - // expose_outer: false, + self.storage.get_repo_pin_status(overlay, repo, user) + } - // // list of topics that are subscribed to - // topics: vec![], - // })) + fn pin_repo_write( + &self, + overlay: &OverlayAccess, + repo: &RepoHash, + user_id: &UserId, + ro_topics: &Vec, + rw_topics: &Vec, + overlay_root_topic: &Option, + expose_outer: bool, + ) -> Result { + self.storage.pin_repo_write( + overlay, + repo, + user_id, + ro_topics, + rw_topics, + overlay_root_topic, + expose_outer, + ) } - fn pin_repo( + fn pin_repo_read( &self, overlay: &OverlayId, repo: &RepoHash, + user_id: &UserId, ro_topics: &Vec, - rw_topics: &Vec, ) -> Result { - //TODO: implement correctly ! - let mut opened = Vec::with_capacity(ro_topics.len() + rw_topics.len()); - for topic in ro_topics { - opened.push((*topic).into()); - } - for topic in rw_topics { - opened.push((*topic).into()); - } - Ok(opened) + self.storage + .pin_repo_read(overlay, repo, user_id, ro_topics) } fn topic_sub( @@ -173,18 +203,13 @@ impl IServerBroker for ServerBroker { overlay: &OverlayId, repo: &RepoHash, topic: &TopicId, + user: &UserId, publisher: Option<&PublisherAdvert>, ) -> Result { - //TODO: implement correctly ! - Ok(TopicSubRes::V0(TopicSubResV0 { - topic: topic.clone(), - known_heads: vec![], - publisher: publisher.is_some(), - })) + self.storage.topic_sub(overlay, repo, topic, publisher) } fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result, ServerError> { - //TODO: implement correctly ! - Ok(vec![Block::dummy()]) + self.storage.get_commit(overlay, id) } } diff --git a/ng-broker/src/server_storage/core/commit.rs b/ng-broker/src/server_storage/core/commit.rs index be61c44..549a235 100644 --- a/ng-broker/src/server_storage/core/commit.rs +++ b/ng-broker/src/server_storage/core/commit.rs @@ -115,7 +115,7 @@ impl<'a> CommitStorage<'a> { id: &ObjectId, overlay: &OverlayId, event: &Option, - storage: &'a mut dyn KCVStorage, + storage: &'a dyn KCVStorage, ) -> Result, StorageError> { let mut creating = CommitStorage::new(id, overlay, storage); if creating.exists() { diff --git a/ng-broker/src/server_storage/core/overlay.rs b/ng-broker/src/server_storage/core/overlay.rs index 3709133..30fb1d7 100644 --- a/ng-broker/src/server_storage/core/overlay.rs +++ b/ng-broker/src/server_storage/core/overlay.rs @@ -100,7 +100,7 @@ impl<'a> OverlayStorage<'a> { pub fn create( id: &OverlayId, overlay_type: &OverlayType, - storage: &'a mut dyn KCVStorage, + storage: &'a dyn KCVStorage, ) -> Result, StorageError> { let mut overlay = OverlayStorage::new(id, storage); if overlay.exists() { @@ -109,6 +109,18 @@ impl<'a> OverlayStorage<'a> { overlay.overlay_type.set(overlay_type)?; ExistentialValue::save(&overlay, overlay_type)?; + if id.is_inner() { + if let Some(outer) = overlay_type.is_inner_get_outer() { + match OverlayStorage::create(outer, &OverlayType::Outer(*id), storage) { + Err(StorageError::AlreadyExists) => { + //it is ok if the Outer overlay already exists. someone else had pinned it before, in read_only, and the broker ahd subscribed to it from another broker + } + Err(e) => return Err(e), //TODO: remove the existentialvalue that was previously saved (or use a transaction) + Ok(_) => {} + } + } + } + Ok(overlay) } diff --git a/ng-broker/src/server_storage/core/repo.rs b/ng-broker/src/server_storage/core/repo.rs index 1ca5088..19a40f4 100644 --- a/ng-broker/src/server_storage/core/repo.rs +++ b/ng-broker/src/server_storage/core/repo.rs @@ -61,7 +61,6 @@ impl<'a> RepoHashStorage<'a> { storage: &'a dyn KCVStorage, ) -> Result { let mut opening = Self::new(repo, overlay, storage); - let info = RepoInfo { topics: Self::TOPICS.get_all(&mut opening)?, expose_outer: Self::EXPOSE_OUTER.get_all(&mut opening)?, @@ -69,6 +68,37 @@ impl<'a> RepoHashStorage<'a> { Ok(info) } + pub fn load_topics( + repo: &RepoHash, + overlay: &OverlayId, + storage: &'a dyn KCVStorage, + ) -> Result { + let mut opening = Self::new(repo, overlay, storage); + let info = RepoInfo { + topics: Self::TOPICS.get_all(&mut opening)?, + expose_outer: HashSet::new(), + }; + Ok(info) + } + + pub fn load_for_user( + user: &UserId, + repo: &RepoHash, + overlay: &OverlayId, + storage: &'a dyn KCVStorage, + ) -> Result { + let mut opening = Self::new(repo, overlay, storage); + let mut expose_outer = HashSet::new(); + if let Ok(()) = Self::EXPOSE_OUTER.has(&mut opening, user) { + expose_outer.insert(*user); + } + let info = RepoInfo { + topics: Self::TOPICS.get_all(&mut opening)?, + expose_outer, + }; + Ok(info) + } + pub fn new(repo: &RepoHash, overlay: &OverlayId, storage: &'a dyn KCVStorage) -> Self { let mut key: Vec = Vec::with_capacity(33 + 33); key.append(&mut to_vec(overlay).unwrap()); @@ -87,7 +117,7 @@ impl<'a> RepoHashStorage<'a> { pub fn create( repo: &RepoHash, overlay: &OverlayId, - storage: &'a mut dyn KCVStorage, + storage: &'a dyn KCVStorage, ) -> Result, StorageError> { let mut creating = Self::new(repo, overlay, storage); Ok(creating) diff --git a/ng-broker/src/server_storage/core/topic.rs b/ng-broker/src/server_storage/core/topic.rs index 88722eb..34562a5 100644 --- a/ng-broker/src/server_storage/core/topic.rs +++ b/ng-broker/src/server_storage/core/topic.rs @@ -109,11 +109,16 @@ impl<'a> TopicStorage<'a> { id: &TopicId, overlay: &OverlayId, repo: &RepoHash, - storage: &'a mut dyn KCVStorage, + storage: &'a dyn KCVStorage, + or_open: bool, ) -> Result, StorageError> { let mut topic = TopicStorage::new(id, overlay, storage); if topic.exists() { - return Err(StorageError::AlreadyExists); + if or_open { + return Ok(topic); + } else { + return Err(StorageError::AlreadyExists); + } } topic.repo.set(repo)?; ExistentialValue::save(&topic, repo)?; diff --git a/ng-net/src/actors/client/commit_get.rs b/ng-net/src/actors/client/commit_get.rs index 7878609..406af02 100644 --- a/ng-net/src/actors/client/commit_get.rs +++ b/ng-net/src/actors/client/commit_get.rs @@ -92,7 +92,7 @@ impl EActor for Actor<'_, CommitGet, Block> { let blocks_res = broker .get_server_broker()? .get_commit(req.overlay(), req.id()); - + // IF NEEDED, the get_commit could be changed to be async, and then the send_in_reply_to would be also totally async match blocks_res { Ok(blocks) => { if blocks.len() == 0 { diff --git a/ng-net/src/actors/client/pin_repo.rs b/ng-net/src/actors/client/pin_repo.rs index 5ed6017..a87a8eb 100644 --- a/ng-net/src/actors/client/pin_repo.rs +++ b/ng-net/src/actors/client/pin_repo.rs @@ -25,8 +25,7 @@ impl PinRepo { Actor::::new_responder(id) } pub fn from_repo(repo: &Repo, broker_id: &DirectPeerId) -> PinRepo { - let overlay = - OverlayAccess::ReadWrite((repo.store.inner_overlay(), repo.store.outer_overlay())); + let overlay = OverlayAccess::new_write_access_from_store(&repo.store); let mut rw_topics = Vec::with_capacity(repo.branches.len()); let mut ro_topics = vec![]; for (_, branch) in repo.branches.iter() { @@ -107,18 +106,73 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> { ) -> Result<(), ProtocolError> { let req = PinRepo::try_from(msg)?; - //TODO implement all the server side logic let broker = BROKER.read().await; - let res = broker.get_server_broker()?.pin_repo( - req.overlay(), - req.hash(), - req.ro_topics(), - req.rw_topics(), - ); + // check the validity of the PublisherAdvert(s) + let server_peer_id = broker.get_config().unwrap().peer_id; + for pub_ad in req.rw_topics() { + pub_ad.verify_for_broker(&server_peer_id)?; + } + + let result = { + match req.overlay_access() { + OverlayAccess::ReadOnly(r) => { + if r.is_inner() + || req.overlay() != r + || req.rw_topics().len() > 0 + || req.overlay_root_topic().is_some() + { + Err(ServerError::InvalidRequest) + } else { + broker.get_server_broker()?.pin_repo_read( + req.overlay(), + req.hash(), + &fsm.lock().await.user_id_or_err()?, + req.ro_topics(), + ) + } + } + OverlayAccess::ReadWrite((w, r)) => { + if req.overlay() != w + || !w.is_inner() + || r.is_inner() + || req.expose_outer() && req.rw_topics().len() == 0 + { + // we do not allow to expose_outer if not a publisher for at least one topic + // TODO add a check on "|| overlay_root_topic.is_none()" because it should be mandatory to have one (not sent by client at the moment) + Err(ServerError::InvalidRequest) + } else { + broker.get_server_broker()?.pin_repo_write( + req.overlay_access(), + req.hash(), + &fsm.lock().await.user_id_or_err()?, + req.ro_topics(), + req.rw_topics(), + req.overlay_root_topic(), + req.expose_outer(), + ) + } + } + OverlayAccess::WriteOnly(w) => { + if !w.is_inner() || req.overlay() != w || req.expose_outer() { + Err(ServerError::InvalidRequest) + } else { + broker.get_server_broker()?.pin_repo_write( + req.overlay_access(), + req.hash(), + &fsm.lock().await.user_id_or_err()?, + req.ro_topics(), + req.rw_topics(), + req.overlay_root_topic(), + false, + ) + } + } + } + }; fsm.lock() .await - .send_in_reply_to(res.into(), self.id()) + .send_in_reply_to(result.into(), self.id()) .await?; Ok(()) } diff --git a/ng-net/src/actors/client/repo_pin_status.rs b/ng-net/src/actors/client/repo_pin_status.rs index 9ebfb6f..9ea8639 100644 --- a/ng-net/src/actors/client/repo_pin_status.rs +++ b/ng-net/src/actors/client/repo_pin_status.rs @@ -78,9 +78,11 @@ impl EActor for Actor<'_, RepoPinStatusReq, RepoPinStatus> { ) -> Result<(), ProtocolError> { let req = RepoPinStatusReq::try_from(msg)?; let broker = BROKER.read().await; - let res = broker - .get_server_broker()? - .get_repo_pin_status(req.overlay(), req.hash()); + let res = broker.get_server_broker()?.get_repo_pin_status( + req.overlay(), + req.hash(), + &fsm.lock().await.user_id_or_err()?, + ); fsm.lock() .await .send_in_reply_to(res.into(), self.id()) diff --git a/ng-net/src/actors/client/topic_sub.rs b/ng-net/src/actors/client/topic_sub.rs index 69f6764..5942d03 100644 --- a/ng-net/src/actors/client/topic_sub.rs +++ b/ng-net/src/actors/client/topic_sub.rs @@ -104,6 +104,7 @@ impl EActor for Actor<'_, TopicSub, TopicSubRes> { req.overlay(), req.hash(), req.topic(), + &fsm.lock().await.user_id_or_err()?, req.publisher(), ); diff --git a/ng-net/src/connection.rs b/ng-net/src/connection.rs index 0d9c882..fa22efe 100644 --- a/ng-net/src/connection.rs +++ b/ng-net/src/connection.rs @@ -261,6 +261,13 @@ impl NoiseFSM { } } + pub fn user_id_or_err(&self) -> Result { + match &self.config { + Some(start_config) => start_config.get_user().ok_or(ProtocolError::ActorError), + _ => Err(ProtocolError::ActorError), + } + } + fn decrypt(&mut self, ciphertext: &Noise) -> Result { let ser = self .noise_cipher_state_dec diff --git a/ng-net/src/server_broker.rs b/ng-net/src/server_broker.rs index b3671a8..d7045e6 100644 --- a/ng-net/src/server_broker.rs +++ b/ng-net/src/server_broker.rs @@ -41,14 +41,26 @@ pub trait IServerBroker: Send + Sync { &self, overlay: &OverlayId, repo: &RepoHash, + user_id: &UserId, ) -> Result; - fn pin_repo( + fn pin_repo_write( &self, - overlay: &OverlayId, + overlay: &OverlayAccess, repo: &RepoHash, + user_id: &UserId, ro_topics: &Vec, rw_topics: &Vec, + overlay_root_topic: &Option, + expose_outer: bool, + ) -> Result; + + fn pin_repo_read( + &self, + overlay: &OverlayId, + repo: &RepoHash, + user_id: &UserId, + ro_topics: &Vec, ) -> Result; fn topic_sub( @@ -56,6 +68,7 @@ pub trait IServerBroker: Send + Sync { overlay: &OverlayId, repo: &RepoHash, topic: &TopicId, + user_id: &UserId, publisher: Option<&PublisherAdvert>, ) -> Result; diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index dc8623d..14c0f70 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -20,8 +20,11 @@ use crate::{actor::EActor, actors::admin::*, actors::*}; use core::fmt; use ng_repo::errors::*; use ng_repo::log::*; +use ng_repo::store::Store; use ng_repo::types::*; +use ng_repo::utils::{sign, verify}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::{ any::{Any, TypeId}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, @@ -1213,6 +1216,38 @@ pub enum OverlayAccess { } impl OverlayAccess { + pub fn is_read_only(&self) -> bool { + match self { + Self::ReadOnly(_) => true, + _ => false, + } + } + pub fn new_write_access_from_store(store: &Store) -> OverlayAccess { + match store.get_store_repo() { + StoreRepo::V0(StoreRepoV0::PrivateStore(_)) | StoreRepo::V0(StoreRepoV0::Dialog(_)) => { + OverlayAccess::WriteOnly(store.inner_overlay()) + } + StoreRepo::V0(StoreRepoV0::ProtectedStore(_)) + | StoreRepo::V0(StoreRepoV0::Group(_)) + | StoreRepo::V0(StoreRepoV0::PublicStore(_)) => { + OverlayAccess::ReadWrite((store.inner_overlay(), store.outer_overlay())) + } + } + } + + pub fn new_read_access_from_store(store: &Store) -> OverlayAccess { + match store.get_store_repo() { + StoreRepo::V0(StoreRepoV0::PrivateStore(_)) | StoreRepo::V0(StoreRepoV0::Dialog(_)) => { + panic!("cannot get read access to a private or dialog store"); + } + StoreRepo::V0(StoreRepoV0::ProtectedStore(_)) + | StoreRepo::V0(StoreRepoV0::Group(_)) + | StoreRepo::V0(StoreRepoV0::PublicStore(_)) => { + OverlayAccess::ReadOnly(store.outer_overlay()) + } + } + } + pub fn new_ro(outer_overlay: OverlayId) -> Result { if let OverlayId::Outer(_digest) = outer_overlay { Ok(OverlayAccess::ReadOnly(outer_overlay)) @@ -1479,8 +1514,6 @@ pub enum PublisherAdvert { V0(PublisherAdvertV0), } -use ng_repo::utils::sign; - impl PublisherAdvert { pub fn new( topic_id: TopicId, @@ -1500,6 +1533,27 @@ impl PublisherAdvert { Self::V0(v0) => &v0.content.topic, } } + + pub fn verify(&self) -> Result<(), NgError> { + match self { + Self::V0(v0) => verify( + &serde_bare::to_vec(&v0.content).unwrap(), + v0.sig, + v0.content.topic, + ), + } + } + + pub fn verify_for_broker(&self, peer_id: &DirectPeerId) -> Result<(), ProtocolError> { + match self { + Self::V0(v0) => { + if v0.content.peer != *peer_id { + return Err(ProtocolError::InvalidPublisherAdvert); + } + } + } + Ok(self.verify()?) + } } /// Topic subscription request by a peer @@ -2684,6 +2738,23 @@ impl PinRepo { PinRepo::V0(o) => &o.overlay.overlay_id_for_client_protocol_purpose(), } } + pub fn overlay_access(&self) -> &OverlayAccess { + match self { + PinRepo::V0(o) => &o.overlay, + } + } + + pub fn overlay_root_topic(&self) -> &Option { + match self { + PinRepo::V0(o) => &o.overlay_root_topic, + } + } + + pub fn expose_outer(&self) -> bool { + match self { + PinRepo::V0(o) => o.expose_outer, + } + } } /// Request to refresh the Pinning of a previously pinned repo. @@ -3244,6 +3315,13 @@ impl TopicSubRes { Self::V0(v0) => v0.publisher, } } + pub fn new_from_heads(topics: HashSet, publisher: bool, topic: TopicId) -> Self { + TopicSubRes::V0(TopicSubResV0 { + topic, + known_heads: topics.into_iter().collect(), + publisher, + }) + } } impl From for TopicSubRes { diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 82b64d2..50833ca 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -215,6 +215,28 @@ pub enum ServerError { RepoAlreadyOpened, NotFound, EmptyStream, + StorageError, + InvalidRequest, + InvalidSignature, + OtherError, +} + +impl From for ServerError { + fn from(e: StorageError) -> Self { + match e { + StorageError::NotFound => ServerError::NotFound, + _ => ServerError::StorageError, + } + } +} + +impl From for ServerError { + fn from(e: NgError) -> Self { + match e { + NgError::InvalidSignature => ServerError::InvalidSignature, + _ => ServerError::OtherError, + } + } } impl ServerError { @@ -318,7 +340,6 @@ pub enum ProtocolError { WsError, ActorError, InvalidState, - SignatureError, InvalidSignature, SerializationError, AccessDenied, @@ -330,6 +351,7 @@ pub enum ProtocolError { InvalidValue, AlreadyExists, RepoIdRequired, + InvalidPublisherAdvert, ConnectionError, Timeout, diff --git a/ng-repo/src/kcv_storage.rs b/ng-repo/src/kcv_storage.rs index 12049aa..b6432db 100644 --- a/ng-repo/src/kcv_storage.rs +++ b/ng-repo/src/kcv_storage.rs @@ -229,6 +229,22 @@ impl< let key = Self::compute_key(model, column)?; model.storage().put(self.prefix, &key, None, &vec![], &None) } + + pub fn add_lazy(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> { + model.check_exists()?; + let key = Self::compute_key(model, column)?; + model.storage().write_transaction(&mut |tx| { + match tx.has_property_value(self.prefix, &key, None, &vec![], &None) { + Ok(_) => {} + Err(StorageError::NotFound) => { + tx.put(self.prefix, &key, None, &vec![], &None)?; + } + Err(e) => return Err(e), + }; + Ok(()) + }) + } + pub fn remove(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> { model.check_exists()?; let key = Self::compute_key(model, column)?; @@ -281,7 +297,7 @@ impl< pub struct MultiMapColumn< Model: IModel, Column: Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, - Value: Serialize + for<'a> Deserialize<'a>, + Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq, > { prefix: u8, phantom_column: PhantomData, @@ -293,7 +309,7 @@ pub struct MultiMapColumn< impl< Model: IModel, Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, - Value: Serialize + for<'a> Deserialize<'a>, + Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq, > MultiMapColumn { pub const fn new(prefix: u8) -> Self { @@ -351,14 +367,60 @@ impl< .has_property_value(self.prefix, &key, None, &to_vec(value)?, &None) } - pub fn has_regardless_value( + pub fn get(&self, model: &mut Model, column: &Column) -> Result { + model.check_exists()?; + let key = MultiValueColumn::compute_key(model, column)?; + let val_ser = model.storage().get(self.prefix, &key, None, &None)?; + Ok(from_slice(&val_ser)?) + } + + pub fn get_or_add( + &self, + model: &mut Model, + column: &Column, + value: &Value, + ) -> Result { + model.check_exists()?; + let key = MultiValueColumn::compute_key(model, column)?; + let mut found: Option = None; + model.storage().write_transaction(&mut |tx| { + found = match tx.get(self.prefix, &key, None, &None) { + Ok(val_ser) => Some(from_slice(&val_ser)?), + Err(StorageError::NotFound) => { + tx.put(self.prefix, &key, None, &to_vec(value)?, &None)?; + None + } + Err(e) => return Err(e), + }; + Ok(()) + })?; + Ok(found.unwrap_or(value.clone())) + } + + pub fn add_or_change( &self, model: &mut Model, column: &Column, + value: &Value, ) -> Result<(), StorageError> { model.check_exists()?; let key = MultiValueColumn::compute_key(model, column)?; - model.storage().get(self.prefix, &key, None, &None)?; + let mut found: Option = None; + model.storage().write_transaction(&mut |tx| { + found = match tx.get(self.prefix, &key, None, &None) { + Ok(val_ser) => Some(from_slice(&val_ser)?), + Err(StorageError::NotFound) => { + tx.put(self.prefix, &key, None, &to_vec(value)?, &None)?; + None + } + Err(e) => return Err(e), + }; + if found.is_some() && found.as_ref().unwrap() != value { + // we change it + tx.put(self.prefix, &key, None, &to_vec(value)?, &None)?; + } + Ok(()) + })?; Ok(()) } @@ -387,7 +449,7 @@ impl< impl< Model: IModel, Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, - Value: Serialize + for<'a> Deserialize<'a>, + Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq, > IMultiValueColumn for MultiMapColumn { fn value_size(&self) -> Result { @@ -526,7 +588,7 @@ pub struct SingleValueColumn Deseriali phantom_model: PhantomData, } -impl Deserialize<'d>> ISingleValueColumn +impl Deserialize<'d>> ISingleValueColumn for SingleValueColumn { fn suffix(&self) -> u8 { @@ -534,7 +596,9 @@ impl Deserialize<'d>> ISingleValueColu } } -impl Deserialize<'d>> SingleValueColumn { +impl Deserialize<'d>> + SingleValueColumn +{ pub const fn new(suffix: u8) -> Self { SingleValueColumn { suffix, @@ -553,6 +617,7 @@ impl Deserialize<'d>> SingleValueColum &None, ) } + pub fn get(&self, model: &mut Model) -> Result { model.check_exists()?; match model @@ -564,6 +629,29 @@ impl Deserialize<'d>> SingleValueColum } } + pub fn get_or_set(&self, model: &mut Model, value: &Value) -> Result { + model.check_exists()?; + let mut found: Option = None; + model.storage().write_transaction(&mut |tx| { + found = match tx.get(model.prefix(), model.key(), Some(self.suffix), &None) { + Ok(val_ser) => Some(from_slice(&val_ser)?), + Err(StorageError::NotFound) => { + tx.put( + model.prefix(), + model.key(), + Some(self.suffix), + &to_vec(value)?, + &None, + )?; + None + } + Err(e) => return Err(e), + }; + Ok(()) + })?; + Ok(found.unwrap_or(value.clone())) + } + pub fn has(&self, model: &mut Model, value: &Value) -> Result<(), StorageError> { model.check_exists()?; model.storage().has_property_value( @@ -575,14 +663,12 @@ impl Deserialize<'d>> SingleValueColum ) } - // should call the Model::del() instead - // pub fn del( - // &self, - // model: &mut Model, - // tx: &mut dyn WriteTransaction, - // ) -> Result<(), StorageError> { - // tx.del(model.prefix(), model.key(), Some(self.suffix), &None) - // } + pub fn del(&self, model: &mut Model) -> Result<(), StorageError> { + model.check_exists()?; + model + .storage() + .del(model.prefix(), model.key(), Some(self.suffix), &None) + } } pub struct ExistentialValue Deserialize<'d>> {