diff --git a/Cargo.lock b/Cargo.lock index 3fa4f3a..5aa1415 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1516,6 +1516,9 @@ name = "either" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" +dependencies = [ + "serde", +] [[package]] name = "embed-resource" @@ -3269,6 +3272,7 @@ dependencies = [ "blake3", "chacha20", "default-net", + "either", "futures", "getrandom 0.2.10", "hex", diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index b5ade53..ac4b0ec 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -400,12 +400,10 @@ impl EActor for LocalBroker { fsm: Arc>, ) -> Result<(), ProtocolError> { // search opened_sessions by user_id of fsm - let session = match fsm.lock().await.user_id() { - Some(user) => self - .get_mut_session_for_user(&user) - .ok_or(ProtocolError::ActorError)?, - None => return Err(ProtocolError::ActorError), - }; + let user = fsm.lock().await.user_id()?; + let session = self + .get_mut_session_for_user(&user) + .ok_or(ProtocolError::ActorError)?; session.verifier.respond(msg, fsm).await } } @@ -796,7 +794,7 @@ impl LocalBroker { let lws_ser = LocalWalletStorage::v0_to_vec(&wallets_to_be_saved); let r = write(path.clone(), &lws_ser); if r.is_err() { - log_debug!("write {:?} {}", path, r.unwrap_err()); + log_debug!("write error {:?} {}", path, r.unwrap_err()); return Err(NgError::IoError); } } @@ -942,9 +940,11 @@ pub async fn wallet_create_v0(params: CreateWalletV0) -> Result Result { - log_debug!("{:?}", payload); + //log_debug!("{:?}", payload); payload.encode().ok_or(()) } diff --git a/ng-broker/Cargo.toml b/ng-broker/Cargo.toml index d523116..4e09bc6 100644 --- a/ng-broker/Cargo.toml +++ b/ng-broker/Cargo.toml @@ -35,6 +35,7 @@ blake3 = "1.3.1" once_cell = "1.17.1" rust-embed= { version = "6.7.0", features=["include-exclude"] } serde_json = "1.0.96" +either = { version = "1.8.1", features=["serde"] } [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] version = "0.2.7" diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs index adfdbab..fe93cdf 100644 --- a/ng-broker/src/rocksdb_server_storage.rs +++ b/ng-broker/src/rocksdb_server_storage.rs @@ -9,7 +9,7 @@ * according to those terms. */ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::{read, File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; @@ -38,7 +38,7 @@ pub(crate) struct RocksDbServerStorage { //peers_storage: RocksDbKCVStorage, peers_last_seq_path: PathBuf, peers_last_seq: Mutex>, - block_storage: RocksDbBlockStorage, + block_storage: Arc>, core_storage: RocksDbKCVStorage, } @@ -115,7 +115,10 @@ impl RocksDbServerStorage { blocks_path.push("blocks"); std::fs::create_dir_all(blocks_path.clone()).unwrap(); let blocks_key = wallet.get_or_create_blocks_key()?; - let block_storage = RocksDbBlockStorage::open(&blocks_path, *blocks_key.slice())?; + let block_storage = Arc::new(std::sync::RwLock::new(RocksDbBlockStorage::open( + &blocks_path, + *blocks_key.slice(), + )?)); // create/open the PEERS storage log_debug!("opening core DB"); @@ -487,24 +490,12 @@ impl RocksDbServerStorage { let event_info = commit_storage .event() .as_ref() - .ok_or(ServerError::NotFound)?; - - // // rehydrate the event : - // let mut blocks = Vec::with_capacity(event_info.blocks.len()); - // for block_id in event_info.blocks { - // let block = self.block_storage.get(&overlay, &block_id)?; - // blocks.push(block); - // } - - // match event_info.event { - // Event::V0(mut v0) => { - // v0.content.blocks = blocks; - // } - // } + .left() + .ok_or(ServerError::NotFound)?; // TODO: for now we do not deal with events that have been removed from storage let mut blocks = Vec::with_capacity(event_info.blocks.len()); for block_id in event_info.blocks.iter() { - let block = self.block_storage.get(&overlay, block_id)?; + let block = self.block_storage.read().unwrap().get(&overlay, block_id)?; blocks.push(block); } @@ -517,7 +508,11 @@ impl RocksDbServerStorage { overlay_storage: &mut OverlayStorage, block: Block, ) -> Result { - let block_id = self.block_storage.put(overlay_id, &block, true)?; + let block_id = self + .block_storage + .write() + .unwrap() + .put(overlay_id, &block, true)?; OverlayStorage::BLOCKS.increment(overlay_storage, &block_id)?; Ok(block_id) } @@ -552,7 +547,7 @@ impl RocksDbServerStorage { if !is_publisher { return Err(ServerError::AccessDenied); } - + //log_info!("SAVED EVENT in overlay {:?} : {}", overlay, event); // remove the blocks from inside the event, and save the "dehydrated" event and each block separately. match event { Event::V0(mut v0) => { @@ -575,8 +570,10 @@ impl RocksDbServerStorage { Arc::new(std::sync::RwLock::new(temp_mini_block_storage)), ); let commit_id = extracted_blocks_ids[0]; - let header = Object::load_header(&first_block_copy, &temp_store) - .map_err(|_| ServerError::InvalidHeader)?; + let header = Object::load_header(&first_block_copy, &temp_store).map_err(|e| { + //log_err!("err : {:?}", e); + ServerError::InvalidHeader + })?; v0.content.blocks = vec![]; let event_info = EventInfo { @@ -592,9 +589,75 @@ impl RocksDbServerStorage { true, &self.core_storage, )?; + + let acks = if header.is_some() { + HashSet::from_iter(header.unwrap().acks()) + } else { + HashSet::new() + }; + let head = HashSet::from([commit_id]); + TopicStorage::HEADS.replace_with_new_set_if_old_set_exists( + &mut topic_storage, + acks, + head, + )?; } } Ok(()) } + + pub(crate) fn topic_sync_req( + &self, + overlay: &OverlayId, + topic: &TopicId, + known_heads: &Vec, + target_heads: &Vec, + ) -> Result, ServerError> { + let overlay = self.check_overlay(overlay)?; + // quick solution for now using the Branch::sync_req. TODO: use the saved references (ACKS,DEPS) in the server_storage, to have much quicker responses + + let target_heads = if target_heads.len() == 0 { + // get the current_heads + let mut topic_storage = TopicStorage::new(topic, &overlay, &self.core_storage); + let heads = TopicStorage::get_all_heads(&mut topic_storage)?; + if heads.len() == 0 { + return Err(ServerError::TopicNotFound); + } + Box::new(heads.into_iter()) as Box> + } else { + Box::new(target_heads.iter().cloned()) as Box> + }; + + let store = Store::new_from_overlay_id(&overlay, Arc::clone(&self.block_storage)); + + let commits = Branch::sync_req(target_heads, known_heads, &store) + .map_err(|_| ServerError::MalformedBranch)?; + + let mut result = Vec::with_capacity(commits.len()); + + for commit_id in commits { + let mut commit_storage = CommitStorage::open(&commit_id, &overlay, &self.core_storage)?; + let mut event_info = commit_storage + .take_event() + .left() + .ok_or(ServerError::NotFound)?; // TODO: for now we do not deal with events that have been removed from storage + + // rehydrate the event : + let mut blocks = Vec::with_capacity(event_info.blocks.len()); + for block_id in event_info.blocks { + let block = store.get(&block_id)?; + blocks.push(block); + } + + match event_info.event { + Event::V0(ref mut v0) => { + v0.content.blocks = blocks; + } + } + result.push(TopicSyncRes::V0(TopicSyncResV0::Event(event_info.event))); + } + + Ok(result) + } } diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index 29a24f2..69e2fe3 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -13,6 +13,7 @@ use std::collections::{HashMap, HashSet}; +use either::Either; use ng_net::{server_broker::IServerBroker, types::*}; use ng_repo::{ errors::{NgError, ProtocolError, ServerError}, @@ -51,7 +52,7 @@ pub struct EventInfo { } pub struct CommitInfo { - pub event: Option, + pub event: Either, pub home_pinned: bool, pub acks: HashSet, pub deps: HashSet, @@ -235,4 +236,15 @@ impl IServerBroker for ServerBroker { ) -> Result<(), ServerError> { self.storage.save_event(overlay, event, user_id) } + + fn topic_sync_req( + &self, + overlay: &OverlayId, + topic: &TopicId, + known_heads: &Vec, + target_heads: &Vec, + ) -> Result, ServerError> { + self.storage + .topic_sync_req(overlay, topic, known_heads, target_heads) + } } diff --git a/ng-broker/src/server_storage/core/commit.rs b/ng-broker/src/server_storage/core/commit.rs index 345b6bd..e1fb0ac 100644 --- a/ng-broker/src/server_storage/core/commit.rs +++ b/ng-broker/src/server_storage/core/commit.rs @@ -17,14 +17,17 @@ use ng_repo::errors::StorageError; use ng_repo::kcv_storage::*; use ng_repo::types::*; +use either::Either; use serde_bare::to_vec; use crate::server_broker::CommitInfo; use crate::server_broker::EventInfo; +use super::OverlayStorage; + pub struct CommitStorage<'a> { key: Vec, - event: ExistentialValue>, + event: ExistentialValue>, storage: &'a dyn KCVStorage, } @@ -78,7 +81,7 @@ impl<'a> CommitStorage<'a> { key.append(&mut to_vec(id).unwrap()); CommitStorage { key, - event: ExistentialValue::>::new(), + event: ExistentialValue::>::new(), storage, } } @@ -115,7 +118,7 @@ impl<'a> CommitStorage<'a> { id: &ObjectId, overlay: &OverlayId, event: EventInfo, - header: &CommitHeader, + header: &Option, home_pinned: bool, storage: &'a dyn KCVStorage, ) -> Result, StorageError> { @@ -123,29 +126,37 @@ impl<'a> CommitStorage<'a> { if creating.exists() { return Err(StorageError::AlreadyExists); } - let event_opt = Some(event); - creating.event.set(&event_opt)?; - ExistentialValue::save(&creating, &event_opt)?; + let event_either = Either::Left(event); + creating.event.set(&event_either)?; + ExistentialValue::save(&creating, &event_either)?; if home_pinned { Self::HOME_PINNED.set(&mut creating, &true)?; } - - // adding all the references - for ack in header.acks() { - Self::ACKS.add(&mut creating, &ack)?; - } - for dep in header.deps() { - Self::DEPS.add(&mut creating, &dep)?; - } - for file in header.files() { - Self::FILES.add(&mut creating, file)?; + if let Some(header) = header { + let mut overlay_storage = OverlayStorage::new(overlay, storage); + // adding all the references + for ack in header.acks() { + Self::ACKS.add(&mut creating, &ack)?; + OverlayStorage::OBJECTS.increment(&mut overlay_storage, &ack)?; + } + for dep in header.deps() { + Self::DEPS.add(&mut creating, &dep)?; + OverlayStorage::OBJECTS.increment(&mut overlay_storage, &dep)?; + } + for file in header.files() { + Self::FILES.add(&mut creating, file)?; + OverlayStorage::OBJECTS.increment(&mut overlay_storage, &file)?; + } } Ok(creating) } - pub fn event(&mut self) -> &Option { + pub fn event(&mut self) -> &Either { self.event.get().unwrap() } + pub fn take_event(mut self) -> Either { + self.event.take().unwrap() + } } diff --git a/ng-net/src/actors/client/commit_get.rs b/ng-net/src/actors/client/commit_get.rs index 406af02..3d3874b 100644 --- a/ng-net/src/actors/client/commit_get.rs +++ b/ng-net/src/actors/client/commit_get.rs @@ -87,12 +87,11 @@ impl EActor for Actor<'_, CommitGet, Block> { fsm: Arc>, ) -> Result<(), ProtocolError> { let req = CommitGet::try_from(msg)?; - log_info!("GOT CommitGet {:?}", req); let broker = BROKER.read().await; let blocks_res = broker .get_server_broker()? .get_commit(req.overlay(), req.id()); - // IF NEEDED, the get_commit could be changed to be async, and then the send_in_reply_to would be also totally async + // IF NEEDED, the get_commit could be changed to return a stream, and then the send_in_reply_to would be also totally async match blocks_res { Ok(blocks) => { if blocks.len() == 0 { diff --git a/ng-net/src/actors/client/event.rs b/ng-net/src/actors/client/event.rs index 83c44d7..122a134 100644 --- a/ng-net/src/actors/client/event.rs +++ b/ng-net/src/actors/client/event.rs @@ -83,7 +83,7 @@ impl EActor for Actor<'_, PublishEvent, ()> { let res = broker.get_server_broker()?.dispatch_event( &overlay, req.take_event(), - &fsm.lock().await.user_id_or_err()?, + &fsm.lock().await.user_id()?, ); fsm.lock() diff --git a/ng-net/src/actors/client/mod.rs b/ng-net/src/actors/client/mod.rs index a2a6672..53ed457 100644 --- a/ng-net/src/actors/client/mod.rs +++ b/ng-net/src/actors/client/mod.rs @@ -7,3 +7,5 @@ pub mod topic_sub; pub mod event; pub mod commit_get; + +pub mod topic_sync_req; diff --git a/ng-net/src/actors/client/pin_repo.rs b/ng-net/src/actors/client/pin_repo.rs index 3d5ee69..8c897ac 100644 --- a/ng-net/src/actors/client/pin_repo.rs +++ b/ng-net/src/actors/client/pin_repo.rs @@ -127,7 +127,7 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> { broker.get_server_broker()?.pin_repo_read( req.overlay(), req.hash(), - &fsm.lock().await.user_id_or_err()?, + &fsm.lock().await.user_id()?, req.ro_topics(), ) } @@ -145,7 +145,7 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> { broker.get_server_broker()?.pin_repo_write( req.overlay_access(), req.hash(), - &fsm.lock().await.user_id_or_err()?, + &fsm.lock().await.user_id()?, req.ro_topics(), req.rw_topics(), req.overlay_root_topic(), @@ -160,7 +160,7 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> { broker.get_server_broker()?.pin_repo_write( req.overlay_access(), req.hash(), - &fsm.lock().await.user_id_or_err()?, + &fsm.lock().await.user_id()?, req.ro_topics(), req.rw_topics(), req.overlay_root_topic(), diff --git a/ng-net/src/actors/client/repo_pin_status.rs b/ng-net/src/actors/client/repo_pin_status.rs index 9ea8639..ffc0a96 100644 --- a/ng-net/src/actors/client/repo_pin_status.rs +++ b/ng-net/src/actors/client/repo_pin_status.rs @@ -81,7 +81,7 @@ impl EActor for Actor<'_, RepoPinStatusReq, RepoPinStatus> { let res = broker.get_server_broker()?.get_repo_pin_status( req.overlay(), req.hash(), - &fsm.lock().await.user_id_or_err()?, + &fsm.lock().await.user_id()?, ); fsm.lock() .await diff --git a/ng-net/src/actors/client/topic_sub.rs b/ng-net/src/actors/client/topic_sub.rs index 7a2e33c..acf591f 100644 --- a/ng-net/src/actors/client/topic_sub.rs +++ b/ng-net/src/actors/client/topic_sub.rs @@ -110,7 +110,7 @@ impl EActor for Actor<'_, TopicSub, TopicSubRes> { req.overlay(), req.hash(), req.topic(), - &fsm.lock().await.user_id_or_err()?, + &fsm.lock().await.user_id()?, req.publisher(), ); diff --git a/ng-net/src/actors/client/topic_sync_req.rs b/ng-net/src/actors/client/topic_sync_req.rs new file mode 100644 index 0000000..4aeaabc --- /dev/null +++ b/ng-net/src/actors/client/topic_sync_req.rs @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ +use crate::broker::{ServerConfig, BROKER}; +use crate::connection::NoiseFSM; +use crate::types::*; +use crate::{actor::*, types::ProtocolMessage}; +use async_std::sync::Mutex; +use ng_repo::errors::*; +use ng_repo::log::*; +use ng_repo::repo::{BranchInfo, Repo}; +use ng_repo::types::*; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +impl TopicSyncReq { + pub fn get_actor(&self, id: i64) -> Box { + Actor::::new_responder(id) + } + + pub fn new_empty(topic: TopicId, overlay: &OverlayId) -> Self { + TopicSyncReq::V0(TopicSyncReqV0 { + topic, + known_heads: vec![], + target_heads: vec![], + overlay: Some(*overlay), + }) + } + + pub fn new( + repo: &Repo, + topic_id: &TopicId, + known_heads: Vec, + target_heads: Vec, + ) -> TopicSyncReq { + TopicSyncReq::V0(TopicSyncReqV0 { + topic: *topic_id, + known_heads, + target_heads, + overlay: Some(repo.store.get_store_repo().overlay_id_for_read_purpose()), + }) + } +} + +impl TryFrom for TopicSyncReq { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + let req: ClientRequestContentV0 = msg.try_into()?; + if let ClientRequestContentV0::TopicSyncReq(a) = req { + Ok(a) + } else { + log_debug!("INVALID {:?}", req); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(msg: TopicSyncReq) -> ProtocolMessage { + let overlay = *msg.overlay(); + ProtocolMessage::from_client_request_v0(ClientRequestContentV0::TopicSyncReq(msg), overlay) + } +} + +impl TryFrom for TopicSyncRes { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + let res: ClientResponseContentV0 = msg.try_into()?; + if let ClientResponseContentV0::TopicSyncRes(a) = res { + Ok(a) + } else { + log_debug!("INVALID {:?}", res); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(b: TopicSyncRes) -> ProtocolMessage { + let mut cr: ClientResponse = ClientResponseContentV0::TopicSyncRes(b).into(); + cr.set_result(ServerError::PartialContent.into()); + cr.into() + } +} + +impl Actor<'_, TopicSyncReq, TopicSyncRes> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, TopicSyncReq, TopicSyncRes> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = TopicSyncReq::try_from(msg)?; + + let broker = BROKER.read().await; + + let res = broker.get_server_broker()?.topic_sync_req( + req.overlay(), + req.topic(), + req.known_heads(), + req.target_heads(), + ); + + // IF NEEDED, the topic_sync_req could be changed to return a stream, and then the send_in_reply_to would be also totally async + match res { + Ok(blocks) => { + if blocks.len() == 0 { + let re: Result<(), ServerError> = Err(ServerError::EmptyStream); + fsm.lock() + .await + .send_in_reply_to(re.into(), self.id()) + .await?; + return Ok(()); + } + let mut lock = fsm.lock().await; + + for block in blocks { + lock.send_in_reply_to(block.into(), self.id()).await?; + } + let re: Result<(), ServerError> = Err(ServerError::EndOfStream); + lock.send_in_reply_to(re.into(), self.id()).await?; + } + Err(e) => { + let re: Result<(), ServerError> = Err(e); + fsm.lock() + .await + .send_in_reply_to(re.into(), self.id()) + .await?; + } + } + Ok(()) + } +} diff --git a/ng-net/src/broker.rs b/ng-net/src/broker.rs index 63052d9..d4c49a6 100644 --- a/ng-net/src/broker.rs +++ b/ng-net/src/broker.rs @@ -657,6 +657,7 @@ impl<'a> Broker<'a> { remote_peer_id: X25519PrivKey, // if client is None it means we are Core mode client: Option, + fsm: &mut NoiseFSM, ) -> Result<(), ProtocolError> { log_debug!("ATTACH PEER_ID {:?}", remote_peer_id); @@ -691,10 +692,10 @@ impl<'a> Broker<'a> { if !listener.config.accepts_client() { return Err(ProtocolError::AccessDenied); } - let client = client.unwrap(); + let client = client.as_ref().unwrap(); self.authorize( &(local_bind_address, remote_bind_address), - Authorization::Client((client.user, client.registration)), + Authorization::Client((client.user.clone(), client.registration.clone())), )?; // TODO add client to storage @@ -708,6 +709,7 @@ impl<'a> Broker<'a> { connection.reset_shutdown(remote_peer_id).await; let connected = if !is_core { + fsm.set_user_id(client.unwrap().user); PeerConnection::Client(connection) } else { let dc = DirectConnection { diff --git a/ng-net/src/connection.rs b/ng-net/src/connection.rs index fa22efe..280818a 100644 --- a/ng-net/src/connection.rs +++ b/ng-net/src/connection.rs @@ -136,6 +136,8 @@ pub struct NoiseFSM { nonce_for_hello: Vec, config: Option, + + user: Option, } impl fmt::Debug for NoiseFSM { @@ -251,20 +253,20 @@ impl NoiseFSM { remote, nonce_for_hello: vec![], config: None, + user: None, } } - pub fn user_id(&self) -> Option { + pub fn user_id(&self) -> Result { match &self.config { - Some(start_config) => start_config.get_user(), - _ => None, + Some(start_config) => start_config.get_user().ok_or(ProtocolError::ActorError), + _ => self.user.ok_or(ProtocolError::ActorError), } } - pub fn user_id_or_err(&self) -> Result { - match &self.config { - Some(start_config) => start_config.get_user().ok_or(ProtocolError::ActorError), - _ => Err(ProtocolError::ActorError), + pub(crate) fn set_user_id(&mut self, user: UserId) { + if self.user.is_none() { + self.user = Some(user); } } @@ -307,7 +309,7 @@ impl NoiseFSM { if in_reply_to != 0 { msg.set_id(in_reply_to); } - log_info!("SENDING: {:?}", msg); + log_debug!("SENDING: {:?}", msg); if self.noise_cipher_state_enc.is_some() { let cipher = self.encrypt(msg)?; self.sender @@ -785,6 +787,7 @@ impl NoiseFSM { local_bind_address, *self.remote.unwrap().slice(), Some(client_auth.content_v0()), + self, ) .await .err() @@ -857,7 +860,7 @@ impl NoiseFSM { #[derive(Debug)] pub struct ConnectionBase { - fsm: Option>>, + pub(crate) fsm: Option>>, sender: Option>, receiver: Option>, diff --git a/ng-net/src/server_broker.rs b/ng-net/src/server_broker.rs index 805fa6f..7b86c65 100644 --- a/ng-net/src/server_broker.rs +++ b/ng-net/src/server_broker.rs @@ -80,4 +80,12 @@ pub trait IServerBroker: Send + Sync { event: Event, user_id: &UserId, ) -> Result<(), ServerError>; + + fn topic_sync_req( + &self, + overlay: &OverlayId, + topic: &TopicId, + known_heads: &Vec, + target_heads: &Vec, + ) -> Result, ServerError>; } diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index 14c0f70..6420e4b 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -1709,6 +1709,9 @@ pub struct TopicSyncReqV0 { /// Stop synchronizing when these commits are met. /// if empty, the local HEAD at the responder is used instead pub target_heads: Vec, + + #[serde(skip)] + pub overlay: Option, } /// Topic synchronization request @@ -1718,6 +1721,16 @@ pub enum TopicSyncReq { } impl TopicSyncReq { + pub fn overlay(&self) -> &OverlayId { + match self { + Self::V0(v0) => v0.overlay.as_ref().unwrap(), + } + } + pub fn set_overlay(&mut self, overlay: OverlayId) { + match self { + Self::V0(v0) => v0.overlay = Some(overlay), + } + } pub fn topic(&self) -> &TopicId { match self { TopicSyncReq::V0(o) => &o.topic, @@ -1728,6 +1741,11 @@ impl TopicSyncReq { TopicSyncReq::V0(o) => &o.known_heads, } } + pub fn target_heads(&self) -> &Vec { + match self { + TopicSyncReq::V0(o) => &o.target_heads, + } + } } /// Status of a Forwarded Peer, sent in the Advert @@ -2248,6 +2266,26 @@ pub enum TopicSyncRes { V0(TopicSyncResV0), } +impl TopicSyncRes { + pub fn event(&self) -> &Event { + match self { + Self::V0(TopicSyncResV0::Event(e)) => e, + _ => panic!("this TopicSyncResV0 is not an event"), + } + } +} + +impl fmt::Display for TopicSyncRes { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => match v0 { + TopicSyncResV0::Event(e) => writeln!(f, "====== Event ====== {e}"), + TopicSyncResV0::Block(b) => writeln!(f, "====== Block ID ====== {}", b.id()), + }, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum CoreBrokerDisconnect { V0(CoreBrokerDisconnectV0), @@ -3183,6 +3221,7 @@ impl ClientRequestContentV0 { ClientRequestContentV0::PinRepo(a) => {} ClientRequestContentV0::PublishEvent(a) => a.set_overlay(overlay), ClientRequestContentV0::CommitGet(a) => a.set_overlay(overlay), + ClientRequestContentV0::TopicSyncReq(a) => a.set_overlay(overlay), _ => unimplemented!(), } } @@ -3230,6 +3269,7 @@ impl ClientRequest { ClientRequestContentV0::TopicSub(r) => r.get_actor(self.id()), ClientRequestContentV0::PublishEvent(r) => r.get_actor(self.id()), ClientRequestContentV0::CommitGet(r) => r.get_actor(self.id()), + ClientRequestContentV0::TopicSyncReq(r) => r.get_actor(self.id()), _ => unimplemented!(), }, } diff --git a/ng-net/src/utils.rs b/ng-net/src/utils.rs index acdab30..db58356 100644 --- a/ng-net/src/utils.rs +++ b/ng-net/src/utils.rs @@ -15,13 +15,13 @@ use crate::NG_BOOTSTRAP_LOCAL_PATH; use async_std::task; use ed25519_dalek::*; use futures::{channel::mpsc, Future}; -use noise_protocol::U8Array; -use noise_protocol::DH; -use noise_rust_crypto::sensitive::Sensitive; #[cfg(target_arch = "wasm32")] use ng_repo::errors::*; use ng_repo::types::PubKey; use ng_repo::{log::*, types::PrivKey}; +use noise_protocol::U8Array; +use noise_protocol::DH; +use noise_rust_crypto::sensitive::Sensitive; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use url::Host; use url::Url; @@ -170,7 +170,7 @@ pub async fn retrieve_local_bootstrap( None }; log_debug!("{}", location_string); - log_debug!("invite_String {:?} invite1{:?}", invite_string, invite1); + log_debug!("invite_string {:?} invite1{:?}", invite_string, invite1); let invite2: Option = { let info = retrieve_ng_bootstrap(&location_string).await; diff --git a/ng-repo/src/branch.rs b/ng-repo/src/branch.rs index a163aa5..aee342d 100644 --- a/ng-repo/src/branch.rs +++ b/ng-repo/src/branch.rs @@ -9,13 +9,16 @@ //! Branch of a Repository +use std::collections::HashMap; use std::collections::HashSet; +use std::fmt; use zeroize::{Zeroize, ZeroizeOnDrop}; // use fastbloom_rs::{BloomFilter as Filter, Membership}; use crate::block_storage::*; use crate::errors::*; +use crate::log::*; use crate::object::*; use crate::store::Store; use crate::types::*; @@ -45,6 +48,47 @@ impl BranchV0 { } } +#[derive(Debug)] +struct DagNode { + pub future: HashSet, +} + +//struct Dag<'a>(&'a HashMap); + +impl fmt::Display for DagNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for fu in self.future.iter() { + write!(f, "{}", fu)?; + } + Ok(()) + } +} + +// impl<'a> fmt::Display for Dag<'a> { +// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +// for node in self.0.iter() { +// writeln!(f, "ID: {} FUTURES: {}", node.0, node.1)?; +// } +// Ok(()) +// } +// } + +impl DagNode { + fn new() -> Self { + Self { + future: HashSet::new(), + } + } + fn collapse(id: &ObjectId, dag: &HashMap) -> Vec { + let mut res = vec![*id]; + let this = dag.get(id).unwrap(); + for child in this.future.iter() { + res.append(&mut Self::collapse(child, dag)); + } + res + } +} + impl Branch { /// topic private key (a BranchWriteCapSecret), encrypted with a key derived as follow /// BLAKE3 derive_key ("NextGraph Branch WriteCap Secret BLAKE3 key", @@ -115,9 +159,9 @@ impl Branch { /// `known_heads` represents the list of current heads at the requester replica at the moment of request. /// an empty list means the requester has an empty branch locally /// - /// Return ObjectIds to send + /// Return ObjectIds to send, ordered in respect of causal partial order pub fn sync_req( - target_heads: &[ObjectId], + target_heads: impl Iterator, known_heads: &[ObjectId], //their_filter: &BloomFilter, store: &Store, @@ -132,25 +176,37 @@ impl Branch { fn load_causal_past( cobj: &Object, store: &Store, - theirs: &HashSet, - visited: &mut HashSet, + theirs: &HashMap, + visited: &mut HashMap, missing: &mut Option<&mut HashSet>, + future: Option, ) -> Result<(), ObjectParseError> { let id = cobj.id(); // check if this commit object is present in theirs or has already been visited in the current walk // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads - if !theirs.contains(&id) && !visited.contains(&id) { - visited.insert(id); - for id in cobj.acks_and_nacks() { - match Object::load(id, None, store) { - Ok(o) => { - load_causal_past(&o, store, theirs, visited, missing)?; - } - Err(ObjectParseError::MissingBlocks(blocks)) => { - missing.as_mut().map(|m| m.extend(blocks)); + if !theirs.contains_key(&id) { + if let Some(past) = visited.get_mut(&id) { + // we update the future + if let Some(f) = future { + past.future.insert(f); + } + } else { + let mut insert = DagNode::new(); + if let Some(f) = future { + insert.future.insert(f); + } + visited.insert(id, insert); + for past_id in cobj.acks_and_nacks() { + match Object::load(past_id, None, store) { + Ok(o) => { + load_causal_past(&o, store, theirs, visited, missing, Some(id))?; + } + Err(ObjectParseError::MissingBlocks(blocks)) => { + missing.as_mut().map(|m| m.extend(blocks)); + } + Err(e) => return Err(e), } - Err(e) => return Err(e), } } } @@ -158,22 +214,22 @@ impl Branch { } // their commits - let mut theirs = HashSet::new(); + let mut theirs: HashMap = HashMap::new(); // collect causal past of known_heads for id in known_heads { if let Ok(cobj) = Object::load(*id, None, store) { - load_causal_past(&cobj, store, &HashSet::new(), &mut theirs, &mut None)?; + load_causal_past(&cobj, store, &HashMap::new(), &mut theirs, &mut None, None)?; } // we silently discard any load error on the known_heads as the responder might not know them (yet). } - let mut visited = HashSet::new(); + let mut visited = HashMap::new(); // collect all commits reachable from target_heads // up to the root or until encountering a commit from theirs for id in target_heads { - if let Ok(cobj) = Object::load(*id, None, store) { - load_causal_past(&cobj, store, &theirs, &mut visited, &mut None)?; + if let Ok(cobj) = Object::load(id, None, store) { + load_causal_past(&cobj, store, &theirs, &mut visited, &mut None, None)?; } // we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally. } @@ -193,7 +249,23 @@ impl Branch { // } // } //log_debug!("!! result filtered: {:?}", result); - Ok(Vec::from_iter(visited)) + + // now ordering to respect causal partial order. + let mut next_generations = HashSet::new(); + for (_, node) in visited.iter() { + for future in node.future.iter() { + next_generations.insert(future); + } + } + let all = HashSet::from_iter(visited.keys()); + let first_generation = all.difference(&next_generations); + + let mut result = Vec::with_capacity(visited.len()); + for first in first_generation { + result.append(&mut DagNode::collapse(first, &visited)); + } + + Ok(result) } } @@ -219,9 +291,6 @@ mod test { ) -> ObjectRef { let max_object_size = 4000; let mut obj = Object::new(ObjectContent::V0(content), header, max_object_size, store); - log_debug!(">>> add_obj"); - log_debug!(" id: {:?}", obj.id()); - log_debug!(" header: {:?}", obj.header()); obj.save_in_test(store).unwrap(); obj.reference().unwrap() } @@ -471,7 +540,7 @@ mod test { log_debug!(" their_commits: [br, t1, t2, a3, t5, a6]"); let ids = Branch::sync_req( - &[t5.id, a6.id, a7.id], + [t5.id, a6.id, a7.id].into_iter(), &[t5.id], //&their_commits, &repo.store, diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index af9c9ef..6fb9af0 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -63,6 +63,7 @@ pub enum NgError { ProtocolError(ProtocolError), ServerError(ServerError), InvalidResponse, + BootstrapError(String), NotAServerError, VerifierError(VerifierError), SiteNotFoundOnBroker, @@ -87,6 +88,9 @@ impl fmt::Display for NgError { Self::CommitLoadError(commit_load_error) => { write!(f, "CommitLoadError: {:?}", commit_load_error) } + Self::BootstrapError(error) => { + write!(f, "BootstrapError: {:?}", error) + } Self::ObjectParseError(error) => write!(f, "ObjectParseError: {:?}", error), Self::StorageError(storage_error) => write!(f, "StorageError: {:?}", storage_error), Self::BrokerConfigErrorStr(s) => write!(f, "BrokerConfigError: {s}"), @@ -190,6 +194,8 @@ pub enum StorageError { PropertyNotFound, NotAStoreRepo, OverlayBranchNotFound, + Abort, + NotEmpty, } impl core::fmt::Display for StorageError { @@ -225,6 +231,7 @@ pub enum ServerError { TopicNotFound, AccessDenied, InvalidHeader, + MalformedBranch, } impl From for ServerError { diff --git a/ng-repo/src/event.rs b/ng-repo/src/event.rs index 60808a6..1167e4e 100644 --- a/ng-repo/src/event.rs +++ b/ng-repo/src/event.rs @@ -113,6 +113,12 @@ impl Event { } } + pub fn commit_id(&self) -> ObjectId { + match self { + Self::V0(v0) => v0.content.blocks[0].id(), + } + } + pub fn open( &self, store: &Store, diff --git a/ng-repo/src/kcv_storage.rs b/ng-repo/src/kcv_storage.rs index b6432db..2a19f5f 100644 --- a/ng-repo/src/kcv_storage.rs +++ b/ng-repo/src/kcv_storage.rs @@ -156,8 +156,15 @@ pub trait IModel { let prefix = self.prefix(); let key = self.key(); let suffix = self.class().existential_column.unwrap().suffix(); + // log_info!( + // "EXISTENTIAL CHECK {} {} {:?}", + // prefix as char, + // suffix as char, + // key + // ); match self.storage().get(prefix, key, Some(suffix), &None) { Ok(res) => { + //log_info!("EXISTENTIAL CHECK GOT {:?}", res); self.existential().as_mut().unwrap().process_exists(res); true } @@ -194,7 +201,7 @@ pub trait IModel { use std::hash::Hash; pub struct MultiValueColumn< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, > { prefix: u8, phantom: PhantomData, @@ -204,7 +211,7 @@ pub struct MultiValueColumn< impl< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, > MultiValueColumn { pub const fn new(prefix: u8) -> Self { @@ -259,6 +266,58 @@ impl< .has_property_value(self.prefix, &key, None, &vec![], &None) } + pub fn replace_with_new_set_if_old_set_exists( + &self, + model: &mut Model, + mut existing_set: HashSet, + replace_with: HashSet, + ) -> Result<(), StorageError> { + // if existing_set.len() == 0 { + // return Err(StorageError::InvalidValue); + // } + model.check_exists()?; + + let key_prefix = model.key(); + let key_prefix_len = key_prefix.len(); + let total_size = key_prefix_len + self.value_size()?; + + let empty_existing = existing_set.len() == 0; + + //log_debug!("REPLACE HEAD {:?} with {:?}", existing_set, replace_with); + + model.storage().write_transaction(&mut |tx| { + for found in tx.get_all_keys_and_values( + self.prefix, + total_size, + key_prefix.to_vec(), + None, + &None, + )? { + if found.0.len() == total_size + 1 { + let val: Column = from_slice(&found.0[1 + key_prefix_len..total_size + 1])?; + if (empty_existing) { + return Err(StorageError::NotEmpty); + } + if existing_set.remove(&val) { + tx.del(self.prefix, &found.0[1..].to_vec(), None, &None)?; + } + } + } + if existing_set.len() == 0 { + for add in replace_with.iter() { + let mut new = Vec::with_capacity(total_size); + new.extend(key_prefix); + let mut val = to_vec(add)?; + new.append(&mut val); + //log_debug!("PUTTING HEAD {} {:?}", self.prefix as char, new); + tx.put(self.prefix, &new, None, &vec![], &None)?; + } + return Ok(()); + } + Err(StorageError::Abort) + }) + } + pub fn get_all(&self, model: &mut Model) -> Result, StorageError> { model.check_exists()?; let key_prefix = model.key(); @@ -283,7 +342,7 @@ impl< impl< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, > IMultiValueColumn for MultiValueColumn { fn value_size(&self) -> Result { @@ -296,7 +355,7 @@ impl< pub struct MultiMapColumn< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq, > { prefix: u8, @@ -308,7 +367,7 @@ pub struct MultiMapColumn< impl< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq, > MultiMapColumn { @@ -448,7 +507,7 @@ impl< } impl< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq, > IMultiValueColumn for MultiMapColumn { @@ -462,7 +521,7 @@ impl< pub struct MultiCounterColumn< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, > { prefix: u8, phantom_column: PhantomData, @@ -471,7 +530,7 @@ pub struct MultiCounterColumn< impl< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, > MultiCounterColumn { pub const fn new(prefix: u8) -> Self { @@ -546,7 +605,7 @@ impl< } impl< Model: IModel, - Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, + Column: std::fmt::Debug + Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, > IMultiValueColumn for MultiCounterColumn { fn value_size(&self) -> Result { @@ -734,6 +793,11 @@ impl Deserialize<'d>> ExistentialValue Result { + self.get()?; + Ok(self.value.take().unwrap()) + } } pub trait WriteTransaction: ReadTransaction { diff --git a/ng-repo/src/object.rs b/ng-repo/src/object.rs index 6697bf3..a88cc5c 100644 --- a/ng-repo/src/object.rs +++ b/ng-repo/src/object.rs @@ -430,10 +430,8 @@ impl Object { pub fn load_header( root_block: &Block, store: &Store, - ) -> Result { - Self::load_header_(root_block, store)? - .0 - .ok_or(ObjectParseError::InvalidHeader) + ) -> Result, ObjectParseError> { + Ok(Self::load_header_(root_block, store)?.0) } fn load_header_( @@ -454,7 +452,9 @@ impl Object { commit_header.set_id(id); Ok((Some(commit_header), obj.blocks().cloned().collect())) } - _ => return Err(ObjectParseError::InvalidHeader), + _ => { + return Err(ObjectParseError::InvalidHeader); + } }, } } @@ -465,8 +465,12 @@ impl Object { Ok(ObjectContent::V0(ObjectContentV0::CommitHeader(commit_header))) => { Ok((Some(commit_header), vec![])) } - Err(_e) => return Err(ObjectParseError::InvalidHeader), - _ => return Err(ObjectParseError::InvalidHeader), + Err(e) => { + return Err(ObjectParseError::InvalidHeader); + } + _ => { + return Err(ObjectParseError::InvalidHeader); + } } } }, diff --git a/ng-repo/src/repo.rs b/ng-repo/src/repo.rs index f0c2144..fe5831c 100644 --- a/ng-repo/src/repo.rs +++ b/ng-repo/src/repo.rs @@ -163,6 +163,13 @@ impl Repo { Self::new_with_member(&pub_key, &pub_key, perms, OverlayId::dummy(), store) } + pub fn update_branch_current_head(&mut self, branch: &BranchId, commit_ref: ObjectRef) { + //log_info!("from branch {} HEAD UPDATED TO {}", branch, commit_ref.id); + if let Some(branch) = self.branches.get_mut(branch) { + branch.current_heads = vec![commit_ref]; + } + } + pub fn new_with_member( id: &PubKey, member: &UserId, diff --git a/ng-repo/src/store.rs b/ng-repo/src/store.rs index 80dabca..7107add 100644 --- a/ng-repo/src/store.rs +++ b/ng-repo/src/store.rs @@ -212,7 +212,7 @@ impl Store { )?; let branch_read_cap = branch_commit.reference().unwrap(); - log_debug!("{:?} BRANCH COMMIT {}", branch_type, branch_commit); + //log_debug!("{:?} BRANCH COMMIT {}", branch_type, branch_commit); // creating the AddBranch commit (on root_branch), deps to the RootBranch commit // author is the owner @@ -236,11 +236,11 @@ impl Store { self, )?; - log_debug!( - "ADD_BRANCH {:?} BRANCH COMMIT {}", - &branch_type, - add_branch_commit - ); + // log_debug!( + // "ADD_BRANCH {:?} BRANCH COMMIT {}", + // &branch_type, + // add_branch_commit + // ); let branch_info = BranchInfo { id: branch_pub_key, @@ -305,7 +305,7 @@ impl Store { &self, )?; - log_debug!("REPOSITORY COMMIT {}", repository_commit); + //log_debug!("REPOSITORY COMMIT {}", repository_commit); let repository_commit_ref = repository_commit.reference().unwrap(); @@ -348,7 +348,7 @@ impl Store { &self, )?; - log_debug!("ROOT_BRANCH COMMIT {}", root_branch_commit); + //log_debug!("ROOT_BRANCH COMMIT {}", root_branch_commit); let root_branch_readcap = root_branch_commit.reference().unwrap(); let root_branch_readcap_id = root_branch_readcap.id; // adding the 2 events for the Repository and Rootbranch commits @@ -641,6 +641,14 @@ impl Store { .overlay_id_for_write_purpose(&self.store_overlay_branch_readcap.key) } + pub fn overlay_for_read_on_client_protocol(&self) -> OverlayId { + match self.store_repo { + _ => self.inner_overlay(), + //StoreRepo::V0(StoreRepoV0::PrivateStore(_)) => self.inner_overlay(), + //_ => self.overlay_id, + } + } + pub fn outer_overlay(&self) -> OverlayId { self.store_repo.outer_overlay() } diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index 50b980e..83b0090 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -1158,6 +1158,11 @@ pub enum RootBranch { } impl RootBranch { + pub fn topic(&self) -> &TopicId { + match self { + Self::V0(v0) => &v0.topic, + } + } pub fn owners(&self) -> &Vec { match self { Self::V0(v0) => &v0.owners, diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index f6e16bf..14133aa 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -403,11 +403,11 @@ pub fn client_info() -> JsValue { #[cfg(target_arch = "wasm32")] #[wasm_bindgen] pub fn encode_create_account(payload: JsValue) -> JsValue { - log_debug!("{:?}", payload); + //log_debug!("{:?}", payload); let create_account = serde_wasm_bindgen::from_value::(payload).unwrap(); - log_debug!("create_account {:?}", create_account); + //log_debug!("create_account {:?}", create_account); let res = create_account.encode(); - log_debug!("res {:?}", res); + //log_debug!("res {:?}", res); serde_wasm_bindgen::to_value(&res).unwrap() } diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index 0b327b8..b03ddeb 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -22,6 +22,8 @@ pub trait CommitVerifier { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError>; } @@ -56,6 +58,8 @@ impl CommitVerifier for RootBranch { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { match self { @@ -126,6 +130,8 @@ impl CommitVerifier for Branch { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { match self { @@ -179,6 +185,8 @@ impl CommitVerifier for SyncSignature { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { match self { @@ -202,7 +210,7 @@ impl CommitVerifier for SyncSignature { } let commits = list_dep_chain_until(deps[0].clone(), &ack.id, &store)?; for commit in commits { - verifier.verify_commit(commit, Arc::clone(&store))?; + verifier.verify_commit(commit, branch_id, repo_id, Arc::clone(&store))?; } } } @@ -215,6 +223,8 @@ impl CommitVerifier for AddBranch { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { match self { @@ -252,6 +262,8 @@ impl CommitVerifier for Repository { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { // left empty intentionally @@ -264,6 +276,8 @@ impl CommitVerifier for StoreUpdate { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { verifier.new_store_from_update(self) @@ -275,6 +289,8 @@ impl CommitVerifier for AddSignerCap { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { match self { @@ -288,6 +304,8 @@ impl CommitVerifier for AddMember { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -299,6 +317,8 @@ impl CommitVerifier for RemoveMember { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -310,6 +330,8 @@ impl CommitVerifier for AddPermission { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -321,6 +343,8 @@ impl CommitVerifier for RemovePermission { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -332,6 +356,8 @@ impl CommitVerifier for RemoveBranch { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -343,6 +369,8 @@ impl CommitVerifier for AddName { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -354,6 +382,8 @@ impl CommitVerifier for RemoveName { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -365,6 +395,8 @@ impl CommitVerifier for () { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -376,6 +408,8 @@ impl CommitVerifier for Snapshot { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -387,6 +421,8 @@ impl CommitVerifier for AddFile { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -398,6 +434,8 @@ impl CommitVerifier for RemoveFile { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -409,6 +447,8 @@ impl CommitVerifier for Compact { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -420,6 +460,8 @@ impl CommitVerifier for AsyncSignature { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -431,6 +473,8 @@ impl CommitVerifier for RootCapRefresh { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -442,6 +486,8 @@ impl CommitVerifier for BranchCapRefresh { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -453,6 +499,8 @@ impl CommitVerifier for AddRepo { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -464,6 +512,8 @@ impl CommitVerifier for RemoveRepo { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -475,6 +525,8 @@ impl CommitVerifier for AddLink { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -486,6 +538,8 @@ impl CommitVerifier for RemoveLink { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -497,6 +551,8 @@ impl CommitVerifier for RemoveSignerCap { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) @@ -508,6 +564,8 @@ impl CommitVerifier for WalletUpdate { &self, commit: &Commit, verifier: &mut Verifier, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { Ok(()) diff --git a/ng-verifier/src/site.rs b/ng-verifier/src/site.rs index d360a0a..e90ecc4 100644 --- a/ng-verifier/src/site.rs +++ b/ng-verifier/src/site.rs @@ -208,7 +208,7 @@ impl SiteV0 { // update the current_heads verifier.update_current_heads(&private_repo_id, &user_branch_id, vec![current_head])?; - // sending the events + // sending the additional events verifier .new_events(commits, private_repo_id, &private_store_repo) .await?; diff --git a/ng-verifier/src/user_storage/repo.rs b/ng-verifier/src/user_storage/repo.rs index c99e090..a9ec852 100644 --- a/ng-verifier/src/user_storage/repo.rs +++ b/ng-verifier/src/user_storage/repo.rs @@ -95,16 +95,20 @@ impl<'a> RepoStorage<'a> { const SUFFIX_FOR_EXIST_CHECK: u8 = Self::READ_CAP; pub fn open(id: &RepoId, storage: &'a dyn KCVStorage) -> Result, StorageError> { - let opening = RepoStorage { - id: id.clone(), - storage, - }; + let opening = RepoStorage::new(id, storage); if !opening.exists() { return Err(StorageError::NotFound); } Ok(opening) } + pub fn new(id: &RepoId, storage: &'a dyn KCVStorage) -> RepoStorage<'a> { + RepoStorage { + id: id.clone(), + storage, + } + } + pub fn create_from_repo( repo: &Repo, storage: &'a dyn KCVStorage, @@ -154,7 +158,7 @@ impl<'a> RepoStorage<'a> { storage: &'a dyn KCVStorage, ) -> Result<(), StorageError> { let repo_id = signer_cap.repo; - let _ = Self::open(&repo_id, storage)?; + let _ = Self::new(&repo_id, storage); storage.write_transaction(&mut |tx| { let id_ser = to_vec(&repo_id)?; let value = to_vec(signer_cap)?; diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 05d4619..e1844b5 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -17,7 +17,7 @@ use ng_repo::log::*; use ng_repo::object::Object; use ng_repo::repo::BranchInfo; use ng_repo::{ - block_storage::BlockStorage, + block_storage::{BlockStorage, HashMapBlockStorage}, errors::{NgError, ProtocolError, ServerError, StorageError, VerifierError}, file::RandomAccessFile, repo::Repo, @@ -87,7 +87,9 @@ pub struct Verifier { impl fmt::Debug for Verifier { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "Verifier\nconfig: {:?}", self.config)?; - writeln!(f, "connected_server_id: {:?}", self.connected_server_id) + writeln!(f, "connected_server_id: {:?}", self.connected_server_id)?; + writeln!(f, "stores: {:?}", self.stores)?; + writeln!(f, "repos: {:?}", self.repos) } } @@ -570,7 +572,6 @@ impl Verifier { overlay: OverlayId, ) -> Result<(), NgError> { assert!(overlay.is_inner()); - //log_info!("searching for topic {} {}", overlay, event.topic_id()); let (repo_id, branch_id) = self .topics .get(&(overlay, *event.topic_id())) @@ -603,7 +604,6 @@ impl Verifier { .await { Ok(SoS::Single(opened)) => { - //log_info!("OPENED {:?}", opened); self.repo_was_opened(&repo_id, &opened)?; //TODO: check that in the returned opened_repo, the branch we are interested in has effectively been subscribed as publisher by the broker. } @@ -646,8 +646,9 @@ impl Verifier { _ => return Err(NgError::InvalidResponse), } // TODO: deal with received known_heads. - // DO a TopicSync + // TODO a TopicSync } + let _ = broker .request::(user, remote, PublishEvent::new(event, overlay)) .await?; @@ -660,6 +661,8 @@ impl Verifier { pub fn verify_commit( &mut self, commit: Commit, + branch_id: &BranchId, + repo_id: &RepoId, store: Arc, ) -> Result<(), VerifierError> { //let quorum_type = commit.quorum_type(); @@ -669,17 +672,31 @@ impl Verifier { // commit, // store // ); - match commit.body().ok_or(VerifierError::CommitBodyNotFound)? { + let res = match commit.body().ok_or(VerifierError::CommitBodyNotFound)? { CommitBody::V0(v0) => match v0 { - CommitBodyV0::Repository(a) => a.verify(&commit, self, store), - CommitBodyV0::RootBranch(a) => a.verify(&commit, self, store), - CommitBodyV0::Branch(a) => a.verify(&commit, self, store), - CommitBodyV0::SyncSignature(a) => a.verify(&commit, self, store), - CommitBodyV0::AddBranch(a) => a.verify(&commit, self, store), - CommitBodyV0::StoreUpdate(a) => a.verify(&commit, self, store), - CommitBodyV0::AddSignerCap(a) => a.verify(&commit, self, store), - _ => unimplemented!(), + CommitBodyV0::Repository(a) => a.verify(&commit, self, branch_id, repo_id, store), + CommitBodyV0::RootBranch(a) => a.verify(&commit, self, branch_id, repo_id, store), + CommitBodyV0::Branch(a) => a.verify(&commit, self, branch_id, repo_id, store), + CommitBodyV0::SyncSignature(a) => { + a.verify(&commit, self, branch_id, repo_id, store) + } + CommitBodyV0::AddBranch(a) => a.verify(&commit, self, branch_id, repo_id, store), + CommitBodyV0::StoreUpdate(a) => a.verify(&commit, self, branch_id, repo_id, store), + CommitBodyV0::AddSignerCap(a) => a.verify(&commit, self, branch_id, repo_id, store), + _ => { + log_err!("unimplemented verifier {}", commit); + Err(VerifierError::NotImplemented) + } }, + }; + if res.is_ok() { + let commit_ref = commit.reference().unwrap(); + if let Some(repo) = self.repos.get_mut(repo_id) { + repo.update_branch_current_head(&branch_id, commit_ref); + } + Ok(()) + } else { + res } } @@ -761,58 +778,203 @@ impl Verifier { } pub async fn bootstrap(&mut self) -> Result<(), NgError> { - if self.is_in_memory() { - // TODO only bootstrap if 3P stores of personal site not already loaded (by replay) + if let Err(e) = self.bootstrap_from_remote().await { + log_warn!("bootstrap_from_remote failed with {}", e); + // maybe it failed because the 3P stores are still in the outbox and haven't been sent yet. + // we immediately try to send the events present in the outbox + let res = self.send_outbox().await; + log_info!("SENDING 3P EVENTS FROM OUTBOX RETURNED: {:?}", res); + + return res; + } + Ok(()) + } - let broker = BROKER.read().await; - let user = self.config.user_priv_key.to_pub(); - let remote = self.connected_server_id.to_owned().unwrap(); - let read_cap = self.config.private_store_read_cap.as_ref().unwrap(); - let private_store_id = self.config.private_store_id.as_ref().unwrap(); - let private_inner_overlay_id = OverlayId::inner(private_store_id, &read_cap.key); - - // first we fetch the read_cap commit of private store repo. - let msg = CommitGet::V0(CommitGetV0 { - id: read_cap.id, - topic: None, // we dont have the topic (only available from RepoLink/BranchLink) but we are pretty sure the Broker has the commit anyway. - overlay: Some(private_inner_overlay_id), - }); - match broker - .request::(&user, &remote, msg) - .await - { - Err(NgError::ServerError(ServerError::NotFound)) => { - // TODO: fallback to BlockGet, then Commit::load(with_body:true), which will return an Err(CommitLoadError::MissingBlocks), then do another BlockGet with those, and then again Commit::load... - return Err(NgError::SiteNotFoundOnBroker); + async fn do_sync_req<'a>( + &mut self, + broker: &RwLockReadGuard<'a, Broker<'a>>, + user: &UserId, + remote: &DirectPeerId, + topic: &TopicId, + branch_id: &BranchId, + branch_secret: &ReadCapSecret, + repo_id: &RepoId, + store: Arc, + ) -> Result<(), NgError> { + let msg = TopicSyncReq::new_empty(*topic, &store.overlay_for_read_on_client_protocol()); + match broker + .request::(user, remote, msg) + .await + { + Err(e) => return Err(e), + Ok(SoS::Stream(mut events)) => { + while let Some(event) = events.next().await { + let commit = event + .event() + .open(&store, repo_id, branch_id, branch_secret)?; + + self.verify_commit(commit, branch_id, repo_id, Arc::clone(&store))?; } - Ok(SoS::Stream(mut blockstream)) => { - while let Some(block) = blockstream.next().await { - log_info!("GOT BLOCK {:?}", block); + } + Ok(_) => return Err(NgError::InvalidResponse), + } + Ok(()) + } + + async fn load_store_from_read_cap<'a>( + &mut self, + broker: &RwLockReadGuard<'a, Broker<'a>>, + user: &UserId, + remote: &DirectPeerId, + store: Arc, + ) -> Result<(), NgError> { + // first we fetch the read_cap commit of private store repo. + let root_branch_commit = Self::get_commit( + store.get_store_readcap().clone(), + None, + &store.overlay_for_read_on_client_protocol(), + &broker, + &user, + &remote, + ) + .await?; + + match root_branch_commit + .body() + .ok_or(VerifierError::CommitBodyNotFound)? + { + CommitBody::V0(v0) => match v0 { + CommitBodyV0::RootBranch(root_branch) => { + // doing a SyncReq on the topic of root branch + + let topic = root_branch.topic(); + + let repo_id = store.id(); + self.do_sync_req( + &broker, + &user, + &remote, + topic, + repo_id, + store.get_store_readcap_secret(), + repo_id, + Arc::clone(&store), + ) + .await + .map_err(|e| NgError::BootstrapError(e.to_string()))?; + + let other_branches: Vec<(PubKey, PubKey, SymKey)> = self + .get_repo(repo_id, store.get_store_repo())? + .branches + .iter() + .map(|(branch_id, branch)| { + ( + branch_id.clone(), + branch.topic.clone(), + branch.read_cap.key.clone(), + ) + }) + .collect(); + + // loading the other Branches of store + for (branch_id, topic, secret) in other_branches { + if branch_id == *repo_id { + // root branch of store is already synced + continue; + } + self.do_sync_req( + &broker, + &user, + &remote, + &topic, + &branch_id, + &secret, + repo_id, + Arc::clone(&store), + ) + .await + .map_err(|e| NgError::BootstrapError(e.to_string()))?; } - Ok(()) + + log_info!("STORE loaded from read_cap {}", repo_id); + // TODO: deal with AddSignerCap that are saved on rocksdb for now, but do not make it to the Verifier.repos } - Ok(_) => return Err(NgError::InvalidResponse), - Err(e) => return Err(e), + _ => return Err(VerifierError::RootBranchNotFound.into()), + }, + } + + Ok(()) + } + + async fn get_commit<'a>( + commit_ref: ObjectRef, + topic_id: Option, + overlay: &OverlayId, + broker: &RwLockReadGuard<'a, Broker<'a>>, + user: &UserId, + remote: &DirectPeerId, + ) -> Result { + let msg = CommitGet::V0(CommitGetV0 { + id: commit_ref.id, + topic: topic_id, // we dont have the topic (only available from RepoLink/BranchLink) but we are pretty sure the Broker has the commit anyway. + overlay: Some(*overlay), + }); + match broker.request::(user, remote, msg).await { + Err(NgError::ServerError(ServerError::NotFound)) => { + // TODO: fallback to BlockGet, then Commit::load(with_body:true), which will return an Err(CommitLoadError::MissingBlocks), then do another BlockGet with those, and then again Commit::load... + return Err(NgError::SiteNotFoundOnBroker); } - } else { - Ok(()) + Ok(SoS::Stream(blockstream)) => { + // we could use the in_memory block_storage of the verifier, but then we would have to remove the blocks from there. + // instead we just create a new temporary in memory block storage + let temp_mem_block_storage = + HashMapBlockStorage::from_block_stream(overlay, blockstream).await; + // creating a temporary store to access the blocks + let temp_store = Store::new_from_overlay_id( + overlay, + Arc::new(std::sync::RwLock::new(temp_mem_block_storage)), + ); + Ok(Commit::load(commit_ref, &temp_store, true)?) + } + Ok(_) => return Err(NgError::InvalidResponse), + Err(e) => return Err(e), } } - fn load_from_credentials_and_events( - &mut self, - events: &Vec, - ) -> Result<(), VerifierError> { - let private_store_id = self.config.private_store_id.as_ref().unwrap(); - let private_outer_overlay_id = OverlayId::outer(private_store_id); - let private_inner_overlay_id = OverlayId::inner( - private_store_id, - &self.config.private_store_read_cap.as_ref().unwrap().key, - ); + async fn bootstrap_from_remote(&mut self) -> Result<(), NgError> { + if self.is_in_memory() { + let broker = BROKER.read().await; + let user = self.config.user_priv_key.to_pub(); + let remote = self.connected_server_id.to_owned().unwrap(); - // let storage = self.block_storage.as_ref().unwrap().write().unwrap(); + let private_store_id = self.config.private_store_id.to_owned().unwrap(); + let private_store = self.create_private_store_from_credentials()?; - let store_repo = StoreRepo::new_private(*private_store_id); + self.load_store_from_read_cap(&broker, &user, &remote, private_store) + .await?; + + let other_stores: Vec> = self + .stores + .iter() + .map(|(_, store)| Arc::clone(store)) + .collect(); + + // load the other stores (protected and public) + for store in other_stores { + if *store.id() == private_store_id { + continue; + // we already loaded the private store + } + self.load_store_from_read_cap(&broker, &user, &remote, store) + .await?; + } + } + Ok(()) + } + + fn create_private_store_from_credentials(&mut self) -> Result, VerifierError> { + let private_store_id = self.config.private_store_id.to_owned().unwrap(); + let store_repo = StoreRepo::new_private(private_store_id); let store = Arc::new(Store::new( store_repo, @@ -823,9 +985,23 @@ impl Verifier { let store = self .stores - .entry(private_outer_overlay_id) + .entry(store_repo.overlay_id_for_storage_purpose()) .or_insert_with(|| store); - let private_store = Arc::clone(store); + Ok(Arc::clone(store)) + } + + fn load_from_credentials_and_outbox( + &mut self, + events: &Vec, + ) -> Result<(), VerifierError> { + let private_store_id = self.config.private_store_id.as_ref().unwrap(); + let private_inner_overlay_id = OverlayId::inner( + private_store_id, + &self.config.private_store_read_cap.as_ref().unwrap().key, + ); + + let private_store = self.create_private_store_from_credentials()?; + // let storage = self.block_storage.as_ref().unwrap().write().unwrap(); // for e in events { // if e.overlay == private_inner_overlay_id { @@ -865,6 +1041,7 @@ impl Verifier { // 1st pass: load all events about private store let mut postponed_signer_caps = Vec::with_capacity(3); + let mut private_user_branch = None; for e in events { if e.overlay == private_inner_overlay_id { @@ -892,9 +1069,15 @@ impl Verifier { .ok_or(VerifierError::CommitBodyNotFound)? .is_add_signer_cap() { + private_user_branch = Some(branch_id.clone()); postponed_signer_caps.push(commit); } else { - self.verify_commit(commit, Arc::clone(&private_store))?; + self.verify_commit( + commit, + &branch_id.clone(), + private_store.id(), + Arc::clone(&private_store), + )?; } } } @@ -928,16 +1111,21 @@ impl Verifier { // } //log_info!("{:?}\n{:?}\n{:?}", self.repos, self.stores, self.topics); - + //log_info!("SECOND PASS"); // 2nd pass : load the other events (that are not from private store) - for (overlay, store) in self.stores.clone().iter() { - //log_info!("TRYING OVERLAY {} {}", overlay, private_outer_overlay_id); - if *overlay == private_outer_overlay_id { - //log_info!("SKIPPED"); + for (_, store) in self.stores.clone().iter() { + let store_inner_overlay_id = store.inner_overlay(); + + // log_info!( + // "TRYING OVERLAY {} {}", + // store_inner_overlay_id, + // private_inner_overlay_id + // ); + if store_inner_overlay_id == private_inner_overlay_id { + //log_info!("SKIPPED PRIVATE"); continue; // we skip the private store, as we already loaded it } - let store_inner_overlay_id = store.inner_overlay(); for e in events { if e.overlay == store_inner_overlay_id { @@ -958,7 +1146,7 @@ impl Verifier { let commit = e.event.open(store, store.id(), branch_id, branch_secret)?; - self.verify_commit(commit, Arc::clone(store))?; + self.verify_commit(commit, &branch_id.clone(), store.id(), Arc::clone(store))?; } else { // log_info!( // "SKIPPED wrong overlay {} {}", @@ -974,7 +1162,7 @@ impl Verifier { // .map(|(o, s)| (o.clone(), s.get_store_repo().clone())) // .collect(); // for (overlay, store_repo) in list { - // if overlay == private_outer_overlay_id { + // if overlay == private_inner_overlay_id { // continue; // // we skip the private store, as we already loaded it // } @@ -983,14 +1171,33 @@ impl Verifier { // finally, ingest the signer_caps. for signer_cap in postponed_signer_caps { - self.verify_commit(signer_cap, Arc::clone(&private_store))?; + self.verify_commit( + signer_cap, + private_user_branch.as_ref().unwrap(), + private_store.id(), + Arc::clone(&private_store), + )?; } Ok(()) } + fn display(heads: &Vec) -> String { + let mut ret = String::new(); + if heads.len() == 0 { + ret = "0".to_string(); + } + for head in heads { + ret.push_str(&format!("{} ", head.id)); + } + ret + } + pub async fn send_outbox(&mut self) -> Result<(), NgError> { - let events: Vec = self.take_events_from_outbox()?; + let events: Vec = self.take_events_from_outbox().unwrap_or(vec![]); + if events.len() == 0 { + return Ok(()); + } let broker = BROKER.read().await; let user = self.config.user_priv_key.to_pub(); let remote = self @@ -1001,52 +1208,64 @@ impl Verifier { // for all the events, check that they are valid (topic exists, current_heads match with event) let mut need_replay = false; + let mut events_to_replay = Vec::with_capacity(events.len()); let mut branch_heads: HashMap> = HashMap::new(); - for e in events.iter() { + for e in events { match self.topics.get(&(e.overlay, *e.event.topic_id())) { Some((repo_id, branch_id)) => match self.repos.get(repo_id) { Some(repo) => match repo.branches.get(branch_id) { Some(branch) => { let commit = e.event.open_with_info(repo, branch)?; let acks = commit.acks(); - match branch_heads.insert(*branch_id, vec![commit.reference().unwrap()]) - { + match branch_heads.get(branch_id) { Some(previous_heads) => { - if previous_heads != acks { - need_replay = true; - break; + if *previous_heads != acks { + // skip event, as it is outdated. + continue; + } else { + branch_heads + .insert(*branch_id, vec![commit.reference().unwrap()]); } } None => { if acks != branch.current_heads { - need_replay = true; - break; + // skip event, as it is outdated. + continue; + } else { + branch_heads + .insert(*branch_id, vec![commit.reference().unwrap()]); } } } } None => { + log_info!("REPLAY BRANCH NOT FOUND {}", branch_id); need_replay = true; - break; } }, None => { + log_info!("REPLAY REPO NOT FOUND {}", repo_id); need_replay = true; - break; } }, None => { + log_info!( + "REPLAY TOPIC NOT FOUND {} IN OVERLAY {}", + e.event.topic_id(), + e.overlay + ); need_replay = true; - break; } } + events_to_replay.push(e); } log_info!("NEED REPLAY {need_replay}"); if need_replay { - self.load_from_credentials_and_events(&events)?; + self.load_from_credentials_and_outbox(&events_to_replay)?; + log_info!("REPLAY DONE"); } - - for e in events { + log_info!("SENDING {} EVENTS FOR OUTBOX", events_to_replay.len()); + for e in events_to_replay { self.send_event(e.event, &broker, &user, &remote, e.overlay) .await?; } @@ -1063,7 +1282,7 @@ impl Verifier { let mut path = path.clone(); std::fs::create_dir_all(path.clone()).unwrap(); path.push(format!("lastseq{}", self.peer_id.to_hash_string())); - log_debug!("last_seq path {}", path.display()); + //log_debug!("last_seq path {}", path.display()); let file = read(path.clone()); let (mut file_save, val) = match file { @@ -1191,10 +1410,8 @@ impl Verifier { fn add_repo_(&mut self, repo: Repo) -> &Repo { for (branch_id, info) in repo.branches.iter() { - //log_info!("LOADING BRANCH: {}", branch_id); let overlay_id: OverlayId = repo.store.inner_overlay(); let topic_id = info.topic.clone(); - //log_info!("LOADING TOPIC: {} {}", overlay_id, topic_id); let repo_id = repo.id.clone(); let branch_id = branch_id.clone(); let res = self diff --git a/ng-wallet/src/lib.rs b/ng-wallet/src/lib.rs index edacfb7..ed2c04a 100644 --- a/ng-wallet/src/lib.rs +++ b/ng-wallet/src/lib.rs @@ -586,7 +586,7 @@ pub async fn create_wallet_second_step_v0( let mut site = SiteV0::create_personal(params.user_privkey.clone(), verifier) .await .map_err(|e| { - log_err!("{e}"); + log_err!("create_personal failed with {e}"); NgWalletError::InternalError })?;