diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index c8af1cc..dfd92a8 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize}; use ng_repo::errors::NgError; use ng_repo::types::*; -use ng_repo::utils::{decode_id, decode_sym_key}; +use ng_repo::utils::{decode_id, decode_key, decode_sym_key}; use crate::types::*; @@ -23,6 +23,15 @@ lazy_static! { #[doc(hidden)] static ref RE_FILE_READ_CAP: Regex = Regex::new(r"^did:ng:j:([A-Za-z0-9-_%.]*):k:([A-Za-z0-9-_%.]*)$").unwrap(); + #[doc(hidden)] + static ref RE_REPO: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_%.]*):v:([A-Za-z0-9-_%.]*)$").unwrap(); + #[doc(hidden)] + static ref RE_BRANCH: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_%.]*):v:([A-Za-z0-9-_%.]*):b:([A-Za-z0-9-_%.]*)$").unwrap(); + #[doc(hidden)] + static ref RE_NAMED_BRANCH_OR_COMMIT: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_%.]*):v:([A-Za-z0-9-_%.]*):a:([A-Za-z0-9-_%.]*)$").unwrap(); //TODO: allow international chars. disallow digit as first char } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -68,6 +77,21 @@ pub enum TargetBranchV0 { Commits(Vec), // only possible if access to their branch is given. must belong to the same branch. } +impl TargetBranchV0 { + pub fn is_valid_for_sparql_update(&self) -> bool { + match self { + Self::Commits(_) => false, + _ => true, + } + } + pub fn branch_id(&self) -> &BranchId { + match self { + Self::BranchId(id) => id, + _ => panic!("not a TargetBranchV0::BranchId"), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum NuriTargetV0 { UserSite, // targets the whole data set of the user @@ -81,8 +105,33 @@ pub enum NuriTargetV0 { Group(String), // shortname of a Group Repo(RepoId), + + None, } +impl NuriTargetV0 { + pub fn is_valid_for_sparql_update(&self) -> bool { + match self { + Self::UserSite | Self::AllDialogs | Self::AllGroups => false, + _ => true, + } + } + pub fn is_repo_id(&self) -> bool { + match self { + Self::Repo(_) => true, + _ => false, + } + } + pub fn repo_id(&self) -> &RepoId { + match self { + Self::Repo(id) => id, + _ => panic!("not a NuriTargetV0::Repo"), + } + } +} + +const DID_PREFIX: &str = "did:ng"; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NuriV0 { pub identity: Option, // None for personal identity @@ -99,6 +148,61 @@ pub struct NuriV0 { } impl NuriV0 { + pub fn commit_graph_name(commit_id: &ObjectId, overlay_id: &OverlayId) -> String { + format!("{DID_PREFIX}:c:{commit_id}:v:{overlay_id}") + } + + pub fn commit_graph_name_from_base64(commit_base64: &String, overlay_id: &OverlayId) -> String { + format!("{DID_PREFIX}:c:{commit_base64}:v:{overlay_id}") + } + + pub fn repo_graph_name(repo_id: &RepoId, overlay_id: &OverlayId) -> String { + format!("{DID_PREFIX}:o:{repo_id}:v:{overlay_id}") + } + + pub fn overlay_id(overlay_id: &OverlayId) -> String { + format!("{DID_PREFIX}:v:{overlay_id}") + } + + pub fn topic_id(topic_id: &TopicId) -> String { + format!("{DID_PREFIX}:h:{topic_id}") + } + + pub fn branch_id(branch_id: &BranchId) -> String { + format!("{DID_PREFIX}:b:{branch_id}") + } + + pub fn branch_id_from_base64(branch_base64: &String) -> String { + format!("{DID_PREFIX}:b:{branch_base64}") + } + + pub fn token(token: &Digest) -> String { + format!("{DID_PREFIX}:n:{token}") + } + + pub fn is_branch_identifier(&self) -> bool { + self.locator.is_empty() + && self.topic.is_none() + && self.access.is_empty() + && self.overlay.as_ref().map_or(false, |o| o.is_outer()) + && self + .branch + .as_ref() + .map_or(true, |b| b.is_valid_for_sparql_update()) + && self.object.is_none() + && !self.entire_store + && self.target.is_repo_id() + } + + pub fn is_valid_for_sparql_update(&self) -> bool { + self.object.is_none() + && self.entire_store == false + && self.target.is_valid_for_sparql_update() + && self + .branch + .as_ref() + .map_or(true, |b| b.is_valid_for_sparql_update()) + } pub fn new_repo_target_from_string(repo_id_string: String) -> Result { let repo_id: RepoId = repo_id_string.as_str().try_into()?; Ok(Self { @@ -140,8 +244,8 @@ impl NuriV0 { locator: vec![], } } - pub fn new_from(from: String) -> Result { - let c = RE_FILE_READ_CAP.captures(&from); + pub fn new_from(from: &String) -> Result { + let c = RE_FILE_READ_CAP.captures(from); if c.is_some() && c.as_ref().unwrap().get(1).is_some() @@ -164,7 +268,58 @@ impl NuriV0 { locator: vec![], }) } else { - Err(NgError::InvalidNuri) + let c = RE_REPO.captures(from); + + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + && c.as_ref().unwrap().get(2).is_some() + { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + let v = cap.get(2).unwrap().as_str(); + let repo_id = decode_key(o)?; + let overlay_id = decode_id(v)?; + Ok(Self { + identity: None, + target: NuriTargetV0::Repo(repo_id), + entire_store: false, + object: None, + branch: None, + overlay: Some(OverlayLink::Outer(overlay_id)), + access: vec![], + topic: None, + locator: vec![], + }) + } else { + let c = RE_BRANCH.captures(from); + + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + && c.as_ref().unwrap().get(2).is_some() + && c.as_ref().unwrap().get(3).is_some() + { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + let v = cap.get(2).unwrap().as_str(); + let b = cap.get(3).unwrap().as_str(); + let repo_id = decode_key(o)?; + let overlay_id = decode_id(v)?; + let branch_id = decode_key(b)?; + Ok(Self { + identity: None, + target: NuriTargetV0::Repo(repo_id), + entire_store: false, + object: None, + branch: Some(TargetBranchV0::BranchId(branch_id)), + overlay: Some(OverlayLink::Outer(overlay_id)), + access: vec![], + topic: None, + locator: vec![], + }) + } else { + Err(NgError::InvalidNuri) + } + } } } } @@ -321,8 +476,12 @@ pub enum DocQuery { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GraphUpdate { - add: Vec, - remove: Vec, + // serialization of Vec + #[serde(with = "serde_bytes")] + pub inserts: Vec, + // serialization of Vec + #[serde(with = "serde_bytes")] + pub removes: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -405,9 +564,12 @@ pub enum DiscretePatch { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GraphPatch { - /// oxigraph::model::GroundQuad serialized to n-quads with oxrdfio - pub adds: Vec, - pub removes: Vec, + // serialization of Vec + #[serde(with = "serde_bytes")] + pub inserts: Vec, + // serialization of Vec + #[serde(with = "serde_bytes")] + pub removes: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -426,7 +588,9 @@ pub enum DiscreteState { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct GraphState { - pub tuples: Vec, + // serialization of Vec + #[serde(with = "serde_bytes")] + pub triples: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index fbb97fb..c7b33b3 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -1346,6 +1346,21 @@ pub enum OverlayLink { Public(PubKey), } +impl OverlayLink { + pub fn is_outer(&self) -> bool { + match self { + Self::Outer(_) => true, + _ => false, + } + } + pub fn outer(&self) -> &Digest { + match self { + Self::Outer(o) => o, + _ => panic!("not an outer overlay ID"), + } + } +} + /// Overlay session ID /// /// It is a pubkey used for signing all OverlayMessage sent by the peer. diff --git a/ng-oxigraph/src/oxigraph/mod.rs b/ng-oxigraph/src/oxigraph/mod.rs index 57a6bd0..a62babb 100644 --- a/ng-oxigraph/src/oxigraph/mod.rs +++ b/ng-oxigraph/src/oxigraph/mod.rs @@ -3,3 +3,16 @@ pub mod model; pub mod sparql; mod storage; pub mod store; + +pub mod storage_ng { + pub use super::storage::numeric_encoder; + pub use super::storage::ADDED_IN_MAIN; + pub use super::storage::ADDED_IN_OTHER; + pub use super::storage::BRANCH_PREFIX; + pub use super::storage::COMMIT_HAS_GRAPH; + pub use super::storage::COMMIT_PREFIX; + pub use super::storage::COMMIT_SKIP_NO_GRAPH; + pub use super::storage::REMOVED_IN_MAIN; + pub use super::storage::REMOVED_IN_OTHER; + pub use super::storage::REPO_IN_MAIN; +} diff --git a/ng-oxigraph/src/oxigraph/sparql/update.rs b/ng-oxigraph/src/oxigraph/sparql/update.rs index 3ee9c8a..95143c0 100644 --- a/ng-oxigraph/src/oxigraph/sparql/update.rs +++ b/ng-oxigraph/src/oxigraph/sparql/update.rs @@ -6,7 +6,7 @@ use crate::oxigraph::sparql::eval::{EncodedTuple, SimpleEvaluator}; use crate::oxigraph::sparql::http::Client; use crate::oxigraph::sparql::{EvaluationError, Update, UpdateOptions}; use crate::oxigraph::storage::numeric_encoder::{Decoder, EncodedTerm}; -use crate::oxigraph::storage::StorageWriter; +use crate::oxigraph::storage::CommitWriter; use crate::spargebra::algebra::{GraphPattern, GraphTarget}; use crate::spargebra::term::{ BlankNode, GraphName, GraphNamePattern, GroundQuad, GroundQuadPattern, GroundSubject, @@ -23,7 +23,7 @@ use std::rc::Rc; use std::sync::Arc; pub fn evaluate_update<'a, 'b: 'a>( - transaction: &'a mut StorageWriter<'b>, + transaction: &'a mut CommitWriter<'b>, update: &Update, options: &UpdateOptions, ) -> Result<(), EvaluationError> { @@ -40,7 +40,7 @@ pub fn evaluate_update<'a, 'b: 'a>( } struct SimpleUpdateEvaluator<'a, 'b> { - transaction: &'a mut StorageWriter<'b>, + transaction: &'a mut CommitWriter<'b>, base_iri: Option>>, options: UpdateOptions, client: Client, @@ -200,27 +200,28 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> { } fn eval_clear(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> { - match graph { - GraphTarget::NamedNode(graph_name) => { - if self - .transaction - .reader() - .contains_named_graph(&graph_name.as_ref().into())? - { - Ok(self.transaction.clear_graph(graph_name.into())?) - } else if silent { - Ok(()) - } else { - Err(EvaluationError::GraphDoesNotExist(graph_name.clone())) - } - } - GraphTarget::DefaultGraph => { - self.transaction.clear_graph(GraphNameRef::DefaultGraph)?; - Ok(()) - } - GraphTarget::NamedGraphs => Ok(self.transaction.clear_all_named_graphs()?), - GraphTarget::AllGraphs => Ok(self.transaction.clear_all_graphs()?), - } + unimplemented!(); + // match graph { + // GraphTarget::NamedNode(graph_name) => { + // if self + // .transaction + // .reader() + // .contains_named_graph(&graph_name.as_ref().into())? + // { + // Ok(self.transaction.clear_graph(graph_name.into())?) + // } else if silent { + // Ok(()) + // } else { + // Err(EvaluationError::GraphDoesNotExist(graph_name.clone())) + // } + // } + // GraphTarget::DefaultGraph => { + // self.transaction.clear_graph(GraphNameRef::DefaultGraph)?; + // Ok(()) + // } + // GraphTarget::NamedGraphs => Ok(self.transaction.clear_all_named_graphs()?), + // GraphTarget::AllGraphs => Ok(self.transaction.clear_all_graphs()?), + // } } fn eval_drop(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> { diff --git a/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs b/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs index e52ff0a..aff1973 100644 --- a/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs +++ b/ng-oxigraph/src/oxigraph/storage/backend/fallback.rs @@ -1,9 +1,10 @@ //! TODO: This storage is dramatically naive. +use super::super::numeric_encoder::StrHash; use crate::oxigraph::storage::StorageError; use crate::oxigraph::store::CorruptionError; use std::cell::RefCell; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::error::Error; use std::mem::transmute; use std::rc::{Rc, Weak}; @@ -17,9 +18,18 @@ pub struct ColumnFamilyDefinition { } #[derive(Clone)] -pub struct Db(Arc, Vec>>>>); +pub struct Db { + db: Arc, Vec>>>>, + pub past_commits_cache: Arc>>>>, +} impl Db { + pub(crate) fn past_commits_cache( + &self, + ) -> Arc>>>> { + Arc::clone(&self.past_commits_cache) + } + #[allow(clippy::unnecessary_wraps)] pub fn new(column_families: Vec) -> Result { let mut trees = HashMap::new(); @@ -27,13 +37,16 @@ impl Db { trees.insert(ColumnFamily(cf.name), BTreeMap::default()); } trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists. - Ok(Self(Arc::new(RwLock::new(trees)))) + Ok(Self { + db: Arc::new(RwLock::new(trees)), + past_commits_cache: Arc::new(RwLock::new(HashMap::new())), + }) } #[allow(clippy::unwrap_in_result)] pub fn column_family(&self, name: &'static str) -> Result { let column_family = ColumnFamily(name); - if self.0.read().unwrap().contains_key(&column_family) { + if self.db.read().unwrap().contains_key(&column_family) { Ok(column_family) } else { Err(CorruptionError::from_missing_column_family_name(name).into()) @@ -42,7 +55,7 @@ impl Db { #[must_use] pub fn snapshot(&self) -> Reader { - Reader(InnerReader::Simple(Arc::clone(&self.0))) + Reader(InnerReader::Simple(Arc::clone(&self.db))) } #[allow(clippy::unwrap_in_result)] @@ -50,7 +63,22 @@ impl Db { &'b self, f: impl Fn(Transaction<'a>) -> Result, ) -> Result { - f(Transaction(Rc::new(RefCell::new(self.0.write().unwrap())))) + let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap()))); + let res = f(t.clone()); + t.rollback(); + res + } + + pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From>( + &'b self, + f: impl Fn(Transaction<'a>) -> Result, + ) -> Result { + let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap()))); + let res = f(t.clone()); + if res.is_err() { + t.rollback(); + } + res } } @@ -225,16 +253,29 @@ impl Reader { } } -pub struct Transaction<'a>( - Rc, Vec>>>>>, -); +#[derive(Clone)] +pub struct Transaction<'a> { + db: Rc, Vec>>>>>, + inserts: Rc), Option>>>>, + removes: Rc), Vec>>>, +} + +impl<'a> Transaction<'a> { + fn new( + db: Rc, Vec>>>>>, + ) -> Self { + Transaction { + db, + inserts: Rc::new(RwLock::new(HashMap::new())), + removes: Rc::new(RwLock::new(HashMap::new())), + } + } -impl Transaction<'_> { #[allow(unsafe_code, clippy::useless_transmute)] pub fn reader(&self) -> Reader { // SAFETY: This transmute is safe because we take a weak reference and the only Rc reference used is guarded by the lifetime. Reader(InnerReader::Transaction(Rc::downgrade(unsafe { - transmute(&self.0) + transmute(&self.db) }))) } @@ -244,12 +285,42 @@ impl Transaction<'_> { column_family: &ColumnFamily, key: &[u8], ) -> Result { - Ok((*self.0) + Ok((*self.db) .borrow() .get(column_family) .map_or(false, |cf| cf.contains_key(key))) } + fn rollback(&mut self) { + let inserts = self.inserts.read().unwrap(); + for ((column_family, key), val) in inserts.iter() { + if val.is_some() { + //restore original val + self.db + .borrow_mut() + .get_mut(&column_family) + .unwrap() + .insert(key.to_vec(), val.as_ref().unwrap().to_vec()); + } else { + // we remove it + self.db + .borrow_mut() + .get_mut(&column_family) + .unwrap() + .remove(key.into()); + } + } + let removes = self.removes.read().unwrap(); + for ((column_family, key), val) in removes.iter() { + //restore original val + self.db + .borrow_mut() + .get_mut(&column_family) + .unwrap() + .insert(key.to_vec(), val.to_vec()); + } + } + #[allow(clippy::unnecessary_wraps, clippy::unwrap_in_result)] pub fn insert( &mut self, @@ -257,11 +328,22 @@ impl Transaction<'_> { key: &[u8], value: &[u8], ) -> Result<(), StorageError> { - self.0 + let mut previous_val = self + .db .borrow_mut() .get_mut(column_family) .unwrap() .insert(key.into(), value.into()); + let key = (column_family.clone(), key.to_vec()); + let previous_val2 = self.removes.write().unwrap().remove(&key); + if previous_val.is_none() && previous_val2.is_some() { + previous_val = previous_val2; + } + let mut inserts = self.inserts.write().unwrap(); + if !inserts.contains_key(&key) { + inserts.insert(key, previous_val); + } + Ok(()) } @@ -275,11 +357,27 @@ impl Transaction<'_> { #[allow(clippy::unnecessary_wraps, clippy::unwrap_in_result)] pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<(), StorageError> { - self.0 + let mut val = self + .db .borrow_mut() .get_mut(column_family) .unwrap() .remove(key); + let val2 = self + .inserts + .write() + .unwrap() + .remove(&(column_family.clone(), key.to_vec())); + if val2.is_some() { + // we prefer the value in inserts as it may contain the original value after several inserts on the same key. + val = val2.unwrap(); + } + if let Some(val) = val { + self.removes + .write() + .unwrap() + .insert((column_family.clone(), key.to_vec()), val.to_vec()); + } Ok(()) } } diff --git a/ng-oxigraph/src/oxigraph/storage/backend/oxi_rocksdb.rs b/ng-oxigraph/src/oxigraph/storage/backend/oxi_rocksdb.rs index 9dbe7e3..9908994 100644 --- a/ng-oxigraph/src/oxigraph/storage/backend/oxi_rocksdb.rs +++ b/ng-oxigraph/src/oxigraph/storage/backend/oxi_rocksdb.rs @@ -8,6 +8,7 @@ clippy::unwrap_in_result )] +use super::super::numeric_encoder::StrHash; use crate::oxigraph::storage::error::{CorruptionError, StorageError}; use libc::{c_char, c_void}; use ng_rocksdb::ffi::*; @@ -15,7 +16,7 @@ use rand::random; use std::borrow::Borrow; #[cfg(unix)] use std::cmp::min; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env::temp_dir; use std::error::Error; use std::ffi::{CStr, CString}; @@ -24,7 +25,7 @@ use std::marker::PhantomData; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::rc::{Rc, Weak}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::{available_parallelism, yield_now}; use std::{fmt, io, ptr, slice}; @@ -70,6 +71,17 @@ enum DbKind { ReadWrite(Arc), } +impl Db { + pub(crate) fn past_commits_cache( + &self, + ) -> Arc>>>> { + match &self.inner { + DbKind::ReadWrite(rw) => Arc::clone(&rw.past_commits_cache), + _ => panic!("rw not implemented for read only DbKind"), + } + } +} + struct RwDbHandler { db: *mut rocksdb_transactiondb_t, env: UnsafeEnv, @@ -88,6 +100,7 @@ struct RwDbHandler { cf_options: Vec<*mut rocksdb_options_t>, in_memory: bool, path: PathBuf, + past_commits_cache: Arc>>>>, } unsafe impl Send for RwDbHandler {} @@ -317,6 +330,7 @@ impl Db { cf_options, in_memory, path, + past_commits_cache: Arc::new(RwLock::new(HashMap::new())), })), }) } @@ -609,6 +623,88 @@ impl Db { pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From>( &'b self, f: impl Fn(Transaction<'a>) -> Result, + ) -> Result { + if let DbKind::ReadWrite(db) = &self.inner { + loop { + let transaction = unsafe { + let transaction = rocksdb_transaction_begin( + db.db, + db.write_options, + db.transaction_options, + ptr::null_mut(), + ); + assert!( + !transaction.is_null(), + "rocksdb_transaction_begin returned null" + ); + transaction + }; + let (read_options, snapshot) = unsafe { + let options = rocksdb_readoptions_create_copy(db.read_options); + let snapshot = rocksdb_transaction_get_snapshot(transaction); + rocksdb_readoptions_set_snapshot(options, snapshot); + (options, snapshot) + }; + let result = f(Transaction { + inner: Rc::new(transaction), + read_options, + _lifetime: PhantomData, + }); + match result { + Ok(result) => { + unsafe { + let r = + ffi_result!(rocksdb_transaction_rollback_with_status(transaction)); + rocksdb_transaction_destroy(transaction); + rocksdb_readoptions_destroy(read_options); + rocksdb_free(snapshot as *mut c_void); + r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails + } + return Ok(result); + } + Err(e) => { + unsafe { + let r = + ffi_result!(rocksdb_transaction_rollback_with_status(transaction)); + rocksdb_transaction_destroy(transaction); + rocksdb_readoptions_destroy(read_options); + rocksdb_free(snapshot as *mut c_void); + r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails + } + // We look for the root error + let mut error: &(dyn Error + 'static) = &e; + while let Some(e) = error.source() { + error = e; + } + let is_conflict_error = + error.downcast_ref::().map_or(false, |e| { + e.0.code == rocksdb_status_code_t_rocksdb_status_code_busy + || e.0.code + == rocksdb_status_code_t_rocksdb_status_code_timed_out + || e.0.code + == rocksdb_status_code_t_rocksdb_status_code_try_again + }); + if is_conflict_error { + // We give a chance to the OS to do something else before retrying in order to help avoiding another conflict + yield_now(); + } else { + // We raise the error + return Err(e); + } + } + } + } + } else { + Err( + StorageError::Other("Transaction are only possible on read-write instances".into()) + .into(), + ) + } + } + + pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From>( + &'b self, + f: impl Fn(Transaction<'a>) -> Result, ) -> Result { if let DbKind::ReadWrite(db) = &self.inner { loop { @@ -1292,6 +1388,18 @@ impl Iter { None } } + + pub fn value(&self) -> Option<&[u8]> { + if self.is_valid() { + unsafe { + let mut len = 0; + let val = rocksdb_iter_value(self.inner, &mut len); + Some(slice::from_raw_parts(val.cast(), len)) + } + } else { + None + } + } } pub struct SstFileWriter { diff --git a/ng-oxigraph/src/oxigraph/storage/mod.rs b/ng-oxigraph/src/oxigraph/storage/mod.rs index e448490..1a167bd 100644 --- a/ng-oxigraph/src/oxigraph/storage/mod.rs +++ b/ng-oxigraph/src/oxigraph/storage/mod.rs @@ -1,9 +1,8 @@ #![allow(clippy::same_name_method)] -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] use crate::oxigraph::model::Quad; use crate::oxigraph::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef}; use crate::oxigraph::storage::backend::{Reader, Transaction}; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use crate::oxigraph::storage::binary_encoder::LATEST_STORAGE_VERSION; use crate::oxigraph::storage::binary_encoder::{ decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple, @@ -14,24 +13,26 @@ use crate::oxigraph::storage::binary_encoder::{ pub use crate::oxigraph::storage::error::{ CorruptionError, LoaderError, SerializerError, StorageError, }; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use crate::oxigraph::storage::numeric_encoder::Decoder; use crate::oxigraph::storage::numeric_encoder::{ insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup, }; +use crate::oxrdf::NamedNodeRef; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::collections::VecDeque; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] use std::collections::{HashMap, HashSet}; use std::error::Error; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +use std::io::Read; +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::mem::{swap, take}; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::path::{Path, PathBuf}; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +use std::sync::Arc; +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::sync::Mutex; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::{io, thread}; mod backend; @@ -47,20 +48,26 @@ const OSPG_CF: &str = "ospg"; const GSPO_CF: &str = "gspo"; const GPOS_CF: &str = "gpos"; const GOSP_CF: &str = "gosp"; -const DSPO_CF: &str = "dspo"; +const DSPO_CF: &str = "dspo"; //TODO: remove all the DXXX as we don't use the default graph anymore const DPOS_CF: &str = "dpos"; const DOSP_CF: &str = "dosp"; -const GRAPHS_CF: &str = "graphs"; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +const HEADS_CF: &str = "heads"; +const PAST_CF: &str = "past"; +const REMOVED_CF: &str = "removed"; +const BRANCHES_CF: &str = "branches"; +const STORES_CF: &str = "stores"; +const NAMES_CF: &str = "names"; +//const GRAPHS_CF: &str = "graphs"; +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] const DEFAULT_CF: &str = "default"; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000; /// Low level storage primitives #[derive(Clone)] pub struct Storage { db: Db, - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] default_cf: ColumnFamily, id2str_cf: ColumnFamily, spog_cf: ColumnFamily, @@ -72,7 +79,13 @@ pub struct Storage { dspo_cf: ColumnFamily, dpos_cf: ColumnFamily, dosp_cf: ColumnFamily, - graphs_cf: ColumnFamily, + //graphs_cf: ColumnFamily, + heads_cf: ColumnFamily, + past_cf: ColumnFamily, + removed_cf: ColumnFamily, + branches_cf: ColumnFamily, + stores_cf: ColumnFamily, + names_cf: ColumnFamily, } impl Storage { @@ -80,7 +93,7 @@ impl Storage { Self::setup(Db::new(Self::column_families())?) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn open(path: &Path, key: Option<[u8; 32]>) -> Result { Self::setup(Db::open_read_write( Some(path), @@ -110,7 +123,7 @@ impl Storage { // )?) // } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn open_read_only(path: &Path, key: Option<[u8; 32]>) -> Result { Self::setup(Db::open_read_only(path, Self::column_families(), key)?) } @@ -177,18 +190,54 @@ impl Storage { min_prefix_size: 0, // There are small literals... unordered_writes: false, }, + // ColumnFamilyDefinition { + // name: GRAPHS_CF, + // use_iter: true, + // min_prefix_size: 17, // named or blank node start + // unordered_writes: false, + // }, ColumnFamilyDefinition { - name: GRAPHS_CF, + name: HEADS_CF, use_iter: true, - min_prefix_size: 17, // named or blank node start + min_prefix_size: 32, + unordered_writes: false, + }, + ColumnFamilyDefinition { + name: PAST_CF, + use_iter: true, + min_prefix_size: 16, + unordered_writes: false, + }, + ColumnFamilyDefinition { + name: REMOVED_CF, + use_iter: true, + min_prefix_size: 17, unordered_writes: false, }, + ColumnFamilyDefinition { + name: BRANCHES_CF, + use_iter: false, + min_prefix_size: 33, + unordered_writes: true, + }, + ColumnFamilyDefinition { + name: STORES_CF, + use_iter: true, + min_prefix_size: 16, + unordered_writes: false, + }, + ColumnFamilyDefinition { + name: NAMES_CF, + use_iter: false, + min_prefix_size: 16, + unordered_writes: true, + }, ] } fn setup(db: Db) -> Result { let this = Self { - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] default_cf: db.column_family(DEFAULT_CF)?, id2str_cf: db.column_family(ID2STR_CF)?, spog_cf: db.column_family(SPOG_CF)?, @@ -200,40 +249,46 @@ impl Storage { dspo_cf: db.column_family(DSPO_CF)?, dpos_cf: db.column_family(DPOS_CF)?, dosp_cf: db.column_family(DOSP_CF)?, - graphs_cf: db.column_family(GRAPHS_CF)?, + //graphs_cf: db.column_family(GRAPHS_CF)?, + heads_cf: db.column_family(HEADS_CF)?, + past_cf: db.column_family(PAST_CF)?, + removed_cf: db.column_family(REMOVED_CF)?, + branches_cf: db.column_family(BRANCHES_CF)?, + stores_cf: db.column_family(STORES_CF)?, + names_cf: db.column_family(NAMES_CF)?, db, }; - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] this.migrate()?; Ok(this) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] fn migrate(&self) -> Result<(), StorageError> { let mut version = self.ensure_version()?; - if version == 0 { - // We migrate to v1 - let mut graph_names = HashSet::new(); - for quad in self.snapshot().quads() { - let quad = quad?; - if !quad.graph_name.is_default_graph() { - graph_names.insert(quad.graph_name); - } - } - let mut graph_names = graph_names - .into_iter() - .map(|g| encode_term(&g)) - .collect::>(); - graph_names.sort_unstable(); - let mut stt_file = self.db.new_sst_file()?; - for k in graph_names { - stt_file.insert_empty(&k)?; - } - self.db - .insert_stt_files(&[(&self.graphs_cf, stt_file.finish()?)])?; - version = 1; - self.update_version(version)?; - } + // if version == 0 { + // // We migrate to v1 + // let mut graph_names = HashSet::new(); + // for quad in self.snapshot().quads() { + // let quad = quad?; + // if !quad.graph_name.is_default_graph() { + // graph_names.insert(quad.graph_name); + // } + // } + // let mut graph_names = graph_names + // .into_iter() + // .map(|g| encode_term(&g)) + // .collect::>(); + // graph_names.sort_unstable(); + // let mut stt_file = self.db.new_sst_file()?; + // for k in graph_names { + // stt_file.insert_empty(&k)?; + // } + // self.db + // .insert_stt_files(&[(&self.graphs_cf, stt_file.finish()?)])?; + // version = 1; + // self.update_version(version)?; + // } match version { _ if version < LATEST_STORAGE_VERSION => Err(CorruptionError::msg(format!( @@ -248,7 +303,7 @@ impl Storage { } } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] fn ensure_version(&self) -> Result { Ok( if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? { @@ -262,7 +317,7 @@ impl Storage { ) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] fn update_version(&self, version: u64) -> Result<(), StorageError> { self.db .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; @@ -276,11 +331,11 @@ impl Storage { } } - pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From>( + pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From>( &'b self, f: impl Fn(StorageWriter<'a>) -> Result, ) -> Result { - self.db.transaction(|transaction| { + self.db.ng_transaction(|transaction| { f(StorageWriter { buffer: Vec::new(), transaction, @@ -289,12 +344,26 @@ impl Storage { }) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From>( + &'b self, + f: impl Fn(CommitWriter<'a>) -> Result, + ) -> Result { + self.db.transaction(|transaction| { + f(CommitWriter { + inserts: HashSet::new(), + removes: HashSet::new(), + transaction, + storage: self, + }) + }) + } + + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn flush(&self) -> Result<(), StorageError> { self.db.flush() } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn compact(&self) -> Result<(), StorageError> { self.db.compact(&self.default_cf)?; self.db.compact(&self.gspo_cf)?; @@ -309,7 +378,7 @@ impl Storage { self.db.compact(&self.id2str_cf) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> { self.db.backup(target_directory) } @@ -320,7 +389,187 @@ pub struct StorageReader { storage: Storage, } +// fn encode_one_hash(sh: &StrHash) -> &[u8] { +// &sh.to_be_bytes() +// } + +fn encode_two_hashes(sh1: &StrHash, sh2: &StrHash) -> Vec { + let mut vec = Vec::with_capacity(32); + vec.extend_from_slice(&sh1.to_be_bytes()); + vec.extend_from_slice(&sh2.to_be_bytes()); + vec +} + +fn encode_three_hashes(sh1: &StrHash, sh2: &StrHash, sh3: &StrHash) -> Vec { + let mut vec = Vec::with_capacity(48); + vec.extend_from_slice(&sh1.to_be_bytes()); + vec.extend_from_slice(&sh2.to_be_bytes()); + vec.extend_from_slice(&sh3.to_be_bytes()); + vec +} + +/*impl Iterator for DecodingGraphIterator { + type Item = Result; + + fn next(&mut self) -> Option { + if let Err(e) = self.iter.status() { + return Some(Err(e)); + } + let term: Result = decode_term(self.iter.key()?); + self.iter.next(); + Some(term) + } +} */ + impl StorageReader { + pub fn ng_get_heads( + &self, + topic: &StrHash, + overlay: &StrHash, + ) -> Result, StorageError> { + let prefix = encode_two_hashes(topic, overlay); + let mut iter = self.reader.scan_prefix(&self.storage.heads_cf, &prefix)?; + let mut set: HashSet = HashSet::new(); + while let Some(key) = iter.key() { + let mut buffer = [0; 16]; + (&key[32..48]).read_exact(&mut buffer)?; + set.insert(StrHash::from_be_bytes(buffer)); + iter.next(); + } + if let Err(e) = iter.status() { + return Err(e); + } + Ok(set) + } + + pub fn ng_get_removed( + &self, + subject: &EncodedTerm, + predicate: &EncodedTerm, + object: &EncodedTerm, + commit: &StrHash, + ) -> Result, StorageError> { + let mut prefix = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE + 16); + write_term(&mut prefix, subject); + write_term(&mut prefix, predicate); + write_term(&mut prefix, object); + prefix.extend_from_slice(&commit.to_be_bytes()); + + let mut iter = self.reader.scan_prefix(&self.storage.removed_cf, &prefix)?; + let mut set: HashSet = HashSet::new(); + let prefix_len = prefix.len(); + while let Some(key) = iter.key() { + let mut buffer = [0; 16]; + (&key[prefix_len..prefix_len + 16]).read_exact(&mut buffer)?; + set.insert(StrHash::from_be_bytes(buffer)); + iter.next(); + } + if let Err(e) = iter.status() { + return Err(e); + } + Ok(set) + } + + fn ng_get_past(&self, commit: &StrHash) -> Result, bool)>, StorageError> { + let mut res = Vec::with_capacity(1); + let mut iter = self + .reader + .scan_prefix(&self.storage.past_cf, &commit.to_be_bytes())?; + let mut skip = false; + while let Some(key) = iter.key() { + let mut buffer = [0; 16]; + (&key[16..32]).read_exact(&mut buffer)?; + res.push(StrHash::from_be_bytes(buffer)); + if !skip && iter.value().unwrap()[0] == COMMIT_SKIP_NO_GRAPH { + skip = true; + } + iter.next(); + } + if let Err(e) = iter.status() { + return Err(e); + } + if res.is_empty() { + Ok(None) + } else { + Ok(Some((res, skip))) + } + } + + fn aggregate_causal_past( + &self, + aggregate: &mut HashMap, + current: StrHash, + cache: &HashMap>>, + ) -> Result<(), StorageError> { + if aggregate.contains_key(¤t) { + return Ok(()); + } + + if let Some(found_in_cache) = cache.get(¤t) { + aggregate.extend(found_in_cache.iter().map(|c| (*c, false))); + } else { + if let Some((past, skip)) = self.ng_get_past(¤t)? { + aggregate.insert(current, skip); + + for next in past { + self.aggregate_causal_past(aggregate, next, cache)?; + } + } else { + // we add the last one (that doesnt have past) as it must be the first commit in branch that hold content + aggregate.insert(current, false); + } + } + Ok(()) + } + + pub fn past_for_heads( + &self, + heads: &HashSet, + ) -> Result>, StorageError> { + let mut res: HashSet = HashSet::new(); + let mut missing: Vec<&StrHash> = Vec::new(); + let mut ready: Vec> = Vec::new(); + { + let past_commits_cache = self.storage.db.past_commits_cache(); + let cache = past_commits_cache.read().unwrap(); + for head in heads { + if let Some(past) = cache.get(head) { + if heads.len() == 1 { + return Ok(Arc::clone(past)); + } + res.extend(past.iter()); + } else { + missing.push(head); + } + } + + for head in missing.iter() { + let mut aggregate: HashMap = HashMap::with_capacity(1); + + self.aggregate_causal_past(&mut aggregate, **head, &cache)?; + + ready.push(HashSet::from_iter( + aggregate + .into_iter() + .filter_map(|(c, skip)| (!skip).then_some(c)), + )); + } + } + + let past_commits_cache = self.storage.db.past_commits_cache(); + let mut cache = past_commits_cache.write().unwrap(); + + for (head, past) in missing.into_iter().zip(ready) { + let past = cache.entry(*head).or_insert(Arc::new(past)); + if heads.len() == 1 { + return Ok(Arc::clone(past)); + } + res.extend(past.iter()); + } + + Ok(Arc::new(res)) + } + pub fn len(&self) -> Result { Ok(self.reader.len(&self.storage.gspo_cf)? + self.reader.len(&self.storage.dspo_cf)?) } @@ -525,6 +774,40 @@ impl StorageReader { }) } + pub fn quads_for_subject_predicate_object_heads( + &self, + subject: &EncodedTerm, + predicate: &EncodedTerm, + object: &EncodedTerm, + heads: &HashSet, + at_current_heads: bool, + ) -> Result, StorageError> { + let past = self.past_for_heads(heads)?; + + let iter = self.ng_spog_quads(&encode_term_triple(subject, predicate, object)); + + Ok(HashSet::from_iter(iter.filter_map(|q| match q { + Err(_) => None, + Ok((quad, value)) => { + if let EncodedTerm::NamedNode { iri_id } = quad.graph_name { + if past.contains(&iri_id) { + if is_added(value) { + return Some(iri_id); + } else if is_removed(value) && !at_current_heads { + let removed_in = self + .ng_get_removed(subject, predicate, object, &iri_id) + .ok()?; + if removed_in.is_disjoint(&past) { + return Some(iri_id); + } + } + } + } + None + } + }))) + } + fn quads_for_subject_object_graph( &self, subject: &EncodedTerm, @@ -575,15 +858,51 @@ impl StorageReader { }) } - pub fn named_graphs(&self) -> DecodingGraphIterator { - DecodingGraphIterator { - iter: self.reader.iter(&self.storage.graphs_cf).unwrap(), // TODO: propagate error? - } + // pub fn named_graphs(&self) -> DecodingGraphIterator { + // DecodingGraphIterator { + // iter: self.reader.iter(&self.storage.graphs_cf).unwrap(), // TODO: propagate error? + // } + // } + + // pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result { + // self.reader + // .contains_key(&self.storage.graphs_cf, &encode_term(graph_name)) + // } + + fn ng_spog_quads(&self, prefix: &[u8]) -> DecodingNgQuadIterator { + self.ng_inner_quads(&self.storage.spog_cf, prefix, QuadEncoding::Spog) } - pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result { - self.reader - .contains_key(&self.storage.graphs_cf, &encode_term(graph_name)) + fn ng_posg_quads(&self, prefix: &[u8]) -> DecodingNgQuadIterator { + self.ng_inner_quads(&self.storage.posg_cf, prefix, QuadEncoding::Posg) + } + + fn ng_ospg_quads(&self, prefix: &[u8]) -> DecodingNgQuadIterator { + self.ng_inner_quads(&self.storage.ospg_cf, prefix, QuadEncoding::Ospg) + } + + fn ng_gspo_quads(&self, prefix: &[u8]) -> DecodingNgQuadIterator { + self.ng_inner_quads(&self.storage.gspo_cf, prefix, QuadEncoding::Gspo) + } + + fn ng_gpos_quads(&self, prefix: &[u8]) -> DecodingNgQuadIterator { + self.ng_inner_quads(&self.storage.gpos_cf, prefix, QuadEncoding::Gpos) + } + + fn ng_gosp_quads(&self, prefix: &[u8]) -> DecodingNgQuadIterator { + self.ng_inner_quads(&self.storage.gosp_cf, prefix, QuadEncoding::Gosp) + } + + fn ng_inner_quads( + &self, + column_family: &ColumnFamily, + prefix: &[u8], + encoding: QuadEncoding, + ) -> DecodingNgQuadIterator { + DecodingNgQuadIterator { + iter: self.reader.scan_prefix(column_family, prefix).unwrap(), // TODO: propagate error? + encoding, + } } fn spog_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { @@ -634,7 +953,7 @@ impl StorageReader { } } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn get_str(&self, key: &StrHash) -> Result, StorageError> { Ok(self .storage @@ -645,7 +964,7 @@ impl StorageReader { .map_err(CorruptionError::new)?) } - #[cfg(any(target_family = "wasm",docsrs))] + #[cfg(any(target_family = "wasm", docsrs))] pub fn get_str(&self, key: &StrHash) -> Result, StorageError> { Ok(self .reader @@ -655,21 +974,21 @@ impl StorageReader { .map_err(CorruptionError::new)?) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn contains_str(&self, key: &StrHash) -> Result { self.storage .db .contains_key(&self.storage.id2str_cf, &key.to_be_bytes()) } - #[cfg(any(target_family = "wasm",docsrs))] + #[cfg(any(target_family = "wasm", docsrs))] pub fn contains_str(&self, key: &StrHash) -> Result { self.reader .contains_key(&self.storage.id2str_cf, &key.to_be_bytes()) } /// Validates that all the storage invariants held in the data - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn validate(&self) -> Result<(), StorageError> { // triples let dspo_size = self.dspo_quads(&[]).count(); @@ -705,7 +1024,7 @@ impl StorageReader { || gspo_size != self.ospg_quads(&[]).count() { return Err(CorruptionError::new( - "Not the same number of triples in dspo, dpos and dosp", + "Not the same number of quads in gspo, gpos, gosp, spog, posg, and ospg", ) .into()); } @@ -767,21 +1086,21 @@ impl StorageReader { )? { return Err(CorruptionError::new("Quad in gspo and not in ospg").into()); } - if !self - .storage - .db - .contains_key(&self.storage.graphs_cf, &encode_term(&gspo.graph_name))? - { - return Err( - CorruptionError::new("Quad graph name in gspo and not in graphs").into(), - ); - } + // if !self + // .storage + // .db + // .contains_key(&self.storage.graphs_cf, &encode_term(&gspo.graph_name))? + // { + // return Err( + // CorruptionError::new("Quad graph name in gspo and not in graphs").into(), + // ); + // } } Ok(()) } /// Validates that all the storage invariants held in the data - #[cfg(any(target_family = "wasm",docsrs))] + #[cfg(any(target_family = "wasm", docsrs))] #[allow(clippy::unused_self, clippy::unnecessary_wraps)] pub fn validate(&self) -> Result<(), StorageError> { Ok(()) // TODO @@ -841,6 +1160,33 @@ impl Iterator for DecodingQuadIterator { } } +pub struct DecodingNgQuadIterator { + iter: Iter, + encoding: QuadEncoding, +} + +impl Iterator for DecodingNgQuadIterator { + type Item = Result<(EncodedQuad, u8), StorageError>; + + fn next(&mut self) -> Option { + if let Err(e) = self.iter.status() { + return Some(Err(e)); + } + let term = self.encoding.decode(self.iter.key()?); + match term { + Ok(term) => { + let val = self.iter.value()?[0]; + self.iter.next(); + Some(Ok((term, val))) + } + Err(e) => { + self.iter.next(); + Some(Err(e)) + } + } + } +} + pub struct DecodingGraphIterator { iter: Iter, } @@ -852,7 +1198,7 @@ impl Iterator for DecodingGraphIterator { if let Err(e) = self.iter.status() { return Some(Err(e)); } - let term = decode_term(self.iter.key()?); + let term: Result = decode_term(self.iter.key()?); self.iter.next(); Some(term) } @@ -864,12 +1210,127 @@ impl StrLookup for StorageReader { } } +pub struct CommitWriter<'a> { + inserts: HashSet, + removes: HashSet, + transaction: Transaction<'a>, + storage: &'a Storage, +} + +impl<'a> CommitWriter<'a> { + pub fn reader(&self) -> StorageReader { + StorageReader { + reader: self.transaction.reader(), + storage: self.storage.clone(), + } + } + + pub fn get_update(self) -> (HashSet, HashSet) { + (self.inserts, self.removes) + } + + pub fn insert(&mut self, quad: QuadRef<'_>) -> Result { + if quad.graph_name.is_default_graph() { + Err(StorageError::Other( + "NextGraph cannot insert triples in default graph".into(), + )) + } else { + let quad = quad.into_owned(); + Ok(self.inserts.insert(quad)) + } + } + + pub fn insert_named_graph( + &mut self, + _graph_name: NamedOrBlankNodeRef<'_>, + ) -> Result { + Err(StorageError::Other( + "NextGraph cannot insert named graph".into(), + )) + } + + pub fn remove(&mut self, quad: QuadRef<'_>) -> Result { + if quad.graph_name.is_default_graph() { + Err(StorageError::Other( + "NextGraph cannot remove triples in default graph".into(), + )) + } else { + let quad = quad.into_owned(); + Ok(self.removes.insert(quad)) + } + } + + pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<(), StorageError> { + if graph_name.is_default_graph() { + Err(StorageError::Other( + "NextGraph cannot clear the default graph".into(), + )) + } else { + unimplemented!(); + } + } + + pub fn clear_all_named_graphs(&mut self) -> Result<(), StorageError> { + unimplemented!(); + } + + pub fn clear_all_graphs(&mut self) -> Result<(), StorageError> { + unimplemented!(); + } + + pub fn remove_named_graph( + &mut self, + _graph_name: NamedOrBlankNodeRef<'_>, + ) -> Result { + unimplemented!(); + } + + pub fn remove_all_named_graphs(&mut self) -> Result<(), StorageError> { + unimplemented!(); + } + + pub fn clear(&mut self) -> Result<(), StorageError> { + unimplemented!(); + } +} + pub struct StorageWriter<'a> { buffer: Vec, transaction: Transaction<'a>, storage: &'a Storage, } +pub const ADDED_IN_MAIN: u8 = 3; +pub const ADDED_IN_OTHER: u8 = 2; +pub const REMOVED_IN_MAIN: u8 = 1; +pub const REMOVED_IN_OTHER: u8 = 0; +pub const REPO_IN_MAIN: u8 = 4; +pub const COMMIT_SKIP_NO_GRAPH: u8 = 1; +pub const COMMIT_HAS_GRAPH: u8 = 0; + +const MASK_ADDED: u8 = 2; +const MASK_REMOVED: u8 = 6; + +pub const BRANCH_PREFIX: u8 = 0; +const TOKEN_PREFIX: u8 = 1; + +pub const COMMIT_PREFIX: u8 = 1; + +#[inline] +fn is_added(val: u8) -> bool { + (val & MASK_ADDED) == 1 +} + +#[inline] +fn is_removed(val: u8) -> bool { + (val & MASK_REMOVED) == 0 +} + +#[inline] +fn is_added_in_main(val: u8) -> bool { + val == ADDED_IN_MAIN +} + impl<'a> StorageWriter<'a> { pub fn reader(&self) -> StorageReader { StorageReader { @@ -878,113 +1339,344 @@ impl<'a> StorageWriter<'a> { } } - pub fn insert(&mut self, quad: QuadRef<'_>) -> Result { - let encoded = quad.into(); - self.buffer.clear(); - let result = if quad.graph_name.is_default_graph() { - write_spo_quad(&mut self.buffer, &encoded); - if self - .transaction - .contains_key_for_update(&self.storage.dspo_cf, &self.buffer)? - { - false + pub fn named_commit_or_branch( + &mut self, + ov_graph_name: NamedNodeRef<'_>, + name: &String, + // if None: remove + value: &Option>, + ) -> Result<(), StorageError> { + let encoded: EncodedTerm = ov_graph_name.into(); + if value.is_some() { + self.insert_term(ov_graph_name.into(), &encoded)?; + } + if let EncodedTerm::NamedNode { iri_id } = encoded { + let mut key = Vec::with_capacity(16 + name.len()); + key.extend_from_slice(&iri_id.to_be_bytes()); + key.extend_from_slice(name.as_bytes()); + if value.is_none() { + self.transaction.remove(&self.storage.names_cf, &key)?; } else { self.transaction - .insert_empty(&self.storage.dspo_cf, &self.buffer)?; - - self.buffer.clear(); - write_pos_quad(&mut self.buffer, &encoded); - self.transaction - .insert_empty(&self.storage.dpos_cf, &self.buffer)?; - - self.buffer.clear(); - write_osp_quad(&mut self.buffer, &encoded); - self.transaction - .insert_empty(&self.storage.dosp_cf, &self.buffer)?; - - self.insert_term(quad.subject.into(), &encoded.subject)?; - self.insert_term(quad.predicate.into(), &encoded.predicate)?; - self.insert_term(quad.object, &encoded.object)?; - true + .insert(&self.storage.names_cf, &key, value.as_ref().unwrap())?; } } else { - write_spog_quad(&mut self.buffer, &encoded); - if self - .transaction - .contains_key_for_update(&self.storage.spog_cf, &self.buffer)? - { - false + panic!("should be an EncodedTerm::NamedNode"); + } + Ok(()) + } + + pub fn doc_in_store( + &mut self, + graph_name: NamedNodeRef<'_>, + overlay: &StrHash, + remove: bool, + ) -> Result<(), StorageError> { + let encoded: EncodedTerm = graph_name.into(); + if !remove { + self.insert_term(graph_name.into(), &encoded)?; + } else { + // TODO: remove term? self.insert_term(graph_name.into(), &encoded)?; + } + if let EncodedTerm::NamedNode { iri_id } = encoded { + let mut key = Vec::with_capacity(32); + key.extend_from_slice(&overlay.to_be_bytes()); + key.extend_from_slice(&iri_id.to_be_bytes()); + if remove { + self.transaction.remove(&self.storage.stores_cf, &key)?; } else { self.transaction - .insert_empty(&self.storage.spog_cf, &self.buffer)?; + .insert_empty(&self.storage.stores_cf, &key)?; + } + } else { + panic!("should be an EncodedTerm::NamedNode"); + } + Ok(()) + } - self.buffer.clear(); - write_posg_quad(&mut self.buffer, &encoded); - self.transaction - .insert_empty(&self.storage.posg_cf, &self.buffer)?; + pub fn update_branch_and_token( + &mut self, + overlay_encoded: &StrHash, + branch_encoded: &StrHash, + topic_encoded: &StrHash, + token_encoded: &StrHash, + ) -> Result<(), StorageError> { + let mut key = Vec::with_capacity(33); + key.push(BRANCH_PREFIX); + key.extend_from_slice(&branch_encoded.to_be_bytes()); + key.extend_from_slice(&overlay_encoded.to_be_bytes()); + + let topic = topic_encoded.to_be_bytes(); + + let reader = self.transaction.reader(); + if match reader.get(&self.storage.branches_cf, &key)? { + Some(val) => val.to_vec() != topic.to_vec(), + None => true, + } { + self.transaction + .insert(&self.storage.branches_cf, &key, &topic)?; + } - self.buffer.clear(); - write_ospg_quad(&mut self.buffer, &encoded); - self.transaction - .insert_empty(&self.storage.ospg_cf, &self.buffer)?; + key.clear(); + key.push(TOKEN_PREFIX); + key.extend_from_slice(&token_encoded.to_be_bytes()); + key.extend_from_slice(&overlay_encoded.to_be_bytes()); - self.buffer.clear(); - write_gspo_quad(&mut self.buffer, &encoded); - self.transaction - .insert_empty(&self.storage.gspo_cf, &self.buffer)?; + let mut token = Vec::with_capacity(32); + token.extend_from_slice(&topic_encoded.to_be_bytes()); + token.extend_from_slice(&branch_encoded.to_be_bytes()); - self.buffer.clear(); - write_gpos_quad(&mut self.buffer, &encoded); - self.transaction - .insert_empty(&self.storage.gpos_cf, &self.buffer)?; + if match reader.get(&self.storage.branches_cf, &key)? { + Some(val) => val.to_vec() != token, + None => true, + } { + self.transaction + .insert(&self.storage.branches_cf, &key, &token)?; + } + Ok(()) + } - self.buffer.clear(); - write_gosp_quad(&mut self.buffer, &encoded); - self.transaction - .insert_empty(&self.storage.gosp_cf, &self.buffer)?; + pub fn ng_update_heads( + &mut self, + topic: &StrHash, + overlay: &StrHash, + commit: &StrHash, + direct_causal_past: &HashSet, + ) -> Result<(), StorageError> { + let mut buffer = Vec::with_capacity(48); + buffer.extend_from_slice(&topic.to_be_bytes()); + buffer.extend_from_slice(&overlay.to_be_bytes()); + + for removing in direct_causal_past { + buffer.truncate(32); + buffer.extend_from_slice(&removing.to_be_bytes()); + self.transaction.remove(&self.storage.heads_cf, &buffer)? + } - self.insert_term(quad.subject.into(), &encoded.subject)?; - self.insert_term(quad.predicate.into(), &encoded.predicate)?; - self.insert_term(quad.object, &encoded.object)?; + buffer.truncate(32); + buffer.extend_from_slice(&commit.to_be_bytes()); + self.transaction + .insert_empty(&self.storage.heads_cf, &buffer)?; - self.buffer.clear(); - write_term(&mut self.buffer, &encoded.graph_name); - if !self - .transaction - .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? - { - self.transaction - .insert_empty(&self.storage.graphs_cf, &self.buffer)?; - self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; - } - true - } - }; - Ok(result) + Ok(()) } - pub fn insert_named_graph( + pub fn ng_update_past( &mut self, - graph_name: NamedOrBlankNodeRef<'_>, - ) -> Result { - let encoded_graph_name = graph_name.into(); + commit: &StrHash, + direct_causal_past: &HashSet, + skip_has_no_graph: bool, + ) -> Result<(), StorageError> { + let mut buffer = Vec::with_capacity(32); + buffer.extend_from_slice(&commit.to_be_bytes()); + + let value = [if skip_has_no_graph { + COMMIT_SKIP_NO_GRAPH + } else { + COMMIT_HAS_GRAPH + }]; + + for adding in direct_causal_past { + buffer.truncate(16); + buffer.extend_from_slice(&adding.to_be_bytes()); + self.transaction + .insert(&self.storage.past_cf, &buffer, &value)? + } + + Ok(()) + } + + pub fn ng_remove(&mut self, quad: &EncodedQuad, commit: &StrHash) -> Result<(), StorageError> { + let mut key = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE + 2 * 16); + write_term(&mut key, &quad.subject); + write_term(&mut key, &quad.predicate); + write_term(&mut key, &quad.object); + if let EncodedTerm::NamedNode { iri_id } = quad.graph_name { + key.extend_from_slice(&iri_id.to_be_bytes()); + key.extend_from_slice(&commit.to_be_bytes()); + self.transaction + .insert_empty(&self.storage.removed_cf, &key) + } else { + Err(CorruptionError::msg("invalid quad").into()) + } + } + pub fn ng_insert(&mut self, quad: QuadRef<'_>, value: u8) -> Result<(), StorageError> { + let encoded = quad.into(); + if self.ng_insert_encoded(&encoded, value)? { + self.insert_term(quad.subject.into(), &encoded.subject)?; + self.insert_term(quad.predicate.into(), &encoded.predicate)?; + self.insert_term(quad.object, &encoded.object)?; + + // self.buffer.clear(); + // write_term(&mut self.buffer, &encoded.graph_name); + // if !self + // .transaction + // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? + // { + // self.transaction + // .insert_empty(&self.storage.graphs_cf, &self.buffer)?; + self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; + //} + } + Ok(()) + } + + pub fn ng_insert_encoded( + &mut self, + encoded: &EncodedQuad, + value: u8, + ) -> Result { + let value = [value]; self.buffer.clear(); - write_term(&mut self.buffer, &encoded_graph_name); + write_spog_quad(&mut self.buffer, encoded); let result = if self .transaction - .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? + .contains_key_for_update(&self.storage.spog_cf, &self.buffer)? { false } else { self.transaction - .insert_empty(&self.storage.graphs_cf, &self.buffer)?; - self.insert_term(graph_name.into(), &encoded_graph_name)?; + .insert(&self.storage.spog_cf, &self.buffer, &value)?; + + self.buffer.clear(); + write_posg_quad(&mut self.buffer, encoded); + self.transaction + .insert(&self.storage.posg_cf, &self.buffer, &value)?; + + self.buffer.clear(); + write_ospg_quad(&mut self.buffer, encoded); + self.transaction + .insert(&self.storage.ospg_cf, &self.buffer, &value)?; + + self.buffer.clear(); + write_gspo_quad(&mut self.buffer, encoded); + self.transaction + .insert(&self.storage.gspo_cf, &self.buffer, &value)?; + + self.buffer.clear(); + write_gpos_quad(&mut self.buffer, encoded); + self.transaction + .insert(&self.storage.gpos_cf, &self.buffer, &value)?; + + self.buffer.clear(); + write_gosp_quad(&mut self.buffer, encoded); + self.transaction + .insert(&self.storage.gosp_cf, &self.buffer, &value)?; + true }; Ok(result) } + // pub fn insert(&mut self, quad: QuadRef<'_>) -> Result { + // let encoded = quad.into(); + // self.buffer.clear(); + // let result = if quad.graph_name.is_default_graph() { + // write_spo_quad(&mut self.buffer, &encoded); + // if self + // .transaction + // .contains_key_for_update(&self.storage.dspo_cf, &self.buffer)? + // { + // false + // } else { + // self.transaction + // .insert_empty(&self.storage.dspo_cf, &self.buffer)?; + + // self.buffer.clear(); + // write_pos_quad(&mut self.buffer, &encoded); + // self.transaction + // .insert_empty(&self.storage.dpos_cf, &self.buffer)?; + + // self.buffer.clear(); + // write_osp_quad(&mut self.buffer, &encoded); + // self.transaction + // .insert_empty(&self.storage.dosp_cf, &self.buffer)?; + + // self.insert_term(quad.subject.into(), &encoded.subject)?; + // self.insert_term(quad.predicate.into(), &encoded.predicate)?; + // self.insert_term(quad.object, &encoded.object)?; + // true + // } + // } else { + // write_spog_quad(&mut self.buffer, &encoded); + // if self + // .transaction + // .contains_key_for_update(&self.storage.spog_cf, &self.buffer)? + // { + // false + // } else { + // self.transaction + // .insert_empty(&self.storage.spog_cf, &self.buffer)?; + + // self.buffer.clear(); + // write_posg_quad(&mut self.buffer, &encoded); + // self.transaction + // .insert_empty(&self.storage.posg_cf, &self.buffer)?; + + // self.buffer.clear(); + // write_ospg_quad(&mut self.buffer, &encoded); + // self.transaction + // .insert_empty(&self.storage.ospg_cf, &self.buffer)?; + + // self.buffer.clear(); + // write_gspo_quad(&mut self.buffer, &encoded); + // self.transaction + // .insert_empty(&self.storage.gspo_cf, &self.buffer)?; + + // self.buffer.clear(); + // write_gpos_quad(&mut self.buffer, &encoded); + // self.transaction + // .insert_empty(&self.storage.gpos_cf, &self.buffer)?; + + // self.buffer.clear(); + // write_gosp_quad(&mut self.buffer, &encoded); + // self.transaction + // .insert_empty(&self.storage.gosp_cf, &self.buffer)?; + + // self.insert_term(quad.subject.into(), &encoded.subject)?; + // self.insert_term(quad.predicate.into(), &encoded.predicate)?; + // self.insert_term(quad.object, &encoded.object)?; + + // self.buffer.clear(); + // write_term(&mut self.buffer, &encoded.graph_name); + // if !self + // .transaction + // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? + // { + // self.transaction + // .insert_empty(&self.storage.graphs_cf, &self.buffer)?; + // self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; + // } + // true + // } + // }; + // Ok(result) + // } + + pub fn insert_named_graph( + &mut self, + graph_name: NamedOrBlankNodeRef<'_>, + ) -> Result { + unimplemented!(); + // let encoded_graph_name = graph_name.into(); + + // self.buffer.clear(); + // write_term(&mut self.buffer, &encoded_graph_name); + // let result = if self + // .transaction + // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? + // { + // false + // } else { + // self.transaction + // .insert_empty(&self.storage.graphs_cf, &self.buffer)?; + // self.insert_term(graph_name.into(), &encoded_graph_name)?; + // true + // }; + // Ok(result) + } + fn insert_term( &mut self, term: TermRef<'_>, @@ -1005,7 +1697,7 @@ impl<'a> StorageWriter<'a> { } } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { if self .storage @@ -1021,7 +1713,7 @@ impl<'a> StorageWriter<'a> { ) } - #[cfg(any(target_family = "wasm",docsrs))] + #[cfg(any(target_family = "wasm", docsrs))] fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { self.transaction.insert( &self.storage.id2str_cf, @@ -1102,24 +1794,25 @@ impl<'a> StorageWriter<'a> { } pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<(), StorageError> { - if graph_name.is_default_graph() { - for quad in self.reader().quads_for_graph(&EncodedTerm::DefaultGraph) { - self.remove_encoded(&quad?)?; - } - } else { - self.buffer.clear(); - write_term(&mut self.buffer, &graph_name.into()); - if self - .transaction - .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? - { - // The condition is useful to lock the graph itself and ensure no quad is inserted at the same time - for quad in self.reader().quads_for_graph(&graph_name.into()) { - self.remove_encoded(&quad?)?; - } - } - } - Ok(()) + unimplemented!(); + // if graph_name.is_default_graph() { + // for quad in self.reader().quads_for_graph(&EncodedTerm::DefaultGraph) { + // self.remove_encoded(&quad?)?; + // } + // } else { + // self.buffer.clear(); + // write_term(&mut self.buffer, &graph_name.into()); + // if self + // .transaction + // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? + // { + // // The condition is useful to lock the graph itself and ensure no quad is inserted at the same time + // for quad in self.reader().quads_for_graph(&graph_name.into()) { + // self.remove_encoded(&quad?)?; + // } + // } + // } + // Ok(()) } pub fn clear_all_named_graphs(&mut self) -> Result<(), StorageError> { @@ -1147,38 +1840,40 @@ impl<'a> StorageWriter<'a> { &mut self, graph_name: &EncodedTerm, ) -> Result { - self.buffer.clear(); - write_term(&mut self.buffer, graph_name); - let result = if self - .transaction - .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? - { - // The condition is done ASAP to lock the graph itself - for quad in self.reader().quads_for_graph(graph_name) { - self.remove_encoded(&quad?)?; - } - self.buffer.clear(); - write_term(&mut self.buffer, graph_name); - self.transaction - .remove(&self.storage.graphs_cf, &self.buffer)?; - true - } else { - false - }; - Ok(result) + unimplemented!(); + // self.buffer.clear(); + // write_term(&mut self.buffer, graph_name); + // let result = if self + // .transaction + // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? + // { + // // The condition is done ASAP to lock the graph itself + // for quad in self.reader().quads_for_graph(graph_name) { + // self.remove_encoded(&quad?)?; + // } + // self.buffer.clear(); + // write_term(&mut self.buffer, graph_name); + // self.transaction + // .remove(&self.storage.graphs_cf, &self.buffer)?; + // true + // } else { + // false + // }; + // Ok(result) } pub fn remove_all_named_graphs(&mut self) -> Result<(), StorageError> { - for graph_name in self.reader().named_graphs() { - self.remove_encoded_named_graph(&graph_name?)?; - } - Ok(()) + unimplemented!(); + // for graph_name in self.reader().named_graphs() { + // self.remove_encoded_named_graph(&graph_name?)?; + // } + // Ok(()) } pub fn clear(&mut self) -> Result<(), StorageError> { - for graph_name in self.reader().named_graphs() { - self.remove_encoded_named_graph(&graph_name?)?; - } + // for graph_name in self.reader().named_graphs() { + // self.remove_encoded_named_graph(&graph_name?)?; + // } for quad in self.reader().quads() { self.remove_encoded(&quad?)?; } @@ -1186,7 +1881,7 @@ impl<'a> StorageWriter<'a> { } } -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] #[must_use] pub struct StorageBulkLoader { storage: Storage, @@ -1195,7 +1890,7 @@ pub struct StorageBulkLoader { max_memory_size: Option, } -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] impl StorageBulkLoader { pub fn new(storage: Storage) -> Self { Self { @@ -1326,7 +2021,7 @@ impl StorageBulkLoader { } } -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] struct FileBulkLoader<'a> { storage: &'a Storage, id2str: HashMap>, @@ -1335,7 +2030,7 @@ struct FileBulkLoader<'a> { graphs: HashSet, } -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] impl<'a> FileBulkLoader<'a> { fn new(storage: &'a Storage, batch_size: usize) -> Self { Self { @@ -1438,10 +2133,10 @@ impl<'a> FileBulkLoader<'a> { } if !self.quads.is_empty() { - to_load.push(( - &self.storage.graphs_cf, - self.build_sst_for_keys(self.graphs.iter().map(encode_term))?, - )); + // to_load.push(( + // &self.storage.graphs_cf, + // self.build_sst_for_keys(self.graphs.iter().map(encode_term))?, + // )); self.graphs.clear(); to_load.push(( @@ -1541,7 +2236,7 @@ impl<'a> FileBulkLoader<'a> { } } -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] fn map_thread_result(result: thread::Result) -> io::Result { result.map_err(|e| { io::Error::new( diff --git a/ng-oxigraph/src/oxigraph/store.rs b/ng-oxigraph/src/oxigraph/store.rs index acc4b42..e915516 100644 --- a/ng-oxigraph/src/oxigraph/store.rs +++ b/ng-oxigraph/src/oxigraph/store.rs @@ -25,7 +25,7 @@ //! }; //! # Result::<_, Box>::Ok(()) //! ``` -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use super::io::RdfParseError; use super::io::{RdfFormat, RdfParser, RdfSerializer}; use super::model::*; @@ -33,16 +33,17 @@ use super::sparql::{ evaluate_query, evaluate_update, EvaluationError, Query, QueryExplanation, QueryOptions, QueryResults, Update, UpdateOptions, }; -use super::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +use super::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm, StrHash}; +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use super::storage::StorageBulkLoader; use super::storage::{ ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter, }; pub use super::storage::{CorruptionError, LoaderError, SerializerError, StorageError}; +use std::collections::HashSet; use std::error::Error; use std::io::{Read, Write}; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::path::Path; use std::{fmt, str}; @@ -100,14 +101,14 @@ impl Store { /// Only one read-write [`Store`] can exist at the same time. /// If you want to have extra [`Store`] instance opened on a same data /// use [`Store::open_read_only`]. - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn open(path: impl AsRef) -> Result { Ok(Self { storage: Storage::open(path.as_ref(), None)?, }) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn open_with_key(path: impl AsRef, key: [u8; 32]) -> Result { Ok(Self { storage: Storage::open(path.as_ref(), Some(key))?, @@ -155,7 +156,7 @@ impl Store { /// Opens a read-only [`Store`] from disk. /// /// Opening as read-only while having an other process writing the database is undefined behavior. - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn open_read_only( path: impl AsRef, key: Option<[u8; 32]>, @@ -408,11 +409,12 @@ impl Store { /// })?; /// # Result::<_, Box>::Ok(()) /// ``` - pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From>( + fn transaction<'a, 'b: 'a, T, E: Error + 'static + From>( &'b self, f: impl Fn(Transaction<'a>) -> Result, ) -> Result { - self.storage.transaction(|writer| f(Transaction { writer })) + self.storage + .ng_transaction(|writer| f(Transaction { writer })) } /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/). @@ -433,11 +435,11 @@ impl Store { /// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?); /// # Result::<_, Box>::Ok(()) /// ``` - pub fn update( + pub fn ng_update( &self, update: impl TryInto>, - ) -> Result<(), EvaluationError> { - self.update_opt(update, UpdateOptions::default()) + ) -> Result<(HashSet, HashSet), EvaluationError> { + self.ng_update_opt(update, UpdateOptions::default()) } /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/) with some options. @@ -457,15 +459,44 @@ impl Store { /// )?; /// # Result::<_, Box>::Ok(()) /// ``` - pub fn update_opt( + pub fn ng_update_opt( &self, update: impl TryInto>, options: impl Into, - ) -> Result<(), EvaluationError> { + ) -> Result<(HashSet, HashSet), EvaluationError> { let update = update.try_into().map_err(Into::into)?; let options = options.into(); + self.storage.transaction(|mut t| { + evaluate_update(&mut t, &update, &options)?; + Ok(t.get_update()) + }) + } + + /// INTERNAL FOR NG + // pub fn ng_update_opt( + // &self, + // update: impl TryInto>, + // options: impl Into, + // ) -> Result<(), EvaluationError> { + // let update = update.try_into().map_err(Into::into)?; + // let options = options.into(); + // self.storage + // .ng_transaction(|mut t| evaluate_update(&mut t, &update, &options)) + // } + + // pub fn ng_update( + // &self, + // update: impl TryInto>, + // ) -> Result<(), EvaluationError> { + // self.ng_update_opt(update, UpdateOptions::default()) + // } + #[doc(hidden)] + pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From>( + &'b self, + f: impl Fn(Transaction<'a>) -> Result, + ) -> Result { self.storage - .transaction(|mut t| evaluate_update(&mut t, &update, &options)) + .ng_transaction(|writer| f(Transaction { writer })) } /// Loads a RDF file under into the store. @@ -506,6 +537,7 @@ impl Store { parser: impl Into, read: impl Read, ) -> Result<(), LoaderError> { + unimplemented!(); let quads = parser .into() .rename_blank_nodes() @@ -627,23 +659,23 @@ impl Store { /// assert!(store.contains(quad)?); /// # Result::<_, Box>::Ok(()) /// ``` - pub fn insert<'a>(&self, quad: impl Into>) -> Result { - let quad = quad.into(); - self.transaction(|mut t| t.insert(quad)) - } + // pub fn insert<'a>(&self, quad: impl Into>) -> Result { + // let quad = quad.into(); + // self.transaction(|mut t| t.insert(quad)) + // } - /// Adds atomically a set of quads to this store. - /// - ///
- /// - /// This operation uses a memory heavy transaction internally, use the [`bulk_loader`](Store::bulk_loader) if you plan to add ten of millions of triples.
- pub fn extend( - &self, - quads: impl IntoIterator>, - ) -> Result<(), StorageError> { - let quads = quads.into_iter().map(Into::into).collect::>(); - self.transaction(move |mut t| t.extend(&quads)) - } + // /// Adds atomically a set of quads to this store. + // /// + // ///
+ // /// + // /// This operation uses a memory heavy transaction internally, use the [`bulk_loader`](Store::bulk_loader) if you plan to add ten of millions of triples.
+ // pub fn extend( + // &self, + // quads: impl IntoIterator>, + // ) -> Result<(), StorageError> { + // let quads = quads.into_iter().map(Into::into).collect::>(); + // self.transaction(move |mut t| t.extend(&quads)) + // } /// Removes a quad from this store. /// @@ -666,6 +698,7 @@ impl Store { /// # Result::<_, Box>::Ok(()) /// ``` pub fn remove<'a>(&self, quad: impl Into>) -> Result { + unimplemented!(); let quad = quad.into(); self.transaction(move |mut t| t.remove(quad)) } @@ -805,13 +838,13 @@ impl Store { /// ); /// # Result::<_, Box>::Ok(()) /// ``` - pub fn named_graphs(&self) -> GraphNameIter { - let reader = self.storage.snapshot(); - GraphNameIter { - iter: reader.named_graphs(), - reader, - } - } + // pub fn named_graphs(&self) -> GraphNameIter { + // let reader = self.storage.snapshot(); + // GraphNameIter { + // iter: reader.named_graphs(), + // reader, + // } + // } /// Checks if the store contains a given graph /// @@ -826,13 +859,13 @@ impl Store { /// assert!(store.contains_named_graph(&ex)?); /// # Result::<_, Box>::Ok(()) /// ``` - pub fn contains_named_graph<'a>( - &self, - graph_name: impl Into>, - ) -> Result { - let graph_name = EncodedTerm::from(graph_name.into()); - self.storage.snapshot().contains_named_graph(&graph_name) - } + // pub fn contains_named_graph<'a>( + // &self, + // graph_name: impl Into>, + // ) -> Result { + // let graph_name = EncodedTerm::from(graph_name.into()); + // self.storage.snapshot().contains_named_graph(&graph_name) + // } /// Inserts a graph into this store. /// @@ -853,13 +886,13 @@ impl Store { /// ); /// # Result::<_, Box>::Ok(()) /// ``` - pub fn insert_named_graph<'a>( - &self, - graph_name: impl Into>, - ) -> Result { - let graph_name = graph_name.into(); - self.transaction(|mut t| t.insert_named_graph(graph_name)) - } + // pub fn insert_named_graph<'a>( + // &self, + // graph_name: impl Into>, + // ) -> Result { + // let graph_name = graph_name.into(); + // self.transaction(|mut t| t.insert_named_graph(graph_name)) + // } /// Clears a graph from this store. /// @@ -939,7 +972,7 @@ impl Store { /// Flushes all buffers and ensures that all writes are saved on disk. /// /// Flushes are automatically done using background threads but might lag a little bit. - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn flush(&self) -> Result<(), StorageError> { self.storage.flush() } @@ -949,7 +982,7 @@ impl Store { /// Useful to call after a batch upload or another similar operation. /// ///
Can take hours on huge databases.
- #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn optimize(&self) -> Result<(), StorageError> { self.storage.compact() } @@ -972,7 +1005,7 @@ impl Store { /// This allows cheap regular backups. /// /// If you want to move your data to another RDF storage system, you should have a look at the [`Store::dump_to_write`] function instead. - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn backup(&self, target_directory: impl AsRef) -> Result<(), StorageError> { self.storage.backup(target_directory.as_ref()) } @@ -999,7 +1032,7 @@ impl Store { /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// # Result::<_, Box>::Ok(()) /// ``` - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] pub fn bulk_loader(&self) -> BulkLoader { BulkLoader { storage: StorageBulkLoader::new(self.storage.clone()), @@ -1205,25 +1238,25 @@ impl<'a> Transaction<'a> { /// })?; /// # Result::<_, EvaluationError>::Ok(()) /// ``` - pub fn update( - &mut self, - update: impl TryInto>, - ) -> Result<(), EvaluationError> { - self.update_opt(update, UpdateOptions::default()) - } + // pub fn update( + // &mut self, + // update: impl TryInto>, + // ) -> Result<(), EvaluationError> { + // self.update_opt(update, UpdateOptions::default()) + // } - /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/) with some options. - pub fn update_opt( - &mut self, - update: impl TryInto>, - options: impl Into, - ) -> Result<(), EvaluationError> { - evaluate_update( - &mut self.writer, - &update.try_into().map_err(Into::into)?, - &options.into(), - ) - } + // /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/) with some options. + // pub fn update_opt( + // &mut self, + // update: impl TryInto>, + // options: impl Into, + // ) -> Result<(), EvaluationError> { + // evaluate_update( + // &mut self.writer, + // &update.try_into().map_err(Into::into)?, + // &options.into(), + // ) + // } /// Loads a RDF file into the store. /// @@ -1261,16 +1294,16 @@ impl<'a> Transaction<'a> { /// assert!(store.contains(QuadRef::new(ex, ex, ex, NamedNodeRef::new("http://example.com/g2")?))?); /// # Result::<_, Box>::Ok(()) /// ``` - pub fn load_from_read( - &mut self, - parser: impl Into, - read: impl Read, - ) -> Result<(), LoaderError> { - for quad in parser.into().rename_blank_nodes().parse_read(read) { - self.insert(quad?.as_ref())?; - } - Ok(()) - } + // pub fn load_from_read( + // &mut self, + // parser: impl Into, + // read: impl Read, + // ) -> Result<(), LoaderError> { + // for quad in parser.into().rename_blank_nodes().parse_read(read) { + // self.insert(quad?.as_ref())?; + // } + // Ok(()) + // } /// Loads a graph file (i.e. triples) into the store. /// @@ -1298,27 +1331,27 @@ impl<'a> Transaction<'a> { /// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?); /// # Result::<_,oxigraph::store::LoaderError>::Ok(()) /// ``` - #[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")] - pub fn load_graph( - &mut self, - read: impl Read, - format: impl Into, - to_graph_name: impl Into, - base_iri: Option<&str>, - ) -> Result<(), LoaderError> { - let mut parser = RdfParser::from_format(format.into()) - .without_named_graphs() - .with_default_graph(to_graph_name); - if let Some(base_iri) = base_iri { - parser = parser - .with_base_iri(base_iri) - .map_err(|e| LoaderError::InvalidBaseIri { - iri: base_iri.into(), - error: e, - })?; - } - self.load_from_read(parser, read) - } + // #[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")] + // pub fn load_graph( + // &mut self, + // read: impl Read, + // format: impl Into, + // to_graph_name: impl Into, + // base_iri: Option<&str>, + // ) -> Result<(), LoaderError> { + // let mut parser = RdfParser::from_format(format.into()) + // .without_named_graphs() + // .with_default_graph(to_graph_name); + // if let Some(base_iri) = base_iri { + // parser = parser + // .with_base_iri(base_iri) + // .map_err(|e| LoaderError::InvalidBaseIri { + // iri: base_iri.into(), + // error: e, + // })?; + // } + // self.load_from_read(parser, read) + // } /// Loads a dataset file (i.e. quads) into the store. /// @@ -1342,23 +1375,91 @@ impl<'a> Transaction<'a> { /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// # Result::<_,oxigraph::store::LoaderError>::Ok(()) /// ``` - #[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")] - pub fn load_dataset( + // #[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")] + // pub fn load_dataset( + // &mut self, + // read: impl Read, + // format: impl Into, + // base_iri: Option<&str>, + // ) -> Result<(), LoaderError> { + // let mut parser = RdfParser::from_format(format.into()); + // if let Some(base_iri) = base_iri { + // parser = parser + // .with_base_iri(base_iri) + // .map_err(|e| LoaderError::InvalidBaseIri { + // iri: base_iri.into(), + // error: e, + // })?; + // } + // self.load_from_read(parser, read) + // } + + pub fn ng_get_heads( + &self, + topic: &StrHash, + overlay: &StrHash, + ) -> Result, StorageError> { + self.writer.reader().ng_get_heads(topic, overlay) + } + + pub fn ng_get_reader(&self) -> StorageReader { + self.writer.reader() + } + + pub fn update_heads( &mut self, - read: impl Read, - format: impl Into, - base_iri: Option<&str>, - ) -> Result<(), LoaderError> { - let mut parser = RdfParser::from_format(format.into()); - if let Some(base_iri) = base_iri { - parser = parser - .with_base_iri(base_iri) - .map_err(|e| LoaderError::InvalidBaseIri { - iri: base_iri.into(), - error: e, - })?; - } - self.load_from_read(parser, read) + topic: &StrHash, + overlay: &StrHash, + commit: &StrHash, + direct_causal_past: &HashSet, + ) -> Result<(), StorageError> { + self.writer + .ng_update_heads(topic, overlay, commit, direct_causal_past) + } + + pub fn update_past( + &mut self, + commit: &StrHash, + direct_causal_past: &HashSet, + skip_has_no_graph: bool, + ) -> Result<(), StorageError> { + self.writer + .ng_update_past(commit, direct_causal_past, skip_has_no_graph) + } + + pub fn update_branch_and_token( + &mut self, + overlay_encoded: &StrHash, + branch_encoded: &StrHash, + topic_encoded: &StrHash, + token_encoded: &StrHash, + ) -> Result<(), StorageError> { + self.writer.update_branch_and_token( + overlay_encoded, + branch_encoded, + topic_encoded, + token_encoded, + ) + } + + pub fn doc_in_store( + &mut self, + graph_name: NamedNodeRef<'_>, + overlay: &StrHash, + remove: bool, + ) -> Result<(), StorageError> { + self.writer.doc_in_store(graph_name, overlay, remove) + } + + pub fn named_commit_or_branch( + &mut self, + ov_graph_name: NamedNodeRef<'_>, + name: &String, + // if None: remove + value: &Option>, + ) -> Result<(), StorageError> { + self.writer + .named_commit_or_branch(ov_graph_name, name, value) } /// Adds a quad to this store. @@ -1378,21 +1479,37 @@ impl<'a> Transaction<'a> { /// assert!(store.contains(quad)?); /// # Result::<_,oxigraph::store::StorageError>::Ok(()) /// ``` - pub fn insert<'b>(&mut self, quad: impl Into>) -> Result { - self.writer.insert(quad.into()) + pub fn insert<'b>( + &mut self, + quad: impl Into>, + value: u8, + ) -> Result<(), StorageError> { + self.writer.ng_insert(quad.into(), value) } - /// Adds a set of quads to this store. - pub fn extend<'b>( + pub fn insert_encoded( &mut self, - quads: impl IntoIterator>>, - ) -> Result<(), StorageError> { - for quad in quads { - self.writer.insert(quad.into())?; - } - Ok(()) + encoded: &EncodedQuad, + value: u8, + ) -> Result { + self.writer.ng_insert_encoded(encoded, value) + } + + pub fn ng_remove(&mut self, quad: &EncodedQuad, commit: &StrHash) -> Result<(), StorageError> { + self.writer.ng_remove(quad, commit) } + // /// Adds a set of quads to this store. + // pub fn extend<'b>( + // &mut self, + // quads: impl IntoIterator>>, + // ) -> Result<(), StorageError> { + // for quad in quads { + // self.writer.insert(quad.into())?; + // } + // Ok(()) + // } + /// Removes a quad from this store. /// /// Returns `true` if the quad was in the store and has been removed. @@ -1417,23 +1534,23 @@ impl<'a> Transaction<'a> { } /// Returns all the store named graphs. - pub fn named_graphs(&self) -> GraphNameIter { - let reader = self.writer.reader(); - GraphNameIter { - iter: reader.named_graphs(), - reader, - } - } + // pub fn named_graphs(&self) -> GraphNameIter { + // let reader = self.writer.reader(); + // GraphNameIter { + // iter: reader.named_graphs(), + // reader, + // } + // } /// Checks if the store contains a given graph. - pub fn contains_named_graph<'b>( - &self, - graph_name: impl Into>, - ) -> Result { - self.writer - .reader() - .contains_named_graph(&EncodedTerm::from(graph_name.into())) - } + // pub fn contains_named_graph<'b>( + // &self, + // graph_name: impl Into>, + // ) -> Result { + // self.writer + // .reader() + // .contains_named_graph(&EncodedTerm::from(graph_name.into())) + // } /// Inserts a graph into this store. /// @@ -1453,12 +1570,12 @@ impl<'a> Transaction<'a> { /// ); /// # Result::<_,oxigraph::store::StorageError>::Ok(()) /// ``` - pub fn insert_named_graph<'b>( - &mut self, - graph_name: impl Into>, - ) -> Result { - self.writer.insert_named_graph(graph_name.into()) - } + // pub fn insert_named_graph<'b>( + // &mut self, + // graph_name: impl Into>, + // ) -> Result { + // self.writer.insert_named_graph(graph_name.into()) + // } /// Clears a graph from this store. /// @@ -1617,14 +1734,14 @@ impl Iterator for GraphNameIter { /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// # Result::<_, Box>::Ok(()) /// ``` -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] #[must_use] pub struct BulkLoader { storage: StorageBulkLoader, on_parse_error: Option Result<(), RdfParseError>>>, } -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] impl BulkLoader { /// Sets the maximal number of threads to be used by the bulk loader per operation. /// @@ -1981,17 +2098,17 @@ mod tests { ]; let store = Store::new()?; - for t in &default_quads { - assert!(store.insert(t)?); - } - assert!(!store.insert(&default_quad)?); - - assert!(store.remove(&default_quad)?); - assert!(!store.remove(&default_quad)?); - assert!(store.insert(&named_quad)?); - assert!(!store.insert(&named_quad)?); - assert!(store.insert(&default_quad)?); - assert!(!store.insert(&default_quad)?); + // for t in &default_quads { + // assert!(store.insert(t)?); + // } + // assert!(!store.insert(&default_quad)?); + + // assert!(store.remove(&default_quad)?); + // assert!(!store.remove(&default_quad)?); + // assert!(store.insert(&named_quad)?); + // assert!(!store.insert(&named_quad)?); + // assert!(store.insert(&default_quad)?); + // assert!(!store.insert(&default_quad)?); assert_eq!(store.len()?, 4); assert_eq!(store.iter().collect::, _>>()?, all_quads); diff --git a/ng-oxigraph/src/oxrdf/named_node.rs b/ng-oxigraph/src/oxrdf/named_node.rs index 41a5516..09ec666 100644 --- a/ng-oxigraph/src/oxrdf/named_node.rs +++ b/ng-oxigraph/src/oxrdf/named_node.rs @@ -51,6 +51,10 @@ impl NamedNode { self.iri } + pub fn as_string(&self) -> &String { + &self.iri + } + #[inline] pub fn as_ref(&self) -> NamedNodeRef<'_> { NamedNodeRef::new_unchecked(&self.iri) diff --git a/ng-repo/src/commit.rs b/ng-repo/src/commit.rs index 62c114d..2f059cd 100644 --- a/ng-repo/src/commit.rs +++ b/ng-repo/src/commit.rs @@ -593,6 +593,20 @@ impl Commit { res } + pub fn direct_causal_past_ids(&self) -> HashSet { + let mut res: HashSet = HashSet::with_capacity(1); + match self { + Commit::V0(c) => match &c.header { + Some(CommitHeader::V0(header_v0)) => { + res.extend(header_v0.acks.iter()); + res.extend(header_v0.nacks.iter()); + } + _ => {} + }, + }; + res + } + // /// Get seq // pub fn seq(&self) -> u64 { // match self { diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 8776820..9aa30b1 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -328,6 +328,24 @@ pub enum VerifierError { DoubleBranchSubscription, InvalidCommit, LocallyConnected, + InvalidTriple, + InvalidNamedGraph, + OxigraphError(String), + CannotRemoveTriplesWhenNewBranch, +} + +impl Error for VerifierError {} + +impl core::fmt::Display for VerifierError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for VerifierError { + fn from(_e: serde_bare::error::Error) -> Self { + VerifierError::SerializationError + } } impl From for VerifierError { diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index 912de8d..c481492 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -442,6 +442,11 @@ impl BlockRef { pub fn nuri(&self) -> String { format!(":j:{}:k:{}", self.id, self.key) } + + pub fn tokenize(&self) -> Digest { + let ser = serde_bare::to_vec(self).unwrap(); + Digest::Blake3Digest32(*blake3::hash(&ser).as_bytes()) + } } impl From for (BlockId, BlockKey) { @@ -1360,6 +1365,15 @@ pub enum BranchType { //Unknown, // only used temporarily when loading a branch info from commits (Branch commit, then AddBranch commit) } +impl BranchType { + pub fn is_main(&self) -> bool { + match self { + Self::Main => true, + _ => false, + } + } +} + impl fmt::Display for BranchType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( diff --git a/ng-sdk-js/app-node/index.js b/ng-sdk-js/app-node/index.js index 82a55ce..dde8331 100644 --- a/ng-sdk-js/app-node/index.js +++ b/ng-sdk-js/app-node/index.js @@ -21,8 +21,8 @@ let config = { ng.init_headless(config).then( async() => { let session_id; try { - let user_id = await ng.admin_create_user(config); - console.log("user created: ",user_id); + //let user_id = await ng.admin_create_user(config); + //console.log("user created: ",user_id); let other_user_id = "AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE"; @@ -30,17 +30,28 @@ ng.init_headless(config).then( async() => { session_id = session.session_id; console.log(session); - let sparql_result = await ng.sparql_query(session.session_id, "SELECT * WHERE { ?s ?p ?o }"); + //await ng.sparql_update(session.session_id, "INSERT DATA { }"); + //await ng.sparql_update(session.session_id, "INSERT { ?s } WHERE { ?s } "); + + //await ng.sparql_update(session.session_id, "INSERT DATA { . }"); + + let sparql_result = await ng.sparql_query(session.session_id, "SELECT ?a WHERE { ?a _:abc. _:abc }"); console.log(sparql_result); + for (const q of sparql_result.results.bindings) { + console.log(q); + } + + sparql_result = await ng.sparql_query(session.session_id, "SELECT ?s ?a WHERE { ?s ?a }"); + console.log(sparql_result); + for (const q of sparql_result.results.bindings) { + console.log(q); + } let quads = await ng.sparql_query(session.session_id, "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); for (const q of quads) { console.log(q.subject.toString(), q.predicate.toString(), q.object.toString(), q.graph.toString()) } - let result = await ng.sparql_update(session.session_id, "INSERT DATA { }"); - console.log(result); - let file_nuri = await ng.file_put_to_private_store(session.session_id,"LICENSE-MIT","text/plain"); console.log(file_nuri); diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index 264fc7e..ca74c6d 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -639,7 +639,7 @@ pub async fn file_get_from_private_store( let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) .map_err(|_| "Deserialization error of session_id".to_string())?; - let nuri = NuriV0::new_from(nuri).map_err(|_| "Deserialization error of Nuri".to_string())?; + let nuri = NuriV0::new_from(&nuri).map_err(|_| "Deserialization error of Nuri".to_string())?; let mut request = AppRequest::new(AppRequestCommandV0::FileGet, nuri.clone(), None); request.set_session_id(session_id); diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index e3e3bbe..dfc9709 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -9,6 +9,8 @@ //! Verifiers for each Commit type +pub mod transaction; + use std::collections::HashMap; use std::sync::Arc; @@ -84,10 +86,10 @@ impl CommitVerifier for RootBranch { //TODO: deal with quorum_type (verify signature) let user_priv = verifier.user_privkey(); - let user_id = user_priv.to_pub(); + let user_id = verifier.user_id(); let repo_write_cap_secret = if store.is_private() { Some(SymKey::nil()) - } else if let Some(pos) = root_branch.owners.iter().position(|&o| o == user_id) { + } else if let Some(pos) = root_branch.owners.iter().position(|o| o == user_id) { let cryptobox = &root_branch.owners_write_cap[pos]; Some(RootBranch::decrypt_write_cap(user_priv, cryptobox)?) } else { diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs new file mode 100644 index 0000000..002820e --- /dev/null +++ b/ng-verifier/src/commits/transaction.rs @@ -0,0 +1,550 @@ +// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +//! Verifiers for AsyncTransaction Commit + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use ng_oxigraph::oxigraph::storage_ng::numeric_encoder::{EncodedQuad, EncodedTerm}; +use ng_oxigraph::oxigraph::storage_ng::*; +use serde::{Deserialize, Serialize}; + +use ng_net::app_protocol::{NuriV0, TargetBranchV0}; +use ng_oxigraph::oxrdf::{GraphName, GraphNameRef, NamedNode, Quad, Triple, TripleRef}; +use ng_repo::errors::VerifierError; +use ng_repo::store::Store; +use ng_repo::types::*; + +use crate::verifier::Verifier; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GraphTransaction { + pub inserts: Vec, + pub removes: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum DiscreteTransaction { + /// A yrs::Update + #[serde(with = "serde_bytes")] + YMap(Vec), + #[serde(with = "serde_bytes")] + YXml(Vec), + #[serde(with = "serde_bytes")] + YText(Vec), + /// An automerge::Patch + #[serde(with = "serde_bytes")] + Automerge(Vec), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TransactionBody { + graph: Option, + discrete: Option, +} + +struct BranchUpdateInfo { + branch_id: BranchId, + branch_is_main: bool, + repo_id: RepoId, + topic_id: TopicId, + token: Digest, + overlay_id: OverlayId, + previous_heads: HashSet, + commit_id: ObjectId, + transaction: GraphTransaction, +} + +impl Verifier { + pub(crate) fn add_doc( + &self, + repo_id: &RepoId, + overlay_id: &OverlayId, + ) -> Result<(), VerifierError> { + self.doc_in_store(repo_id, overlay_id, false) + } + + pub(crate) fn remove_doc( + &self, + repo_id: &RepoId, + overlay_id: &OverlayId, + ) -> Result<(), VerifierError> { + self.doc_in_store(repo_id, overlay_id, true) + } + + fn doc_in_store( + &self, + repo_id: &RepoId, + overlay_id: &OverlayId, + remove: bool, + ) -> Result<(), VerifierError> { + let ov_graphname = NamedNode::new_unchecked(NuriV0::repo_graph_name(repo_id, overlay_id)); + + let overlay_encoded = numeric_encoder::StrHash::new(&NuriV0::overlay_id(overlay_id)); + + self.graph_dataset + .as_ref() + .unwrap() + .ng_transaction( + move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> { + transaction.doc_in_store(ov_graphname.as_ref(), &overlay_encoded, remove) + }, + ) + .map_err(|e| VerifierError::OxigraphError(e.to_string())) + } + + pub(crate) fn add_named_commit( + &self, + repo_id: &RepoId, + overlay_id: &OverlayId, + name: String, + commit_id: ObjectId, + ) -> Result<(), VerifierError> { + self.named_commit_or_branch( + repo_id, + overlay_id, + name, + false, + Some(format!("{commit_id}")), + ) + } + + pub(crate) fn add_named_branch( + &self, + repo_id: &RepoId, + overlay_id: &OverlayId, + name: String, + branch_id: BranchId, + ) -> Result<(), VerifierError> { + self.named_commit_or_branch( + repo_id, + overlay_id, + name, + true, + Some(format!("{branch_id}")), + ) + } + + pub(crate) fn remove_named( + &self, + repo_id: &RepoId, + overlay_id: &OverlayId, + name: String, + ) -> Result<(), VerifierError> { + self.named_commit_or_branch(repo_id, overlay_id, name, false, None) + } + + fn named_commit_or_branch( + &self, + repo_id: &RepoId, + overlay_id: &OverlayId, + name: String, + is_branch: bool, + base64_id: Option, + ) -> Result<(), VerifierError> { + let ov_graphname = NamedNode::new_unchecked(NuriV0::repo_graph_name(repo_id, overlay_id)); + + let value = if base64_id.is_none() { + None + } else { + if is_branch { + let overlay_encoded = + numeric_encoder::StrHash::new(&NuriV0::overlay_id(overlay_id)); + let branch_encoded = numeric_encoder::StrHash::new(&NuriV0::branch_id_from_base64( + base64_id.as_ref().unwrap(), + )); + let mut buffer = Vec::with_capacity(33); + buffer.push(BRANCH_PREFIX); + buffer.extend_from_slice(&branch_encoded.to_be_bytes()); + buffer.extend_from_slice(&overlay_encoded.to_be_bytes()); + Some(buffer) + } else { + let commit_name = + NuriV0::commit_graph_name_from_base64(base64_id.as_ref().unwrap(), overlay_id); + let commit_encoded = numeric_encoder::StrHash::new(&commit_name); + let mut buffer = Vec::with_capacity(17); + buffer.push(COMMIT_PREFIX); + buffer.extend_from_slice(&commit_encoded.to_be_bytes()); + Some(buffer) + } + }; + + self.graph_dataset + .as_ref() + .unwrap() + .ng_transaction( + move |mut transaction: ng_oxigraph::oxigraph::store::Transaction<'_>| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> { + transaction.named_commit_or_branch(ov_graphname.as_ref(), &name, &value) + }, + ) + .map_err(|e| VerifierError::OxigraphError(e.to_string())) + } + + pub(crate) async fn verify_async_transaction( + &mut self, + transaction: &Transaction, + commit: &Commit, + branch_id: &BranchId, + repo_id: &RepoId, + store: Arc, + ) -> Result<(), VerifierError> { + let Transaction::V0(v0) = transaction; + let mut body: TransactionBody = serde_bare::from_slice(&v0)?; + + let repo = self.get_repo(repo_id, store.get_store_repo())?; + + let branch = repo.branch(branch_id)?; + + if body.graph.is_some() { + let info = BranchUpdateInfo { + branch_id: *branch_id, + branch_is_main: branch.branch_type.is_main(), + repo_id: *repo_id, + topic_id: branch.topic, + token: branch.read_cap.tokenize(), + overlay_id: store.overlay_id, + previous_heads: commit.direct_causal_past_ids(), + commit_id: commit.id().unwrap(), + transaction: body.graph.take().unwrap(), + }; + self.update_graph(&[info]) + } else { + Ok(()) + } + //TODO: discrete update + } + + fn find_branch_and_repo_for_quad( + &self, + quad: &Quad, + branches: &mut HashMap, + nuri_branches: &mut HashMap, + ) -> Result<(BranchId, bool), VerifierError> { + match &quad.graph_name { + GraphName::NamedNode(named_node) => { + let graph_name = named_node.as_string(); + if let Some(branch_found) = nuri_branches.get(graph_name) { + return Ok(branch_found.clone()); + } + let nuri = NuriV0::new_from(graph_name)?; + if !nuri.is_branch_identifier() { + return Err(VerifierError::InvalidNamedGraph); + } + let store = self + .get_store_by_overlay_id(&OverlayId::Outer(*nuri.overlay.unwrap().outer()))?; + let repo = self.get_repo(nuri.target.repo_id(), store.get_store_repo())?; + let (branch_id, is_publisher, is_main, topic_id, token) = match nuri.branch { + None => { + let b = repo.main_branch().ok_or(VerifierError::BranchNotFound)?; + ( + b.id, + b.topic_priv_key.is_some(), + true, + b.topic, + b.read_cap.tokenize(), + ) + } + Some(TargetBranchV0::BranchId(id)) => { + let b = repo.branch(&id)?; + //TODO: deal with named branch that is also the main branch + ( + id, + b.topic_priv_key.is_some(), + false, + b.topic, + b.read_cap.tokenize(), + ) + } + _ => unimplemented!(), + }; + let _ = branches.entry(branch_id).or_insert(( + store.get_store_repo().clone(), + repo.id, + is_main, + topic_id, + token, + store.overlay_id, + )); + let _ = nuri_branches + .entry(graph_name.clone()) + .or_insert((branch_id, is_publisher)); + Ok((branch_id, is_publisher)) + } + _ => Err(VerifierError::InvalidNamedGraph), + } + } + + async fn prepare_sparql_update( + &mut self, + inserts: Vec, + removes: Vec, + ) -> Result<(), VerifierError> { + // options when not a publisher on the repo: + // - skip + // - TODO: abort (the whole transaction) + // - TODO: inbox (sent to inbox of document for a suggested update) + // for now we just do skip, without giving option to user + let mut inserts_map: HashMap> = HashMap::with_capacity(1); + let mut removes_map: HashMap> = HashMap::with_capacity(1); + let mut branches: HashMap = + HashMap::with_capacity(1); + let mut nuri_branches: HashMap = HashMap::with_capacity(1); + let mut inserts_len = inserts.len(); + let mut removes_len = removes.len(); + for insert in inserts { + let (branch_id, is_publisher) = + self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?; + if !is_publisher { + continue; + } + let set = inserts_map.entry(branch_id).or_insert_with(|| { + let set = HashSet::with_capacity(inserts_len); + inserts_len = 1; + set + }); + set.insert(insert.into()); + } + for remove in removes { + let (branch_id, is_publisher) = + self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?; + if !is_publisher { + continue; + } + let set = removes_map.entry(branch_id).or_insert_with(|| { + let set = HashSet::with_capacity(removes_len); + removes_len = 1; + set + }); + set.insert(remove.into()); + } + + let mut updates = Vec::with_capacity(branches.len()); + + for (branch_id, (store_repo, repo_id, branch_is_main, topic_id, token, overlay_id)) in + branches + { + let graph_transac = GraphTransaction { + inserts: Vec::from_iter(inserts_map.remove(&branch_id).unwrap_or(HashSet::new())), + removes: Vec::from_iter(removes_map.remove(&branch_id).unwrap_or(HashSet::new())), + }; + + let mut transac = TransactionBody { + graph: Some(graph_transac), + discrete: None, + }; + + let transaction_commit_body = + CommitBodyV0::AsyncTransaction(Transaction::V0(serde_bare::to_vec(&transac)?)); + + let commit = self + .new_transaction_commit( + transaction_commit_body, + &repo_id, + &branch_id, + &store_repo, + vec![], //TODO deps + vec![], + ) + .await?; + + let graph_update = transac.graph.take().unwrap(); + + let info = BranchUpdateInfo { + branch_id, + branch_is_main, + repo_id, + topic_id, + token, + overlay_id, + previous_heads: commit.direct_causal_past_ids(), + commit_id: commit.id().unwrap(), + transaction: graph_update, + }; + updates.push(info); + } + self.update_graph(&updates) + } + + fn update_graph(&mut self, updates: &[BranchUpdateInfo]) -> Result<(), VerifierError> { + self.graph_dataset + .as_ref() + .unwrap() + .ng_transaction( + move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> { + let reader = transaction.ng_get_reader(); + + for update in updates { + let commit_name = + NuriV0::commit_graph_name(&update.commit_id, &update.overlay_id); + let commit_encoded = numeric_encoder::StrHash::new(&commit_name); + + let cv_graphname = NamedNode::new_unchecked(commit_name); + let cv_graphname_ref = GraphNameRef::NamedNode((&cv_graphname).into()); + let ov_main = if update.branch_is_main { + let ov_graphname = NamedNode::new_unchecked(NuriV0::repo_graph_name( + &update.repo_id, + &update.overlay_id, + )); + Some(ov_graphname) + } else { + None + }; + let value = if update.branch_is_main { + ADDED_IN_MAIN + } else { + ADDED_IN_OTHER + }; + for triple in update.transaction.inserts.iter() { + let triple_ref: TripleRef = triple.into(); + let quad_ref = triple_ref.in_graph(cv_graphname_ref); + transaction.insert(quad_ref, value)?; + if let Some(ov_graphname) = ov_main.as_ref() { + let ov_graphname_ref = GraphNameRef::NamedNode(ov_graphname.into()); + let triple_ref: TripleRef = triple.into(); + let quad_ref = triple_ref.in_graph(ov_graphname_ref); + transaction.insert(quad_ref, REPO_IN_MAIN)?; + } + } + + let topic_encoded = + numeric_encoder::StrHash::new(&NuriV0::topic_id(&update.topic_id)); + let overlay_encoded = + numeric_encoder::StrHash::new(&NuriV0::overlay_id(&update.overlay_id)); + + let branch_encoded = + numeric_encoder::StrHash::new(&NuriV0::branch_id(&update.branch_id)); + let token_encoded = + numeric_encoder::StrHash::new(&NuriV0::token(&update.token)); + + transaction.update_branch_and_token( + &overlay_encoded, + &branch_encoded, + &topic_encoded, + &token_encoded, + )?; + + let direct_causal_past_encoded: HashSet = + HashSet::from_iter(update.previous_heads.iter().map(|commit_id| { + numeric_encoder::StrHash::new(&NuriV0::commit_graph_name( + commit_id, + &update.overlay_id, + )) + })); + + let current_heads = + reader.ng_get_heads(&topic_encoded, &overlay_encoded)?; + + transaction.update_heads( + &topic_encoded, + &overlay_encoded, + &commit_encoded, + &direct_causal_past_encoded, + )?; + + if !direct_causal_past_encoded.is_empty() { + // adding past + transaction.update_past( + &commit_encoded, + &direct_causal_past_encoded, + false, + )?; + } + + if !update.transaction.removes.is_empty() { + if current_heads.is_empty() { + return Err(ng_oxigraph::oxigraph::store::StorageError::Other( + Box::new(VerifierError::CannotRemoveTriplesWhenNewBranch), + )); + } + + let at_current_heads = current_heads != direct_causal_past_encoded; + // if not, we need to base ourselves on the materialized state of the direct_causal_past of the commit + + let value = if update.branch_is_main { + REMOVED_IN_MAIN + } else { + REMOVED_IN_OTHER + }; + + for remove in update.transaction.removes.iter() { + let encoded_subject = remove.subject.as_ref().into(); + let encoded_predicate = remove.predicate.as_ref().into(); + let encoded_object = remove.object.as_ref().into(); + let observed_adds = reader + .quads_for_subject_predicate_object_heads( + &encoded_subject, + &encoded_predicate, + &encoded_object, + &direct_causal_past_encoded, + at_current_heads, + )?; + for removing in observed_adds { + let graph_encoded = EncodedTerm::NamedNode { iri_id: removing }; + let quad_encoded = EncodedQuad::new( + encoded_subject.clone(), + encoded_predicate.clone(), + encoded_object.clone(), + graph_encoded, + ); + transaction.insert_encoded(&quad_encoded, value)?; + transaction.ng_remove(&quad_encoded, &commit_encoded)?; + } + if let Some(ov_graphname) = ov_main.as_ref() { + let should_remove_ov_triples = at_current_heads || { + reader + .quads_for_subject_predicate_object_heads( + &encoded_subject, + &encoded_predicate, + &encoded_object, + ¤t_heads, + true, + )? + .is_empty() + }; + if should_remove_ov_triples { + let ov_graphname_ref = + GraphNameRef::NamedNode(ov_graphname.into()); + let triple_ref: TripleRef = remove.into(); + let quad_ref = triple_ref.in_graph(ov_graphname_ref); + transaction.remove(quad_ref)?; + } + } + } + } + } + + Ok(()) + }, + ) + .map_err(|e| VerifierError::OxigraphError(e.to_string())) + } + + pub(crate) async fn process_sparql_update( + &mut self, + _nuri: &NuriV0, + query: &String, + ) -> Result<(), String> { + let store = self.graph_dataset.as_ref().unwrap(); + //TODO: use nuri to set some default dataset in oxigraph + let res = store.ng_update(query); + match res { + Err(e) => Err(e.to_string()), + Ok((inserts, removes)) => { + if inserts.is_empty() && removes.is_empty() { + Ok(()) + } else { + self.prepare_sparql_update(Vec::from_iter(inserts), Vec::from_iter(removes)) + .await + .map_err(|e| e.to_string()) + } + } + } + } +} diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index 7c186d2..4f7b318 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -229,19 +229,20 @@ impl Verifier { } } AppFetchContentV0::WriteQuery => { - if let Some(AppRequestPayload::V0(AppRequestPayloadV0::Query(DocQuery::V0( - query, - )))) = payload + if !nuri.is_valid_for_sparql_update() { + return Err(NgError::InvalidNuri); + } + return if let Some(AppRequestPayload::V0(AppRequestPayloadV0::Query( + DocQuery::V0(query), + ))) = payload { - let store = self.graph_dataset.as_ref().unwrap(); - let res = store.update(&query); - return Ok(match res { - Err(e) => AppResponse::error(e.to_string()), + Ok(match self.process_sparql_update(&nuri, &query).await { + Err(e) => AppResponse::error(e), Ok(_) => AppResponse::ok(), - }); + }) } else { - return Err(NgError::InvalidPayload); - } + Err(NgError::InvalidPayload) + }; } _ => unimplemented!(), }, diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 6777ea4..8069637 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -13,11 +13,11 @@ use core::fmt; use std::cmp::max; use std::collections::BTreeMap; use std::collections::HashSet; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::fs::create_dir_all; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::fs::{read, File, OpenOptions}; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use std::io::Write; use std::{collections::HashMap, sync::Arc}; @@ -58,7 +58,7 @@ use ng_net::{ }; use crate::commits::*; -#[cfg(all(not(target_family = "wasm"),not(docsrs)))] +#[cfg(all(not(target_family = "wasm"), not(docsrs)))] use crate::rocksdb_user_storage::RocksDbUserStorage; use crate::types::*; use crate::user_storage::InMemoryUserStorage; @@ -79,6 +79,7 @@ use crate::user_storage::UserStorage; pub struct Verifier { pub(crate) config: VerifierConfig, + user_id: UserId, pub connected_broker: BrokerPeerId, pub(crate) graph_dataset: Option, pub(crate) user_storage: Option>>, @@ -129,6 +130,10 @@ impl Verifier { &self.config.user_priv_key } + pub(crate) fn user_id(&self) -> &UserId { + &self.user_id + } + pub fn private_store_id(&self) -> &RepoId { self.config.private_store_id.as_ref().unwrap() } @@ -140,11 +145,11 @@ impl Verifier { } pub async fn close(&self) { - log_debug!("VERIFIER CLOSED {}", self.user_privkey().to_pub()); + log_debug!("VERIFIER CLOSED {}", self.user_id()); BROKER .write() .await - .close_peer_connection_x(None, Some(self.config.user_priv_key.to_pub())) + .close_peer_connection_x(None, Some(self.user_id().clone())) .await; } @@ -270,17 +275,20 @@ impl Verifier { let (peer_priv_key, peer_id) = generate_keypair(); let block_storage = Arc::new(std::sync::RwLock::new(HashMapBlockStorage::new())) as Arc>; + let user_priv_key = PrivKey::random_ed(); + let user_id = user_priv_key.to_pub(); Verifier { config: VerifierConfig { config_type: VerifierConfigType::Memory, user_master_key: [0; 32], peer_priv_key, - user_priv_key: PrivKey::random_ed(), + user_priv_key, private_store_read_cap: None, private_store_id: None, protected_store_id: None, public_store_id: None, }, + user_id, connected_broker: BrokerPeerId::None, graph_dataset: None, user_storage: None, @@ -447,7 +455,7 @@ impl Verifier { } #[allow(dead_code)] - fn get_store(&self, store_repo: &StoreRepo) -> Result, VerifierError> { + pub(crate) fn get_store(&self, store_repo: &StoreRepo) -> Result, VerifierError> { let overlay_id = store_repo.overlay_id_for_storage_purpose(); let store = self .stores @@ -604,7 +612,7 @@ impl Verifier { let branch = repo.branch(branch_id)?; let commit = Commit::new_with_body_and_save( self.user_privkey(), - &self.user_privkey().to_pub(), + self.user_id(), *branch_id, QuorumType::NoSigning, deps, @@ -628,6 +636,44 @@ impl Verifier { .await } + pub(crate) async fn new_transaction_commit( + &mut self, + commit_body: CommitBodyV0, + repo_id: &RepoId, + branch_id: &BranchId, + store_repo: &StoreRepo, + deps: Vec, + files: Vec, + ) -> Result { + let commit = { + let repo = self.get_repo(repo_id, &store_repo)?; + let branch = repo.branch(branch_id)?; + let commit = Commit::new_with_body_and_save( + self.user_privkey(), + self.user_id(), + *branch_id, + QuorumType::NoSigning, + deps, + vec![], + branch.current_heads.clone(), + vec![], + files, + vec![], + vec![], + CommitBody::V0(commit_body), + 0, + &repo.store, + )?; + commit + }; + //log_info!("{}", commit); + + self.new_event(&commit, &vec![], *repo_id, store_repo) + .await?; + + Ok(commit) + } + #[allow(dead_code)] pub(crate) async fn new_commit_simple( &mut self, @@ -714,7 +760,7 @@ impl Verifier { } Ok(res) } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] VerifierConfigType::RocksDb(path) => { let mut path = path.clone(); path.push(format!("outbox{}", self.peer_id.to_hash_string())); @@ -779,7 +825,7 @@ impl Verifier { // send the event to the server already let connected_broker = self.connected_broker.clone(); let broker = BROKER.read().await; - let user = self.config.user_priv_key.to_pub(); + let user = self.user_id().clone(); self.send_event(event, &broker, &Some(user), &connected_broker, overlay) .await?; } else { @@ -794,7 +840,7 @@ impl Verifier { serde_bare::to_vec(&e)?, )?; } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] VerifierConfigType::RocksDb(path) => { let mut path = path.clone(); std::fs::create_dir_all(path.clone()).unwrap(); @@ -843,7 +889,7 @@ impl Verifier { } } let connected_broker = self.connected_broker.clone(); - let user = self.config.user_priv_key.to_pub(); + let user = self.user_id().clone(); let broker = BROKER.read().await; //log_info!("looping on branches {:?}", branches); for (repo, branch, publisher) in branches { @@ -885,7 +931,7 @@ impl Verifier { let res = self.send_outbox().await; log_info!("SENDING EVENTS FROM OUTBOX RETURNED: {:?}", res); - let user = self.config.user_priv_key.to_pub(); + let user = self.user_id().clone(); let broker = BROKER.read().await; log_info!("looping on branches {:?}", branches); for (repo, branch, publisher) in branches { @@ -924,7 +970,7 @@ impl Verifier { return Ok(()); } - let user = self.config.user_priv_key.to_pub(); + let user = self.user_id().clone(); let connected_broker = self.connected_broker.clone(); self.open_branch_( repo_id, @@ -942,7 +988,7 @@ impl Verifier { let overlay = repo.store.overlay_for_read_on_client_protocol(); let broker = BROKER.read().await; - let user = self.config.user_priv_key.to_pub(); + let user = self.user_id().clone(); let remote = self.connected_broker.connected_or_err()?; let msg = BlocksPut::V0(BlocksPutV0 { @@ -963,7 +1009,7 @@ impl Verifier { let overlay = repo.store.overlay_for_read_on_client_protocol(); let broker = BROKER.read().await; - let user = self.config.user_priv_key.to_pub(); + let user = self.user_id().clone(); let remote = self.connected_broker.connected_or_err()?; let msg = BlocksExist::V0(BlocksExistV0 { @@ -1238,6 +1284,9 @@ impl Verifier { CommitBodyV0::StoreUpdate(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddSignerCap(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddFile(a) => a.verify(commit, self, branch_id, repo_id, store), + CommitBodyV0::AsyncTransaction(a) => { + Box::pin(self.verify_async_transaction(a, commit, branch_id, repo_id, store)) + } _ => { log_err!("unimplemented verifier {}", commit); return Err(VerifierError::NotImplemented); @@ -1351,6 +1400,15 @@ impl Verifier { repo_ref } + pub(crate) fn get_store_by_overlay_id( + &self, + id: &OverlayId, + ) -> Result, VerifierError> { + Ok(Arc::clone( + self.stores.get(id).ok_or(VerifierError::StoreNotFound)?, + )) + } + async fn bootstrap(&mut self) -> Result<(), NgError> { if let Err(e) = self.bootstrap_from_remote().await { log_warn!("bootstrap_from_remote failed with {}", e); @@ -1635,7 +1693,7 @@ impl Verifier { let overlay = repo.store.overlay_for_read_on_client_protocol(); let broker = BROKER.read().await; - let user = Some(self.config.user_priv_key.to_pub()); + let user = Some(self.user_id().clone()); let remote = &self.connected_broker; match repo.store.has(id) { @@ -1666,7 +1724,7 @@ impl Verifier { async fn bootstrap_from_remote(&mut self) -> Result<(), NgError> { if self.need_bootstrap() { let broker = BROKER.read().await; - let user = Some(self.config.user_priv_key.to_pub()); + let user = Some(self.user_id().clone()); self.connected_broker.is_direct_or_err()?; let private_store_id = self.config.private_store_id.to_owned().unwrap(); @@ -1857,7 +1915,7 @@ impl Verifier { return Ok(()); } let broker = BROKER.read().await; - let user = Some(self.config.user_priv_key.to_pub()); + let user = Some(self.user_id().clone()); self.connected_broker.connected_or_err()?; let remote = self.connected_broker.clone(); @@ -1968,7 +2026,7 @@ impl Verifier { let res = (js.last_seq_function)(self.peer_id, qty)?; self.max_reserved_seq_num = res + qty as u64; } - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] VerifierConfigType::RocksDb(path) => { let mut path = path.clone(); std::fs::create_dir_all(path.clone()).unwrap(); @@ -2028,7 +2086,7 @@ impl Verifier { Some(Box::new(InMemoryUserStorage::new()) as Box), Some(block_storage), ), - #[cfg(all(not(target_family = "wasm"),not(docsrs)))] + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] VerifierConfigType::RocksDb(path) => { let mut path_oxi = path.clone(); path_oxi.push("graph"); @@ -2059,6 +2117,7 @@ impl Verifier { }; let peer_id = config.peer_priv_key.to_pub(); let mut verif = Verifier { + user_id: config.user_priv_key.to_pub(), config, connected_broker: BrokerPeerId::None, graph_dataset: graph,