|
|
|
@ -17,6 +17,7 @@ use futures::SinkExt; |
|
|
|
|
use ng_net::actor::SoS; |
|
|
|
|
use ng_net::broker::{Broker, BROKER}; |
|
|
|
|
use ng_repo::block_storage::store_max_value_size; |
|
|
|
|
use ng_repo::file::ReadFile; |
|
|
|
|
use ng_repo::log::*; |
|
|
|
|
use ng_repo::object::Object; |
|
|
|
|
use ng_repo::repo::BranchInfo; |
|
|
|
@ -153,6 +154,30 @@ impl Verifier { |
|
|
|
|
Ok(file.reference().unwrap()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub(crate) async fn put_all_blocks_of_file( |
|
|
|
|
&self, |
|
|
|
|
file_ref: &ObjectRef, |
|
|
|
|
repo_id: &RepoId, |
|
|
|
|
store_repo: &StoreRepo, |
|
|
|
|
) -> Result<(), NgError> { |
|
|
|
|
let repo = self.get_repo(&repo_id, &store_repo)?; |
|
|
|
|
// check that the referenced object exists locally.
|
|
|
|
|
repo.store.has(&file_ref.id)?; |
|
|
|
|
// we send all the blocks to the broker.
|
|
|
|
|
let file = RandomAccessFile::open( |
|
|
|
|
file_ref.id.clone(), |
|
|
|
|
file_ref.key.clone(), |
|
|
|
|
Arc::clone(&repo.store), |
|
|
|
|
)?; |
|
|
|
|
let blocks = file.get_all_blocks_ids()?; |
|
|
|
|
let found = self.has_blocks(blocks, repo).await?; |
|
|
|
|
for block_id in found.missing() { |
|
|
|
|
let block = repo.store.get(block_id)?; |
|
|
|
|
self.put_blocks(vec![block], repo).await?; |
|
|
|
|
} |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub(crate) async fn push_app_response(&mut self, branch: &BranchId, response: AppResponse) { |
|
|
|
|
// log_info!(
|
|
|
|
|
// "push_app_response {} {:?}",
|
|
|
|
@ -433,23 +458,23 @@ impl Verifier { |
|
|
|
|
self.stores.insert(overlay_id, store); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub(crate) fn update_current_heads( |
|
|
|
|
&mut self, |
|
|
|
|
repo_id: &RepoId, |
|
|
|
|
branch_id: &BranchId, |
|
|
|
|
current_heads: Vec<ObjectRef>, |
|
|
|
|
) -> Result<(), VerifierError> { |
|
|
|
|
let repo = self |
|
|
|
|
.repos |
|
|
|
|
.get_mut(repo_id) |
|
|
|
|
.ok_or(VerifierError::RepoNotFound)?; |
|
|
|
|
let branch = repo |
|
|
|
|
.branches |
|
|
|
|
.get_mut(branch_id) |
|
|
|
|
.ok_or(VerifierError::BranchNotFound)?; |
|
|
|
|
branch.current_heads = current_heads; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
// pub(crate) fn update_current_heads(
|
|
|
|
|
// &mut self,
|
|
|
|
|
// repo_id: &RepoId,
|
|
|
|
|
// branch_id: &BranchId,
|
|
|
|
|
// current_heads: Vec<ObjectRef>,
|
|
|
|
|
// ) -> Result<(), VerifierError> {
|
|
|
|
|
// let repo = self
|
|
|
|
|
// .repos
|
|
|
|
|
// .get_mut(repo_id)
|
|
|
|
|
// .ok_or(VerifierError::RepoNotFound)?;
|
|
|
|
|
// let branch = repo
|
|
|
|
|
// .branches
|
|
|
|
|
// .get_mut(branch_id)
|
|
|
|
|
// .ok_or(VerifierError::BranchNotFound)?;
|
|
|
|
|
// branch.current_heads = current_heads;
|
|
|
|
|
// Ok(())
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
pub(crate) async fn new_event( |
|
|
|
|
&mut self, |
|
|
|
@ -491,8 +516,12 @@ impl Verifier { |
|
|
|
|
let repo = self.get_repo(&repo_id, store_repo)?; |
|
|
|
|
|
|
|
|
|
let event = Event::new(&publisher, seq_num, commit, additional_blocks, repo)?; |
|
|
|
|
self.send_or_save_event_to_outbox(event, repo.store.inner_overlay()) |
|
|
|
|
.await?; |
|
|
|
|
self.send_or_save_event_to_outbox( |
|
|
|
|
commit.reference().unwrap(), |
|
|
|
|
event, |
|
|
|
|
repo.store.inner_overlay(), |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -507,8 +536,12 @@ impl Verifier { |
|
|
|
|
let seq_num = self.last_seq_num; |
|
|
|
|
|
|
|
|
|
let event = Event::new(&publisher, seq_num, commit, additional_blocks, repo)?; |
|
|
|
|
self.send_or_save_event_to_outbox(event, repo.store.inner_overlay()) |
|
|
|
|
.await?; |
|
|
|
|
self.send_or_save_event_to_outbox( |
|
|
|
|
commit.reference().unwrap(), |
|
|
|
|
event, |
|
|
|
|
repo.store.inner_overlay(), |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -688,11 +721,20 @@ impl Verifier { |
|
|
|
|
|
|
|
|
|
async fn send_or_save_event_to_outbox<'a>( |
|
|
|
|
&'a mut self, |
|
|
|
|
commit_ref: ObjectRef, |
|
|
|
|
event: Event, |
|
|
|
|
overlay: OverlayId, |
|
|
|
|
) -> Result<(), NgError> { |
|
|
|
|
//log_info!("========== EVENT {:03}: {}", event.seq_num(), event);
|
|
|
|
|
|
|
|
|
|
let (repo_id, branch_id) = self |
|
|
|
|
.topics |
|
|
|
|
.get(&(overlay, *event.topic_id())) |
|
|
|
|
.ok_or(NgError::TopicNotFound)? |
|
|
|
|
.to_owned(); |
|
|
|
|
|
|
|
|
|
self.update_branch_current_head(&repo_id, &branch_id, commit_ref); |
|
|
|
|
|
|
|
|
|
if self.connected_server_id.is_some() { |
|
|
|
|
// send the event to the server already
|
|
|
|
|
let broker = BROKER.read().await; |
|
|
|
@ -917,6 +959,7 @@ impl Verifier { |
|
|
|
|
topic.known_heads(), |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -938,6 +981,21 @@ impl Verifier { |
|
|
|
|
// );
|
|
|
|
|
if as_publisher && !pin_status.is_topic_subscribed_as_publisher(topic_id) { |
|
|
|
|
need_sub = true; |
|
|
|
|
} else { |
|
|
|
|
for topic in pin_status.topics() { |
|
|
|
|
if topic.topic_id() == topic_id { |
|
|
|
|
self.do_sync_req_if_needed( |
|
|
|
|
broker, |
|
|
|
|
user, |
|
|
|
|
remote, |
|
|
|
|
branch, |
|
|
|
|
repo_id, |
|
|
|
|
topic.known_heads(), |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
_ => return Err(NgError::InvalidResponse), |
|
|
|
@ -1088,15 +1146,32 @@ impl Verifier { |
|
|
|
|
let res = res.await; |
|
|
|
|
if res.is_ok() { |
|
|
|
|
let commit_ref = commit.reference().unwrap(); |
|
|
|
|
if let Some(repo) = self.repos.get_mut(repo_id) { |
|
|
|
|
repo.update_branch_current_head(&branch_id, commit_ref); |
|
|
|
|
} |
|
|
|
|
self.update_branch_current_head(repo_id, branch_id, commit_ref); |
|
|
|
|
Ok(()) |
|
|
|
|
} else { |
|
|
|
|
res |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn update_branch_current_head( |
|
|
|
|
&mut self, |
|
|
|
|
repo_id: &RepoId, |
|
|
|
|
branch: &BranchId, |
|
|
|
|
commit_ref: ObjectRef, |
|
|
|
|
) { |
|
|
|
|
if let Some(repo) = self.repos.get_mut(repo_id) { |
|
|
|
|
let new_heads = repo.update_branch_current_head(branch, commit_ref); |
|
|
|
|
if new_heads.is_none() { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
let new_heads = new_heads.unwrap(); |
|
|
|
|
log_info!("NEW HEADS {} {:?}", branch, new_heads); |
|
|
|
|
if let Some(user_storage) = self.user_storage_if_persistent() { |
|
|
|
|
let _ = user_storage.update_branch_current_head(repo_id, branch, new_heads); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn user_storage_if_persistent(&self) -> Option<Arc<Box<dyn UserStorage>>> { |
|
|
|
|
if self.is_persistent() { |
|
|
|
|
if let Some(us) = self.user_storage.as_ref() { |
|
|
|
@ -1776,6 +1851,41 @@ impl Verifier { |
|
|
|
|
} |
|
|
|
|
log_info!("SENDING {} EVENTS FOR OUTBOX", events_to_replay.len()); |
|
|
|
|
for e in events_to_replay { |
|
|
|
|
let files = e.event.file_ids(); |
|
|
|
|
log_info!("HAS FILE {:?}", files); |
|
|
|
|
if !files.is_empty() { |
|
|
|
|
let (repo_id, branch_id) = self |
|
|
|
|
.topics |
|
|
|
|
.get(&(e.overlay, *e.event.topic_id())) |
|
|
|
|
.ok_or(NgError::TopicNotFound)? |
|
|
|
|
.to_owned(); |
|
|
|
|
|
|
|
|
|
let repo = self |
|
|
|
|
.repos |
|
|
|
|
.get(&repo_id) |
|
|
|
|
.ok_or(VerifierError::RepoNotFound)?; |
|
|
|
|
|
|
|
|
|
let branch = repo.branch(&branch_id)?; |
|
|
|
|
|
|
|
|
|
let commit = e.event.open_without_body( |
|
|
|
|
&repo.store, |
|
|
|
|
&repo_id, |
|
|
|
|
&branch_id, |
|
|
|
|
&branch.read_cap.key, |
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
let store_repo = repo.store.get_store_repo().clone(); |
|
|
|
|
|
|
|
|
|
self.open_branch_(&repo_id, &branch_id, true, &broker, &user, &remote, true) |
|
|
|
|
.await?; |
|
|
|
|
|
|
|
|
|
for file in commit.files() { |
|
|
|
|
log_info!("PUT FILE {:?}", file.id); |
|
|
|
|
self.put_all_blocks_of_file(&file, &repo_id, &store_repo) |
|
|
|
|
.await?; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
self.send_event(e.event, &broker, &user, &remote, e.overlay) |
|
|
|
|
.await?; |
|
|
|
|
} |
|
|
|
|