switched to rust-bloom-filter as bloomfilter-rs is not portable across arch

pull/19/head
Niko PLP 7 months ago
parent 9e108733e6
commit 48fd6806b8
  1. 80
      Cargo.lock
  2. 4
      nextgraph/src/local_broker.rs
  3. 2
      ng-repo/Cargo.toml
  4. 74
      ng-repo/src/branch.rs
  5. 3
      ng-repo/src/errors.rs
  6. 24
      ng-repo/src/types.rs
  7. 2
      ng-verifier/Cargo.toml
  8. 29
      ng-verifier/src/verifier.rs

80
Cargo.lock generated

@ -564,6 +564,15 @@ dependencies = [
"syn 2.0.58", "syn 2.0.58",
] ]
[[package]]
name = "bit-vec"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "bit_field" name = "bit_field"
version = "0.10.2" version = "0.10.2"
@ -657,6 +666,17 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "bloomfilter"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b64d54e47a7f4fd723f082e8f11429f3df6ba8adaeca355a76556f9f0602bbcf"
dependencies = [
"bit-vec",
"getrandom 0.2.10",
"siphasher 1.0.1",
]
[[package]] [[package]]
name = "brotli" name = "brotli"
version = "3.3.4" version = "3.3.4"
@ -1215,17 +1235,6 @@ dependencies = [
"cipher", "cipher",
] ]
[[package]]
name = "cuckoofilter"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b810a8449931679f64cd7eef1bbd0fa315801b6d5d9cdc1ace2804d6529eee18"
dependencies = [
"byteorder",
"fnv",
"rand 0.7.3",
]
[[package]] [[package]]
name = "current_platform" name = "current_platform"
version = "0.2.0" version = "0.2.0"
@ -1598,24 +1607,6 @@ dependencies = [
"synstructure", "synstructure",
] ]
[[package]]
name = "fastbloom-rs"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f04c5562065adb6d23e97f342cb73a73ed94ebfdb8d96a92a85d6fb93f1cdfed"
dependencies = [
"cuckoofilter",
"fastmurmur3",
"xorfilter-rs",
"xxhash-rust",
]
[[package]]
name = "fastmurmur3"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d922f481ae01f2a3f1fff7b9e0e789f18f0c755a38ec983a3e6f37762cdcc2a2"
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "1.9.0" version = "1.9.0"
@ -3268,13 +3259,13 @@ version = "0.1.0"
dependencies = [ dependencies = [
"base64-url", "base64-url",
"blake3", "blake3",
"bloomfilter",
"chacha20", "chacha20",
"crypto_box", "crypto_box",
"current_platform", "current_platform",
"curve25519-dalek 3.2.0", "curve25519-dalek 3.2.0",
"debug_print", "debug_print",
"ed25519-dalek", "ed25519-dalek",
"fastbloom-rs",
"futures", "futures",
"getrandom 0.2.10", "getrandom 0.2.10",
"gloo-timers", "gloo-timers",
@ -3335,8 +3326,8 @@ dependencies = [
"async-std", "async-std",
"async-trait", "async-trait",
"automerge", "automerge",
"bloomfilter",
"either", "either",
"fastbloom-rs",
"futures", "futures",
"getrandom 0.2.10", "getrandom 0.2.10",
"ng-net", "ng-net",
@ -3739,7 +3730,7 @@ dependencies = [
"rocksdb", "rocksdb",
"sha1", "sha1",
"sha2 0.10.7", "sha2 0.10.7",
"siphasher", "siphasher 0.3.10",
"sparesults", "sparesults",
"spargebra", "spargebra",
"sparopt", "sparopt",
@ -4055,7 +4046,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7"
dependencies = [ dependencies = [
"siphasher", "siphasher 0.3.10",
] ]
[[package]] [[package]]
@ -4064,7 +4055,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096"
dependencies = [ dependencies = [
"siphasher", "siphasher 0.3.10",
] ]
[[package]] [[package]]
@ -4990,6 +4981,15 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "siphasher"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "sized-chunks" name = "sized-chunks"
version = "0.6.5" version = "0.6.5"
@ -6826,12 +6826,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "xorfilter-rs"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47f9da296a88b6bc150b896d17770a62d4dc6f63ecf0ed10a9c08a1cb3d12f24"
[[package]] [[package]]
name = "xsalsa20poly1305" name = "xsalsa20poly1305"
version = "0.9.1" version = "0.9.1"
@ -6845,12 +6839,6 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "xxhash-rust"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "735a71d46c4d68d71d4b24d03fdc2b98e38cea81730595801db779c04fe80d70"
[[package]] [[package]]
name = "yrs" name = "yrs"
version = "0.18.2" version = "0.18.2"

@ -1455,6 +1455,10 @@ pub async fn user_connect_with_device_info(
if let Err(e) = if let Err(e) =
session.verifier.connection_opened(server_key).await session.verifier.connection_opened(server_key).await
{ {
log_err!(
"got error while processing opened connection {:?}",
e
);
Broker::close_all_connections().await; Broker::close_all_connections().await;
tried.as_mut().unwrap().3 = Some(e.to_string()); tried.as_mut().unwrap().3 = Some(e.to_string());
} }

@ -33,7 +33,7 @@ rand = { version = "0.7", features = ["getrandom"] }
blake3 = "1.3.1" blake3 = "1.3.1"
chacha20 = "0.9.0" chacha20 = "0.9.0"
ed25519-dalek = "1.0.1" ed25519-dalek = "1.0.1"
fastbloom-rs = "0.5.3" bloomfilter = { version = "1.0.13", features = ["random","serde"] }
curve25519-dalek = "3.2.0" curve25519-dalek = "3.2.0"
threshold_crypto = "0.4.0" threshold_crypto = "0.4.0"
crypto_box = { version = "0.8.2", features = ["seal"] } crypto_box = { version = "0.8.2", features = ["seal"] }

@ -13,7 +13,7 @@ use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
use fastbloom_rs::{BloomFilter as Filter, Membership}; use bloomfilter::Bloom;
use zeroize::Zeroize; use zeroize::Zeroize;
use crate::errors::*; use crate::errors::*;
@ -51,6 +51,7 @@ impl BranchV0 {
#[derive(Debug)] #[derive(Debug)]
pub struct DagNode { pub struct DagNode {
pub future: HashSet<ObjectId>, pub future: HashSet<ObjectId>,
pub past: HashSet<ObjectId>,
} }
struct Dag<'a>(&'a HashMap<Digest, DagNode>); struct Dag<'a>(&'a HashMap<Digest, DagNode>);
@ -58,7 +59,7 @@ struct Dag<'a>(&'a HashMap<Digest, DagNode>);
impl fmt::Display for DagNode { impl fmt::Display for DagNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for fu in self.future.iter() { for fu in self.future.iter() {
write!(f, "{}", fu)?; write!(f, "{} ", fu)?;
} }
Ok(()) Ok(())
} }
@ -77,15 +78,28 @@ impl DagNode {
fn new() -> Self { fn new() -> Self {
Self { Self {
future: HashSet::new(), future: HashSet::new(),
past: HashSet::new(),
} }
} }
fn collapse(id: &ObjectId, dag: &HashMap<ObjectId, DagNode>) -> Vec<ObjectId> { fn collapse(
let mut res = vec![*id]; id: &ObjectId,
dag: &HashMap<ObjectId, DagNode>,
already_in: &mut HashSet<ObjectId>,
) -> Vec<ObjectId> {
let this = dag.get(id).unwrap(); let this = dag.get(id).unwrap();
for child in this.future.iter() {
res.append(&mut Self::collapse(child, dag)); if this.past.len() > 1 && !this.past.is_subset(already_in) {
// we postpone it
// log_debug!("postponed {}", id);
vec![]
} else {
let mut res = vec![*id];
already_in.insert(*id);
for child in this.future.iter() {
res.append(&mut Self::collapse(child, dag, already_in));
}
res
} }
res
} }
} }
@ -165,7 +179,7 @@ impl Branch {
missing: &mut Option<&mut HashSet<ObjectId>>, missing: &mut Option<&mut HashSet<ObjectId>>,
future: Option<ObjectId>, future: Option<ObjectId>,
theirs_found: &mut Option<&mut HashSet<ObjectId>>, theirs_found: &mut Option<&mut HashSet<ObjectId>>,
theirs_filter: &Option<Filter>, theirs_filter: &Option<Bloom<ObjectId>>,
) -> Result<(), ObjectParseError> { ) -> Result<(), ObjectParseError> {
let id = cobj.id(); let id = cobj.id();
@ -173,7 +187,7 @@ impl Branch {
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
let found_in_filter = if let Some(filter) = theirs_filter { let found_in_filter = if let Some(filter) = theirs_filter {
filter.contains(id.slice()) filter.check(&id)
} else { } else {
false false
}; };
@ -185,12 +199,14 @@ impl Branch {
past.future.insert(f); past.future.insert(f);
} }
} else { } else {
let mut insert = DagNode::new(); let mut new_node_to_insert = DagNode::new();
if let Some(f) = future { if let Some(f) = future {
insert.future.insert(f); new_node_to_insert.future.insert(f);
} }
visited.insert(id, insert); let pasts = cobj.acks_and_nacks();
for past_id in cobj.acks_and_nacks() { new_node_to_insert.past.extend(pasts.iter().cloned());
visited.insert(id, new_node_to_insert);
for past_id in pasts {
match Object::load(past_id, None, store) { match Object::load(past_id, None, store) {
Ok(o) => { Ok(o) => {
Self::load_causal_past( Self::load_causal_past(
@ -258,9 +274,14 @@ impl Branch {
let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect(); let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect();
let filter = known_commits let filter = if let Some(filter) = known_commits.as_ref() {
.as_ref() Some(
.map(|their_filter| Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k)); serde_bare::from_slice(filter.filter())
.map_err(|_| ObjectParseError::FilterDeserializationError)?,
)
} else {
None
};
// collect all commits reachable from target_heads // collect all commits reachable from target_heads
// up to the root or until encountering a commit from theirs // up to the root or until encountering a commit from theirs
@ -292,15 +313,22 @@ impl Branch {
let all = HashSet::from_iter(visited.keys()); let all = HashSet::from_iter(visited.keys());
let first_generation = all.difference(&next_generations); let first_generation = all.difference(&next_generations);
let mut result = Vec::with_capacity(visited.len()); let mut already_in: HashSet<ObjectId> = HashSet::new();
let sub_dag_to_send_size = visited.len();
let mut result = Vec::with_capacity(sub_dag_to_send_size);
for first in first_generation { for first in first_generation {
result.append(&mut DagNode::collapse(first, &visited)); result.append(&mut DagNode::collapse(first, &visited, &mut already_in));
} }
// #[cfg(debug_assertions)] if result.len() != sub_dag_to_send_size || already_in.len() != sub_dag_to_send_size {
// for _res in result.iter() { return Err(ObjectParseError::MalformedDag);
// log_debug!("sending missing commit {}", _res); }
// }
#[cfg(debug_assertions)]
for _res in result.iter() {
log_debug!("sending missing commit {}", _res);
}
Ok(result) Ok(result)
} }
@ -310,7 +338,7 @@ impl Branch {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
//use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership}; //use use bloomfilter::Bloom;
use crate::branch::*; use crate::branch::*;

@ -192,6 +192,9 @@ pub enum ObjectParseError {
ObjectDeserializeError, ObjectDeserializeError,
MissingHeaderBlocks((Object, Vec<BlockId>)), MissingHeaderBlocks((Object, Vec<BlockId>)),
MalformedDag,
FilterDeserializationError,
} }
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]

@ -338,24 +338,24 @@ impl fmt::Display for RelTime {
/// Bloom filter (variable size) /// Bloom filter (variable size)
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct BloomFilter { pub struct BloomFilterV0 {
/// Number of hash functions
pub k: u32,
/// Filter /// Filter
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub f: Vec<u8>, pub f: Vec<u8>,
} }
/// Bloom filter (128 B) #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
/// pub enum BloomFilter {
/// (m=1024; k=7; p=0.01; n=107) V0(BloomFilterV0),
pub type BloomFilter128 = [[u8; 32]; 4]; }
/// Bloom filter (1 KiB) impl BloomFilter {
/// pub fn filter(&self) -> &Vec<u8> {
/// (m=8192; k=7; p=0.01; n=855) match self {
pub type BloomFilter1K = [[u8; 32]; 32]; Self::V0(v0) => &v0.f,
}
}
}
// //
// REPOSITORY TYPES // REPOSITORY TYPES

@ -31,7 +31,7 @@ async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] }
oxigraph = { git = "https://git.nextgraph.org/NextGraph/oxigraph.git", branch="main" } oxigraph = { git = "https://git.nextgraph.org/NextGraph/oxigraph.git", branch="main" }
automerge = "0.5.9" automerge = "0.5.9"
yrs = "0.18.2" yrs = "0.18.2"
fastbloom-rs = "0.5.3" bloomfilter = { version = "1.0.13", features = ["random","serde"] }
ng-repo = { path = "../ng-repo", version = "0.1.0" } ng-repo = { path = "../ng-repo", version = "0.1.0" }
ng-net = { path = "../ng-net", version = "0.1.0" } ng-net = { path = "../ng-net", version = "0.1.0" }

@ -21,7 +21,7 @@ use std::{collections::HashMap, sync::Arc};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Mutex, RwLockReadGuard}; use async_std::sync::{Mutex, RwLockReadGuard};
use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Hashes, Membership}; use bloomfilter::Bloom;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use ng_repo::object::Object; use ng_repo::object::Object;
@ -800,6 +800,7 @@ impl Verifier {
pub async fn connection_opened(&mut self, peer: DirectPeerId) -> Result<(), NgError> { pub async fn connection_opened(&mut self, peer: DirectPeerId) -> Result<(), NgError> {
self.connected_server_id = Some(peer); self.connected_server_id = Some(peer);
log_info!("CONNECTION ESTABLISHED WITH peer {}", peer);
if let Err(e) = self.bootstrap().await { if let Err(e) = self.bootstrap().await {
self.connected_server_id = None; self.connected_server_id = None;
return Err(e); return Err(e);
@ -820,10 +821,18 @@ impl Verifier {
let user = self.config.user_priv_key.to_pub(); let user = self.config.user_priv_key.to_pub();
let broker = BROKER.read().await; let broker = BROKER.read().await;
//log_info!("looping on branches {:?}", branches);
for (repo, branch, publisher) in branches { for (repo, branch, publisher) in branches {
let _ = self //log_info!("open_branch_ repo {} branch {}", repo, branch);
let _e = self
.open_branch_(&repo, &branch, publisher, &broker, &user, &peer, false) .open_branch_(&repo, &branch, publisher, &broker, &user, &peer, false)
.await; .await;
// log_info!(
// "END OF open_branch_ repo {} branch {} with {:?}",
// repo,
// branch,
// _e
// );
// discarding error. // discarding error.
} }
Ok(()) Ok(())
@ -991,6 +1000,7 @@ impl Verifier {
// ); // );
if as_publisher && !pin_status.is_topic_subscribed_as_publisher(topic_id) { if as_publisher && !pin_status.is_topic_subscribed_as_publisher(topic_id) {
need_sub = true; need_sub = true;
//log_info!("need_sub forced to true");
} else { } else {
for topic in pin_status.topics() { for topic in pin_status.topics() {
if topic.topic_id() == topic_id { if topic.topic_id() == topic_id {
@ -1298,6 +1308,7 @@ impl Verifier {
remote_commits_nbr: u64, remote_commits_nbr: u64,
) -> Result<(), NgError> { ) -> Result<(), NgError> {
let (store, msg, branch_secret) = { let (store, msg, branch_secret) = {
//log_info!("do_sync_req_if_needed for branch {}", branch_id);
if remote_commits_nbr == 0 || remote_heads.is_empty() { if remote_commits_nbr == 0 || remote_heads.is_empty() {
log_info!("branch is new on the broker. doing nothing"); log_info!("branch is new on the broker. doing nothing");
return Ok(()); return Ok(());
@ -1350,16 +1361,14 @@ impl Verifier {
// prepare bloom filter // prepare bloom filter
let expected_elements = let expected_elements =
remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr); remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr);
let mut config = FilterBuilder::new(expected_elements, 0.01); let mut filter =
config.enable_repeat_insert(false); Bloom::<ObjectId>::new_for_fp_rate(expected_elements as usize, 0.01);
let mut filter = Filter::new(config);
for commit_id in visited.keys() { for commit_id in visited.keys() {
filter.add(commit_id.slice()); filter.set(commit_id);
} }
Some(BloomFilter { Some(BloomFilter::V0(BloomFilterV0 {
k: filter.hashes(), f: serde_bare::to_vec(&filter)?,
f: filter.get_u8_array().to_vec(), }))
})
} }
}; };

Loading…
Cancel
Save