diff --git a/Cargo.lock b/Cargo.lock index 6be6da1..fbc2779 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,6 +564,15 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +dependencies = [ + "serde", +] + [[package]] name = "bit_field" version = "0.10.2" @@ -657,6 +666,17 @@ dependencies = [ "log", ] +[[package]] +name = "bloomfilter" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b64d54e47a7f4fd723f082e8f11429f3df6ba8adaeca355a76556f9f0602bbcf" +dependencies = [ + "bit-vec", + "getrandom 0.2.10", + "siphasher 1.0.1", +] + [[package]] name = "brotli" version = "3.3.4" @@ -1215,17 +1235,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "cuckoofilter" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b810a8449931679f64cd7eef1bbd0fa315801b6d5d9cdc1ace2804d6529eee18" -dependencies = [ - "byteorder", - "fnv", - "rand 0.7.3", -] - [[package]] name = "current_platform" version = "0.2.0" @@ -1598,24 +1607,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "fastbloom-rs" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04c5562065adb6d23e97f342cb73a73ed94ebfdb8d96a92a85d6fb93f1cdfed" -dependencies = [ - "cuckoofilter", - "fastmurmur3", - "xorfilter-rs", - "xxhash-rust", -] - -[[package]] -name = "fastmurmur3" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d922f481ae01f2a3f1fff7b9e0e789f18f0c755a38ec983a3e6f37762cdcc2a2" - [[package]] name = "fastrand" version = "1.9.0" @@ -3268,13 +3259,13 @@ version = "0.1.0" dependencies = [ "base64-url", "blake3", + "bloomfilter", "chacha20", "crypto_box", "current_platform", "curve25519-dalek 3.2.0", "debug_print", "ed25519-dalek", - "fastbloom-rs", "futures", "getrandom 0.2.10", "gloo-timers", @@ -3335,8 +3326,8 @@ dependencies = [ "async-std", "async-trait", "automerge", + "bloomfilter", "either", - "fastbloom-rs", "futures", "getrandom 0.2.10", "ng-net", @@ -3739,7 +3730,7 @@ dependencies = [ "rocksdb", "sha1", "sha2 0.10.7", - "siphasher", + "siphasher 0.3.10", "sparesults", "spargebra", "sparopt", @@ -4055,7 +4046,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" dependencies = [ - "siphasher", + "siphasher 0.3.10", ] [[package]] @@ -4064,7 +4055,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" dependencies = [ - "siphasher", + "siphasher 0.3.10", ] [[package]] @@ -4990,6 +4981,15 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +dependencies = [ + "serde", +] + [[package]] name = "sized-chunks" version = "0.6.5" @@ -6826,12 +6826,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "xorfilter-rs" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47f9da296a88b6bc150b896d17770a62d4dc6f63ecf0ed10a9c08a1cb3d12f24" - [[package]] name = "xsalsa20poly1305" version = "0.9.1" @@ -6845,12 +6839,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "xxhash-rust" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "735a71d46c4d68d71d4b24d03fdc2b98e38cea81730595801db779c04fe80d70" - [[package]] name = "yrs" version = "0.18.2" diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index 2e6210b..197e042 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -1455,6 +1455,10 @@ pub async fn user_connect_with_device_info( if let Err(e) = session.verifier.connection_opened(server_key).await { + log_err!( + "got error while processing opened connection {:?}", + e + ); Broker::close_all_connections().await; tried.as_mut().unwrap().3 = Some(e.to_string()); } diff --git a/ng-repo/Cargo.toml b/ng-repo/Cargo.toml index b5b2e61..dd8118a 100644 --- a/ng-repo/Cargo.toml +++ b/ng-repo/Cargo.toml @@ -33,7 +33,7 @@ rand = { version = "0.7", features = ["getrandom"] } blake3 = "1.3.1" chacha20 = "0.9.0" ed25519-dalek = "1.0.1" -fastbloom-rs = "0.5.3" +bloomfilter = { version = "1.0.13", features = ["random","serde"] } curve25519-dalek = "3.2.0" threshold_crypto = "0.4.0" crypto_box = { version = "0.8.2", features = ["seal"] } diff --git a/ng-repo/src/branch.rs b/ng-repo/src/branch.rs index 16f30cb..ab709c9 100644 --- a/ng-repo/src/branch.rs +++ b/ng-repo/src/branch.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt; -use fastbloom_rs::{BloomFilter as Filter, Membership}; +use bloomfilter::Bloom; use zeroize::Zeroize; use crate::errors::*; @@ -51,6 +51,7 @@ impl BranchV0 { #[derive(Debug)] pub struct DagNode { pub future: HashSet, + pub past: HashSet, } struct Dag<'a>(&'a HashMap); @@ -58,7 +59,7 @@ struct Dag<'a>(&'a HashMap); impl fmt::Display for DagNode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { for fu in self.future.iter() { - write!(f, "{}", fu)?; + write!(f, "{} ", fu)?; } Ok(()) } @@ -77,15 +78,28 @@ impl DagNode { fn new() -> Self { Self { future: HashSet::new(), + past: HashSet::new(), } } - fn collapse(id: &ObjectId, dag: &HashMap) -> Vec { - let mut res = vec![*id]; + fn collapse( + id: &ObjectId, + dag: &HashMap, + already_in: &mut HashSet, + ) -> Vec { let this = dag.get(id).unwrap(); - for child in this.future.iter() { - res.append(&mut Self::collapse(child, dag)); + + if this.past.len() > 1 && !this.past.is_subset(already_in) { + // we postpone it + // log_debug!("postponed {}", id); + vec![] + } else { + let mut res = vec![*id]; + already_in.insert(*id); + for child in this.future.iter() { + res.append(&mut Self::collapse(child, dag, already_in)); + } + res } - res } } @@ -165,7 +179,7 @@ impl Branch { missing: &mut Option<&mut HashSet>, future: Option, theirs_found: &mut Option<&mut HashSet>, - theirs_filter: &Option, + theirs_filter: &Option>, ) -> Result<(), ObjectParseError> { let id = cobj.id(); @@ -173,7 +187,7 @@ impl Branch { // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads let found_in_filter = if let Some(filter) = theirs_filter { - filter.contains(id.slice()) + filter.check(&id) } else { false }; @@ -185,12 +199,14 @@ impl Branch { past.future.insert(f); } } else { - let mut insert = DagNode::new(); + let mut new_node_to_insert = DagNode::new(); if let Some(f) = future { - insert.future.insert(f); + new_node_to_insert.future.insert(f); } - visited.insert(id, insert); - for past_id in cobj.acks_and_nacks() { + let pasts = cobj.acks_and_nacks(); + new_node_to_insert.past.extend(pasts.iter().cloned()); + visited.insert(id, new_node_to_insert); + for past_id in pasts { match Object::load(past_id, None, store) { Ok(o) => { Self::load_causal_past( @@ -258,9 +274,14 @@ impl Branch { let theirs: HashSet = theirs.keys().into_iter().cloned().collect(); - let filter = known_commits - .as_ref() - .map(|their_filter| Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k)); + let filter = if let Some(filter) = known_commits.as_ref() { + Some( + serde_bare::from_slice(filter.filter()) + .map_err(|_| ObjectParseError::FilterDeserializationError)?, + ) + } else { + None + }; // collect all commits reachable from target_heads // up to the root or until encountering a commit from theirs @@ -292,15 +313,22 @@ impl Branch { let all = HashSet::from_iter(visited.keys()); let first_generation = all.difference(&next_generations); - let mut result = Vec::with_capacity(visited.len()); + let mut already_in: HashSet = HashSet::new(); + + let sub_dag_to_send_size = visited.len(); + let mut result = Vec::with_capacity(sub_dag_to_send_size); for first in first_generation { - result.append(&mut DagNode::collapse(first, &visited)); + result.append(&mut DagNode::collapse(first, &visited, &mut already_in)); } - // #[cfg(debug_assertions)] - // for _res in result.iter() { - // log_debug!("sending missing commit {}", _res); - // } + if result.len() != sub_dag_to_send_size || already_in.len() != sub_dag_to_send_size { + return Err(ObjectParseError::MalformedDag); + } + + #[cfg(debug_assertions)] + for _res in result.iter() { + log_debug!("sending missing commit {}", _res); + } Ok(result) } @@ -310,7 +338,7 @@ impl Branch { #[cfg(test)] mod test { - //use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership}; + //use use bloomfilter::Bloom; use crate::branch::*; diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index f3c26d5..259a78e 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -192,6 +192,9 @@ pub enum ObjectParseError { ObjectDeserializeError, MissingHeaderBlocks((Object, Vec)), + + MalformedDag, + FilterDeserializationError, } #[derive(Debug, PartialEq, Eq, Clone)] diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index e97c87b..b17d056 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -338,24 +338,24 @@ impl fmt::Display for RelTime { /// Bloom filter (variable size) #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct BloomFilter { - /// Number of hash functions - pub k: u32, - +pub struct BloomFilterV0 { /// Filter #[serde(with = "serde_bytes")] pub f: Vec, } -/// Bloom filter (128 B) -/// -/// (m=1024; k=7; p=0.01; n=107) -pub type BloomFilter128 = [[u8; 32]; 4]; +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum BloomFilter { + V0(BloomFilterV0), +} -/// Bloom filter (1 KiB) -/// -/// (m=8192; k=7; p=0.01; n=855) -pub type BloomFilter1K = [[u8; 32]; 32]; +impl BloomFilter { + pub fn filter(&self) -> &Vec { + match self { + Self::V0(v0) => &v0.f, + } + } +} // // REPOSITORY TYPES diff --git a/ng-verifier/Cargo.toml b/ng-verifier/Cargo.toml index d8fece7..c63581c 100644 --- a/ng-verifier/Cargo.toml +++ b/ng-verifier/Cargo.toml @@ -31,7 +31,7 @@ async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] } oxigraph = { git = "https://git.nextgraph.org/NextGraph/oxigraph.git", branch="main" } automerge = "0.5.9" yrs = "0.18.2" -fastbloom-rs = "0.5.3" +bloomfilter = { version = "1.0.13", features = ["random","serde"] } ng-repo = { path = "../ng-repo", version = "0.1.0" } ng-net = { path = "../ng-net", version = "0.1.0" } diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 7083510..7855391 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -21,7 +21,7 @@ use std::{collections::HashMap, sync::Arc}; use async_std::stream::StreamExt; use async_std::sync::{Mutex, RwLockReadGuard}; -use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Hashes, Membership}; +use bloomfilter::Bloom; use futures::channel::mpsc; use futures::SinkExt; use ng_repo::object::Object; @@ -800,6 +800,7 @@ impl Verifier { pub async fn connection_opened(&mut self, peer: DirectPeerId) -> Result<(), NgError> { self.connected_server_id = Some(peer); + log_info!("CONNECTION ESTABLISHED WITH peer {}", peer); if let Err(e) = self.bootstrap().await { self.connected_server_id = None; return Err(e); @@ -820,10 +821,18 @@ impl Verifier { let user = self.config.user_priv_key.to_pub(); let broker = BROKER.read().await; + //log_info!("looping on branches {:?}", branches); for (repo, branch, publisher) in branches { - let _ = self + //log_info!("open_branch_ repo {} branch {}", repo, branch); + let _e = self .open_branch_(&repo, &branch, publisher, &broker, &user, &peer, false) .await; + // log_info!( + // "END OF open_branch_ repo {} branch {} with {:?}", + // repo, + // branch, + // _e + // ); // discarding error. } Ok(()) @@ -991,6 +1000,7 @@ impl Verifier { // ); if as_publisher && !pin_status.is_topic_subscribed_as_publisher(topic_id) { need_sub = true; + //log_info!("need_sub forced to true"); } else { for topic in pin_status.topics() { if topic.topic_id() == topic_id { @@ -1298,6 +1308,7 @@ impl Verifier { remote_commits_nbr: u64, ) -> Result<(), NgError> { let (store, msg, branch_secret) = { + //log_info!("do_sync_req_if_needed for branch {}", branch_id); if remote_commits_nbr == 0 || remote_heads.is_empty() { log_info!("branch is new on the broker. doing nothing"); return Ok(()); @@ -1350,16 +1361,14 @@ impl Verifier { // prepare bloom filter let expected_elements = remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr); - let mut config = FilterBuilder::new(expected_elements, 0.01); - config.enable_repeat_insert(false); - let mut filter = Filter::new(config); + let mut filter = + Bloom::::new_for_fp_rate(expected_elements as usize, 0.01); for commit_id in visited.keys() { - filter.add(commit_id.slice()); + filter.set(commit_id); } - Some(BloomFilter { - k: filter.hashes(), - f: filter.get_u8_array().to_vec(), - }) + Some(BloomFilter::V0(BloomFilterV0 { + f: serde_bare::to_vec(&filter)?, + })) } };