SPARQL Update CRDT

pull/19/head
Niko PLP 6 months ago
parent d4fdf2139f
commit 74c49d43e6
  1. 182
      ng-net/src/app_protocol.rs
  2. 15
      ng-net/src/types.rs
  3. 13
      ng-oxigraph/src/oxigraph/mod.rs
  4. 49
      ng-oxigraph/src/oxigraph/sparql/update.rs
  5. 126
      ng-oxigraph/src/oxigraph/storage/backend/fallback.rs
  6. 112
      ng-oxigraph/src/oxigraph/storage/backend/oxi_rocksdb.rs
  7. 1017
      ng-oxigraph/src/oxigraph/storage/mod.rs
  8. 419
      ng-oxigraph/src/oxigraph/store.rs
  9. 4
      ng-oxigraph/src/oxrdf/named_node.rs
  10. 14
      ng-repo/src/commit.rs
  11. 18
      ng-repo/src/errors.rs
  12. 14
      ng-repo/src/types.rs
  13. 23
      ng-sdk-js/app-node/index.js
  14. 2
      ng-sdk-js/src/lib.rs
  15. 6
      ng-verifier/src/commits/mod.rs
  16. 550
      ng-verifier/src/commits/transaction.rs
  17. 21
      ng-verifier/src/request_processor.rs
  18. 87
      ng-verifier/src/verifier.rs

@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use ng_repo::errors::NgError; use ng_repo::errors::NgError;
use ng_repo::types::*; use ng_repo::types::*;
use ng_repo::utils::{decode_id, decode_sym_key}; use ng_repo::utils::{decode_id, decode_key, decode_sym_key};
use crate::types::*; use crate::types::*;
@ -23,6 +23,15 @@ lazy_static! {
#[doc(hidden)] #[doc(hidden)]
static ref RE_FILE_READ_CAP: Regex = static ref RE_FILE_READ_CAP: Regex =
Regex::new(r"^did:ng:j:([A-Za-z0-9-_%.]*):k:([A-Za-z0-9-_%.]*)$").unwrap(); Regex::new(r"^did:ng:j:([A-Za-z0-9-_%.]*):k:([A-Za-z0-9-_%.]*)$").unwrap();
#[doc(hidden)]
static ref RE_REPO: Regex =
Regex::new(r"^did:ng:o:([A-Za-z0-9-_%.]*):v:([A-Za-z0-9-_%.]*)$").unwrap();
#[doc(hidden)]
static ref RE_BRANCH: Regex =
Regex::new(r"^did:ng:o:([A-Za-z0-9-_%.]*):v:([A-Za-z0-9-_%.]*):b:([A-Za-z0-9-_%.]*)$").unwrap();
#[doc(hidden)]
static ref RE_NAMED_BRANCH_OR_COMMIT: Regex =
Regex::new(r"^did:ng:o:([A-Za-z0-9-_%.]*):v:([A-Za-z0-9-_%.]*):a:([A-Za-z0-9-_%.]*)$").unwrap(); //TODO: allow international chars. disallow digit as first char
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -68,6 +77,21 @@ pub enum TargetBranchV0 {
Commits(Vec<ObjectId>), // only possible if access to their branch is given. must belong to the same branch. Commits(Vec<ObjectId>), // only possible if access to their branch is given. must belong to the same branch.
} }
impl TargetBranchV0 {
pub fn is_valid_for_sparql_update(&self) -> bool {
match self {
Self::Commits(_) => false,
_ => true,
}
}
pub fn branch_id(&self) -> &BranchId {
match self {
Self::BranchId(id) => id,
_ => panic!("not a TargetBranchV0::BranchId"),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NuriTargetV0 { pub enum NuriTargetV0 {
UserSite, // targets the whole data set of the user UserSite, // targets the whole data set of the user
@ -81,7 +105,32 @@ pub enum NuriTargetV0 {
Group(String), // shortname of a Group Group(String), // shortname of a Group
Repo(RepoId), Repo(RepoId),
None,
}
impl NuriTargetV0 {
pub fn is_valid_for_sparql_update(&self) -> bool {
match self {
Self::UserSite | Self::AllDialogs | Self::AllGroups => false,
_ => true,
}
}
pub fn is_repo_id(&self) -> bool {
match self {
Self::Repo(_) => true,
_ => false,
}
}
pub fn repo_id(&self) -> &RepoId {
match self {
Self::Repo(id) => id,
_ => panic!("not a NuriTargetV0::Repo"),
} }
}
}
const DID_PREFIX: &str = "did:ng";
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NuriV0 { pub struct NuriV0 {
@ -99,6 +148,61 @@ pub struct NuriV0 {
} }
impl NuriV0 { impl NuriV0 {
pub fn commit_graph_name(commit_id: &ObjectId, overlay_id: &OverlayId) -> String {
format!("{DID_PREFIX}:c:{commit_id}:v:{overlay_id}")
}
pub fn commit_graph_name_from_base64(commit_base64: &String, overlay_id: &OverlayId) -> String {
format!("{DID_PREFIX}:c:{commit_base64}:v:{overlay_id}")
}
pub fn repo_graph_name(repo_id: &RepoId, overlay_id: &OverlayId) -> String {
format!("{DID_PREFIX}:o:{repo_id}:v:{overlay_id}")
}
pub fn overlay_id(overlay_id: &OverlayId) -> String {
format!("{DID_PREFIX}:v:{overlay_id}")
}
pub fn topic_id(topic_id: &TopicId) -> String {
format!("{DID_PREFIX}:h:{topic_id}")
}
pub fn branch_id(branch_id: &BranchId) -> String {
format!("{DID_PREFIX}:b:{branch_id}")
}
pub fn branch_id_from_base64(branch_base64: &String) -> String {
format!("{DID_PREFIX}:b:{branch_base64}")
}
pub fn token(token: &Digest) -> String {
format!("{DID_PREFIX}:n:{token}")
}
pub fn is_branch_identifier(&self) -> bool {
self.locator.is_empty()
&& self.topic.is_none()
&& self.access.is_empty()
&& self.overlay.as_ref().map_or(false, |o| o.is_outer())
&& self
.branch
.as_ref()
.map_or(true, |b| b.is_valid_for_sparql_update())
&& self.object.is_none()
&& !self.entire_store
&& self.target.is_repo_id()
}
pub fn is_valid_for_sparql_update(&self) -> bool {
self.object.is_none()
&& self.entire_store == false
&& self.target.is_valid_for_sparql_update()
&& self
.branch
.as_ref()
.map_or(true, |b| b.is_valid_for_sparql_update())
}
pub fn new_repo_target_from_string(repo_id_string: String) -> Result<Self, NgError> { pub fn new_repo_target_from_string(repo_id_string: String) -> Result<Self, NgError> {
let repo_id: RepoId = repo_id_string.as_str().try_into()?; let repo_id: RepoId = repo_id_string.as_str().try_into()?;
Ok(Self { Ok(Self {
@ -140,8 +244,8 @@ impl NuriV0 {
locator: vec![], locator: vec![],
} }
} }
pub fn new_from(from: String) -> Result<Self, NgError> { pub fn new_from(from: &String) -> Result<Self, NgError> {
let c = RE_FILE_READ_CAP.captures(&from); let c = RE_FILE_READ_CAP.captures(from);
if c.is_some() if c.is_some()
&& c.as_ref().unwrap().get(1).is_some() && c.as_ref().unwrap().get(1).is_some()
@ -163,11 +267,62 @@ impl NuriV0 {
topic: None, topic: None,
locator: vec![], locator: vec![],
}) })
} else {
let c = RE_REPO.captures(from);
if c.is_some()
&& c.as_ref().unwrap().get(1).is_some()
&& c.as_ref().unwrap().get(2).is_some()
{
let cap = c.unwrap();
let o = cap.get(1).unwrap().as_str();
let v = cap.get(2).unwrap().as_str();
let repo_id = decode_key(o)?;
let overlay_id = decode_id(v)?;
Ok(Self {
identity: None,
target: NuriTargetV0::Repo(repo_id),
entire_store: false,
object: None,
branch: None,
overlay: Some(OverlayLink::Outer(overlay_id)),
access: vec![],
topic: None,
locator: vec![],
})
} else {
let c = RE_BRANCH.captures(from);
if c.is_some()
&& c.as_ref().unwrap().get(1).is_some()
&& c.as_ref().unwrap().get(2).is_some()
&& c.as_ref().unwrap().get(3).is_some()
{
let cap = c.unwrap();
let o = cap.get(1).unwrap().as_str();
let v = cap.get(2).unwrap().as_str();
let b = cap.get(3).unwrap().as_str();
let repo_id = decode_key(o)?;
let overlay_id = decode_id(v)?;
let branch_id = decode_key(b)?;
Ok(Self {
identity: None,
target: NuriTargetV0::Repo(repo_id),
entire_store: false,
object: None,
branch: Some(TargetBranchV0::BranchId(branch_id)),
overlay: Some(OverlayLink::Outer(overlay_id)),
access: vec![],
topic: None,
locator: vec![],
})
} else { } else {
Err(NgError::InvalidNuri) Err(NgError::InvalidNuri)
} }
} }
} }
}
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AppRequestCommandV0 { pub enum AppRequestCommandV0 {
@ -321,8 +476,12 @@ pub enum DocQuery {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GraphUpdate { pub struct GraphUpdate {
add: Vec<String>, // serialization of Vec<Quad>
remove: Vec<String>, #[serde(with = "serde_bytes")]
pub inserts: Vec<u8>,
// serialization of Vec<Quad>
#[serde(with = "serde_bytes")]
pub removes: Vec<u8>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -405,9 +564,12 @@ pub enum DiscretePatch {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GraphPatch { pub struct GraphPatch {
/// oxigraph::model::GroundQuad serialized to n-quads with oxrdfio // serialization of Vec<Triple>
pub adds: Vec<String>, #[serde(with = "serde_bytes")]
pub removes: Vec<String>, pub inserts: Vec<u8>,
// serialization of Vec<Triple>
#[serde(with = "serde_bytes")]
pub removes: Vec<u8>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -426,7 +588,9 @@ pub enum DiscreteState {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GraphState { pub struct GraphState {
pub tuples: Vec<String>, // serialization of Vec<Triple>
#[serde(with = "serde_bytes")]
pub triples: Vec<u8>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

@ -1346,6 +1346,21 @@ pub enum OverlayLink {
Public(PubKey), Public(PubKey),
} }
impl OverlayLink {
pub fn is_outer(&self) -> bool {
match self {
Self::Outer(_) => true,
_ => false,
}
}
pub fn outer(&self) -> &Digest {
match self {
Self::Outer(o) => o,
_ => panic!("not an outer overlay ID"),
}
}
}
/// Overlay session ID /// Overlay session ID
/// ///
/// It is a pubkey used for signing all OverlayMessage sent by the peer. /// It is a pubkey used for signing all OverlayMessage sent by the peer.

@ -3,3 +3,16 @@ pub mod model;
pub mod sparql; pub mod sparql;
mod storage; mod storage;
pub mod store; pub mod store;
pub mod storage_ng {
pub use super::storage::numeric_encoder;
pub use super::storage::ADDED_IN_MAIN;
pub use super::storage::ADDED_IN_OTHER;
pub use super::storage::BRANCH_PREFIX;
pub use super::storage::COMMIT_HAS_GRAPH;
pub use super::storage::COMMIT_PREFIX;
pub use super::storage::COMMIT_SKIP_NO_GRAPH;
pub use super::storage::REMOVED_IN_MAIN;
pub use super::storage::REMOVED_IN_OTHER;
pub use super::storage::REPO_IN_MAIN;
}

@ -6,7 +6,7 @@ use crate::oxigraph::sparql::eval::{EncodedTuple, SimpleEvaluator};
use crate::oxigraph::sparql::http::Client; use crate::oxigraph::sparql::http::Client;
use crate::oxigraph::sparql::{EvaluationError, Update, UpdateOptions}; use crate::oxigraph::sparql::{EvaluationError, Update, UpdateOptions};
use crate::oxigraph::storage::numeric_encoder::{Decoder, EncodedTerm}; use crate::oxigraph::storage::numeric_encoder::{Decoder, EncodedTerm};
use crate::oxigraph::storage::StorageWriter; use crate::oxigraph::storage::CommitWriter;
use crate::spargebra::algebra::{GraphPattern, GraphTarget}; use crate::spargebra::algebra::{GraphPattern, GraphTarget};
use crate::spargebra::term::{ use crate::spargebra::term::{
BlankNode, GraphName, GraphNamePattern, GroundQuad, GroundQuadPattern, GroundSubject, BlankNode, GraphName, GraphNamePattern, GroundQuad, GroundQuadPattern, GroundSubject,
@ -23,7 +23,7 @@ use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
pub fn evaluate_update<'a, 'b: 'a>( pub fn evaluate_update<'a, 'b: 'a>(
transaction: &'a mut StorageWriter<'b>, transaction: &'a mut CommitWriter<'b>,
update: &Update, update: &Update,
options: &UpdateOptions, options: &UpdateOptions,
) -> Result<(), EvaluationError> { ) -> Result<(), EvaluationError> {
@ -40,7 +40,7 @@ pub fn evaluate_update<'a, 'b: 'a>(
} }
struct SimpleUpdateEvaluator<'a, 'b> { struct SimpleUpdateEvaluator<'a, 'b> {
transaction: &'a mut StorageWriter<'b>, transaction: &'a mut CommitWriter<'b>,
base_iri: Option<Rc<Iri<String>>>, base_iri: Option<Rc<Iri<String>>>,
options: UpdateOptions, options: UpdateOptions,
client: Client, client: Client,
@ -200,27 +200,28 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> {
} }
fn eval_clear(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> { fn eval_clear(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> {
match graph { unimplemented!();
GraphTarget::NamedNode(graph_name) => { // match graph {
if self // GraphTarget::NamedNode(graph_name) => {
.transaction // if self
.reader() // .transaction
.contains_named_graph(&graph_name.as_ref().into())? // .reader()
{ // .contains_named_graph(&graph_name.as_ref().into())?
Ok(self.transaction.clear_graph(graph_name.into())?) // {
} else if silent { // Ok(self.transaction.clear_graph(graph_name.into())?)
Ok(()) // } else if silent {
} else { // Ok(())
Err(EvaluationError::GraphDoesNotExist(graph_name.clone())) // } else {
} // Err(EvaluationError::GraphDoesNotExist(graph_name.clone()))
} // }
GraphTarget::DefaultGraph => { // }
self.transaction.clear_graph(GraphNameRef::DefaultGraph)?; // GraphTarget::DefaultGraph => {
Ok(()) // self.transaction.clear_graph(GraphNameRef::DefaultGraph)?;
} // Ok(())
GraphTarget::NamedGraphs => Ok(self.transaction.clear_all_named_graphs()?), // }
GraphTarget::AllGraphs => Ok(self.transaction.clear_all_graphs()?), // GraphTarget::NamedGraphs => Ok(self.transaction.clear_all_named_graphs()?),
} // GraphTarget::AllGraphs => Ok(self.transaction.clear_all_graphs()?),
// }
} }
fn eval_drop(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> { fn eval_drop(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> {

@ -1,9 +1,10 @@
//! TODO: This storage is dramatically naive. //! TODO: This storage is dramatically naive.
use super::super::numeric_encoder::StrHash;
use crate::oxigraph::storage::StorageError; use crate::oxigraph::storage::StorageError;
use crate::oxigraph::store::CorruptionError; use crate::oxigraph::store::CorruptionError;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap, HashSet};
use std::error::Error; use std::error::Error;
use std::mem::transmute; use std::mem::transmute;
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
@ -17,9 +18,18 @@ pub struct ColumnFamilyDefinition {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Db(Arc<RwLock<HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>); pub struct Db {
db: Arc<RwLock<HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>,
pub past_commits_cache: Arc<RwLock<HashMap<StrHash, Arc<HashSet<StrHash>>>>>,
}
impl Db { impl Db {
pub(crate) fn past_commits_cache(
&self,
) -> Arc<RwLock<HashMap<StrHash, Arc<HashSet<StrHash>>>>> {
Arc::clone(&self.past_commits_cache)
}
#[allow(clippy::unnecessary_wraps)] #[allow(clippy::unnecessary_wraps)]
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self, StorageError> { pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self, StorageError> {
let mut trees = HashMap::new(); let mut trees = HashMap::new();
@ -27,13 +37,16 @@ impl Db {
trees.insert(ColumnFamily(cf.name), BTreeMap::default()); trees.insert(ColumnFamily(cf.name), BTreeMap::default());
} }
trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists. trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists.
Ok(Self(Arc::new(RwLock::new(trees)))) Ok(Self {
db: Arc::new(RwLock::new(trees)),
past_commits_cache: Arc::new(RwLock::new(HashMap::new())),
})
} }
#[allow(clippy::unwrap_in_result)] #[allow(clippy::unwrap_in_result)]
pub fn column_family(&self, name: &'static str) -> Result<ColumnFamily, StorageError> { pub fn column_family(&self, name: &'static str) -> Result<ColumnFamily, StorageError> {
let column_family = ColumnFamily(name); let column_family = ColumnFamily(name);
if self.0.read().unwrap().contains_key(&column_family) { if self.db.read().unwrap().contains_key(&column_family) {
Ok(column_family) Ok(column_family)
} else { } else {
Err(CorruptionError::from_missing_column_family_name(name).into()) Err(CorruptionError::from_missing_column_family_name(name).into())
@ -42,7 +55,7 @@ impl Db {
#[must_use] #[must_use]
pub fn snapshot(&self) -> Reader { pub fn snapshot(&self) -> Reader {
Reader(InnerReader::Simple(Arc::clone(&self.0))) Reader(InnerReader::Simple(Arc::clone(&self.db)))
} }
#[allow(clippy::unwrap_in_result)] #[allow(clippy::unwrap_in_result)]
@ -50,7 +63,22 @@ impl Db {
&'b self, &'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>, f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
f(Transaction(Rc::new(RefCell::new(self.0.write().unwrap())))) let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap())));
let res = f(t.clone());
t.rollback();
res
}
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> {
let mut t = Transaction::new(Rc::new(RefCell::new(self.db.write().unwrap())));
let res = f(t.clone());
if res.is_err() {
t.rollback();
}
res
} }
} }
@ -225,16 +253,29 @@ impl Reader {
} }
} }
pub struct Transaction<'a>( #[derive(Clone)]
Rc<RefCell<RwLockWriteGuard<'a, HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>>, pub struct Transaction<'a> {
); db: Rc<RefCell<RwLockWriteGuard<'a, HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>>,
inserts: Rc<RwLock<HashMap<(ColumnFamily, Vec<u8>), Option<Vec<u8>>>>>,
removes: Rc<RwLock<HashMap<(ColumnFamily, Vec<u8>), Vec<u8>>>>,
}
impl<'a> Transaction<'a> {
fn new(
db: Rc<RefCell<RwLockWriteGuard<'a, HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>>,
) -> Self {
Transaction {
db,
inserts: Rc::new(RwLock::new(HashMap::new())),
removes: Rc::new(RwLock::new(HashMap::new())),
}
}
impl Transaction<'_> {
#[allow(unsafe_code, clippy::useless_transmute)] #[allow(unsafe_code, clippy::useless_transmute)]
pub fn reader(&self) -> Reader { pub fn reader(&self) -> Reader {
// SAFETY: This transmute is safe because we take a weak reference and the only Rc reference used is guarded by the lifetime. // SAFETY: This transmute is safe because we take a weak reference and the only Rc reference used is guarded by the lifetime.
Reader(InnerReader::Transaction(Rc::downgrade(unsafe { Reader(InnerReader::Transaction(Rc::downgrade(unsafe {
transmute(&self.0) transmute(&self.db)
}))) })))
} }
@ -244,12 +285,42 @@ impl Transaction<'_> {
column_family: &ColumnFamily, column_family: &ColumnFamily,
key: &[u8], key: &[u8],
) -> Result<bool, StorageError> { ) -> Result<bool, StorageError> {
Ok((*self.0) Ok((*self.db)
.borrow() .borrow()
.get(column_family) .get(column_family)
.map_or(false, |cf| cf.contains_key(key))) .map_or(false, |cf| cf.contains_key(key)))
} }
fn rollback(&mut self) {
let inserts = self.inserts.read().unwrap();
for ((column_family, key), val) in inserts.iter() {
if val.is_some() {
//restore original val
self.db
.borrow_mut()
.get_mut(&column_family)
.unwrap()
.insert(key.to_vec(), val.as_ref().unwrap().to_vec());
} else {
// we remove it
self.db
.borrow_mut()
.get_mut(&column_family)
.unwrap()
.remove(key.into());
}
}
let removes = self.removes.read().unwrap();
for ((column_family, key), val) in removes.iter() {
//restore original val
self.db
.borrow_mut()
.get_mut(&column_family)
.unwrap()
.insert(key.to_vec(), val.to_vec());
}
}
#[allow(clippy::unnecessary_wraps, clippy::unwrap_in_result)] #[allow(clippy::unnecessary_wraps, clippy::unwrap_in_result)]
pub fn insert( pub fn insert(
&mut self, &mut self,
@ -257,11 +328,22 @@ impl Transaction<'_> {
key: &[u8], key: &[u8],
value: &[u8], value: &[u8],
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
self.0 let mut previous_val = self
.db
.borrow_mut() .borrow_mut()
.get_mut(column_family) .get_mut(column_family)
.unwrap() .unwrap()
.insert(key.into(), value.into()); .insert(key.into(), value.into());
let key = (column_family.clone(), key.to_vec());
let previous_val2 = self.removes.write().unwrap().remove(&key);
if previous_val.is_none() && previous_val2.is_some() {
previous_val = previous_val2;
}
let mut inserts = self.inserts.write().unwrap();
if !inserts.contains_key(&key) {
inserts.insert(key, previous_val);
}
Ok(()) Ok(())
} }
@ -275,11 +357,27 @@ impl Transaction<'_> {
#[allow(clippy::unnecessary_wraps, clippy::unwrap_in_result)] #[allow(clippy::unnecessary_wraps, clippy::unwrap_in_result)]
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<(), StorageError> { pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<(), StorageError> {
self.0 let mut val = self
.db
.borrow_mut() .borrow_mut()
.get_mut(column_family) .get_mut(column_family)
.unwrap() .unwrap()
.remove(key); .remove(key);
let val2 = self
.inserts
.write()
.unwrap()
.remove(&(column_family.clone(), key.to_vec()));
if val2.is_some() {
// we prefer the value in inserts as it may contain the original value after several inserts on the same key.
val = val2.unwrap();
}
if let Some(val) = val {
self.removes
.write()
.unwrap()
.insert((column_family.clone(), key.to_vec()), val.to_vec());
}
Ok(()) Ok(())
} }
} }

@ -8,6 +8,7 @@
clippy::unwrap_in_result clippy::unwrap_in_result
)] )]
use super::super::numeric_encoder::StrHash;
use crate::oxigraph::storage::error::{CorruptionError, StorageError}; use crate::oxigraph::storage::error::{CorruptionError, StorageError};
use libc::{c_char, c_void}; use libc::{c_char, c_void};
use ng_rocksdb::ffi::*; use ng_rocksdb::ffi::*;
@ -15,7 +16,7 @@ use rand::random;
use std::borrow::Borrow; use std::borrow::Borrow;
#[cfg(unix)] #[cfg(unix)]
use std::cmp::min; use std::cmp::min;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::env::temp_dir; use std::env::temp_dir;
use std::error::Error; use std::error::Error;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
@ -24,7 +25,7 @@ use std::marker::PhantomData;
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
use std::sync::Arc; use std::sync::{Arc, RwLock};
use std::thread::{available_parallelism, yield_now}; use std::thread::{available_parallelism, yield_now};
use std::{fmt, io, ptr, slice}; use std::{fmt, io, ptr, slice};
@ -70,6 +71,17 @@ enum DbKind {
ReadWrite(Arc<RwDbHandler>), ReadWrite(Arc<RwDbHandler>),
} }
impl Db {
pub(crate) fn past_commits_cache(
&self,
) -> Arc<RwLock<HashMap<StrHash, Arc<HashSet<StrHash>>>>> {
match &self.inner {
DbKind::ReadWrite(rw) => Arc::clone(&rw.past_commits_cache),
_ => panic!("rw not implemented for read only DbKind"),
}
}
}
struct RwDbHandler { struct RwDbHandler {
db: *mut rocksdb_transactiondb_t, db: *mut rocksdb_transactiondb_t,
env: UnsafeEnv, env: UnsafeEnv,
@ -88,6 +100,7 @@ struct RwDbHandler {
cf_options: Vec<*mut rocksdb_options_t>, cf_options: Vec<*mut rocksdb_options_t>,
in_memory: bool, in_memory: bool,
path: PathBuf, path: PathBuf,
past_commits_cache: Arc<RwLock<HashMap<StrHash, Arc<HashSet<StrHash>>>>>,
} }
unsafe impl Send for RwDbHandler {} unsafe impl Send for RwDbHandler {}
@ -317,6 +330,7 @@ impl Db {
cf_options, cf_options,
in_memory, in_memory,
path, path,
past_commits_cache: Arc::new(RwLock::new(HashMap::new())),
})), })),
}) })
} }
@ -609,6 +623,88 @@ impl Db {
pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>, f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> {
if let DbKind::ReadWrite(db) = &self.inner {
loop {
let transaction = unsafe {
let transaction = rocksdb_transaction_begin(
db.db,
db.write_options,
db.transaction_options,
ptr::null_mut(),
);
assert!(
!transaction.is_null(),
"rocksdb_transaction_begin returned null"
);
transaction
};
let (read_options, snapshot) = unsafe {
let options = rocksdb_readoptions_create_copy(db.read_options);
let snapshot = rocksdb_transaction_get_snapshot(transaction);
rocksdb_readoptions_set_snapshot(options, snapshot);
(options, snapshot)
};
let result = f(Transaction {
inner: Rc::new(transaction),
read_options,
_lifetime: PhantomData,
});
match result {
Ok(result) => {
unsafe {
let r =
ffi_result!(rocksdb_transaction_rollback_with_status(transaction));
rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options);
rocksdb_free(snapshot as *mut c_void);
r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails
}
return Ok(result);
}
Err(e) => {
unsafe {
let r =
ffi_result!(rocksdb_transaction_rollback_with_status(transaction));
rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options);
rocksdb_free(snapshot as *mut c_void);
r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails
}
// We look for the root error
let mut error: &(dyn Error + 'static) = &e;
while let Some(e) = error.source() {
error = e;
}
let is_conflict_error =
error.downcast_ref::<ErrorStatus>().map_or(false, |e| {
e.0.code == rocksdb_status_code_t_rocksdb_status_code_busy
|| e.0.code
== rocksdb_status_code_t_rocksdb_status_code_timed_out
|| e.0.code
== rocksdb_status_code_t_rocksdb_status_code_try_again
});
if is_conflict_error {
// We give a chance to the OS to do something else before retrying in order to help avoiding another conflict
yield_now();
} else {
// We raise the error
return Err(e);
}
}
}
}
} else {
Err(
StorageError::Other("Transaction are only possible on read-write instances".into())
.into(),
)
}
}
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
if let DbKind::ReadWrite(db) = &self.inner { if let DbKind::ReadWrite(db) = &self.inner {
loop { loop {
@ -1292,6 +1388,18 @@ impl Iter {
None None
} }
} }
pub fn value(&self) -> Option<&[u8]> {
if self.is_valid() {
unsafe {
let mut len = 0;
let val = rocksdb_iter_value(self.inner, &mut len);
Some(slice::from_raw_parts(val.cast(), len))
}
} else {
None
}
}
} }
pub struct SstFileWriter { pub struct SstFileWriter {

File diff suppressed because it is too large Load Diff

@ -33,13 +33,14 @@ use super::sparql::{
evaluate_query, evaluate_update, EvaluationError, Query, QueryExplanation, QueryOptions, evaluate_query, evaluate_update, EvaluationError, Query, QueryExplanation, QueryOptions,
QueryResults, Update, UpdateOptions, QueryResults, Update, UpdateOptions,
}; };
use super::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; use super::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm, StrHash};
#[cfg(all(not(target_family = "wasm"), not(docsrs)))] #[cfg(all(not(target_family = "wasm"), not(docsrs)))]
use super::storage::StorageBulkLoader; use super::storage::StorageBulkLoader;
use super::storage::{ use super::storage::{
ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter, ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter,
}; };
pub use super::storage::{CorruptionError, LoaderError, SerializerError, StorageError}; pub use super::storage::{CorruptionError, LoaderError, SerializerError, StorageError};
use std::collections::HashSet;
use std::error::Error; use std::error::Error;
use std::io::{Read, Write}; use std::io::{Read, Write};
#[cfg(all(not(target_family = "wasm"), not(docsrs)))] #[cfg(all(not(target_family = "wasm"), not(docsrs)))]
@ -408,11 +409,12 @@ impl Store {
/// })?; /// })?;
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>( fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self, &'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>, f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> { ) -> Result<T, E> {
self.storage.transaction(|writer| f(Transaction { writer })) self.storage
.ng_transaction(|writer| f(Transaction { writer }))
} }
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/). /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/).
@ -433,11 +435,11 @@ impl Store {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?); /// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn update( pub fn ng_update(
&self, &self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>, update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
) -> Result<(), EvaluationError> { ) -> Result<(HashSet<Quad>, HashSet<Quad>), EvaluationError> {
self.update_opt(update, UpdateOptions::default()) self.ng_update_opt(update, UpdateOptions::default())
} }
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/) with some options. /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/) with some options.
@ -457,15 +459,44 @@ impl Store {
/// )?; /// )?;
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn update_opt( pub fn ng_update_opt(
&self, &self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>, update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
options: impl Into<UpdateOptions>, options: impl Into<UpdateOptions>,
) -> Result<(), EvaluationError> { ) -> Result<(HashSet<Quad>, HashSet<Quad>), EvaluationError> {
let update = update.try_into().map_err(Into::into)?; let update = update.try_into().map_err(Into::into)?;
let options = options.into(); let options = options.into();
self.storage.transaction(|mut t| {
evaluate_update(&mut t, &update, &options)?;
Ok(t.get_update())
})
}
/// INTERNAL FOR NG
// pub fn ng_update_opt(
// &self,
// update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
// options: impl Into<UpdateOptions>,
// ) -> Result<(), EvaluationError> {
// let update = update.try_into().map_err(Into::into)?;
// let options = options.into();
// self.storage
// .ng_transaction(|mut t| evaluate_update(&mut t, &update, &options))
// }
// pub fn ng_update(
// &self,
// update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
// ) -> Result<(), EvaluationError> {
// self.ng_update_opt(update, UpdateOptions::default())
// }
#[doc(hidden)]
pub fn ng_transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self,
f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> {
self.storage self.storage
.transaction(|mut t| evaluate_update(&mut t, &update, &options)) .ng_transaction(|writer| f(Transaction { writer }))
} }
/// Loads a RDF file under into the store. /// Loads a RDF file under into the store.
@ -506,6 +537,7 @@ impl Store {
parser: impl Into<RdfParser>, parser: impl Into<RdfParser>,
read: impl Read, read: impl Read,
) -> Result<(), LoaderError> { ) -> Result<(), LoaderError> {
unimplemented!();
let quads = parser let quads = parser
.into() .into()
.rename_blank_nodes() .rename_blank_nodes()
@ -627,23 +659,23 @@ impl Store {
/// assert!(store.contains(quad)?); /// assert!(store.contains(quad)?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> { // pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = quad.into(); // let quad = quad.into();
self.transaction(|mut t| t.insert(quad)) // self.transaction(|mut t| t.insert(quad))
} // }
/// Adds atomically a set of quads to this store. // /// Adds atomically a set of quads to this store.
/// // ///
/// <div class="warning"> // /// <div class="warning">
/// // ///
/// This operation uses a memory heavy transaction internally, use the [`bulk_loader`](Store::bulk_loader) if you plan to add ten of millions of triples.</div> // /// This operation uses a memory heavy transaction internally, use the [`bulk_loader`](Store::bulk_loader) if you plan to add ten of millions of triples.</div>
pub fn extend( // pub fn extend(
&self, // &self,
quads: impl IntoIterator<Item = impl Into<Quad>>, // quads: impl IntoIterator<Item = impl Into<Quad>>,
) -> Result<(), StorageError> { // ) -> Result<(), StorageError> {
let quads = quads.into_iter().map(Into::into).collect::<Vec<_>>(); // let quads = quads.into_iter().map(Into::into).collect::<Vec<_>>();
self.transaction(move |mut t| t.extend(&quads)) // self.transaction(move |mut t| t.extend(&quads))
} // }
/// Removes a quad from this store. /// Removes a quad from this store.
/// ///
@ -666,6 +698,7 @@ impl Store {
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> { pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
unimplemented!();
let quad = quad.into(); let quad = quad.into();
self.transaction(move |mut t| t.remove(quad)) self.transaction(move |mut t| t.remove(quad))
} }
@ -805,13 +838,13 @@ impl Store {
/// ); /// );
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn named_graphs(&self) -> GraphNameIter { // pub fn named_graphs(&self) -> GraphNameIter {
let reader = self.storage.snapshot(); // let reader = self.storage.snapshot();
GraphNameIter { // GraphNameIter {
iter: reader.named_graphs(), // iter: reader.named_graphs(),
reader, // reader,
} // }
} // }
/// Checks if the store contains a given graph /// Checks if the store contains a given graph
/// ///
@ -826,13 +859,13 @@ impl Store {
/// assert!(store.contains_named_graph(&ex)?); /// assert!(store.contains_named_graph(&ex)?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn contains_named_graph<'a>( // pub fn contains_named_graph<'a>(
&self, // &self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>, // graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, StorageError> { // ) -> Result<bool, StorageError> {
let graph_name = EncodedTerm::from(graph_name.into()); // let graph_name = EncodedTerm::from(graph_name.into());
self.storage.snapshot().contains_named_graph(&graph_name) // self.storage.snapshot().contains_named_graph(&graph_name)
} // }
/// Inserts a graph into this store. /// Inserts a graph into this store.
/// ///
@ -853,13 +886,13 @@ impl Store {
/// ); /// );
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn insert_named_graph<'a>( // pub fn insert_named_graph<'a>(
&self, // &self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>, // graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, StorageError> { // ) -> Result<bool, StorageError> {
let graph_name = graph_name.into(); // let graph_name = graph_name.into();
self.transaction(|mut t| t.insert_named_graph(graph_name)) // self.transaction(|mut t| t.insert_named_graph(graph_name))
} // }
/// Clears a graph from this store. /// Clears a graph from this store.
/// ///
@ -1205,25 +1238,25 @@ impl<'a> Transaction<'a> {
/// })?; /// })?;
/// # Result::<_, EvaluationError>::Ok(()) /// # Result::<_, EvaluationError>::Ok(())
/// ``` /// ```
pub fn update( // pub fn update(
&mut self, // &mut self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>, // update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
) -> Result<(), EvaluationError> { // ) -> Result<(), EvaluationError> {
self.update_opt(update, UpdateOptions::default()) // self.update_opt(update, UpdateOptions::default())
} // }
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/) with some options. // /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/) with some options.
pub fn update_opt( // pub fn update_opt(
&mut self, // &mut self,
update: impl TryInto<Update, Error = impl Into<EvaluationError>>, // update: impl TryInto<Update, Error = impl Into<EvaluationError>>,
options: impl Into<UpdateOptions>, // options: impl Into<UpdateOptions>,
) -> Result<(), EvaluationError> { // ) -> Result<(), EvaluationError> {
evaluate_update( // evaluate_update(
&mut self.writer, // &mut self.writer,
&update.try_into().map_err(Into::into)?, // &update.try_into().map_err(Into::into)?,
&options.into(), // &options.into(),
) // )
} // }
/// Loads a RDF file into the store. /// Loads a RDF file into the store.
/// ///
@ -1261,16 +1294,16 @@ impl<'a> Transaction<'a> {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, NamedNodeRef::new("http://example.com/g2")?))?); /// assert!(store.contains(QuadRef::new(ex, ex, ex, NamedNodeRef::new("http://example.com/g2")?))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn load_from_read( // pub fn load_from_read(
&mut self, // &mut self,
parser: impl Into<RdfParser>, // parser: impl Into<RdfParser>,
read: impl Read, // read: impl Read,
) -> Result<(), LoaderError> { // ) -> Result<(), LoaderError> {
for quad in parser.into().rename_blank_nodes().parse_read(read) { // for quad in parser.into().rename_blank_nodes().parse_read(read) {
self.insert(quad?.as_ref())?; // self.insert(quad?.as_ref())?;
} // }
Ok(()) // Ok(())
} // }
/// Loads a graph file (i.e. triples) into the store. /// Loads a graph file (i.e. triples) into the store.
/// ///
@ -1298,27 +1331,27 @@ impl<'a> Transaction<'a> {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?); /// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?);
/// # Result::<_,oxigraph::store::LoaderError>::Ok(()) /// # Result::<_,oxigraph::store::LoaderError>::Ok(())
/// ``` /// ```
#[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")] // #[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")]
pub fn load_graph( // pub fn load_graph(
&mut self, // &mut self,
read: impl Read, // read: impl Read,
format: impl Into<RdfFormat>, // format: impl Into<RdfFormat>,
to_graph_name: impl Into<GraphName>, // to_graph_name: impl Into<GraphName>,
base_iri: Option<&str>, // base_iri: Option<&str>,
) -> Result<(), LoaderError> { // ) -> Result<(), LoaderError> {
let mut parser = RdfParser::from_format(format.into()) // let mut parser = RdfParser::from_format(format.into())
.without_named_graphs() // .without_named_graphs()
.with_default_graph(to_graph_name); // .with_default_graph(to_graph_name);
if let Some(base_iri) = base_iri { // if let Some(base_iri) = base_iri {
parser = parser // parser = parser
.with_base_iri(base_iri) // .with_base_iri(base_iri)
.map_err(|e| LoaderError::InvalidBaseIri { // .map_err(|e| LoaderError::InvalidBaseIri {
iri: base_iri.into(), // iri: base_iri.into(),
error: e, // error: e,
})?; // })?;
} // }
self.load_from_read(parser, read) // self.load_from_read(parser, read)
} // }
/// Loads a dataset file (i.e. quads) into the store. /// Loads a dataset file (i.e. quads) into the store.
/// ///
@ -1342,23 +1375,91 @@ impl<'a> Transaction<'a> {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_,oxigraph::store::LoaderError>::Ok(()) /// # Result::<_,oxigraph::store::LoaderError>::Ok(())
/// ``` /// ```
#[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")] // #[deprecated(note = "use Transaction.load_from_read instead", since = "0.4.0")]
pub fn load_dataset( // pub fn load_dataset(
// &mut self,
// read: impl Read,
// format: impl Into<RdfFormat>,
// base_iri: Option<&str>,
// ) -> Result<(), LoaderError> {
// let mut parser = RdfParser::from_format(format.into());
// if let Some(base_iri) = base_iri {
// parser = parser
// .with_base_iri(base_iri)
// .map_err(|e| LoaderError::InvalidBaseIri {
// iri: base_iri.into(),
// error: e,
// })?;
// }
// self.load_from_read(parser, read)
// }
pub fn ng_get_heads(
&self,
topic: &StrHash,
overlay: &StrHash,
) -> Result<HashSet<StrHash>, StorageError> {
self.writer.reader().ng_get_heads(topic, overlay)
}
pub fn ng_get_reader(&self) -> StorageReader {
self.writer.reader()
}
pub fn update_heads(
&mut self, &mut self,
read: impl Read, topic: &StrHash,
format: impl Into<RdfFormat>, overlay: &StrHash,
base_iri: Option<&str>, commit: &StrHash,
) -> Result<(), LoaderError> { direct_causal_past: &HashSet<StrHash>,
let mut parser = RdfParser::from_format(format.into()); ) -> Result<(), StorageError> {
if let Some(base_iri) = base_iri { self.writer
parser = parser .ng_update_heads(topic, overlay, commit, direct_causal_past)
.with_base_iri(base_iri)
.map_err(|e| LoaderError::InvalidBaseIri {
iri: base_iri.into(),
error: e,
})?;
} }
self.load_from_read(parser, read)
pub fn update_past(
&mut self,
commit: &StrHash,
direct_causal_past: &HashSet<StrHash>,
skip_has_no_graph: bool,
) -> Result<(), StorageError> {
self.writer
.ng_update_past(commit, direct_causal_past, skip_has_no_graph)
}
pub fn update_branch_and_token(
&mut self,
overlay_encoded: &StrHash,
branch_encoded: &StrHash,
topic_encoded: &StrHash,
token_encoded: &StrHash,
) -> Result<(), StorageError> {
self.writer.update_branch_and_token(
overlay_encoded,
branch_encoded,
topic_encoded,
token_encoded,
)
}
pub fn doc_in_store(
&mut self,
graph_name: NamedNodeRef<'_>,
overlay: &StrHash,
remove: bool,
) -> Result<(), StorageError> {
self.writer.doc_in_store(graph_name, overlay, remove)
}
pub fn named_commit_or_branch(
&mut self,
ov_graph_name: NamedNodeRef<'_>,
name: &String,
// if None: remove
value: &Option<Vec<u8>>,
) -> Result<(), StorageError> {
self.writer
.named_commit_or_branch(ov_graph_name, name, value)
} }
/// Adds a quad to this store. /// Adds a quad to this store.
@ -1378,21 +1479,37 @@ impl<'a> Transaction<'a> {
/// assert!(store.contains(quad)?); /// assert!(store.contains(quad)?);
/// # Result::<_,oxigraph::store::StorageError>::Ok(()) /// # Result::<_,oxigraph::store::StorageError>::Ok(())
/// ``` /// ```
pub fn insert<'b>(&mut self, quad: impl Into<QuadRef<'b>>) -> Result<bool, StorageError> { pub fn insert<'b>(
self.writer.insert(quad.into()) &mut self,
quad: impl Into<QuadRef<'b>>,
value: u8,
) -> Result<(), StorageError> {
self.writer.ng_insert(quad.into(), value)
} }
/// Adds a set of quads to this store. pub fn insert_encoded(
pub fn extend<'b>(
&mut self, &mut self,
quads: impl IntoIterator<Item = impl Into<QuadRef<'b>>>, encoded: &EncodedQuad,
) -> Result<(), StorageError> { value: u8,
for quad in quads { ) -> Result<bool, StorageError> {
self.writer.insert(quad.into())?; self.writer.ng_insert_encoded(encoded, value)
} }
Ok(())
pub fn ng_remove(&mut self, quad: &EncodedQuad, commit: &StrHash) -> Result<(), StorageError> {
self.writer.ng_remove(quad, commit)
} }
// /// Adds a set of quads to this store.
// pub fn extend<'b>(
// &mut self,
// quads: impl IntoIterator<Item = impl Into<QuadRef<'b>>>,
// ) -> Result<(), StorageError> {
// for quad in quads {
// self.writer.insert(quad.into())?;
// }
// Ok(())
// }
/// Removes a quad from this store. /// Removes a quad from this store.
/// ///
/// Returns `true` if the quad was in the store and has been removed. /// Returns `true` if the quad was in the store and has been removed.
@ -1417,23 +1534,23 @@ impl<'a> Transaction<'a> {
} }
/// Returns all the store named graphs. /// Returns all the store named graphs.
pub fn named_graphs(&self) -> GraphNameIter { // pub fn named_graphs(&self) -> GraphNameIter {
let reader = self.writer.reader(); // let reader = self.writer.reader();
GraphNameIter { // GraphNameIter {
iter: reader.named_graphs(), // iter: reader.named_graphs(),
reader, // reader,
} // }
} // }
/// Checks if the store contains a given graph. /// Checks if the store contains a given graph.
pub fn contains_named_graph<'b>( // pub fn contains_named_graph<'b>(
&self, // &self,
graph_name: impl Into<NamedOrBlankNodeRef<'b>>, // graph_name: impl Into<NamedOrBlankNodeRef<'b>>,
) -> Result<bool, StorageError> { // ) -> Result<bool, StorageError> {
self.writer // self.writer
.reader() // .reader()
.contains_named_graph(&EncodedTerm::from(graph_name.into())) // .contains_named_graph(&EncodedTerm::from(graph_name.into()))
} // }
/// Inserts a graph into this store. /// Inserts a graph into this store.
/// ///
@ -1453,12 +1570,12 @@ impl<'a> Transaction<'a> {
/// ); /// );
/// # Result::<_,oxigraph::store::StorageError>::Ok(()) /// # Result::<_,oxigraph::store::StorageError>::Ok(())
/// ``` /// ```
pub fn insert_named_graph<'b>( // pub fn insert_named_graph<'b>(
&mut self, // &mut self,
graph_name: impl Into<NamedOrBlankNodeRef<'b>>, // graph_name: impl Into<NamedOrBlankNodeRef<'b>>,
) -> Result<bool, StorageError> { // ) -> Result<bool, StorageError> {
self.writer.insert_named_graph(graph_name.into()) // self.writer.insert_named_graph(graph_name.into())
} // }
/// Clears a graph from this store. /// Clears a graph from this store.
/// ///
@ -1981,17 +2098,17 @@ mod tests {
]; ];
let store = Store::new()?; let store = Store::new()?;
for t in &default_quads { // for t in &default_quads {
assert!(store.insert(t)?); // assert!(store.insert(t)?);
} // }
assert!(!store.insert(&default_quad)?); // assert!(!store.insert(&default_quad)?);
assert!(store.remove(&default_quad)?); // assert!(store.remove(&default_quad)?);
assert!(!store.remove(&default_quad)?); // assert!(!store.remove(&default_quad)?);
assert!(store.insert(&named_quad)?); // assert!(store.insert(&named_quad)?);
assert!(!store.insert(&named_quad)?); // assert!(!store.insert(&named_quad)?);
assert!(store.insert(&default_quad)?); // assert!(store.insert(&default_quad)?);
assert!(!store.insert(&default_quad)?); // assert!(!store.insert(&default_quad)?);
assert_eq!(store.len()?, 4); assert_eq!(store.len()?, 4);
assert_eq!(store.iter().collect::<Result<Vec<_>, _>>()?, all_quads); assert_eq!(store.iter().collect::<Result<Vec<_>, _>>()?, all_quads);

@ -51,6 +51,10 @@ impl NamedNode {
self.iri self.iri
} }
pub fn as_string(&self) -> &String {
&self.iri
}
#[inline] #[inline]
pub fn as_ref(&self) -> NamedNodeRef<'_> { pub fn as_ref(&self) -> NamedNodeRef<'_> {
NamedNodeRef::new_unchecked(&self.iri) NamedNodeRef::new_unchecked(&self.iri)

@ -593,6 +593,20 @@ impl Commit {
res res
} }
pub fn direct_causal_past_ids(&self) -> HashSet<ObjectId> {
let mut res: HashSet<ObjectId> = HashSet::with_capacity(1);
match self {
Commit::V0(c) => match &c.header {
Some(CommitHeader::V0(header_v0)) => {
res.extend(header_v0.acks.iter());
res.extend(header_v0.nacks.iter());
}
_ => {}
},
};
res
}
// /// Get seq // /// Get seq
// pub fn seq(&self) -> u64 { // pub fn seq(&self) -> u64 {
// match self { // match self {

@ -328,6 +328,24 @@ pub enum VerifierError {
DoubleBranchSubscription, DoubleBranchSubscription,
InvalidCommit, InvalidCommit,
LocallyConnected, LocallyConnected,
InvalidTriple,
InvalidNamedGraph,
OxigraphError(String),
CannotRemoveTriplesWhenNewBranch,
}
impl Error for VerifierError {}
impl core::fmt::Display for VerifierError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<serde_bare::error::Error> for VerifierError {
fn from(_e: serde_bare::error::Error) -> Self {
VerifierError::SerializationError
}
} }
impl From<NgError> for VerifierError { impl From<NgError> for VerifierError {

@ -442,6 +442,11 @@ impl BlockRef {
pub fn nuri(&self) -> String { pub fn nuri(&self) -> String {
format!(":j:{}:k:{}", self.id, self.key) format!(":j:{}:k:{}", self.id, self.key)
} }
pub fn tokenize(&self) -> Digest {
let ser = serde_bare::to_vec(self).unwrap();
Digest::Blake3Digest32(*blake3::hash(&ser).as_bytes())
}
} }
impl From<BlockRef> for (BlockId, BlockKey) { impl From<BlockRef> for (BlockId, BlockKey) {
@ -1360,6 +1365,15 @@ pub enum BranchType {
//Unknown, // only used temporarily when loading a branch info from commits (Branch commit, then AddBranch commit) //Unknown, // only used temporarily when loading a branch info from commits (Branch commit, then AddBranch commit)
} }
impl BranchType {
pub fn is_main(&self) -> bool {
match self {
Self::Main => true,
_ => false,
}
}
}
impl fmt::Display for BranchType { impl fmt::Display for BranchType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!( write!(

@ -21,8 +21,8 @@ let config = {
ng.init_headless(config).then( async() => { ng.init_headless(config).then( async() => {
let session_id; let session_id;
try { try {
let user_id = await ng.admin_create_user(config); //let user_id = await ng.admin_create_user(config);
console.log("user created: ",user_id); //console.log("user created: ",user_id);
let other_user_id = "AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE"; let other_user_id = "AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE";
@ -30,17 +30,28 @@ ng.init_headless(config).then( async() => {
session_id = session.session_id; session_id = session.session_id;
console.log(session); console.log(session);
let sparql_result = await ng.sparql_query(session.session_id, "SELECT * WHERE { ?s ?p ?o }"); //await ng.sparql_update(session.session_id, "INSERT DATA { <did:ng:t:AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE> <did:ng:i> <did:ng:j> }");
//await ng.sparql_update(session.session_id, "INSERT { ?s <did:ng:i> <did:ng:k> } WHERE { ?s <did:ng:i> <did:ng:j> } ");
//await ng.sparql_update(session.session_id, "INSERT DATA { <did:ng:z> <did:ng:j> <did:ng:t:BJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE>. <did:ng:t:BJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE> <did:ng:m> <did:ng:n> }");
let sparql_result = await ng.sparql_query(session.session_id, "SELECT ?a WHERE { ?a <did:ng:j> _:abc. _:abc <did:ng:m> <did:ng:n> }");
console.log(sparql_result); console.log(sparql_result);
for (const q of sparql_result.results.bindings) {
console.log(q);
}
sparql_result = await ng.sparql_query(session.session_id, "SELECT ?s ?a WHERE { ?s <did:ng:i> ?a }");
console.log(sparql_result);
for (const q of sparql_result.results.bindings) {
console.log(q);
}
let quads = await ng.sparql_query(session.session_id, "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); let quads = await ng.sparql_query(session.session_id, "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }");
for (const q of quads) { for (const q of quads) {
console.log(q.subject.toString(), q.predicate.toString(), q.object.toString(), q.graph.toString()) console.log(q.subject.toString(), q.predicate.toString(), q.object.toString(), q.graph.toString())
} }
let result = await ng.sparql_update(session.session_id, "INSERT DATA { <http://example.com> <http://example.com> <http://example.com> }");
console.log(result);
let file_nuri = await ng.file_put_to_private_store(session.session_id,"LICENSE-MIT","text/plain"); let file_nuri = await ng.file_put_to_private_store(session.session_id,"LICENSE-MIT","text/plain");
console.log(file_nuri); console.log(file_nuri);

@ -639,7 +639,7 @@ pub async fn file_get_from_private_store(
let session_id: u64 = serde_wasm_bindgen::from_value::<u64>(session_id) let session_id: u64 = serde_wasm_bindgen::from_value::<u64>(session_id)
.map_err(|_| "Deserialization error of session_id".to_string())?; .map_err(|_| "Deserialization error of session_id".to_string())?;
let nuri = NuriV0::new_from(nuri).map_err(|_| "Deserialization error of Nuri".to_string())?; let nuri = NuriV0::new_from(&nuri).map_err(|_| "Deserialization error of Nuri".to_string())?;
let mut request = AppRequest::new(AppRequestCommandV0::FileGet, nuri.clone(), None); let mut request = AppRequest::new(AppRequestCommandV0::FileGet, nuri.clone(), None);
request.set_session_id(session_id); request.set_session_id(session_id);

@ -9,6 +9,8 @@
//! Verifiers for each Commit type //! Verifiers for each Commit type
pub mod transaction;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -84,10 +86,10 @@ impl CommitVerifier for RootBranch {
//TODO: deal with quorum_type (verify signature) //TODO: deal with quorum_type (verify signature)
let user_priv = verifier.user_privkey(); let user_priv = verifier.user_privkey();
let user_id = user_priv.to_pub(); let user_id = verifier.user_id();
let repo_write_cap_secret = if store.is_private() { let repo_write_cap_secret = if store.is_private() {
Some(SymKey::nil()) Some(SymKey::nil())
} else if let Some(pos) = root_branch.owners.iter().position(|&o| o == user_id) { } else if let Some(pos) = root_branch.owners.iter().position(|o| o == user_id) {
let cryptobox = &root_branch.owners_write_cap[pos]; let cryptobox = &root_branch.owners_write_cap[pos];
Some(RootBranch::decrypt_write_cap(user_priv, cryptobox)?) Some(RootBranch::decrypt_write_cap(user_priv, cryptobox)?)
} else { } else {

@ -0,0 +1,550 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Verifiers for AsyncTransaction Commit
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use ng_oxigraph::oxigraph::storage_ng::numeric_encoder::{EncodedQuad, EncodedTerm};
use ng_oxigraph::oxigraph::storage_ng::*;
use serde::{Deserialize, Serialize};
use ng_net::app_protocol::{NuriV0, TargetBranchV0};
use ng_oxigraph::oxrdf::{GraphName, GraphNameRef, NamedNode, Quad, Triple, TripleRef};
use ng_repo::errors::VerifierError;
use ng_repo::store::Store;
use ng_repo::types::*;
use crate::verifier::Verifier;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GraphTransaction {
pub inserts: Vec<Triple>,
pub removes: Vec<Triple>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum DiscreteTransaction {
/// A yrs::Update
#[serde(with = "serde_bytes")]
YMap(Vec<u8>),
#[serde(with = "serde_bytes")]
YXml(Vec<u8>),
#[serde(with = "serde_bytes")]
YText(Vec<u8>),
/// An automerge::Patch
#[serde(with = "serde_bytes")]
Automerge(Vec<u8>),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TransactionBody {
graph: Option<GraphTransaction>,
discrete: Option<DiscreteTransaction>,
}
struct BranchUpdateInfo {
branch_id: BranchId,
branch_is_main: bool,
repo_id: RepoId,
topic_id: TopicId,
token: Digest,
overlay_id: OverlayId,
previous_heads: HashSet<ObjectId>,
commit_id: ObjectId,
transaction: GraphTransaction,
}
impl Verifier {
pub(crate) fn add_doc(
&self,
repo_id: &RepoId,
overlay_id: &OverlayId,
) -> Result<(), VerifierError> {
self.doc_in_store(repo_id, overlay_id, false)
}
pub(crate) fn remove_doc(
&self,
repo_id: &RepoId,
overlay_id: &OverlayId,
) -> Result<(), VerifierError> {
self.doc_in_store(repo_id, overlay_id, true)
}
fn doc_in_store(
&self,
repo_id: &RepoId,
overlay_id: &OverlayId,
remove: bool,
) -> Result<(), VerifierError> {
let ov_graphname = NamedNode::new_unchecked(NuriV0::repo_graph_name(repo_id, overlay_id));
let overlay_encoded = numeric_encoder::StrHash::new(&NuriV0::overlay_id(overlay_id));
self.graph_dataset
.as_ref()
.unwrap()
.ng_transaction(
move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> {
transaction.doc_in_store(ov_graphname.as_ref(), &overlay_encoded, remove)
},
)
.map_err(|e| VerifierError::OxigraphError(e.to_string()))
}
pub(crate) fn add_named_commit(
&self,
repo_id: &RepoId,
overlay_id: &OverlayId,
name: String,
commit_id: ObjectId,
) -> Result<(), VerifierError> {
self.named_commit_or_branch(
repo_id,
overlay_id,
name,
false,
Some(format!("{commit_id}")),
)
}
pub(crate) fn add_named_branch(
&self,
repo_id: &RepoId,
overlay_id: &OverlayId,
name: String,
branch_id: BranchId,
) -> Result<(), VerifierError> {
self.named_commit_or_branch(
repo_id,
overlay_id,
name,
true,
Some(format!("{branch_id}")),
)
}
pub(crate) fn remove_named(
&self,
repo_id: &RepoId,
overlay_id: &OverlayId,
name: String,
) -> Result<(), VerifierError> {
self.named_commit_or_branch(repo_id, overlay_id, name, false, None)
}
fn named_commit_or_branch(
&self,
repo_id: &RepoId,
overlay_id: &OverlayId,
name: String,
is_branch: bool,
base64_id: Option<String>,
) -> Result<(), VerifierError> {
let ov_graphname = NamedNode::new_unchecked(NuriV0::repo_graph_name(repo_id, overlay_id));
let value = if base64_id.is_none() {
None
} else {
if is_branch {
let overlay_encoded =
numeric_encoder::StrHash::new(&NuriV0::overlay_id(overlay_id));
let branch_encoded = numeric_encoder::StrHash::new(&NuriV0::branch_id_from_base64(
base64_id.as_ref().unwrap(),
));
let mut buffer = Vec::with_capacity(33);
buffer.push(BRANCH_PREFIX);
buffer.extend_from_slice(&branch_encoded.to_be_bytes());
buffer.extend_from_slice(&overlay_encoded.to_be_bytes());
Some(buffer)
} else {
let commit_name =
NuriV0::commit_graph_name_from_base64(base64_id.as_ref().unwrap(), overlay_id);
let commit_encoded = numeric_encoder::StrHash::new(&commit_name);
let mut buffer = Vec::with_capacity(17);
buffer.push(COMMIT_PREFIX);
buffer.extend_from_slice(&commit_encoded.to_be_bytes());
Some(buffer)
}
};
self.graph_dataset
.as_ref()
.unwrap()
.ng_transaction(
move |mut transaction: ng_oxigraph::oxigraph::store::Transaction<'_>| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> {
transaction.named_commit_or_branch(ov_graphname.as_ref(), &name, &value)
},
)
.map_err(|e| VerifierError::OxigraphError(e.to_string()))
}
pub(crate) async fn verify_async_transaction(
&mut self,
transaction: &Transaction,
commit: &Commit,
branch_id: &BranchId,
repo_id: &RepoId,
store: Arc<Store>,
) -> Result<(), VerifierError> {
let Transaction::V0(v0) = transaction;
let mut body: TransactionBody = serde_bare::from_slice(&v0)?;
let repo = self.get_repo(repo_id, store.get_store_repo())?;
let branch = repo.branch(branch_id)?;
if body.graph.is_some() {
let info = BranchUpdateInfo {
branch_id: *branch_id,
branch_is_main: branch.branch_type.is_main(),
repo_id: *repo_id,
topic_id: branch.topic,
token: branch.read_cap.tokenize(),
overlay_id: store.overlay_id,
previous_heads: commit.direct_causal_past_ids(),
commit_id: commit.id().unwrap(),
transaction: body.graph.take().unwrap(),
};
self.update_graph(&[info])
} else {
Ok(())
}
//TODO: discrete update
}
fn find_branch_and_repo_for_quad(
&self,
quad: &Quad,
branches: &mut HashMap<BranchId, (StoreRepo, RepoId, bool, TopicId, Digest, OverlayId)>,
nuri_branches: &mut HashMap<String, (BranchId, bool)>,
) -> Result<(BranchId, bool), VerifierError> {
match &quad.graph_name {
GraphName::NamedNode(named_node) => {
let graph_name = named_node.as_string();
if let Some(branch_found) = nuri_branches.get(graph_name) {
return Ok(branch_found.clone());
}
let nuri = NuriV0::new_from(graph_name)?;
if !nuri.is_branch_identifier() {
return Err(VerifierError::InvalidNamedGraph);
}
let store = self
.get_store_by_overlay_id(&OverlayId::Outer(*nuri.overlay.unwrap().outer()))?;
let repo = self.get_repo(nuri.target.repo_id(), store.get_store_repo())?;
let (branch_id, is_publisher, is_main, topic_id, token) = match nuri.branch {
None => {
let b = repo.main_branch().ok_or(VerifierError::BranchNotFound)?;
(
b.id,
b.topic_priv_key.is_some(),
true,
b.topic,
b.read_cap.tokenize(),
)
}
Some(TargetBranchV0::BranchId(id)) => {
let b = repo.branch(&id)?;
//TODO: deal with named branch that is also the main branch
(
id,
b.topic_priv_key.is_some(),
false,
b.topic,
b.read_cap.tokenize(),
)
}
_ => unimplemented!(),
};
let _ = branches.entry(branch_id).or_insert((
store.get_store_repo().clone(),
repo.id,
is_main,
topic_id,
token,
store.overlay_id,
));
let _ = nuri_branches
.entry(graph_name.clone())
.or_insert((branch_id, is_publisher));
Ok((branch_id, is_publisher))
}
_ => Err(VerifierError::InvalidNamedGraph),
}
}
async fn prepare_sparql_update(
&mut self,
inserts: Vec<Quad>,
removes: Vec<Quad>,
) -> Result<(), VerifierError> {
// options when not a publisher on the repo:
// - skip
// - TODO: abort (the whole transaction)
// - TODO: inbox (sent to inbox of document for a suggested update)
// for now we just do skip, without giving option to user
let mut inserts_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut removes_map: HashMap<BranchId, HashSet<Triple>> = HashMap::with_capacity(1);
let mut branches: HashMap<BranchId, (StoreRepo, RepoId, bool, TopicId, Digest, OverlayId)> =
HashMap::with_capacity(1);
let mut nuri_branches: HashMap<String, (BranchId, bool)> = HashMap::with_capacity(1);
let mut inserts_len = inserts.len();
let mut removes_len = removes.len();
for insert in inserts {
let (branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&insert, &mut branches, &mut nuri_branches)?;
if !is_publisher {
continue;
}
let set = inserts_map.entry(branch_id).or_insert_with(|| {
let set = HashSet::with_capacity(inserts_len);
inserts_len = 1;
set
});
set.insert(insert.into());
}
for remove in removes {
let (branch_id, is_publisher) =
self.find_branch_and_repo_for_quad(&remove, &mut branches, &mut nuri_branches)?;
if !is_publisher {
continue;
}
let set = removes_map.entry(branch_id).or_insert_with(|| {
let set = HashSet::with_capacity(removes_len);
removes_len = 1;
set
});
set.insert(remove.into());
}
let mut updates = Vec::with_capacity(branches.len());
for (branch_id, (store_repo, repo_id, branch_is_main, topic_id, token, overlay_id)) in
branches
{
let graph_transac = GraphTransaction {
inserts: Vec::from_iter(inserts_map.remove(&branch_id).unwrap_or(HashSet::new())),
removes: Vec::from_iter(removes_map.remove(&branch_id).unwrap_or(HashSet::new())),
};
let mut transac = TransactionBody {
graph: Some(graph_transac),
discrete: None,
};
let transaction_commit_body =
CommitBodyV0::AsyncTransaction(Transaction::V0(serde_bare::to_vec(&transac)?));
let commit = self
.new_transaction_commit(
transaction_commit_body,
&repo_id,
&branch_id,
&store_repo,
vec![], //TODO deps
vec![],
)
.await?;
let graph_update = transac.graph.take().unwrap();
let info = BranchUpdateInfo {
branch_id,
branch_is_main,
repo_id,
topic_id,
token,
overlay_id,
previous_heads: commit.direct_causal_past_ids(),
commit_id: commit.id().unwrap(),
transaction: graph_update,
};
updates.push(info);
}
self.update_graph(&updates)
}
fn update_graph(&mut self, updates: &[BranchUpdateInfo]) -> Result<(), VerifierError> {
self.graph_dataset
.as_ref()
.unwrap()
.ng_transaction(
move |mut transaction| -> Result<(), ng_oxigraph::oxigraph::store::StorageError> {
let reader = transaction.ng_get_reader();
for update in updates {
let commit_name =
NuriV0::commit_graph_name(&update.commit_id, &update.overlay_id);
let commit_encoded = numeric_encoder::StrHash::new(&commit_name);
let cv_graphname = NamedNode::new_unchecked(commit_name);
let cv_graphname_ref = GraphNameRef::NamedNode((&cv_graphname).into());
let ov_main = if update.branch_is_main {
let ov_graphname = NamedNode::new_unchecked(NuriV0::repo_graph_name(
&update.repo_id,
&update.overlay_id,
));
Some(ov_graphname)
} else {
None
};
let value = if update.branch_is_main {
ADDED_IN_MAIN
} else {
ADDED_IN_OTHER
};
for triple in update.transaction.inserts.iter() {
let triple_ref: TripleRef = triple.into();
let quad_ref = triple_ref.in_graph(cv_graphname_ref);
transaction.insert(quad_ref, value)?;
if let Some(ov_graphname) = ov_main.as_ref() {
let ov_graphname_ref = GraphNameRef::NamedNode(ov_graphname.into());
let triple_ref: TripleRef = triple.into();
let quad_ref = triple_ref.in_graph(ov_graphname_ref);
transaction.insert(quad_ref, REPO_IN_MAIN)?;
}
}
let topic_encoded =
numeric_encoder::StrHash::new(&NuriV0::topic_id(&update.topic_id));
let overlay_encoded =
numeric_encoder::StrHash::new(&NuriV0::overlay_id(&update.overlay_id));
let branch_encoded =
numeric_encoder::StrHash::new(&NuriV0::branch_id(&update.branch_id));
let token_encoded =
numeric_encoder::StrHash::new(&NuriV0::token(&update.token));
transaction.update_branch_and_token(
&overlay_encoded,
&branch_encoded,
&topic_encoded,
&token_encoded,
)?;
let direct_causal_past_encoded: HashSet<numeric_encoder::StrHash> =
HashSet::from_iter(update.previous_heads.iter().map(|commit_id| {
numeric_encoder::StrHash::new(&NuriV0::commit_graph_name(
commit_id,
&update.overlay_id,
))
}));
let current_heads =
reader.ng_get_heads(&topic_encoded, &overlay_encoded)?;
transaction.update_heads(
&topic_encoded,
&overlay_encoded,
&commit_encoded,
&direct_causal_past_encoded,
)?;
if !direct_causal_past_encoded.is_empty() {
// adding past
transaction.update_past(
&commit_encoded,
&direct_causal_past_encoded,
false,
)?;
}
if !update.transaction.removes.is_empty() {
if current_heads.is_empty() {
return Err(ng_oxigraph::oxigraph::store::StorageError::Other(
Box::new(VerifierError::CannotRemoveTriplesWhenNewBranch),
));
}
let at_current_heads = current_heads != direct_causal_past_encoded;
// if not, we need to base ourselves on the materialized state of the direct_causal_past of the commit
let value = if update.branch_is_main {
REMOVED_IN_MAIN
} else {
REMOVED_IN_OTHER
};
for remove in update.transaction.removes.iter() {
let encoded_subject = remove.subject.as_ref().into();
let encoded_predicate = remove.predicate.as_ref().into();
let encoded_object = remove.object.as_ref().into();
let observed_adds = reader
.quads_for_subject_predicate_object_heads(
&encoded_subject,
&encoded_predicate,
&encoded_object,
&direct_causal_past_encoded,
at_current_heads,
)?;
for removing in observed_adds {
let graph_encoded = EncodedTerm::NamedNode { iri_id: removing };
let quad_encoded = EncodedQuad::new(
encoded_subject.clone(),
encoded_predicate.clone(),
encoded_object.clone(),
graph_encoded,
);
transaction.insert_encoded(&quad_encoded, value)?;
transaction.ng_remove(&quad_encoded, &commit_encoded)?;
}
if let Some(ov_graphname) = ov_main.as_ref() {
let should_remove_ov_triples = at_current_heads || {
reader
.quads_for_subject_predicate_object_heads(
&encoded_subject,
&encoded_predicate,
&encoded_object,
&current_heads,
true,
)?
.is_empty()
};
if should_remove_ov_triples {
let ov_graphname_ref =
GraphNameRef::NamedNode(ov_graphname.into());
let triple_ref: TripleRef = remove.into();
let quad_ref = triple_ref.in_graph(ov_graphname_ref);
transaction.remove(quad_ref)?;
}
}
}
}
}
Ok(())
},
)
.map_err(|e| VerifierError::OxigraphError(e.to_string()))
}
pub(crate) async fn process_sparql_update(
&mut self,
_nuri: &NuriV0,
query: &String,
) -> Result<(), String> {
let store = self.graph_dataset.as_ref().unwrap();
//TODO: use nuri to set some default dataset in oxigraph
let res = store.ng_update(query);
match res {
Err(e) => Err(e.to_string()),
Ok((inserts, removes)) => {
if inserts.is_empty() && removes.is_empty() {
Ok(())
} else {
self.prepare_sparql_update(Vec::from_iter(inserts), Vec::from_iter(removes))
.await
.map_err(|e| e.to_string())
}
}
}
}
}

@ -229,19 +229,20 @@ impl Verifier {
} }
} }
AppFetchContentV0::WriteQuery => { AppFetchContentV0::WriteQuery => {
if let Some(AppRequestPayload::V0(AppRequestPayloadV0::Query(DocQuery::V0( if !nuri.is_valid_for_sparql_update() {
query, return Err(NgError::InvalidNuri);
)))) = payload }
return if let Some(AppRequestPayload::V0(AppRequestPayloadV0::Query(
DocQuery::V0(query),
))) = payload
{ {
let store = self.graph_dataset.as_ref().unwrap(); Ok(match self.process_sparql_update(&nuri, &query).await {
let res = store.update(&query); Err(e) => AppResponse::error(e),
return Ok(match res {
Err(e) => AppResponse::error(e.to_string()),
Ok(_) => AppResponse::ok(), Ok(_) => AppResponse::ok(),
}); })
} else { } else {
return Err(NgError::InvalidPayload); Err(NgError::InvalidPayload)
} };
} }
_ => unimplemented!(), _ => unimplemented!(),
}, },

@ -79,6 +79,7 @@ use crate::user_storage::UserStorage;
pub struct Verifier { pub struct Verifier {
pub(crate) config: VerifierConfig, pub(crate) config: VerifierConfig,
user_id: UserId,
pub connected_broker: BrokerPeerId, pub connected_broker: BrokerPeerId,
pub(crate) graph_dataset: Option<ng_oxigraph::oxigraph::store::Store>, pub(crate) graph_dataset: Option<ng_oxigraph::oxigraph::store::Store>,
pub(crate) user_storage: Option<Arc<Box<dyn UserStorage>>>, pub(crate) user_storage: Option<Arc<Box<dyn UserStorage>>>,
@ -129,6 +130,10 @@ impl Verifier {
&self.config.user_priv_key &self.config.user_priv_key
} }
pub(crate) fn user_id(&self) -> &UserId {
&self.user_id
}
pub fn private_store_id(&self) -> &RepoId { pub fn private_store_id(&self) -> &RepoId {
self.config.private_store_id.as_ref().unwrap() self.config.private_store_id.as_ref().unwrap()
} }
@ -140,11 +145,11 @@ impl Verifier {
} }
pub async fn close(&self) { pub async fn close(&self) {
log_debug!("VERIFIER CLOSED {}", self.user_privkey().to_pub()); log_debug!("VERIFIER CLOSED {}", self.user_id());
BROKER BROKER
.write() .write()
.await .await
.close_peer_connection_x(None, Some(self.config.user_priv_key.to_pub())) .close_peer_connection_x(None, Some(self.user_id().clone()))
.await; .await;
} }
@ -270,17 +275,20 @@ impl Verifier {
let (peer_priv_key, peer_id) = generate_keypair(); let (peer_priv_key, peer_id) = generate_keypair();
let block_storage = Arc::new(std::sync::RwLock::new(HashMapBlockStorage::new())) let block_storage = Arc::new(std::sync::RwLock::new(HashMapBlockStorage::new()))
as Arc<std::sync::RwLock<dyn BlockStorage + Send + Sync>>; as Arc<std::sync::RwLock<dyn BlockStorage + Send + Sync>>;
let user_priv_key = PrivKey::random_ed();
let user_id = user_priv_key.to_pub();
Verifier { Verifier {
config: VerifierConfig { config: VerifierConfig {
config_type: VerifierConfigType::Memory, config_type: VerifierConfigType::Memory,
user_master_key: [0; 32], user_master_key: [0; 32],
peer_priv_key, peer_priv_key,
user_priv_key: PrivKey::random_ed(), user_priv_key,
private_store_read_cap: None, private_store_read_cap: None,
private_store_id: None, private_store_id: None,
protected_store_id: None, protected_store_id: None,
public_store_id: None, public_store_id: None,
}, },
user_id,
connected_broker: BrokerPeerId::None, connected_broker: BrokerPeerId::None,
graph_dataset: None, graph_dataset: None,
user_storage: None, user_storage: None,
@ -447,7 +455,7 @@ impl Verifier {
} }
#[allow(dead_code)] #[allow(dead_code)]
fn get_store(&self, store_repo: &StoreRepo) -> Result<Arc<Store>, VerifierError> { pub(crate) fn get_store(&self, store_repo: &StoreRepo) -> Result<Arc<Store>, VerifierError> {
let overlay_id = store_repo.overlay_id_for_storage_purpose(); let overlay_id = store_repo.overlay_id_for_storage_purpose();
let store = self let store = self
.stores .stores
@ -604,7 +612,7 @@ impl Verifier {
let branch = repo.branch(branch_id)?; let branch = repo.branch(branch_id)?;
let commit = Commit::new_with_body_and_save( let commit = Commit::new_with_body_and_save(
self.user_privkey(), self.user_privkey(),
&self.user_privkey().to_pub(), self.user_id(),
*branch_id, *branch_id,
QuorumType::NoSigning, QuorumType::NoSigning,
deps, deps,
@ -628,6 +636,44 @@ impl Verifier {
.await .await
} }
pub(crate) async fn new_transaction_commit(
&mut self,
commit_body: CommitBodyV0,
repo_id: &RepoId,
branch_id: &BranchId,
store_repo: &StoreRepo,
deps: Vec<ObjectRef>,
files: Vec<ObjectRef>,
) -> Result<Commit, NgError> {
let commit = {
let repo = self.get_repo(repo_id, &store_repo)?;
let branch = repo.branch(branch_id)?;
let commit = Commit::new_with_body_and_save(
self.user_privkey(),
self.user_id(),
*branch_id,
QuorumType::NoSigning,
deps,
vec![],
branch.current_heads.clone(),
vec![],
files,
vec![],
vec![],
CommitBody::V0(commit_body),
0,
&repo.store,
)?;
commit
};
//log_info!("{}", commit);
self.new_event(&commit, &vec![], *repo_id, store_repo)
.await?;
Ok(commit)
}
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) async fn new_commit_simple( pub(crate) async fn new_commit_simple(
&mut self, &mut self,
@ -779,7 +825,7 @@ impl Verifier {
// send the event to the server already // send the event to the server already
let connected_broker = self.connected_broker.clone(); let connected_broker = self.connected_broker.clone();
let broker = BROKER.read().await; let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub(); let user = self.user_id().clone();
self.send_event(event, &broker, &Some(user), &connected_broker, overlay) self.send_event(event, &broker, &Some(user), &connected_broker, overlay)
.await?; .await?;
} else { } else {
@ -843,7 +889,7 @@ impl Verifier {
} }
} }
let connected_broker = self.connected_broker.clone(); let connected_broker = self.connected_broker.clone();
let user = self.config.user_priv_key.to_pub(); let user = self.user_id().clone();
let broker = BROKER.read().await; let broker = BROKER.read().await;
//log_info!("looping on branches {:?}", branches); //log_info!("looping on branches {:?}", branches);
for (repo, branch, publisher) in branches { for (repo, branch, publisher) in branches {
@ -885,7 +931,7 @@ impl Verifier {
let res = self.send_outbox().await; let res = self.send_outbox().await;
log_info!("SENDING EVENTS FROM OUTBOX RETURNED: {:?}", res); log_info!("SENDING EVENTS FROM OUTBOX RETURNED: {:?}", res);
let user = self.config.user_priv_key.to_pub(); let user = self.user_id().clone();
let broker = BROKER.read().await; let broker = BROKER.read().await;
log_info!("looping on branches {:?}", branches); log_info!("looping on branches {:?}", branches);
for (repo, branch, publisher) in branches { for (repo, branch, publisher) in branches {
@ -924,7 +970,7 @@ impl Verifier {
return Ok(()); return Ok(());
} }
let user = self.config.user_priv_key.to_pub(); let user = self.user_id().clone();
let connected_broker = self.connected_broker.clone(); let connected_broker = self.connected_broker.clone();
self.open_branch_( self.open_branch_(
repo_id, repo_id,
@ -942,7 +988,7 @@ impl Verifier {
let overlay = repo.store.overlay_for_read_on_client_protocol(); let overlay = repo.store.overlay_for_read_on_client_protocol();
let broker = BROKER.read().await; let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub(); let user = self.user_id().clone();
let remote = self.connected_broker.connected_or_err()?; let remote = self.connected_broker.connected_or_err()?;
let msg = BlocksPut::V0(BlocksPutV0 { let msg = BlocksPut::V0(BlocksPutV0 {
@ -963,7 +1009,7 @@ impl Verifier {
let overlay = repo.store.overlay_for_read_on_client_protocol(); let overlay = repo.store.overlay_for_read_on_client_protocol();
let broker = BROKER.read().await; let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub(); let user = self.user_id().clone();
let remote = self.connected_broker.connected_or_err()?; let remote = self.connected_broker.connected_or_err()?;
let msg = BlocksExist::V0(BlocksExistV0 { let msg = BlocksExist::V0(BlocksExistV0 {
@ -1238,6 +1284,9 @@ impl Verifier {
CommitBodyV0::StoreUpdate(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::StoreUpdate(a) => a.verify(commit, self, branch_id, repo_id, store),
CommitBodyV0::AddSignerCap(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddSignerCap(a) => a.verify(commit, self, branch_id, repo_id, store),
CommitBodyV0::AddFile(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddFile(a) => a.verify(commit, self, branch_id, repo_id, store),
CommitBodyV0::AsyncTransaction(a) => {
Box::pin(self.verify_async_transaction(a, commit, branch_id, repo_id, store))
}
_ => { _ => {
log_err!("unimplemented verifier {}", commit); log_err!("unimplemented verifier {}", commit);
return Err(VerifierError::NotImplemented); return Err(VerifierError::NotImplemented);
@ -1351,6 +1400,15 @@ impl Verifier {
repo_ref repo_ref
} }
pub(crate) fn get_store_by_overlay_id(
&self,
id: &OverlayId,
) -> Result<Arc<Store>, VerifierError> {
Ok(Arc::clone(
self.stores.get(id).ok_or(VerifierError::StoreNotFound)?,
))
}
async fn bootstrap(&mut self) -> Result<(), NgError> { async fn bootstrap(&mut self) -> Result<(), NgError> {
if let Err(e) = self.bootstrap_from_remote().await { if let Err(e) = self.bootstrap_from_remote().await {
log_warn!("bootstrap_from_remote failed with {}", e); log_warn!("bootstrap_from_remote failed with {}", e);
@ -1635,7 +1693,7 @@ impl Verifier {
let overlay = repo.store.overlay_for_read_on_client_protocol(); let overlay = repo.store.overlay_for_read_on_client_protocol();
let broker = BROKER.read().await; let broker = BROKER.read().await;
let user = Some(self.config.user_priv_key.to_pub()); let user = Some(self.user_id().clone());
let remote = &self.connected_broker; let remote = &self.connected_broker;
match repo.store.has(id) { match repo.store.has(id) {
@ -1666,7 +1724,7 @@ impl Verifier {
async fn bootstrap_from_remote(&mut self) -> Result<(), NgError> { async fn bootstrap_from_remote(&mut self) -> Result<(), NgError> {
if self.need_bootstrap() { if self.need_bootstrap() {
let broker = BROKER.read().await; let broker = BROKER.read().await;
let user = Some(self.config.user_priv_key.to_pub()); let user = Some(self.user_id().clone());
self.connected_broker.is_direct_or_err()?; self.connected_broker.is_direct_or_err()?;
let private_store_id = self.config.private_store_id.to_owned().unwrap(); let private_store_id = self.config.private_store_id.to_owned().unwrap();
@ -1857,7 +1915,7 @@ impl Verifier {
return Ok(()); return Ok(());
} }
let broker = BROKER.read().await; let broker = BROKER.read().await;
let user = Some(self.config.user_priv_key.to_pub()); let user = Some(self.user_id().clone());
self.connected_broker.connected_or_err()?; self.connected_broker.connected_or_err()?;
let remote = self.connected_broker.clone(); let remote = self.connected_broker.clone();
@ -2059,6 +2117,7 @@ impl Verifier {
}; };
let peer_id = config.peer_priv_key.to_pub(); let peer_id = config.peer_priv_key.to_pub();
let mut verif = Verifier { let mut verif = Verifier {
user_id: config.user_priv_key.to_pub(),
config, config,
connected_broker: BrokerPeerId::None, connected_broker: BrokerPeerId::None,
graph_dataset: graph, graph_dataset: graph,

Loading…
Cancel
Save