Compare commits

..

No commits in common. '263df00e93f0c6177d8817ee8f66fcd721a42b56' and '9fbeccc3f57b5af4c28f2669977a3ac88485be71' have entirely different histories.

  1. 52
      Cargo.lock
  2. 2
      ng-oxigraph/src/oxigraph/storage/backend/fallback.rs
  3. 2
      ng-oxigraph/src/oxigraph/storage/backend/oxi_rocksdb.rs
  4. 2
      ng-oxigraph/src/oxigraph/storage/mod.rs
  5. 4
      ng-oxigraph/src/oxigraph/store.rs
  6. 2
      ng-repo/Cargo.toml
  7. 14
      ng-repo/src/branch.rs
  8. 23
      ng-repo/src/types.rs
  9. 2
      ng-verifier/Cargo.toml
  10. 22
      ng-verifier/src/commits/transaction.rs
  11. 12
      ng-verifier/src/verifier.rs

52
Cargo.lock generated

@ -562,6 +562,15 @@ dependencies = [
"syn 2.0.58", "syn 2.0.58",
] ]
[[package]]
name = "bit-vec"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "bit_field" name = "bit_field"
version = "0.10.2" version = "0.10.2"
@ -661,6 +670,17 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "bloomfilter"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b64d54e47a7f4fd723f082e8f11429f3df6ba8adaeca355a76556f9f0602bbcf"
dependencies = [
"bit-vec",
"getrandom 0.2.10",
"siphasher 1.0.1",
]
[[package]] [[package]]
name = "brotli" name = "brotli"
version = "3.3.4" version = "3.3.4"
@ -1723,12 +1743,6 @@ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.10",
] ]
[[package]]
name = "fastrange-rs"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e90a1392cd6ec5ebe42ccaf251f2b7ba6be654c377f05c913f3898bfb2172512"
[[package]] [[package]]
name = "fdeflate" name = "fdeflate"
version = "0.3.0" version = "0.3.0"
@ -3449,6 +3463,7 @@ version = "0.1.0-preview.1"
dependencies = [ dependencies = [
"base64-url", "base64-url",
"blake3", "blake3",
"bloomfilter",
"chacha20", "chacha20",
"crypto_box", "crypto_box",
"current_platform", "current_platform",
@ -3463,7 +3478,6 @@ dependencies = [
"once_cell", "once_cell",
"os_info", "os_info",
"rand 0.7.3", "rand 0.7.3",
"sbbf-rs-safe",
"serde", "serde",
"serde_bare", "serde_bare",
"serde_bytes", "serde_bytes",
@ -3552,6 +3566,7 @@ dependencies = [
"async-std", "async-std",
"async-trait", "async-trait",
"automerge", "automerge",
"bloomfilter",
"either", "either",
"futures", "futures",
"getrandom 0.2.10", "getrandom 0.2.10",
@ -3560,7 +3575,6 @@ dependencies = [
"ng-repo", "ng-repo",
"ng-storage-rocksdb", "ng-storage-rocksdb",
"rand 0.7.3", "rand 0.7.3",
"sbbf-rs-safe",
"serde", "serde",
"serde_bare", "serde_bare",
"serde_bytes", "serde_bytes",
@ -4890,25 +4904,6 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "sbbf-rs"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5525db49c7719816ac719ea8ffd0d0b4586db1a3f5d3e7751593230dacc642fd"
dependencies = [
"cpufeatures",
"fastrange-rs",
]
[[package]]
name = "sbbf-rs-safe"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9902ffeb2cff3f8c072c60c7d526ac9560fc9a66fe1dfc3c240eba5e2151ba3c"
dependencies = [
"sbbf-rs",
]
[[package]] [[package]]
name = "schannel" name = "schannel"
version = "0.1.22" version = "0.1.22"
@ -5247,6 +5242,9 @@ name = "siphasher"
version = "1.0.1" version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "sized-chunks" name = "sized-chunks"

@ -81,7 +81,7 @@ impl Db {
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
mut f: impl FnMut(Transaction<'a>) -> Result<T, E>, f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap()))); let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap())));
let res = f(t.clone()); let res = f(t.clone());

@ -714,7 +714,7 @@ impl Db {
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
mut f: impl FnMut(Transaction<'a>) -> Result<T, E>, f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
if let DbKind::ReadWrite(db) = &self.inner { if let DbKind::ReadWrite(db) = &self.inner {
loop { loop {

@ -378,7 +378,7 @@ impl Storage {
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
mut f: impl FnMut(StorageWriter<'a>) -> Result<T, E>, f: impl Fn(StorageWriter<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
self.db.ng_transaction(|transaction| { self.db.ng_transaction(|transaction| {
f(StorageWriter { f(StorageWriter {

@ -439,7 +439,7 @@ impl Store {
/// ``` /// ```
fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
mut f: impl FnMut(Transaction<'a>) -> Result<T, E>, f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
self.storage self.storage
.ng_transaction(|writer| f(Transaction { writer })) .ng_transaction(|writer| f(Transaction { writer }))
@ -524,7 +524,7 @@ impl Store {
#[doc(hidden)] #[doc(hidden)]
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
mut f: impl FnMut(Transaction<'a>) -> Result<T, E>, f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
self.storage self.storage
.ng_transaction(|writer| f(Transaction { writer })) .ng_transaction(|writer| f(Transaction { writer }))

@ -34,7 +34,7 @@ rand = { version = "0.7", features = ["getrandom"] }
blake3 = "1.3.1" blake3 = "1.3.1"
chacha20 = "0.9.0" chacha20 = "0.9.0"
ed25519-dalek = "1.0.1" ed25519-dalek = "1.0.1"
sbbf-rs-safe = "0.3.2" bloomfilter = { version = "1.0.13", features = ["random","serde"] }
curve25519-dalek = "3.2.0" curve25519-dalek = "3.2.0"
threshold_crypto = "0.4.0" threshold_crypto = "0.4.0"
crypto_box = { version = "0.8.2", features = ["seal"] } crypto_box = { version = "0.8.2", features = ["seal"] }

@ -13,7 +13,7 @@ use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
use sbbf_rs_safe::Filter; use bloomfilter::Bloom;
use zeroize::Zeroize; use zeroize::Zeroize;
use crate::errors::*; use crate::errors::*;
@ -187,7 +187,7 @@ impl Branch {
missing: &mut Option<&mut HashSet<ObjectId>>, missing: &mut Option<&mut HashSet<ObjectId>>,
future: Option<ObjectId>, future: Option<ObjectId>,
theirs_found: &mut Option<&mut HashSet<ObjectId>>, theirs_found: &mut Option<&mut HashSet<ObjectId>>,
theirs_filter: &Option<Filter>, theirs_filter: &Option<Bloom<ObjectId>>,
) -> Result<(), ObjectParseError> { ) -> Result<(), ObjectParseError> {
let id = cobj.id(); let id = cobj.id();
@ -195,8 +195,7 @@ impl Branch {
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
let found_in_filter = if let Some(filter) = theirs_filter { let found_in_filter = if let Some(filter) = theirs_filter {
let hash = id.get_hash(); filter.check(&id)
filter.contains_hash(hash)
} else { } else {
false false
}; };
@ -277,7 +276,7 @@ impl Branch {
// we silently discard any load error on the known_heads as the responder might not know them (yet). // we silently discard any load error on the known_heads as the responder might not know them (yet).
} }
// log_debug!("their causal past \n{}", Dag(&theirs)); //log_debug!("their causal past \n{}", Dag(&theirs));
let mut visited = HashMap::new(); let mut visited = HashMap::new();
@ -285,7 +284,8 @@ impl Branch {
let filter = if let Some(filter) = known_commits.as_ref() { let filter = if let Some(filter) = known_commits.as_ref() {
Some( Some(
filter.filter(), //.map_err(|_| ObjectParseError::FilterDeserializationError)?, serde_bare::from_slice(filter.filter())
.map_err(|_| ObjectParseError::FilterDeserializationError)?,
) )
} else { } else {
None None
@ -309,7 +309,7 @@ impl Branch {
// we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally. // we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
} }
// log_debug!("what we have here \n{}", Dag(&visited)); //log_debug!("what we have here \n{}", Dag(&visited));
// now ordering to respect causal partial order. // now ordering to respect causal partial order.
let mut next_generations = HashSet::new(); let mut next_generations = HashSet::new();

@ -13,11 +13,9 @@
use core::fmt; use core::fmt;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::hash_map::DefaultHasher; use std::hash::Hash;
use std::hash::{Hash, Hasher};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use sbbf_rs_safe::Filter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use threshold_crypto::serde_impl::SerdeSecret; use threshold_crypto::serde_impl::SerdeSecret;
use zeroize::{Zeroize, ZeroizeOnDrop}; use zeroize::{Zeroize, ZeroizeOnDrop};
@ -72,16 +70,6 @@ impl Digest {
Self::Blake3Digest32(o) => o, Self::Blake3Digest32(o) => o,
} }
} }
/// returns a hash that is consistent across platforms (32/64 bits. important for WASM32 compatibility with the rest)
/// see https://www.reddit.com/r/rust/comments/fwpki6/a_debugging_mystery_hashing_slices_in_wasm_works/
pub fn get_hash(&self) -> u64 {
let mut hasher = DefaultHasher::new();
let ser = serde_bare::to_vec(&self).unwrap();
for e in ser {
e.hash(&mut hasher);
}
hasher.finish()
}
} }
impl fmt::Display for Digest { impl fmt::Display for Digest {
@ -385,16 +373,11 @@ pub enum BloomFilter {
} }
impl BloomFilter { impl BloomFilter {
pub fn filter(&self) -> Filter { pub fn filter(&self) -> &Vec<u8> {
match self { match self {
Self::V0(v0) => Filter::from_bytes(&v0.f).unwrap(), Self::V0(v0) => &v0.f,
} }
} }
pub fn from_filter(filter: &Filter) -> Self {
BloomFilter::V0(BloomFilterV0 {
f: filter.as_bytes().to_vec(),
})
}
} }
// //

@ -31,7 +31,7 @@ async-trait = "0.1.64"
async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] } async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] }
automerge = "0.5.9" automerge = "0.5.9"
yrs = "0.18.2" yrs = "0.18.2"
sbbf-rs-safe = "0.3.2" bloomfilter = { version = "1.0.13", features = ["random","serde"] }
ng-repo = { path = "../ng-repo", version = "0.1.0-preview.1" } ng-repo = { path = "../ng-repo", version = "0.1.0-preview.1" }
ng-net = { path = "../ng-net", version = "0.1.0-preview.1" } ng-net = { path = "../ng-net", version = "0.1.0-preview.1" }
ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.7-ngpreview6" } ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.7-ngpreview6" }

@ -404,11 +404,8 @@ impl Verifier {
self.update_graph(updates).await self.update_graph(updates).await
} }
async fn update_graph( async fn update_graph(&mut self, updates: Vec<BranchUpdateInfo>) -> Result<(), VerifierError> {
&mut self, let updates_ref = &updates;
mut updates: Vec<BranchUpdateInfo>,
) -> Result<(), VerifierError> {
let updates_ref = &mut updates;
let res = self let res = self
.graph_dataset .graph_dataset
.as_ref() .as_ref()
@ -417,7 +414,7 @@ impl Verifier {
move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> { move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> {
let reader = transaction.ng_get_reader(); let reader = transaction.ng_get_reader();
for update in updates_ref.iter_mut() { for update in updates_ref {
let commit_name = let commit_name =
NuriV0::commit_graph_name(&update.commit_id, &update.overlay_id); NuriV0::commit_graph_name(&update.commit_id, &update.overlay_id);
let commit_encoded = numeric_encoder::StrHash::new(&commit_name); let commit_encoded = numeric_encoder::StrHash::new(&commit_name);
@ -508,8 +505,8 @@ impl Verifier {
} else { } else {
REMOVED_IN_OTHER REMOVED_IN_OTHER
}; };
let mut to_remove_from_removes: HashSet<usize> = HashSet::new();
for (pos, remove) in update.transaction.removes.iter().enumerate() { for remove in update.transaction.removes.iter() {
let encoded_subject = remove.subject.as_ref().into(); let encoded_subject = remove.subject.as_ref().into();
let encoded_predicate = remove.predicate.as_ref().into(); let encoded_predicate = remove.predicate.as_ref().into();
let encoded_object = remove.object.as_ref().into(); let encoded_object = remove.object.as_ref().into();
@ -551,19 +548,12 @@ impl Verifier {
let triple_ref: TripleRef = remove.into(); let triple_ref: TripleRef = remove.into();
let quad_ref = triple_ref.in_graph(ov_graphname_ref); let quad_ref = triple_ref.in_graph(ov_graphname_ref);
transaction.remove(quad_ref)?; transaction.remove(quad_ref)?;
} else {
to_remove_from_removes.insert(pos);
} }
} }
} }
let mut idx: usize = 0;
update.transaction.removes.retain(|_| {
let retain = !to_remove_from_removes.remove(&idx);
idx += 1;
retain
});
} }
} }
Ok(()) Ok(())
}, },
) )

@ -23,9 +23,9 @@ use std::{collections::HashMap, sync::Arc};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Mutex, RwLockReadGuard}; use async_std::sync::{Mutex, RwLockReadGuard};
use bloomfilter::Bloom;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use sbbf_rs_safe::Filter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use web_time::SystemTime; use web_time::SystemTime;
@ -1585,7 +1585,6 @@ impl Verifier {
let mut theirs_found = HashSet::new(); let mut theirs_found = HashSet::new();
let mut visited = HashMap::new(); let mut visited = HashMap::new();
for our in ours_set.iter() { for our in ours_set.iter() {
//log_info!("OUR HEADS {}", our);
if let Ok(cobj) = Object::load(*our, None, &repo.store) { if let Ok(cobj) = Object::load(*our, None, &repo.store) {
let _ = Branch::load_causal_past( let _ = Branch::load_causal_past(
&cobj, &cobj,
@ -1612,11 +1611,14 @@ impl Verifier {
// prepare bloom filter // prepare bloom filter
let expected_elements = let expected_elements =
remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr); remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr);
let mut filter = Filter::new(27, expected_elements as usize); let mut filter =
Bloom::<ObjectId>::new_for_fp_rate(expected_elements as usize, 0.01);
for commit_id in visited.keys() { for commit_id in visited.keys() {
filter.insert_hash(commit_id.get_hash()); filter.set(commit_id);
} }
Some(BloomFilter::from_filter(&filter)) Some(BloomFilter::V0(BloomFilterV0 {
f: serde_bare::to_vec(&filter)?,
}))
} }
}; };

Loading…
Cancel
Save