Compare commits

..

2 Commits

  1. 52
      Cargo.lock
  2. 2
      ng-oxigraph/src/oxigraph/storage/backend/fallback.rs
  3. 2
      ng-oxigraph/src/oxigraph/storage/backend/oxi_rocksdb.rs
  4. 2
      ng-oxigraph/src/oxigraph/storage/mod.rs
  5. 4
      ng-oxigraph/src/oxigraph/store.rs
  6. 2
      ng-repo/Cargo.toml
  7. 10
      ng-repo/src/branch.rs
  8. 23
      ng-repo/src/types.rs
  9. 2
      ng-verifier/Cargo.toml
  10. 22
      ng-verifier/src/commits/transaction.rs
  11. 12
      ng-verifier/src/verifier.rs

52
Cargo.lock generated

@ -562,15 +562,6 @@ dependencies = [
"syn 2.0.58", "syn 2.0.58",
] ]
[[package]]
name = "bit-vec"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "bit_field" name = "bit_field"
version = "0.10.2" version = "0.10.2"
@ -670,17 +661,6 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "bloomfilter"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b64d54e47a7f4fd723f082e8f11429f3df6ba8adaeca355a76556f9f0602bbcf"
dependencies = [
"bit-vec",
"getrandom 0.2.10",
"siphasher 1.0.1",
]
[[package]] [[package]]
name = "brotli" name = "brotli"
version = "3.3.4" version = "3.3.4"
@ -1743,6 +1723,12 @@ dependencies = [
"getrandom 0.2.10", "getrandom 0.2.10",
] ]
[[package]]
name = "fastrange-rs"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e90a1392cd6ec5ebe42ccaf251f2b7ba6be654c377f05c913f3898bfb2172512"
[[package]] [[package]]
name = "fdeflate" name = "fdeflate"
version = "0.3.0" version = "0.3.0"
@ -3463,7 +3449,6 @@ version = "0.1.0-preview.1"
dependencies = [ dependencies = [
"base64-url", "base64-url",
"blake3", "blake3",
"bloomfilter",
"chacha20", "chacha20",
"crypto_box", "crypto_box",
"current_platform", "current_platform",
@ -3478,6 +3463,7 @@ dependencies = [
"once_cell", "once_cell",
"os_info", "os_info",
"rand 0.7.3", "rand 0.7.3",
"sbbf-rs-safe",
"serde", "serde",
"serde_bare", "serde_bare",
"serde_bytes", "serde_bytes",
@ -3566,7 +3552,6 @@ dependencies = [
"async-std", "async-std",
"async-trait", "async-trait",
"automerge", "automerge",
"bloomfilter",
"either", "either",
"futures", "futures",
"getrandom 0.2.10", "getrandom 0.2.10",
@ -3575,6 +3560,7 @@ dependencies = [
"ng-repo", "ng-repo",
"ng-storage-rocksdb", "ng-storage-rocksdb",
"rand 0.7.3", "rand 0.7.3",
"sbbf-rs-safe",
"serde", "serde",
"serde_bare", "serde_bare",
"serde_bytes", "serde_bytes",
@ -4904,6 +4890,25 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "sbbf-rs"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5525db49c7719816ac719ea8ffd0d0b4586db1a3f5d3e7751593230dacc642fd"
dependencies = [
"cpufeatures",
"fastrange-rs",
]
[[package]]
name = "sbbf-rs-safe"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9902ffeb2cff3f8c072c60c7d526ac9560fc9a66fe1dfc3c240eba5e2151ba3c"
dependencies = [
"sbbf-rs",
]
[[package]] [[package]]
name = "schannel" name = "schannel"
version = "0.1.22" version = "0.1.22"
@ -5242,9 +5247,6 @@ name = "siphasher"
version = "1.0.1" version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "sized-chunks" name = "sized-chunks"

@ -81,7 +81,7 @@ impl Db {
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>, mut f: impl FnMut(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap()))); let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap())));
let res = f(t.clone()); let res = f(t.clone());

@ -714,7 +714,7 @@ impl Db {
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>, mut f: impl FnMut(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
if let DbKind::ReadWrite(db) = &self.inner { if let DbKind::ReadWrite(db) = &self.inner {
loop { loop {

@ -378,7 +378,7 @@ impl Storage {
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
f: impl Fn(StorageWriter<'a>) -> Result<T, E>, mut f: impl FnMut(StorageWriter<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
self.db.ng_transaction(|transaction| { self.db.ng_transaction(|transaction| {
f(StorageWriter { f(StorageWriter {

@ -439,7 +439,7 @@ impl Store {
/// ``` /// ```
fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>, mut f: impl FnMut(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
self.storage self.storage
.ng_transaction(|writer| f(Transaction { writer })) .ng_transaction(|writer| f(Transaction { writer }))
@ -524,7 +524,7 @@ impl Store {
#[doc(hidden)] #[doc(hidden)]
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>, mut f: impl FnMut(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
self.storage self.storage
.ng_transaction(|writer| f(Transaction { writer })) .ng_transaction(|writer| f(Transaction { writer }))

@ -34,7 +34,7 @@ rand = { version = "0.7", features = ["getrandom"] }
blake3 = "1.3.1" blake3 = "1.3.1"
chacha20 = "0.9.0" chacha20 = "0.9.0"
ed25519-dalek = "1.0.1" ed25519-dalek = "1.0.1"
bloomfilter = { version = "1.0.13", features = ["random","serde"] } sbbf-rs-safe = "0.3.2"
curve25519-dalek = "3.2.0" curve25519-dalek = "3.2.0"
threshold_crypto = "0.4.0" threshold_crypto = "0.4.0"
crypto_box = { version = "0.8.2", features = ["seal"] } crypto_box = { version = "0.8.2", features = ["seal"] }

@ -13,7 +13,7 @@ use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
use bloomfilter::Bloom; use sbbf_rs_safe::Filter;
use zeroize::Zeroize; use zeroize::Zeroize;
use crate::errors::*; use crate::errors::*;
@ -187,7 +187,7 @@ impl Branch {
missing: &mut Option<&mut HashSet<ObjectId>>, missing: &mut Option<&mut HashSet<ObjectId>>,
future: Option<ObjectId>, future: Option<ObjectId>,
theirs_found: &mut Option<&mut HashSet<ObjectId>>, theirs_found: &mut Option<&mut HashSet<ObjectId>>,
theirs_filter: &Option<Bloom<ObjectId>>, theirs_filter: &Option<Filter>,
) -> Result<(), ObjectParseError> { ) -> Result<(), ObjectParseError> {
let id = cobj.id(); let id = cobj.id();
@ -195,7 +195,8 @@ impl Branch {
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
let found_in_filter = if let Some(filter) = theirs_filter { let found_in_filter = if let Some(filter) = theirs_filter {
filter.check(&id) let hash = id.get_hash();
filter.contains_hash(hash)
} else { } else {
false false
}; };
@ -284,8 +285,7 @@ impl Branch {
let filter = if let Some(filter) = known_commits.as_ref() { let filter = if let Some(filter) = known_commits.as_ref() {
Some( Some(
serde_bare::from_slice(filter.filter()) filter.filter(), //.map_err(|_| ObjectParseError::FilterDeserializationError)?,
.map_err(|_| ObjectParseError::FilterDeserializationError)?,
) )
} else { } else {
None None

@ -13,9 +13,11 @@
use core::fmt; use core::fmt;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::hash::Hash; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use sbbf_rs_safe::Filter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use threshold_crypto::serde_impl::SerdeSecret; use threshold_crypto::serde_impl::SerdeSecret;
use zeroize::{Zeroize, ZeroizeOnDrop}; use zeroize::{Zeroize, ZeroizeOnDrop};
@ -70,6 +72,16 @@ impl Digest {
Self::Blake3Digest32(o) => o, Self::Blake3Digest32(o) => o,
} }
} }
/// returns a hash that is consistent across platforms (32/64 bits. important for WASM32 compatibility with the rest)
/// see https://www.reddit.com/r/rust/comments/fwpki6/a_debugging_mystery_hashing_slices_in_wasm_works/
pub fn get_hash(&self) -> u64 {
let mut hasher = DefaultHasher::new();
let ser = serde_bare::to_vec(&self).unwrap();
for e in ser {
e.hash(&mut hasher);
}
hasher.finish()
}
} }
impl fmt::Display for Digest { impl fmt::Display for Digest {
@ -373,10 +385,15 @@ pub enum BloomFilter {
} }
impl BloomFilter { impl BloomFilter {
pub fn filter(&self) -> &Vec<u8> { pub fn filter(&self) -> Filter {
match self { match self {
Self::V0(v0) => &v0.f, Self::V0(v0) => Filter::from_bytes(&v0.f).unwrap(),
}
} }
pub fn from_filter(filter: &Filter) -> Self {
BloomFilter::V0(BloomFilterV0 {
f: filter.as_bytes().to_vec(),
})
} }
} }

@ -31,7 +31,7 @@ async-trait = "0.1.64"
async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] } async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] }
automerge = "0.5.9" automerge = "0.5.9"
yrs = "0.18.2" yrs = "0.18.2"
bloomfilter = { version = "1.0.13", features = ["random","serde"] } sbbf-rs-safe = "0.3.2"
ng-repo = { path = "../ng-repo", version = "0.1.0-preview.1" } ng-repo = { path = "../ng-repo", version = "0.1.0-preview.1" }
ng-net = { path = "../ng-net", version = "0.1.0-preview.1" } ng-net = { path = "../ng-net", version = "0.1.0-preview.1" }
ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.7-ngpreview6" } ng-oxigraph = { path = "../ng-oxigraph", version = "0.4.0-alpha.7-ngpreview6" }

@ -404,8 +404,11 @@ impl Verifier {
self.update_graph(updates).await self.update_graph(updates).await
} }
async fn update_graph(&mut self, updates: Vec<BranchUpdateInfo>) -> Result<(), VerifierError> { async fn update_graph(
let updates_ref = &updates; &mut self,
mut updates: Vec<BranchUpdateInfo>,
) -> Result<(), VerifierError> {
let updates_ref = &mut updates;
let res = self let res = self
.graph_dataset .graph_dataset
.as_ref() .as_ref()
@ -414,7 +417,7 @@ impl Verifier {
move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> { move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> {
let reader = transaction.ng_get_reader(); let reader = transaction.ng_get_reader();
for update in updates_ref { for update in updates_ref.iter_mut() {
let commit_name = let commit_name =
NuriV0::commit_graph_name(&update.commit_id, &update.overlay_id); NuriV0::commit_graph_name(&update.commit_id, &update.overlay_id);
let commit_encoded = numeric_encoder::StrHash::new(&commit_name); let commit_encoded = numeric_encoder::StrHash::new(&commit_name);
@ -505,8 +508,8 @@ impl Verifier {
} else { } else {
REMOVED_IN_OTHER REMOVED_IN_OTHER
}; };
let mut to_remove_from_removes: HashSet<usize> = HashSet::new();
for remove in update.transaction.removes.iter() { for (pos, remove) in update.transaction.removes.iter().enumerate() {
let encoded_subject = remove.subject.as_ref().into(); let encoded_subject = remove.subject.as_ref().into();
let encoded_predicate = remove.predicate.as_ref().into(); let encoded_predicate = remove.predicate.as_ref().into();
let encoded_object = remove.object.as_ref().into(); let encoded_object = remove.object.as_ref().into();
@ -548,12 +551,19 @@ impl Verifier {
let triple_ref: TripleRef = remove.into(); let triple_ref: TripleRef = remove.into();
let quad_ref = triple_ref.in_graph(ov_graphname_ref); let quad_ref = triple_ref.in_graph(ov_graphname_ref);
transaction.remove(quad_ref)?; transaction.remove(quad_ref)?;
} else {
to_remove_from_removes.insert(pos);
} }
} }
} }
let mut idx: usize = 0;
update.transaction.removes.retain(|_| {
let retain = !to_remove_from_removes.remove(&idx);
idx += 1;
retain
});
} }
} }
Ok(()) Ok(())
}, },
) )

@ -23,9 +23,9 @@ use std::{collections::HashMap, sync::Arc};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Mutex, RwLockReadGuard}; use async_std::sync::{Mutex, RwLockReadGuard};
use bloomfilter::Bloom;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use sbbf_rs_safe::Filter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use web_time::SystemTime; use web_time::SystemTime;
@ -1585,6 +1585,7 @@ impl Verifier {
let mut theirs_found = HashSet::new(); let mut theirs_found = HashSet::new();
let mut visited = HashMap::new(); let mut visited = HashMap::new();
for our in ours_set.iter() { for our in ours_set.iter() {
//log_info!("OUR HEADS {}", our);
if let Ok(cobj) = Object::load(*our, None, &repo.store) { if let Ok(cobj) = Object::load(*our, None, &repo.store) {
let _ = Branch::load_causal_past( let _ = Branch::load_causal_past(
&cobj, &cobj,
@ -1611,14 +1612,11 @@ impl Verifier {
// prepare bloom filter // prepare bloom filter
let expected_elements = let expected_elements =
remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr); remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr);
let mut filter = let mut filter = Filter::new(27, expected_elements as usize);
Bloom::<ObjectId>::new_for_fp_rate(expected_elements as usize, 0.01);
for commit_id in visited.keys() { for commit_id in visited.keys() {
filter.set(commit_id); filter.insert_hash(commit_id.get_hash());
} }
Some(BloomFilter::V0(BloomFilterV0 { Some(BloomFilter::from_filter(&filter))
f: serde_bare::to_vec(&filter)?,
}))
} }
}; };

Loading…
Cancel
Save