PublishEvent actor

pull/19/head
Niko PLP 9 months ago
parent 776c88e3ae
commit 7e3d036e96
  1. 126
      ng-broker/src/rocksdb_server_storage.rs
  2. 9
      ng-broker/src/server_broker.rs
  3. 24
      ng-broker/src/server_storage/core/commit.rs
  4. 1
      ng-broker/src/server_storage/core/overlay.rs
  5. 23
      ng-net/src/actors/client/event.rs
  6. 7
      ng-net/src/server_broker.rs
  7. 17
      ng-repo/src/block_storage.rs
  8. 2
      ng-repo/src/commit.rs
  9. 3
      ng-repo/src/errors.rs
  10. 32
      ng-repo/src/event.rs
  11. 109
      ng-repo/src/object.rs
  12. 27
      ng-repo/src/store.rs
  13. 5
      ng-repo/src/types.rs
  14. 17
      ng-storage-rocksdb/src/block_storage.rs

@ -13,7 +13,7 @@ use std::collections::HashMap;
use std::fs::{read, File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use crate::server_broker::*;
use crate::server_storage::admin::account::Account;
@ -22,9 +22,12 @@ use crate::server_storage::admin::wallet::Wallet;
use crate::server_storage::core::*;
use crate::types::*;
use ng_net::types::*;
use ng_repo::block_storage::{BlockStorage, HashMapBlockStorage};
use ng_repo::errors::{ProtocolError, ServerError, StorageError};
use ng_repo::log::*;
use ng_repo::object::Object;
use ng_repo::store::Store;
use ng_repo::types::*;
use ng_storage_rocksdb::block_storage::RocksDbBlockStorage;
use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage;
@ -400,42 +403,46 @@ impl RocksDbServerStorage {
}
}
pub(crate) fn topic_sub(
&self,
overlay: &OverlayId,
repo: &RepoHash,
topic: &TopicId,
user_id: &UserId,
publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError> {
fn check_overlay(&self, overlay: &OverlayId) -> Result<OverlayId, ServerError> {
let mut overlay_storage =
OverlayStorage::open(overlay, &self.core_storage).map_err(|e| match e {
StorageError::NotFound => ServerError::OverlayNotFound,
_ => e.into(),
})?;
let overlay = match overlay_storage.overlay_type() {
Ok(match overlay_storage.overlay_type() {
OverlayType::Outer(_) => {
if overlay.is_outer() {
overlay
*overlay
} else {
return Err(ServerError::OverlayMismatch);
}
}
OverlayType::Inner(outer) => {
if outer.is_outer() {
outer
*outer
} else {
return Err(ServerError::OverlayMismatch);
}
}
OverlayType::InnerOnly => {
if overlay.is_inner() {
overlay
*overlay
} else {
return Err(ServerError::OverlayMismatch);
}
}
};
})
}
pub(crate) fn topic_sub(
&self,
overlay: &OverlayId,
repo: &RepoHash,
topic: &TopicId,
user_id: &UserId,
publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError> {
let overlay = self.check_overlay(overlay)?;
// now we check that the repo was previously pinned.
// if it was opened but not pinned, then this should be deal with in the ServerBroker, in memory, not here)
@ -443,14 +450,14 @@ impl RocksDbServerStorage {
// (we already checked that the advert is valid)
let mut topic_storage =
TopicStorage::create(topic, overlay, repo, &self.core_storage, true)?;
TopicStorage::create(topic, &overlay, repo, &self.core_storage, true)?;
let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &is_publisher)?;
if is_publisher {
let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, publisher.unwrap())?;
}
let mut repo_info = RepoHashStorage::open(repo, overlay, &self.core_storage)?;
let mut repo_info = RepoHashStorage::open(repo, &overlay, &self.core_storage)?;
RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?;
Ok(TopicSubRes::new_from_heads(
@ -468,4 +475,91 @@ impl RocksDbServerStorage {
//TODO: implement correctly !
Ok(vec![Block::dummy()])
}
fn add_block(
&self,
overlay_id: &OverlayId,
overlay_storage: &mut OverlayStorage,
block: Block,
) -> Result<BlockId, StorageError> {
let block_id = self.block_storage.put(overlay_id, &block, true)?;
OverlayStorage::BLOCKS.increment(overlay_storage, &block_id)?;
Ok(block_id)
}
pub(crate) fn save_event(
&self,
overlay: &OverlayId,
mut event: Event,
user_id: &UserId,
) -> Result<(), ServerError> {
if overlay.is_outer() {
// we don't publish events on the outer overlay!
return Err(ServerError::OverlayMismatch);
}
let overlay = self.check_overlay(overlay)?;
let overlay = &overlay;
// check that the sequence number is correct
// check that the topic exists and that this user has pinned it as publisher
let mut topic_storage = TopicStorage::open(event.topic_id(), overlay, &self.core_storage)
.map_err(|e| match e {
StorageError::NotFound => ServerError::TopicNotFound,
_ => e.into(),
})?;
let is_publisher = TopicStorage::USERS
.get(&mut topic_storage, user_id)
.map_err(|e| match e {
StorageError::NotFound => ServerError::AccessDenied,
_ => e.into(),
})?;
if !is_publisher {
return Err(ServerError::AccessDenied);
}
// remove the blocks from inside the event, and save the empty event and each block separately.
match event {
Event::V0(mut v0) => {
let mut overlay_storage = OverlayStorage::new(overlay, &self.core_storage);
let mut extracted_blocks_ids = Vec::with_capacity(v0.content.blocks.len());
let first_block_copy = v0.content.blocks[0].clone();
let temp_mini_block_storage = HashMapBlockStorage::new();
for block in v0.content.blocks {
let _ = temp_mini_block_storage.put(overlay, &block, false)?;
extracted_blocks_ids.push(self.add_block(
overlay,
&mut overlay_storage,
block,
)?);
}
// creating a temporary store to access the blocks
let temp_store = Store::new_from_overlay_id(
overlay,
Arc::new(std::sync::RwLock::new(temp_mini_block_storage)),
);
let commit_id = extracted_blocks_ids[0];
let header = Object::load_header(&first_block_copy, &temp_store)
.map_err(|_| ServerError::InvalidHeader)?;
v0.content.blocks = vec![];
let event_info = EventInfo {
event: Event::V0(v0),
blocks: extracted_blocks_ids,
};
CommitStorage::create(
&commit_id,
overlay,
event_info,
&header,
true,
&self.core_storage,
)?;
}
}
Ok(())
}
}

@ -213,4 +213,13 @@ impl IServerBroker for ServerBroker {
fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError> {
self.storage.get_commit(overlay, id)
}
fn dispatch_event(
&self,
overlay: &OverlayId,
event: Event,
user_id: &UserId,
) -> Result<(), ServerError> {
self.storage.save_event(overlay, event, user_id)
}
}

@ -114,15 +114,33 @@ impl<'a> CommitStorage<'a> {
pub fn create(
id: &ObjectId,
overlay: &OverlayId,
event: &Option<EventInfo>,
event: EventInfo,
header: &CommitHeader,
home_pinned: bool,
storage: &'a dyn KCVStorage,
) -> Result<CommitStorage<'a>, StorageError> {
let mut creating = CommitStorage::new(id, overlay, storage);
if creating.exists() {
return Err(StorageError::AlreadyExists);
}
creating.event.set(event)?;
ExistentialValue::save(&creating, event)?;
let event_opt = Some(event);
creating.event.set(&event_opt)?;
ExistentialValue::save(&creating, &event_opt)?;
if home_pinned {
Self::HOME_PINNED.set(&mut creating, &true)?;
}
// adding all the references
for ack in header.acks() {
Self::ACKS.add(&mut creating, &ack)?;
}
for dep in header.deps() {
Self::DEPS.add(&mut creating, &dep)?;
}
for file in header.files() {
Self::FILES.add(&mut creating, file)?;
}
Ok(creating)
}

@ -51,6 +51,7 @@ impl<'a> OverlayStorage<'a> {
// Overlay properties
pub const TYPE: ExistentialValueColumn = ExistentialValueColumn::new(b'y');
/// BE CAREFUL: this property is exceptionally stored on the InnerOverlay
pub const TOPIC: SingleValueColumn<Self, TopicId> = SingleValueColumn::new(b't');
// Overlay <-> Block refcount

@ -33,9 +33,15 @@ impl PublishEvent {
self.1 = Some(overlay);
}
// pub fn overlay(&self) -> &OverlayId {
// self.1.as_ref().unwrap()
// }
pub fn overlay(&self) -> &OverlayId {
self.1.as_ref().unwrap()
}
pub fn event(&self) -> &Event {
&self.0
}
pub fn take_event(self) -> Event {
self.0
}
}
impl TryFrom<ProtocolMessage> for PublishEvent {
@ -69,9 +75,16 @@ impl EActor for Actor<'_, PublishEvent, ()> {
) -> Result<(), ProtocolError> {
let req = PublishEvent::try_from(msg)?;
//TODO implement all the server side logic
// send a ProtocolError if invalid signatures (will disconnect the client)
req.event().verify()?;
let res: Result<(), ServerError> = Ok(());
let broker = BROKER.read().await;
let overlay = req.overlay().clone();
let res = broker.get_server_broker()?.dispatch_event(
&overlay,
req.take_event(),
&fsm.lock().await.user_id_or_err()?,
);
fsm.lock()
.await

@ -73,4 +73,11 @@ pub trait IServerBroker: Send + Sync {
) -> Result<TopicSubRes, ServerError>;
fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError>;
fn dispatch_event(
&self,
overlay: &OverlayId,
event: Event,
user_id: &UserId,
) -> Result<(), ServerError>;
}

@ -31,7 +31,7 @@ pub trait BlockStorage: Send + Sync {
// }
/// Save a block to the storage.
fn put(&self, overlay: &OverlayId, block: &Block) -> Result<BlockId, StorageError>;
fn put(&self, overlay: &OverlayId, block: &Block, lazy: bool) -> Result<BlockId, StorageError>;
/// Delete a block from the storage.
fn del(&self, overlay: &OverlayId, id: &BlockId) -> Result<usize, StorageError>;
@ -99,7 +99,7 @@ impl HashMapBlockStorage {
pub async fn from_block_stream(overlay: &OverlayId, mut blockstream: Receiver<Block>) -> Self {
let this = Self::new();
while let Some(block) = blockstream.next().await {
this.put(overlay, &block).unwrap();
this.put(overlay, &block, false).unwrap();
}
this
}
@ -116,10 +116,14 @@ impl HashMapBlockStorage {
.map(|x| x.clone())
.collect()
}
pub fn put_local(&self, block: &Block) -> Result<BlockId, StorageError> {
let overlay = OverlayId::nil();
self.put(&overlay, block, false)
}
}
impl BlockStorage for HashMapBlockStorage {
fn get(&self, overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError> {
fn get(&self, _overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError> {
match self.blocks.read().unwrap().get(id) {
Some(block) => {
let mut b = block.clone();
@ -138,7 +142,12 @@ impl BlockStorage for HashMapBlockStorage {
Ok(self.get_len())
}
fn put(&self, overlay: &OverlayId, block: &Block) -> Result<BlockId, StorageError> {
fn put(
&self,
_overlay: &OverlayId,
block: &Block,
_lazy: bool,
) -> Result<BlockId, StorageError> {
let id = block.id();
//log_debug!("PUTTING {}", id);
let mut b = block.clone();

@ -428,7 +428,7 @@ impl Commit {
self.content().branch()
}
/// Get commit signature
/// Get commit header
pub fn header(&self) -> &Option<CommitHeader> {
match self {
Commit::V0(c) => &c.header,

@ -68,6 +68,7 @@ pub enum NgError {
SiteNotFoundOnBroker,
BrokerConfigErrorStr(&'static str),
BrokerConfigError(String),
MalformedEvent,
}
impl Error for NgError {}
@ -222,6 +223,8 @@ pub enum ServerError {
OverlayMismatch,
OverlayNotFound,
TopicNotFound,
AccessDenied,
InvalidHeader,
}
impl From<StorageError> for ServerError {

@ -91,6 +91,18 @@ impl Event {
}
}
pub fn publisher(&self) -> &PeerId {
match self {
Event::V0(v0) => &v0.content.publisher,
}
}
pub fn verify(&self) -> Result<(), NgError> {
match self {
Event::V0(v0) => v0.verify(),
}
}
/// opens an event with the key derived from information kept in Repo.
///
/// returns the Commit object and optional list of additional block IDs.
@ -125,6 +137,23 @@ impl Event {
}
impl EventV0 {
pub fn verify(&self) -> Result<(), NgError> {
let content_ser = serde_bare::to_vec(&self.content)?;
verify(&content_ser, self.topic_sig, self.content.topic)?;
match self.content.publisher {
PeerId::Forwarded(peer_id) => verify(&content_ser, self.peer_sig, peer_id)?,
PeerId::ForwardedObfuscated(_) => {
panic!("cannot verify an Event with obfuscated publisher")
}
PeerId::Direct(_) => panic!("direct events are not supported"),
}
if self.content.blocks.len() < 2 {
// an event is always containing a commit, which always has at least 2 blocks (one for the commit content, and one for the commit body)
return Err(NgError::MalformedEvent);
}
Ok(())
}
pub fn derive_key(
repo_id: &RepoId,
branch_id: &BranchId,
@ -231,7 +260,8 @@ impl EventV0 {
branch_id: &BranchId,
branch_secret: &ReadCapSecret,
) -> Result<Commit, NgError> {
// TODO: verifier event signature
// verifying event signatures
self.verify()?;
let publisher_pubkey = self.content.publisher.get_pub_key();
let key = Self::derive_key(repo_id, branch_id, branch_secret, &publisher_pubkey);

@ -427,6 +427,53 @@ impl Object {
Self::load(reference.id.clone(), Some(reference.key.clone()), store)
}
pub fn load_header(
root_block: &Block,
store: &Store,
) -> Result<CommitHeader, ObjectParseError> {
Self::load_header_(root_block, store)?
.0
.ok_or(ObjectParseError::InvalidHeader)
}
fn load_header_(
root: &Block,
store: &Store,
) -> Result<(Option<CommitHeader>, Vec<Block>), ObjectParseError> {
match root.header_ref() {
Some(header_ref) => match header_ref.obj {
CommitHeaderObject::None | CommitHeaderObject::RandomAccess => {
panic!("shouldn't happen")
}
CommitHeaderObject::Id(id) => {
let obj_res = Object::load(id, Some(header_ref.key.clone()), store);
match obj_res {
Err(e) => return Err(e),
Ok(obj) => match obj.content()? {
ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => {
commit_header.set_id(id);
Ok((Some(commit_header), obj.blocks().cloned().collect()))
}
_ => return Err(ObjectParseError::InvalidHeader),
},
}
}
CommitHeaderObject::EncryptedContent(content) => {
let (_, decrypted_content) =
Block::new_with_encrypted_content(content, None).read(&header_ref.key)?;
match serde_bare::from_slice(&decrypted_content) {
Ok(ObjectContent::V0(ObjectContentV0::CommitHeader(commit_header))) => {
Ok((Some(commit_header), vec![]))
}
Err(_e) => return Err(ObjectParseError::InvalidHeader),
_ => return Err(ObjectParseError::InvalidHeader),
}
}
},
None => Ok((None, vec![])),
}
}
/// Load an Object from BlockStorage
///
/// Returns Ok(Object) or an Err(ObjectParseError::MissingBlocks(Vec<ObjectId>)) of missing BlockIds
@ -485,57 +532,29 @@ impl Object {
root.set_key(key);
}
let header = match root.header_ref() {
Some(header_ref) => match header_ref.obj {
CommitHeaderObject::None | CommitHeaderObject::RandomAccess => {
panic!("shouldn't happen")
}
CommitHeaderObject::Id(id) => {
let obj_res = Object::load(id, Some(header_ref.key.clone()), store);
match obj_res {
Err(ObjectParseError::MissingBlocks(m)) => {
return Err(ObjectParseError::MissingHeaderBlocks((
Object {
blocks,
block_contents,
header: None,
header_blocks: vec![],
#[cfg(test)]
already_saved: false,
},
m,
)));
}
Err(e) => return Err(e),
Ok(obj) => match obj.content()? {
ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => {
commit_header.set_id(id);
(Some(commit_header), Some(obj.blocks().cloned().collect()))
}
_ => return Err(ObjectParseError::InvalidHeader),
},
}
}
CommitHeaderObject::EncryptedContent(content) => {
let (_, decrypted_content) =
Block::new_with_encrypted_content(content, None).read(&header_ref.key)?;
match serde_bare::from_slice(&decrypted_content) {
Ok(ObjectContent::V0(ObjectContentV0::CommitHeader(commit_header))) => {
(Some(commit_header), None)
}
Err(_e) => return Err(ObjectParseError::InvalidHeader),
_ => return Err(ObjectParseError::InvalidHeader),
}
}
},
None => (None, None),
let header = match Self::load_header_(root, store) {
Err(ObjectParseError::MissingBlocks(m)) => {
return Err(ObjectParseError::MissingHeaderBlocks((
Object {
blocks,
block_contents,
header: None,
header_blocks: vec![],
#[cfg(test)]
already_saved: false,
},
m,
)));
}
Err(e) => return Err(e),
Ok(h) => h,
};
Ok(Object {
blocks,
block_contents,
header: header.0,
header_blocks: header.1.unwrap_or(vec![]),
header_blocks: header.1,
#[cfg(test)]
already_saved: true,
})

@ -15,7 +15,7 @@ use core::fmt;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use crate::block_storage::BlockStorage;
use crate::block_storage::{BlockStorage, HashMapBlockStorage};
use crate::errors::{NgError, StorageError};
use crate::object::Object;
use crate::repo::{BranchInfo, Repo};
@ -60,6 +60,29 @@ impl fmt::Debug for Store {
}
impl Store {
pub fn new_temp_in_mem() -> Self {
Store {
store_repo: StoreRepo::nil(),
store_readcap: ReadCap::nil(),
store_overlay_branch_readcap: ReadCap::nil(),
overlay_id: OverlayId::nil(),
storage: Arc::new(RwLock::new(HashMapBlockStorage::new())),
}
}
pub fn new_from_overlay_id(
overlay: &OverlayId,
storage: Arc<RwLock<dyn BlockStorage + Send + Sync>>,
) -> Store {
Store {
store_repo: StoreRepo::nil(),
store_readcap: ReadCap::nil(),
store_overlay_branch_readcap: ReadCap::nil(),
overlay_id: overlay.clone(),
storage,
}
}
pub fn new_from(
update: &StoreUpdate,
storage: Arc<RwLock<dyn BlockStorage + Send + Sync>>,
@ -125,7 +148,7 @@ impl Store {
self.storage
.write()
.map_err(|_| StorageError::BackendError)?
.put(&self.overlay_id, block)
.put(&self.overlay_id, block, true)
}
/// Delete a block from the storage.

@ -728,6 +728,11 @@ impl StoreRepo {
StoreRepo::V0(StoreRepoV0::PublicStore(repo_pubkey))
}
pub fn nil() -> Self {
let store_pubkey = PubKey::nil();
StoreRepo::V0(StoreRepoV0::PublicStore(store_pubkey))
}
pub fn new_private(repo_pubkey: PubKey) -> Self {
StoreRepo::V0(StoreRepoV0::PrivateStore(repo_pubkey))
}

@ -118,13 +118,22 @@ impl BlockStorage for RocksDbBlockStorage {
}
/// Save a block to the storage.
fn put(&self, overlay: &OverlayId, block: &Block) -> Result<BlockId, StorageError> {
// TODO? return an error if already present in blockstorage?
fn put(&self, overlay: &OverlayId, block: &Block, lazy: bool) -> Result<BlockId, StorageError> {
// TODO? return an error if already present in blockstorage and !lazy ?
let block_id = block.id();
let ser = serde_bare::to_vec(block)?;
let tx = self.db.transaction();
tx.put(Self::compute_key(overlay, &block_id), &ser)
.map_err(|_e| StorageError::BackendError)?;
let key = Self::compute_key(overlay, &block_id);
if (lazy) {
if let Some(block_ser) = tx
.get(key.clone())
.map_err(|_e| StorageError::BackendError)?
{
let block: Block = serde_bare::from_slice(&block_ser)?;
return Ok(block.id());
}
}
tx.put(key, &ser).map_err(|_e| StorageError::BackendError)?;
tx.commit().map_err(|_| StorageError::BackendError)?;
Ok(block_id)
}

Loading…
Cancel
Save