diff --git a/ng-app/src-tauri/src/lib.rs b/ng-app/src-tauri/src/lib.rs index 30398fc..cc4fce7 100644 --- a/ng-app/src-tauri/src/lib.rs +++ b/ng-app/src-tauri/src/lib.rs @@ -309,7 +309,7 @@ async fn upload_chunk( nuri: NuriV0, app: tauri::AppHandle, ) -> Result { - log_debug!("upload_chunk {:?}", chunk); + //log_debug!("upload_chunk {:?}", chunk); let request = AppRequest::V0(AppRequestV0 { command: AppRequestCommandV0::FilePut, @@ -336,6 +336,13 @@ async fn cancel_stream(stream_id: &str) -> Result<(), String> { #[tauri::command(rename_all = "snake_case")] async fn disconnections_subscribe(app: tauri::AppHandle) -> Result<(), String> { + let path = app + .path() + .resolve("", BaseDirectory::AppLocalData) + .map_err(|_| NgError::SerializationError) + .unwrap(); + init_local_broker(Box::new(move || LocalBrokerConfig::BasePath(path.clone()))).await; + let main_window = app.get_window("main").unwrap(); let reader = nextgraph::local_broker::take_disconnections_receiver() diff --git a/ng-app/src/App.svelte b/ng-app/src/App.svelte index b9ab6c6..abb5612 100644 --- a/ng-app/src/App.svelte +++ b/ng-app/src/App.svelte @@ -106,10 +106,13 @@ } else { // ON WEB CLIENTS window.addEventListener("storage", async (event) => { + console.log("localStorage event", event); if (event.storageArea != localStorage) return; if (event.key === "ng_wallets") { + console.log("localStorage", JSON.stringify($wallets)); await ng.wallets_reload(); wallets.set(await ng.get_wallets()); + console.log("localStorage after", JSON.stringify($wallets)); } }); wallets.set(await ng.get_wallets()); diff --git a/ng-app/src/api.ts b/ng-app/src/api.ts index 929b698..328367f 100644 --- a/ng-app/src/api.ts +++ b/ng-app/src/api.ts @@ -124,6 +124,10 @@ const handler = { let callback = args[2]; let unlisten = await getCurrent().listen(stream_id, (event) => { + //console.log(event.payload); + if (event.payload.V0.FileBinary) { + event.payload.V0.FileBinary = Uint8Array.from(event.payload.V0.FileBinary); + } callback(event.payload).then(()=> {}) }) await tauri.invoke("app_request_stream",{session_id, stream_id, request}); diff --git a/ng-app/src/lib/Login.svelte b/ng-app/src/lib/Login.svelte index c32e15c..d217085 100644 --- a/ng-app/src/lib/Login.svelte +++ b/ng-app/src/lib/Login.svelte @@ -321,7 +321,7 @@ Do you trust this device?
Save my wallet on this deviceYes, save my wallet on this device

diff --git a/ng-app/src/lib/Test.svelte b/ng-app/src/lib/Test.svelte index e36f832..5acf47a 100644 --- a/ng-app/src/lib/Test.svelte +++ b/ng-app/src/lib/Test.svelte @@ -216,7 +216,7 @@

{:else} -
+
diff --git a/ng-app/src/store.ts b/ng-app/src/store.ts index e97771e..c6ab2d9 100644 --- a/ng-app/src/store.ts +++ b/ng-app/src/store.ts @@ -7,7 +7,7 @@ // notice may not be copied, modified, or distributed except // according to those terms. -import { writable, readonly, derived, get } from "svelte/store"; +import { writable, readable, readonly, derived, get } from "svelte/store"; import ng from "./api"; let all_branches = {}; @@ -124,17 +124,43 @@ export const reconnect = async function() { console.error(e) } } +// export const disconnections_subscribe = async function() { +// let disconnections_unsub = await ng.disconnections_subscribe(async (user_id) => { +// console.log("DISCONNECTION FOR USER", user_id); +// connections.update((c) => { +// c[user_id].error = "ConnectionError"; +// c[user_id].since = new Date(); +// return c; +// }); +// }); +// } +let disconnections_unsub; + export const disconnections_subscribe = async function() { - let disconnections_unsub = await ng.disconnections_subscribe(async (user_id) => { - console.log("DISCONNECTION FOR USER", user_id); - connections.update((c) => { - c[user_id].error = "ConnectionError"; - c[user_id].since = new Date(); - return c; + if (!disconnections_unsub) { + await ng.disconnections_subscribe(async (user_id) => { + console.log("DISCONNECTION FOR USER", user_id); + connections.update((c) => { + c[user_id].error = "ConnectionError"; + c[user_id].since = new Date(); + return c; + }); }); - }); + disconnections_unsub = true; + } } + + + +readable(false, function start(set) { + + + return function stop() { + disconnections_unsub(); + }; +}); + can_connect.subscribe(async (value) => { if (value[0] && value[0].wallet && value[1]) { diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index d75cce7..d9b73bc 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -318,7 +318,7 @@ impl IServerBroker for ServerBroker { let topic = self.storage.save_event(overlay, event, user_id)?; log_debug!( - "DISPATCH EVENt {} {} {:?}", + "DISPATCH EVENT {} {} {:?}", overlay, topic, self.local_subscriptions diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index 77e2dba..01b971a 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -2932,6 +2932,11 @@ impl RepoPinStatus { } } } + pub fn topics(&self) -> &Vec { + match self { + Self::V0(v0) => &v0.topics, + } + } } /// Request subscription to a `Topic` of an already opened or pinned Repo diff --git a/ng-repo/src/event.rs b/ng-repo/src/event.rs index 1167e4e..bb15f43 100644 --- a/ng-repo/src/event.rs +++ b/ng-repo/src/event.rs @@ -91,6 +91,12 @@ impl Event { } } + pub fn file_ids(&self) -> &Vec { + match self { + Event::V0(v0) => &v0.content.file_ids, + } + } + pub fn publisher(&self) -> &PeerId { match self { Event::V0(v0) => &v0.content.publisher, @@ -127,7 +133,19 @@ impl Event { branch_secret: &ReadCapSecret, ) -> Result { match self { - Self::V0(v0) => v0.open(store, repo_id, branch_id, branch_secret), + Self::V0(v0) => v0.open(store, repo_id, branch_id, branch_secret, true), + } + } + + pub fn open_without_body( + &self, + store: &Store, + repo_id: &RepoId, + branch_id: &BranchId, + branch_secret: &ReadCapSecret, + ) -> Result { + match self { + Self::V0(v0) => v0.open(store, repo_id, branch_id, branch_secret, false), } } @@ -256,7 +274,13 @@ impl EventV0 { /// returns the Commit object and optional list of additional block IDs. /// Those blocks have been added to the storage of store of repo so they can be retrieved. pub fn open_with_info(&self, repo: &Repo, branch: &BranchInfo) -> Result { - self.open(&repo.store, &repo.id, &branch.id, &branch.read_cap.key) + self.open( + &repo.store, + &repo.id, + &branch.id, + &branch.read_cap.key, + true, + ) } pub fn open( @@ -265,6 +289,7 @@ impl EventV0 { repo_id: &RepoId, branch_id: &BranchId, branch_secret: &ReadCapSecret, + with_body: bool, ) -> Result { // verifying event signatures self.verify()?; @@ -287,6 +312,6 @@ impl EventV0 { } } let commit_ref = ObjectRef::from_id_key(first_id.unwrap(), commit_key); - Ok(Commit::load(commit_ref, &store, true)?) + Ok(Commit::load(commit_ref, &store, with_body)?) } } diff --git a/ng-repo/src/repo.rs b/ng-repo/src/repo.rs index 08cba10..666ff30 100644 --- a/ng-repo/src/repo.rs +++ b/ng-repo/src/repo.rs @@ -163,13 +163,19 @@ impl Repo { Self::new_with_member(&pub_key, &pub_key, perms, OverlayId::dummy(), store) } - pub fn update_branch_current_head(&mut self, branch: &BranchId, commit_ref: ObjectRef) { + pub fn update_branch_current_head( + &mut self, + branch: &BranchId, + commit_ref: ObjectRef, + ) -> Option> { //log_info!("from branch {} HEAD UPDATED TO {}", branch, commit_ref.id); if let Some(branch) = self.branches.get_mut(branch) { // FIXME: this is very wrong. the DAG is not always linear branch.current_heads = vec![commit_ref]; - - //TODO: if userstorage: save current_heads to user storage + // we return the new current heads + Some(branch.current_heads.to_vec()) + } else { + None } } diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index 4df91ee..457f519 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -577,7 +577,7 @@ pub async fn upload_chunk( js_chunk: JsValue, js_nuri: JsValue, ) -> Result { - log_debug!("upload_chunk {:?}", js_nuri); + //log_debug!("upload_chunk {:?}", js_nuri); let session_id: u64 = serde_wasm_bindgen::from_value::(js_session_id) .map_err(|_| "Deserialization error of session_id".to_string())?; let upload_id: u32 = serde_wasm_bindgen::from_value::(js_upload_id) diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index efbfc43..59314bf 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -442,11 +442,11 @@ impl CommitVerifier for AddFile { nuri: refe.nuri(), reference: refe, }; - verifier - .user_storage - .as_ref() - .unwrap() - .branch_add_file(*branch_id, filename.clone())?; + verifier.user_storage.as_ref().unwrap().branch_add_file( + commit.id().unwrap(), + *branch_id, + filename.clone(), + )?; verifier .push_app_response(branch_id, AppResponse::V0(AppResponseV0::File(filename))) .await; diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index 240f288..d621e6f 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -160,20 +160,11 @@ impl AppRequestCommandV0 { let (repo_id, branch, store_repo) = Self::open_for_target(verifier, &nuri.target, true).await?; //log_info!("GOT ADD FILE {:?}", add); - let repo = verifier.get_repo(&repo_id, &store_repo)?; - // check that the referenced object exists locally. - repo.store.has(&add.object.id)?; - // we send all the blocks to the broker. - let file = RandomAccessFile::open( - add.object.id.clone(), - add.object.key.clone(), - Arc::clone(&repo.store), - )?; - let blocks = file.get_all_blocks_ids()?; - let found = verifier.has_blocks(blocks, repo).await?; - for block_id in found.missing() { - let block = repo.store.get(block_id)?; - verifier.put_blocks(vec![block], repo).await?; + + if verifier.connected_server_id.is_some() { + verifier + .put_all_blocks_of_file(&add.object, &repo_id, &store_repo) + .await?; } let add_file_commit_body = CommitBodyV0::AddFile(AddFile::V0(AddFileV0 { diff --git a/ng-verifier/src/rocksdb_user_storage.rs b/ng-verifier/src/rocksdb_user_storage.rs index c7f2ccd..4d89c3e 100644 --- a/ng-verifier/src/rocksdb_user_storage.rs +++ b/ng-verifier/src/rocksdb_user_storage.rs @@ -10,10 +10,12 @@ //! RocksDb Backend for UserStorage trait use crate::types::*; -use crate::user_storage::repo::RepoStorage; +use crate::user_storage::branch::*; +use crate::user_storage::repo::*; use crate::user_storage::*; use either::Either::{Left, Right}; use ng_repo::block_storage::BlockStorage; +use ng_repo::log::*; use ng_repo::repo::{BranchInfo, Repo}; use ng_repo::store::Store; use ng_repo::{errors::StorageError, types::*}; @@ -76,10 +78,31 @@ impl UserStorage for RocksDbUserStorage { RepoStorage::update_signer_cap(signer_cap, &self.user_storage) } - fn branch_add_file(&self, branch: BranchId, file: FileName) -> Result<(), StorageError> { - todo!(); + fn update_branch_current_head( + &self, + repo_id: &RepoId, + branch_id: &BranchId, + new_heads: Vec, + ) -> Result<(), StorageError> { + let branch = BranchStorage::new(branch_id, &self.user_storage)?; + if let Err(e) = branch.replace_current_heads(new_heads) { + log_err!("error while updating branch current head {:?}", e); + Err(e) + } else { + Ok(()) + } + } + + fn branch_add_file( + &self, + commit_id: ObjectId, + branch: BranchId, + file: FileName, + ) -> Result<(), StorageError> { + let branch = BranchStorage::new(&branch, &self.user_storage)?; + branch.add_file(&commit_id, &file) } fn branch_get_all_files(&self, branch: &BranchId) -> Result, StorageError> { - todo!(); + BranchStorage::get_all_files(&branch, &self.user_storage) } } diff --git a/ng-verifier/src/site.rs b/ng-verifier/src/site.rs index dbf7368..f0f1d00 100644 --- a/ng-verifier/src/site.rs +++ b/ng-verifier/src/site.rs @@ -213,7 +213,8 @@ impl SiteV0 { } // update the current_heads - verifier.update_current_heads(&private_repo_id, &user_branch_id, vec![current_head])?; + //verifier.update_current_heads(&private_repo_id, &user_branch_id, vec![current_head])?; + // this is now done in send_or_save_event_to_outbox // sending the additional events verifier diff --git a/ng-verifier/src/user_storage/branch.rs b/ng-verifier/src/user_storage/branch.rs index ed28b66..2fdb933 100644 --- a/ng-verifier/src/user_storage/branch.rs +++ b/ng-verifier/src/user_storage/branch.rs @@ -37,6 +37,8 @@ use ng_repo::types::TopicId; use serde_bare::from_slice; use serde_bare::to_vec; +use crate::types::FileName; + pub struct BranchStorage<'a> { storage: &'a dyn KCVStorage, id: BranchId, @@ -55,16 +57,25 @@ impl<'a> BranchStorage<'a> { const PREFIX_HEADS: u8 = b'h'; + const PREFIX_FILES: u8 = b'f'; + const SUFFIX_FOR_EXIST_CHECK: u8 = Self::TYPE; - pub fn open( + pub fn new( id: &BranchId, storage: &'a dyn KCVStorage, ) -> Result, StorageError> { - let opening = BranchStorage { + Ok(BranchStorage { id: id.clone(), storage, - }; + }) + } + + pub fn open( + id: &BranchId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let opening = Self::new(id, storage)?; if !opening.exists() { return Err(StorageError::NotFound); } @@ -172,6 +183,43 @@ impl<'a> BranchStorage<'a> { Ok(res) } + pub fn add_file(&self, commit_id: &ObjectId, file: &FileName) -> Result<(), StorageError> { + self.storage.write_transaction(&mut |tx| { + let branch_id_ser = to_vec(&self.id)?; + let commit_id_ser = to_vec(commit_id)?; + let val = to_vec(file)?; + let mut key = Vec::with_capacity(branch_id_ser.len() + commit_id_ser.len()); + key.append(&mut branch_id_ser.clone()); + key.append(&mut commit_id_ser.clone()); + tx.put(Self::PREFIX_FILES, &key, None, &val, &None)?; + Ok(()) + }) + } + + pub fn get_all_files( + id: &BranchId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let size = to_vec(&ObjectId::nil())?.len(); + let key_prefix = to_vec(id).unwrap(); + let key_prefix_len = key_prefix.len(); + let mut res: Vec = vec![]; + let total_size = key_prefix_len + size; + for file in storage.get_all_keys_and_values( + Self::PREFIX_FILES, + total_size, + key_prefix, + None, + &None, + )? { + if file.0.len() == total_size + 1 { + let file: FileName = from_slice(&file.1)?; + res.push(file); + } + } + Ok(res) + } + pub fn exists(&self) -> bool { self.storage .get( @@ -186,6 +234,22 @@ impl<'a> BranchStorage<'a> { &self.id } + pub fn replace_current_heads(&self, new_heads: Vec) -> Result<(), StorageError> { + self.storage.write_transaction(&mut |tx| { + let id_ser = &to_vec(&self.id)?; + let size = to_vec(&ObjectRef::nil())?.len(); + tx.del_all_values(Self::PREFIX_HEADS, id_ser, size, None, &None)?; + for head in new_heads.iter() { + let mut head_ser = to_vec(head)?; + let mut key = Vec::with_capacity(id_ser.len() + head_ser.len()); + key.append(&mut id_ser.clone()); + key.append(&mut head_ser); + tx.put(Self::PREFIX_HEADS, &key, None, &vec![], &None)?; + } + Ok(()) + }) + } + pub fn del(&self) -> Result<(), StorageError> { self.storage.write_transaction(&mut |tx| { let key = &to_vec(&self.id)?; diff --git a/ng-verifier/src/user_storage/storage.rs b/ng-verifier/src/user_storage/storage.rs index 49d6738..9fe06f9 100644 --- a/ng-verifier/src/user_storage/storage.rs +++ b/ng-verifier/src/user_storage/storage.rs @@ -44,9 +44,21 @@ pub trait UserStorage: Send + Sync { fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), StorageError>; - fn branch_add_file(&self, branch: BranchId, file: FileName) -> Result<(), StorageError>; + fn branch_add_file( + &self, + commit_id: ObjectId, + branch: BranchId, + file: FileName, + ) -> Result<(), StorageError>; fn branch_get_all_files(&self, branch: &BranchId) -> Result, StorageError>; + + fn update_branch_current_head( + &self, + repo_id: &RepoId, + branch_id: &BranchId, + new_heads: Vec, + ) -> Result<(), StorageError>; } pub(crate) struct InMemoryUserStorage { @@ -62,7 +74,12 @@ impl InMemoryUserStorage { } impl UserStorage for InMemoryUserStorage { - fn branch_add_file(&self, branch: BranchId, file: FileName) -> Result<(), StorageError> { + fn branch_add_file( + &self, + commit_id: ObjectId, + branch: BranchId, + file: FileName, + ) -> Result<(), StorageError> { let mut lock = self.branch_files.write().unwrap(); let file_list = lock.entry(branch).or_insert_with(|| Vec::with_capacity(1)); file_list.push(file); @@ -104,4 +121,13 @@ impl UserStorage for InMemoryUserStorage { fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), StorageError> { unimplemented!(); } + + fn update_branch_current_head( + &self, + repo_id: &RepoId, + branch_id: &BranchId, + new_heads: Vec, + ) -> Result<(), StorageError> { + unimplemented!(); + } } diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 3b85b5d..c6a43f6 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -17,6 +17,7 @@ use futures::SinkExt; use ng_net::actor::SoS; use ng_net::broker::{Broker, BROKER}; use ng_repo::block_storage::store_max_value_size; +use ng_repo::file::ReadFile; use ng_repo::log::*; use ng_repo::object::Object; use ng_repo::repo::BranchInfo; @@ -153,6 +154,30 @@ impl Verifier { Ok(file.reference().unwrap()) } + pub(crate) async fn put_all_blocks_of_file( + &self, + file_ref: &ObjectRef, + repo_id: &RepoId, + store_repo: &StoreRepo, + ) -> Result<(), NgError> { + let repo = self.get_repo(&repo_id, &store_repo)?; + // check that the referenced object exists locally. + repo.store.has(&file_ref.id)?; + // we send all the blocks to the broker. + let file = RandomAccessFile::open( + file_ref.id.clone(), + file_ref.key.clone(), + Arc::clone(&repo.store), + )?; + let blocks = file.get_all_blocks_ids()?; + let found = self.has_blocks(blocks, repo).await?; + for block_id in found.missing() { + let block = repo.store.get(block_id)?; + self.put_blocks(vec![block], repo).await?; + } + Ok(()) + } + pub(crate) async fn push_app_response(&mut self, branch: &BranchId, response: AppResponse) { // log_info!( // "push_app_response {} {:?}", @@ -433,23 +458,23 @@ impl Verifier { self.stores.insert(overlay_id, store); } - pub(crate) fn update_current_heads( - &mut self, - repo_id: &RepoId, - branch_id: &BranchId, - current_heads: Vec, - ) -> Result<(), VerifierError> { - let repo = self - .repos - .get_mut(repo_id) - .ok_or(VerifierError::RepoNotFound)?; - let branch = repo - .branches - .get_mut(branch_id) - .ok_or(VerifierError::BranchNotFound)?; - branch.current_heads = current_heads; - Ok(()) - } + // pub(crate) fn update_current_heads( + // &mut self, + // repo_id: &RepoId, + // branch_id: &BranchId, + // current_heads: Vec, + // ) -> Result<(), VerifierError> { + // let repo = self + // .repos + // .get_mut(repo_id) + // .ok_or(VerifierError::RepoNotFound)?; + // let branch = repo + // .branches + // .get_mut(branch_id) + // .ok_or(VerifierError::BranchNotFound)?; + // branch.current_heads = current_heads; + // Ok(()) + // } pub(crate) async fn new_event( &mut self, @@ -491,8 +516,12 @@ impl Verifier { let repo = self.get_repo(&repo_id, store_repo)?; let event = Event::new(&publisher, seq_num, commit, additional_blocks, repo)?; - self.send_or_save_event_to_outbox(event, repo.store.inner_overlay()) - .await?; + self.send_or_save_event_to_outbox( + commit.reference().unwrap(), + event, + repo.store.inner_overlay(), + ) + .await?; Ok(()) } @@ -507,8 +536,12 @@ impl Verifier { let seq_num = self.last_seq_num; let event = Event::new(&publisher, seq_num, commit, additional_blocks, repo)?; - self.send_or_save_event_to_outbox(event, repo.store.inner_overlay()) - .await?; + self.send_or_save_event_to_outbox( + commit.reference().unwrap(), + event, + repo.store.inner_overlay(), + ) + .await?; Ok(()) } @@ -688,11 +721,20 @@ impl Verifier { async fn send_or_save_event_to_outbox<'a>( &'a mut self, + commit_ref: ObjectRef, event: Event, overlay: OverlayId, ) -> Result<(), NgError> { //log_info!("========== EVENT {:03}: {}", event.seq_num(), event); + let (repo_id, branch_id) = self + .topics + .get(&(overlay, *event.topic_id())) + .ok_or(NgError::TopicNotFound)? + .to_owned(); + + self.update_branch_current_head(&repo_id, &branch_id, commit_ref); + if self.connected_server_id.is_some() { // send the event to the server already let broker = BROKER.read().await; @@ -917,6 +959,7 @@ impl Verifier { topic.known_heads(), ) .await?; + break; } } } @@ -938,6 +981,21 @@ impl Verifier { // ); if as_publisher && !pin_status.is_topic_subscribed_as_publisher(topic_id) { need_sub = true; + } else { + for topic in pin_status.topics() { + if topic.topic_id() == topic_id { + self.do_sync_req_if_needed( + broker, + user, + remote, + branch, + repo_id, + topic.known_heads(), + ) + .await?; + break; + } + } } } _ => return Err(NgError::InvalidResponse), @@ -1088,15 +1146,32 @@ impl Verifier { let res = res.await; if res.is_ok() { let commit_ref = commit.reference().unwrap(); - if let Some(repo) = self.repos.get_mut(repo_id) { - repo.update_branch_current_head(&branch_id, commit_ref); - } + self.update_branch_current_head(repo_id, branch_id, commit_ref); Ok(()) } else { res } } + fn update_branch_current_head( + &mut self, + repo_id: &RepoId, + branch: &BranchId, + commit_ref: ObjectRef, + ) { + if let Some(repo) = self.repos.get_mut(repo_id) { + let new_heads = repo.update_branch_current_head(branch, commit_ref); + if new_heads.is_none() { + return; + } + let new_heads = new_heads.unwrap(); + log_info!("NEW HEADS {} {:?}", branch, new_heads); + if let Some(user_storage) = self.user_storage_if_persistent() { + let _ = user_storage.update_branch_current_head(repo_id, branch, new_heads); + } + } + } + fn user_storage_if_persistent(&self) -> Option>> { if self.is_persistent() { if let Some(us) = self.user_storage.as_ref() { @@ -1776,6 +1851,41 @@ impl Verifier { } log_info!("SENDING {} EVENTS FOR OUTBOX", events_to_replay.len()); for e in events_to_replay { + let files = e.event.file_ids(); + log_info!("HAS FILE {:?}", files); + if !files.is_empty() { + let (repo_id, branch_id) = self + .topics + .get(&(e.overlay, *e.event.topic_id())) + .ok_or(NgError::TopicNotFound)? + .to_owned(); + + let repo = self + .repos + .get(&repo_id) + .ok_or(VerifierError::RepoNotFound)?; + + let branch = repo.branch(&branch_id)?; + + let commit = e.event.open_without_body( + &repo.store, + &repo_id, + &branch_id, + &branch.read_cap.key, + )?; + + let store_repo = repo.store.get_store_repo().clone(); + + self.open_branch_(&repo_id, &branch_id, true, &broker, &user, &remote, true) + .await?; + + for file in commit.files() { + log_info!("PUT FILE {:?}", file.id); + self.put_all_blocks_of_file(&file, &repo_id, &store_repo) + .await?; + } + } + self.send_event(e.event, &broker, &user, &remote, e.overlay) .await?; }