removed recursion in load_causal_past

master
Niko PLP 4 months ago
parent 5155dc208b
commit a983205baa
  1. 177
      ng-repo/src/branch.rs
  2. 187
      ng-repo/src/repo.rs
  3. 34
      ng-verifier/src/verifier.rs

@ -180,64 +180,79 @@ impl Branch {
/// optionally collecting the missing objects/blocks that couldn't be found locally on the way,
/// and also optionally, collecting the commits of `theirs` found on the way
pub fn load_causal_past(
cobj: &Object,
recursor: &mut Vec<(ObjectId, Option<ObjectId>)>,
store: &Store,
theirs: &HashSet<ObjectId>,
visited: &mut HashMap<ObjectId, DagNode>,
missing: &mut Option<&mut HashSet<ObjectId>>,
future: Option<ObjectId>,
theirs_found: &mut Option<&mut HashSet<ObjectId>>,
theirs_filter: &Option<Filter>,
) -> Result<(), ObjectParseError> {
let id = cobj.id();
// check if this commit object is present in theirs or has already been visited in the current walk
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
let found_in_filter = if let Some(filter) = theirs_filter {
let hash = id.get_hash();
filter.contains_hash(hash)
} else {
false
};
if !found_in_filter && !theirs.contains(&id) {
if let Some(past) = visited.get_mut(&id) {
// we update the future
if let Some(f) = future {
past.future.insert(f);
while let Some((id, future)) = recursor.pop() {
match Object::load(id, None, store) {
Ok(cobj) => {
let id = cobj.id();
// check if this commit object is present in theirs or has already been visited in the current walk
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
let found_in_filter = if let Some(filter) = theirs_filter {
let hash = id.get_hash();
filter.contains_hash(hash)
} else {
false
};
if !found_in_filter && !theirs.contains(&id) {
if let Some(past) = visited.get_mut(&id) {
// we update the future
if let Some(f) = future {
past.future.insert(f);
}
} else {
let mut new_node_to_insert = DagNode::new();
if let Some(f) = future {
new_node_to_insert.future.insert(f);
}
let pasts = cobj.acks_and_nacks();
new_node_to_insert.past.extend(pasts.iter().cloned());
visited.insert(id, new_node_to_insert);
recursor.extend(pasts.into_iter().map(|past_id| (past_id, Some(id))));
// for past_id in pasts {
// match Object::load(past_id, None, store) {
// Ok(o) => {
// Self::load_causal_past(
// recursor,
// store,
// theirs,
// visited,
// missing,
// theirs_found,
// theirs_filter,
// )?;
// }
// Err(ObjectParseError::MissingBlocks(blocks)) => {
// missing.as_mut().map(|m| m.extend(blocks));
// }
// Err(e) => return Err(e),
// }
// }
}
} else if theirs_found.is_some() {
theirs_found.as_mut().unwrap().insert(id);
}
}
} else {
let mut new_node_to_insert = DagNode::new();
if let Some(f) = future {
new_node_to_insert.future.insert(f);
Err(ObjectParseError::MissingBlocks(blocks)) => {
if future.is_some() {
missing.as_mut().map(|m| m.extend(blocks));
}
}
let pasts = cobj.acks_and_nacks();
new_node_to_insert.past.extend(pasts.iter().cloned());
visited.insert(id, new_node_to_insert);
for past_id in pasts {
match Object::load(past_id, None, store) {
Ok(o) => {
Self::load_causal_past(
&o,
store,
theirs,
visited,
missing,
Some(id),
theirs_found,
theirs_filter,
)?;
}
Err(ObjectParseError::MissingBlocks(blocks)) => {
missing.as_mut().map(|m| m.extend(blocks));
}
Err(e) => return Err(e),
Err(e) => {
if future.is_some() {
return Err(e);
}
}
}
} else if theirs_found.is_some() {
theirs_found.as_mut().unwrap().insert(id);
}
Ok(())
}
@ -260,22 +275,20 @@ impl Branch {
// their commits
let mut theirs: HashMap<ObjectId, DagNode> = HashMap::new();
//
let mut recursor: Vec<(ObjectId, Option<ObjectId>)> =
known_heads.iter().map(|h| (h.clone(), None)).collect();
// collect causal past of known_heads
for id in known_heads {
if let Ok(cobj) = Object::load(*id, None, store) {
Self::load_causal_past(
&cobj,
store,
&HashSet::new(),
&mut theirs,
&mut None,
None,
&mut None,
&None,
)?;
}
// we silently discard any load error on the known_heads as the responder might not know them (yet).
}
// we silently discard any load error on the known_heads as the responder might not know them (yet).
Self::load_causal_past(
&mut recursor,
store,
&HashSet::new(),
&mut theirs,
&mut None,
&mut None,
&None,
)?;
// log_debug!("their causal past \n{}", Dag(&theirs));
@ -291,23 +304,35 @@ impl Branch {
None
};
let mut recursor: Vec<(ObjectId, Option<ObjectId>)> =
target_heads.map(|h| (h.clone(), None)).collect();
// collect all commits reachable from target_heads
// up to the root or until encountering a commit from theirs
for id in target_heads {
if let Ok(cobj) = Object::load(id, None, store) {
Self::load_causal_past(
&cobj,
store,
&theirs,
&mut visited,
&mut None,
None,
&mut None,
&filter,
)?;
}
// we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
}
// we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
Self::load_causal_past(
&mut recursor,
store,
&theirs,
&mut visited,
&mut None,
&mut None,
&filter,
)?;
// for id in target_heads {
// if let Ok(cobj) = Object::load(id, None, store) {
// Self::load_causal_past(
// &cobj,
// store,
// &theirs,
// &mut visited,
// &mut None,
// None,
// &mut None,
// &filter,
// )?;
// }
// }
// log_debug!("what we have here \n{}", Dag(&visited));

@ -191,91 +191,95 @@ impl Repo {
fn load_causal_past(
&self,
cobj: &Commit,
recursor: &mut Vec<(BlockRef, Option<ObjectId>)>,
visited: &mut HashMap<ObjectId, (HashSet<ObjectId>, CommitInfo)>,
future: Option<ObjectId>,
) -> Result<Option<ObjectId>, VerifierError> {
let mut root = None;
let id = cobj.id().unwrap();
if let Some((future_set, _)) = visited.get_mut(&id) {
// we update the future
if let Some(f) = future {
future_set.insert(f);
}
} else {
let commit_type = cobj.get_type().unwrap();
let acks = cobj.acks();
// for a in acks.iter() {
// log_debug!("ACKS of {} {}", id.to_string(), a.id.to_string());
// }
let (past, real_acks, next_future) = match commit_type {
CommitType::SyncSignature => {
assert_eq!(acks.len(), 1);
let dep = cobj.deps();
assert_eq!(dep.len(), 1);
let mut current_commit = dep[0].clone();
let sign_ref = cobj.get_signature_reference().unwrap();
let real_acks;
let mut future = id;
loop {
let o = Commit::load(current_commit.clone(), &self.store, true)?;
let deps = o.deps();
let commit_info = CommitInfo {
past: deps.iter().map(|r| r.id.clone()).collect(),
key: o.key().unwrap(),
signature: Some(sign_ref.clone()),
author: self.get_user_string(o.author()),
timestamp: o.timestamp(),
final_consistency: o.final_consistency(),
commit_type: o.get_type().unwrap(),
branch: None,
x: 0,
y: 0,
};
let id = o.id().unwrap();
visited.insert(id, ([future].into(), commit_info));
future = id;
if id == acks[0].id {
real_acks = o.acks();
break;
}
assert_eq!(deps.len(), 1);
current_commit = deps[0].clone();
while let Some((next_ref, future)) = recursor.pop() {
if let Ok(cobj) = Commit::load(next_ref, &self.store, true) {
let id = cobj.id().unwrap();
if let Some((future_set, _)) = visited.get_mut(&id) {
// we update the future
if let Some(f) = future {
future_set.insert(f);
}
(vec![dep[0].id], real_acks, future)
}
CommitType::AsyncSignature => {
let past: Vec<ObjectId> = acks.iter().map(|r| r.id.clone()).collect();
for p in past.iter() {
visited.get_mut(p).unwrap().1.signature =
Some(cobj.get_signature_reference().unwrap());
} else {
let commit_type = cobj.get_type().unwrap();
let acks = cobj.acks();
// for a in acks.iter() {
// log_debug!("ACKS of {} {}", id.to_string(), a.id.to_string());
// }
let (past, real_acks, next_future) = match commit_type {
CommitType::SyncSignature => {
assert_eq!(acks.len(), 1);
let dep = cobj.deps();
assert_eq!(dep.len(), 1);
let mut current_commit = dep[0].clone();
let sign_ref = cobj.get_signature_reference().unwrap();
let real_acks;
let mut future = id;
loop {
let o = Commit::load(current_commit.clone(), &self.store, true)?;
let deps = o.deps();
let commit_info = CommitInfo {
past: deps.iter().map(|r| r.id.clone()).collect(),
key: o.key().unwrap(),
signature: Some(sign_ref.clone()),
author: self.get_user_string(o.author()),
timestamp: o.timestamp(),
final_consistency: o.final_consistency(),
commit_type: o.get_type().unwrap(),
branch: None,
x: 0,
y: 0,
};
let id = o.id().unwrap();
visited.insert(id, ([future].into(), commit_info));
future = id;
if id == acks[0].id {
real_acks = o.acks();
break;
}
assert_eq!(deps.len(), 1);
current_commit = deps[0].clone();
}
(vec![dep[0].id], real_acks, future)
}
CommitType::AsyncSignature => {
let past: Vec<ObjectId> = acks.iter().map(|r| r.id.clone()).collect();
for p in past.iter() {
visited.get_mut(p).unwrap().1.signature =
Some(cobj.get_signature_reference().unwrap());
}
(past, acks, id)
}
_ => (acks.iter().map(|r| r.id.clone()).collect(), acks, id),
};
let commit_info = CommitInfo {
past,
key: cobj.key().unwrap(),
signature: None,
author: self.get_user_string(cobj.author()),
timestamp: cobj.timestamp(),
final_consistency: cobj.final_consistency(),
commit_type,
branch: None,
x: 0,
y: 0,
};
visited.insert(id, (future.map_or([].into(), |f| [f].into()), commit_info));
if real_acks.is_empty() && root.is_none() {
root = Some(next_future);
}
(past, acks, id)
}
_ => (acks.iter().map(|r| r.id.clone()).collect(), acks, id),
};
let commit_info = CommitInfo {
past,
key: cobj.key().unwrap(),
signature: None,
author: self.get_user_string(cobj.author()),
timestamp: cobj.timestamp(),
final_consistency: cobj.final_consistency(),
commit_type,
branch: None,
x: 0,
y: 0,
};
visited.insert(id, (future.map_or([].into(), |f| [f].into()), commit_info));
if real_acks.is_empty() {
root = Some(next_future);
}
for past_ref in real_acks {
let o = Commit::load(past_ref, &self.store, true)?;
if let Some(r) = self.load_causal_past(&o, visited, Some(next_future))? {
root = Some(r);
recursor.extend(real_acks.into_iter().map(|br| (br, Some(next_future))));
// for past_ref in real_acks {
// let o = Commit::load(past_ref, &self.store, true)?;
// if let Some(r) = self.load_causal_past(&o, visited, Some(next_future))? {
// root = Some(r);
// }
// }
}
}
}
@ -458,15 +462,22 @@ impl Repo {
// }
let mut visited = HashMap::new();
let mut root = None;
for id in heads {
if let Ok(cobj) = Commit::load(id.clone(), &self.store, true) {
let r = self.load_causal_past(&cobj, &mut visited, None)?;
//log_debug!("ROOT? {:?}", r.map(|rr| rr.to_string()));
if r.is_some() {
root = r;
}
}
let mut recursor: Vec<(BlockRef, Option<ObjectId>)> =
heads.iter().map(|h| (h.clone(), None)).collect();
let r = self.load_causal_past(&mut recursor, &mut visited)?;
if r.is_some() {
root = r;
}
// for id in heads {
// if let Ok(cobj) = Commit::load(id.clone(), &self.store, true) {
// let r = self.load_causal_past(&cobj, &mut visited, None)?;
// //log_debug!("ROOT? {:?}", r.map(|rr| rr.to_string()));
// if r.is_some() {
// root = r;
// }
// }
// }
// for h in visited.keys() {
// log_debug!("VISITED {}", h);
// }

@ -1609,21 +1609,25 @@ impl Verifier {
let mut theirs_found = HashSet::new();
let mut visited = HashMap::new();
for our in ours_set.iter() {
//log_info!("OUR HEADS {}", our);
if let Ok(cobj) = Object::load(*our, None, &repo.store) {
let _ = Branch::load_causal_past(
&cobj,
&repo.store,
&theirs,
&mut visited,
&mut None,
None,
&mut Some(&mut theirs_found),
&None,
);
}
}
let mut recursor: Vec<(ObjectId, Option<ObjectId>)> =
ours_set.iter().map(|h| (h.clone(), None)).collect();
let _ = Branch::load_causal_past(
&mut recursor,
&repo.store,
&theirs,
&mut visited,
&mut None,
&mut Some(&mut theirs_found),
&None,
);
// for our in ours_set.iter() {
// //log_info!("OUR HEADS {}", our);
// if let Ok(cobj) = Object::load(*our, None, &repo.store) {
// let _ =
// }
// }
let theirs_not_found: Vec<ObjectId> =
theirs.difference(&theirs_found).cloned().collect();

Loading…
Cancel
Save