From 7e3d036e9619426d83113bfa6ffedf783c768814 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Wed, 1 May 2024 16:53:20 +0300 Subject: [PATCH] PublishEvent actor --- ng-broker/src/rocksdb_server_storage.rs | 126 ++++++++++++++++--- ng-broker/src/server_broker.rs | 9 ++ ng-broker/src/server_storage/core/commit.rs | 24 +++- ng-broker/src/server_storage/core/overlay.rs | 1 + ng-net/src/actors/client/event.rs | 23 +++- ng-net/src/server_broker.rs | 7 ++ ng-repo/src/block_storage.rs | 17 ++- ng-repo/src/commit.rs | 2 +- ng-repo/src/errors.rs | 3 + ng-repo/src/event.rs | 32 ++++- ng-repo/src/object.rs | 109 +++++++++------- ng-repo/src/store.rs | 27 +++- ng-repo/src/types.rs | 5 + ng-storage-rocksdb/src/block_storage.rs | 17 ++- 14 files changed, 321 insertions(+), 81 deletions(-) diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs index fd1a463..9ea7d0b 100644 --- a/ng-broker/src/rocksdb_server_storage.rs +++ b/ng-broker/src/rocksdb_server_storage.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; use std::fs::{read, File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use crate::server_broker::*; use crate::server_storage::admin::account::Account; @@ -22,9 +22,12 @@ use crate::server_storage::admin::wallet::Wallet; use crate::server_storage::core::*; use crate::types::*; use ng_net::types::*; +use ng_repo::block_storage::{BlockStorage, HashMapBlockStorage}; use ng_repo::errors::{ProtocolError, ServerError, StorageError}; use ng_repo::log::*; +use ng_repo::object::Object; +use ng_repo::store::Store; use ng_repo::types::*; use ng_storage_rocksdb::block_storage::RocksDbBlockStorage; use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage; @@ -400,42 +403,46 @@ impl RocksDbServerStorage { } } - pub(crate) fn topic_sub( - &self, - overlay: &OverlayId, - repo: &RepoHash, - topic: &TopicId, - user_id: &UserId, - publisher: Option<&PublisherAdvert>, - ) -> Result { + fn check_overlay(&self, overlay: &OverlayId) -> Result { let mut overlay_storage = OverlayStorage::open(overlay, &self.core_storage).map_err(|e| match e { StorageError::NotFound => ServerError::OverlayNotFound, _ => e.into(), })?; - let overlay = match overlay_storage.overlay_type() { + Ok(match overlay_storage.overlay_type() { OverlayType::Outer(_) => { if overlay.is_outer() { - overlay + *overlay } else { return Err(ServerError::OverlayMismatch); } } OverlayType::Inner(outer) => { if outer.is_outer() { - outer + *outer } else { return Err(ServerError::OverlayMismatch); } } OverlayType::InnerOnly => { if overlay.is_inner() { - overlay + *overlay } else { return Err(ServerError::OverlayMismatch); } } - }; + }) + } + + pub(crate) fn topic_sub( + &self, + overlay: &OverlayId, + repo: &RepoHash, + topic: &TopicId, + user_id: &UserId, + publisher: Option<&PublisherAdvert>, + ) -> Result { + let overlay = self.check_overlay(overlay)?; // now we check that the repo was previously pinned. // if it was opened but not pinned, then this should be deal with in the ServerBroker, in memory, not here) @@ -443,14 +450,14 @@ impl RocksDbServerStorage { // (we already checked that the advert is valid) let mut topic_storage = - TopicStorage::create(topic, overlay, repo, &self.core_storage, true)?; + TopicStorage::create(topic, &overlay, repo, &self.core_storage, true)?; let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &is_publisher)?; if is_publisher { let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, publisher.unwrap())?; } - let mut repo_info = RepoHashStorage::open(repo, overlay, &self.core_storage)?; + let mut repo_info = RepoHashStorage::open(repo, &overlay, &self.core_storage)?; RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?; Ok(TopicSubRes::new_from_heads( @@ -468,4 +475,91 @@ impl RocksDbServerStorage { //TODO: implement correctly ! Ok(vec![Block::dummy()]) } + + fn add_block( + &self, + overlay_id: &OverlayId, + overlay_storage: &mut OverlayStorage, + block: Block, + ) -> Result { + let block_id = self.block_storage.put(overlay_id, &block, true)?; + OverlayStorage::BLOCKS.increment(overlay_storage, &block_id)?; + Ok(block_id) + } + + pub(crate) fn save_event( + &self, + overlay: &OverlayId, + mut event: Event, + user_id: &UserId, + ) -> Result<(), ServerError> { + if overlay.is_outer() { + // we don't publish events on the outer overlay! + return Err(ServerError::OverlayMismatch); + } + let overlay = self.check_overlay(overlay)?; + let overlay = &overlay; + + // check that the sequence number is correct + + // check that the topic exists and that this user has pinned it as publisher + let mut topic_storage = TopicStorage::open(event.topic_id(), overlay, &self.core_storage) + .map_err(|e| match e { + StorageError::NotFound => ServerError::TopicNotFound, + _ => e.into(), + })?; + let is_publisher = TopicStorage::USERS + .get(&mut topic_storage, user_id) + .map_err(|e| match e { + StorageError::NotFound => ServerError::AccessDenied, + _ => e.into(), + })?; + if !is_publisher { + return Err(ServerError::AccessDenied); + } + + // remove the blocks from inside the event, and save the empty event and each block separately. + match event { + Event::V0(mut v0) => { + let mut overlay_storage = OverlayStorage::new(overlay, &self.core_storage); + let mut extracted_blocks_ids = Vec::with_capacity(v0.content.blocks.len()); + let first_block_copy = v0.content.blocks[0].clone(); + let temp_mini_block_storage = HashMapBlockStorage::new(); + for block in v0.content.blocks { + let _ = temp_mini_block_storage.put(overlay, &block, false)?; + extracted_blocks_ids.push(self.add_block( + overlay, + &mut overlay_storage, + block, + )?); + } + + // creating a temporary store to access the blocks + let temp_store = Store::new_from_overlay_id( + overlay, + Arc::new(std::sync::RwLock::new(temp_mini_block_storage)), + ); + let commit_id = extracted_blocks_ids[0]; + let header = Object::load_header(&first_block_copy, &temp_store) + .map_err(|_| ServerError::InvalidHeader)?; + + v0.content.blocks = vec![]; + let event_info = EventInfo { + event: Event::V0(v0), + blocks: extracted_blocks_ids, + }; + + CommitStorage::create( + &commit_id, + overlay, + event_info, + &header, + true, + &self.core_storage, + )?; + } + } + + Ok(()) + } } diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index 93b7c77..623c0aa 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -213,4 +213,13 @@ impl IServerBroker for ServerBroker { fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result, ServerError> { self.storage.get_commit(overlay, id) } + + fn dispatch_event( + &self, + overlay: &OverlayId, + event: Event, + user_id: &UserId, + ) -> Result<(), ServerError> { + self.storage.save_event(overlay, event, user_id) + } } diff --git a/ng-broker/src/server_storage/core/commit.rs b/ng-broker/src/server_storage/core/commit.rs index 549a235..345b6bd 100644 --- a/ng-broker/src/server_storage/core/commit.rs +++ b/ng-broker/src/server_storage/core/commit.rs @@ -114,15 +114,33 @@ impl<'a> CommitStorage<'a> { pub fn create( id: &ObjectId, overlay: &OverlayId, - event: &Option, + event: EventInfo, + header: &CommitHeader, + home_pinned: bool, storage: &'a dyn KCVStorage, ) -> Result, StorageError> { let mut creating = CommitStorage::new(id, overlay, storage); if creating.exists() { return Err(StorageError::AlreadyExists); } - creating.event.set(event)?; - ExistentialValue::save(&creating, event)?; + let event_opt = Some(event); + creating.event.set(&event_opt)?; + ExistentialValue::save(&creating, &event_opt)?; + + if home_pinned { + Self::HOME_PINNED.set(&mut creating, &true)?; + } + + // adding all the references + for ack in header.acks() { + Self::ACKS.add(&mut creating, &ack)?; + } + for dep in header.deps() { + Self::DEPS.add(&mut creating, &dep)?; + } + for file in header.files() { + Self::FILES.add(&mut creating, file)?; + } Ok(creating) } diff --git a/ng-broker/src/server_storage/core/overlay.rs b/ng-broker/src/server_storage/core/overlay.rs index 30fb1d7..4b04204 100644 --- a/ng-broker/src/server_storage/core/overlay.rs +++ b/ng-broker/src/server_storage/core/overlay.rs @@ -51,6 +51,7 @@ impl<'a> OverlayStorage<'a> { // Overlay properties pub const TYPE: ExistentialValueColumn = ExistentialValueColumn::new(b'y'); + /// BE CAREFUL: this property is exceptionally stored on the InnerOverlay pub const TOPIC: SingleValueColumn = SingleValueColumn::new(b't'); // Overlay <-> Block refcount diff --git a/ng-net/src/actors/client/event.rs b/ng-net/src/actors/client/event.rs index a6ebafc..83c44d7 100644 --- a/ng-net/src/actors/client/event.rs +++ b/ng-net/src/actors/client/event.rs @@ -33,9 +33,15 @@ impl PublishEvent { self.1 = Some(overlay); } - // pub fn overlay(&self) -> &OverlayId { - // self.1.as_ref().unwrap() - // } + pub fn overlay(&self) -> &OverlayId { + self.1.as_ref().unwrap() + } + pub fn event(&self) -> &Event { + &self.0 + } + pub fn take_event(self) -> Event { + self.0 + } } impl TryFrom for PublishEvent { @@ -69,9 +75,16 @@ impl EActor for Actor<'_, PublishEvent, ()> { ) -> Result<(), ProtocolError> { let req = PublishEvent::try_from(msg)?; - //TODO implement all the server side logic + // send a ProtocolError if invalid signatures (will disconnect the client) + req.event().verify()?; - let res: Result<(), ServerError> = Ok(()); + let broker = BROKER.read().await; + let overlay = req.overlay().clone(); + let res = broker.get_server_broker()?.dispatch_event( + &overlay, + req.take_event(), + &fsm.lock().await.user_id_or_err()?, + ); fsm.lock() .await diff --git a/ng-net/src/server_broker.rs b/ng-net/src/server_broker.rs index d7045e6..805fa6f 100644 --- a/ng-net/src/server_broker.rs +++ b/ng-net/src/server_broker.rs @@ -73,4 +73,11 @@ pub trait IServerBroker: Send + Sync { ) -> Result; fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result, ServerError>; + + fn dispatch_event( + &self, + overlay: &OverlayId, + event: Event, + user_id: &UserId, + ) -> Result<(), ServerError>; } diff --git a/ng-repo/src/block_storage.rs b/ng-repo/src/block_storage.rs index 75a5d61..2c7a761 100644 --- a/ng-repo/src/block_storage.rs +++ b/ng-repo/src/block_storage.rs @@ -31,7 +31,7 @@ pub trait BlockStorage: Send + Sync { // } /// Save a block to the storage. - fn put(&self, overlay: &OverlayId, block: &Block) -> Result; + fn put(&self, overlay: &OverlayId, block: &Block, lazy: bool) -> Result; /// Delete a block from the storage. fn del(&self, overlay: &OverlayId, id: &BlockId) -> Result; @@ -99,7 +99,7 @@ impl HashMapBlockStorage { pub async fn from_block_stream(overlay: &OverlayId, mut blockstream: Receiver) -> Self { let this = Self::new(); while let Some(block) = blockstream.next().await { - this.put(overlay, &block).unwrap(); + this.put(overlay, &block, false).unwrap(); } this } @@ -116,10 +116,14 @@ impl HashMapBlockStorage { .map(|x| x.clone()) .collect() } + pub fn put_local(&self, block: &Block) -> Result { + let overlay = OverlayId::nil(); + self.put(&overlay, block, false) + } } impl BlockStorage for HashMapBlockStorage { - fn get(&self, overlay: &OverlayId, id: &BlockId) -> Result { + fn get(&self, _overlay: &OverlayId, id: &BlockId) -> Result { match self.blocks.read().unwrap().get(id) { Some(block) => { let mut b = block.clone(); @@ -138,7 +142,12 @@ impl BlockStorage for HashMapBlockStorage { Ok(self.get_len()) } - fn put(&self, overlay: &OverlayId, block: &Block) -> Result { + fn put( + &self, + _overlay: &OverlayId, + block: &Block, + _lazy: bool, + ) -> Result { let id = block.id(); //log_debug!("PUTTING {}", id); let mut b = block.clone(); diff --git a/ng-repo/src/commit.rs b/ng-repo/src/commit.rs index c67ddec..37a31eb 100644 --- a/ng-repo/src/commit.rs +++ b/ng-repo/src/commit.rs @@ -428,7 +428,7 @@ impl Commit { self.content().branch() } - /// Get commit signature + /// Get commit header pub fn header(&self) -> &Option { match self { Commit::V0(c) => &c.header, diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 223784d..af9c9ef 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -68,6 +68,7 @@ pub enum NgError { SiteNotFoundOnBroker, BrokerConfigErrorStr(&'static str), BrokerConfigError(String), + MalformedEvent, } impl Error for NgError {} @@ -222,6 +223,8 @@ pub enum ServerError { OverlayMismatch, OverlayNotFound, TopicNotFound, + AccessDenied, + InvalidHeader, } impl From for ServerError { diff --git a/ng-repo/src/event.rs b/ng-repo/src/event.rs index 3587cb8..60808a6 100644 --- a/ng-repo/src/event.rs +++ b/ng-repo/src/event.rs @@ -91,6 +91,18 @@ impl Event { } } + pub fn publisher(&self) -> &PeerId { + match self { + Event::V0(v0) => &v0.content.publisher, + } + } + + pub fn verify(&self) -> Result<(), NgError> { + match self { + Event::V0(v0) => v0.verify(), + } + } + /// opens an event with the key derived from information kept in Repo. /// /// returns the Commit object and optional list of additional block IDs. @@ -125,6 +137,23 @@ impl Event { } impl EventV0 { + pub fn verify(&self) -> Result<(), NgError> { + let content_ser = serde_bare::to_vec(&self.content)?; + verify(&content_ser, self.topic_sig, self.content.topic)?; + match self.content.publisher { + PeerId::Forwarded(peer_id) => verify(&content_ser, self.peer_sig, peer_id)?, + PeerId::ForwardedObfuscated(_) => { + panic!("cannot verify an Event with obfuscated publisher") + } + PeerId::Direct(_) => panic!("direct events are not supported"), + } + if self.content.blocks.len() < 2 { + // an event is always containing a commit, which always has at least 2 blocks (one for the commit content, and one for the commit body) + return Err(NgError::MalformedEvent); + } + Ok(()) + } + pub fn derive_key( repo_id: &RepoId, branch_id: &BranchId, @@ -231,7 +260,8 @@ impl EventV0 { branch_id: &BranchId, branch_secret: &ReadCapSecret, ) -> Result { - // TODO: verifier event signature + // verifying event signatures + self.verify()?; let publisher_pubkey = self.content.publisher.get_pub_key(); let key = Self::derive_key(repo_id, branch_id, branch_secret, &publisher_pubkey); diff --git a/ng-repo/src/object.rs b/ng-repo/src/object.rs index 1fb4d34..6697bf3 100644 --- a/ng-repo/src/object.rs +++ b/ng-repo/src/object.rs @@ -427,6 +427,53 @@ impl Object { Self::load(reference.id.clone(), Some(reference.key.clone()), store) } + pub fn load_header( + root_block: &Block, + store: &Store, + ) -> Result { + Self::load_header_(root_block, store)? + .0 + .ok_or(ObjectParseError::InvalidHeader) + } + + fn load_header_( + root: &Block, + store: &Store, + ) -> Result<(Option, Vec), ObjectParseError> { + match root.header_ref() { + Some(header_ref) => match header_ref.obj { + CommitHeaderObject::None | CommitHeaderObject::RandomAccess => { + panic!("shouldn't happen") + } + CommitHeaderObject::Id(id) => { + let obj_res = Object::load(id, Some(header_ref.key.clone()), store); + match obj_res { + Err(e) => return Err(e), + Ok(obj) => match obj.content()? { + ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => { + commit_header.set_id(id); + Ok((Some(commit_header), obj.blocks().cloned().collect())) + } + _ => return Err(ObjectParseError::InvalidHeader), + }, + } + } + CommitHeaderObject::EncryptedContent(content) => { + let (_, decrypted_content) = + Block::new_with_encrypted_content(content, None).read(&header_ref.key)?; + match serde_bare::from_slice(&decrypted_content) { + Ok(ObjectContent::V0(ObjectContentV0::CommitHeader(commit_header))) => { + Ok((Some(commit_header), vec![])) + } + Err(_e) => return Err(ObjectParseError::InvalidHeader), + _ => return Err(ObjectParseError::InvalidHeader), + } + } + }, + None => Ok((None, vec![])), + } + } + /// Load an Object from BlockStorage /// /// Returns Ok(Object) or an Err(ObjectParseError::MissingBlocks(Vec)) of missing BlockIds @@ -485,57 +532,29 @@ impl Object { root.set_key(key); } - let header = match root.header_ref() { - Some(header_ref) => match header_ref.obj { - CommitHeaderObject::None | CommitHeaderObject::RandomAccess => { - panic!("shouldn't happen") - } - CommitHeaderObject::Id(id) => { - let obj_res = Object::load(id, Some(header_ref.key.clone()), store); - match obj_res { - Err(ObjectParseError::MissingBlocks(m)) => { - return Err(ObjectParseError::MissingHeaderBlocks(( - Object { - blocks, - block_contents, - header: None, - header_blocks: vec![], - #[cfg(test)] - already_saved: false, - }, - m, - ))); - } - Err(e) => return Err(e), - Ok(obj) => match obj.content()? { - ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => { - commit_header.set_id(id); - (Some(commit_header), Some(obj.blocks().cloned().collect())) - } - _ => return Err(ObjectParseError::InvalidHeader), - }, - } - } - CommitHeaderObject::EncryptedContent(content) => { - let (_, decrypted_content) = - Block::new_with_encrypted_content(content, None).read(&header_ref.key)?; - match serde_bare::from_slice(&decrypted_content) { - Ok(ObjectContent::V0(ObjectContentV0::CommitHeader(commit_header))) => { - (Some(commit_header), None) - } - Err(_e) => return Err(ObjectParseError::InvalidHeader), - _ => return Err(ObjectParseError::InvalidHeader), - } - } - }, - None => (None, None), + let header = match Self::load_header_(root, store) { + Err(ObjectParseError::MissingBlocks(m)) => { + return Err(ObjectParseError::MissingHeaderBlocks(( + Object { + blocks, + block_contents, + header: None, + header_blocks: vec![], + #[cfg(test)] + already_saved: false, + }, + m, + ))); + } + Err(e) => return Err(e), + Ok(h) => h, }; Ok(Object { blocks, block_contents, header: header.0, - header_blocks: header.1.unwrap_or(vec![]), + header_blocks: header.1, #[cfg(test)] already_saved: true, }) diff --git a/ng-repo/src/store.rs b/ng-repo/src/store.rs index 58b3ce2..80dabca 100644 --- a/ng-repo/src/store.rs +++ b/ng-repo/src/store.rs @@ -15,7 +15,7 @@ use core::fmt; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; -use crate::block_storage::BlockStorage; +use crate::block_storage::{BlockStorage, HashMapBlockStorage}; use crate::errors::{NgError, StorageError}; use crate::object::Object; use crate::repo::{BranchInfo, Repo}; @@ -60,6 +60,29 @@ impl fmt::Debug for Store { } impl Store { + pub fn new_temp_in_mem() -> Self { + Store { + store_repo: StoreRepo::nil(), + store_readcap: ReadCap::nil(), + store_overlay_branch_readcap: ReadCap::nil(), + overlay_id: OverlayId::nil(), + storage: Arc::new(RwLock::new(HashMapBlockStorage::new())), + } + } + + pub fn new_from_overlay_id( + overlay: &OverlayId, + storage: Arc>, + ) -> Store { + Store { + store_repo: StoreRepo::nil(), + store_readcap: ReadCap::nil(), + store_overlay_branch_readcap: ReadCap::nil(), + overlay_id: overlay.clone(), + storage, + } + } + pub fn new_from( update: &StoreUpdate, storage: Arc>, @@ -125,7 +148,7 @@ impl Store { self.storage .write() .map_err(|_| StorageError::BackendError)? - .put(&self.overlay_id, block) + .put(&self.overlay_id, block, true) } /// Delete a block from the storage. diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index cd3b571..50b980e 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -728,6 +728,11 @@ impl StoreRepo { StoreRepo::V0(StoreRepoV0::PublicStore(repo_pubkey)) } + pub fn nil() -> Self { + let store_pubkey = PubKey::nil(); + StoreRepo::V0(StoreRepoV0::PublicStore(store_pubkey)) + } + pub fn new_private(repo_pubkey: PubKey) -> Self { StoreRepo::V0(StoreRepoV0::PrivateStore(repo_pubkey)) } diff --git a/ng-storage-rocksdb/src/block_storage.rs b/ng-storage-rocksdb/src/block_storage.rs index f0e01f0..7688dbb 100644 --- a/ng-storage-rocksdb/src/block_storage.rs +++ b/ng-storage-rocksdb/src/block_storage.rs @@ -118,13 +118,22 @@ impl BlockStorage for RocksDbBlockStorage { } /// Save a block to the storage. - fn put(&self, overlay: &OverlayId, block: &Block) -> Result { - // TODO? return an error if already present in blockstorage? + fn put(&self, overlay: &OverlayId, block: &Block, lazy: bool) -> Result { + // TODO? return an error if already present in blockstorage and !lazy ? let block_id = block.id(); let ser = serde_bare::to_vec(block)?; let tx = self.db.transaction(); - tx.put(Self::compute_key(overlay, &block_id), &ser) - .map_err(|_e| StorageError::BackendError)?; + let key = Self::compute_key(overlay, &block_id); + if (lazy) { + if let Some(block_ser) = tx + .get(key.clone()) + .map_err(|_e| StorageError::BackendError)? + { + let block: Block = serde_bare::from_slice(&block_ser)?; + return Ok(block.id()); + } + } + tx.put(key, &ser).map_err(|_e| StorageError::BackendError)?; tx.commit().map_err(|_| StorageError::BackendError)?; Ok(block_id) }