From 21ad76c7cfc4e68efb5673fbdb2f54d974984000 Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 20 Aug 2019 14:49:38 +0200 Subject: [PATCH] Introduces Repository and RepositoryConnection and simplifies API Allows to upgrades to the latest versions of RocksDB and avoid some behaviors that could cause unexpected crashes --- README.md | 11 +- lib/Cargo.toml | 3 +- lib/src/lib.rs | 78 +-- lib/src/model/dataset.rs | 212 -------- lib/src/model/graph.rs | 13 +- lib/src/model/isomorphism.rs | 315 ++++++----- lib/src/model/mod.rs | 4 - lib/src/model/triple.rs | 5 + lib/src/repository.rs | 126 +++++ lib/src/sparql/eval.rs | 288 +++++----- lib/src/sparql/mod.rs | 121 ++--- lib/src/sparql/plan.rs | 8 +- lib/src/store/encoded.rs | 966 --------------------------------- lib/src/store/memory.rs | 424 +++++++++------ lib/src/store/mod.rs | 123 ++++- lib/src/store/rocksdb.rs | 362 +++++++----- lib/tests/sparql_test_cases.rs | 62 ++- server/src/main.rs | 172 +++--- 18 files changed, 1317 insertions(+), 1976 deletions(-) delete mode 100644 lib/src/model/dataset.rs create mode 100644 lib/src/repository.rs delete mode 100644 lib/src/store/encoded.rs diff --git a/README.md b/README.md index 2d9c9410..c2eb0450 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,11 @@ -# Rudf +Rudf is a work in progress graph database implementing the [SPARQL](https://www.w3.org/TR/sparql11-overview/) standard. -This library is a work in progress of a [RDF](https://www.w3.org/RDF/) stack implementation in [Rust](https://www.rust-lang.org). +There is no released version yet. -There is no released version yet but [automated documentation for the master branch is available](https://tpt.github.io/rudf/). +Its goal is to provide a compliant, safe and fast graph database. +It is written in Rust. -Its goal is to provide a compliant, safe and fast implementation of W3C specifications in Rust. - -The `lib` directory contains the Rust library code and the `python` directory a beginning of Python bindings. +The `lib` directory contains the database written as a Rust library. [![Build Status](https://travis-ci.org/Tpt/rudf.svg?branch=master)](https://travis-ci.org/Tpt/rudf) [![dependency status](https://deps.rs/repo/github/Tpt/rudf/status.svg)](https://deps.rs/repo/github/Tpt/rudf) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 68ae737c..fdc89211 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -17,7 +17,7 @@ travis-ci = { repository = "Tpt/rudf" } [dependencies] lazy_static = "1" -rocksdb = { version = "0.11", optional = true } +rocksdb = { version = "0.12", optional = true } url = "2" uuid = { version = "0.7", features = ["v4"] } bzip2 = "0.3" @@ -32,7 +32,6 @@ regex = "1" rio_api = "0.2" rio_turtle = "0.2" rio_xml = "0.2" -permutohedron = "0.2" [build-dependencies] peg = "0.5" diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 4c5fa47e..98bca791 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,46 +1,50 @@ -//! This crate is a work in progress of implementation of an RDF and SPARQL software stack in Rust. +//! Rudf is a work in progress graph database implementing the [SPARQL](https://www.w3.org/TR/sparql11-overview/) standard. //! -//! Its goal is to provide a compliant, safe and fast implementation of W3C specifications. +//! Its goal is to provide a compliant, safe and fast graph database. //! -//! It currently provides: -//! * Basic RDF data structures in the `model` package -//! * Parsers for XML, Turtle and N-Triples syntaxes in the `rio` package -//! * A memory based and a disk based stores in the `store` package -//! * A work in progress SPARQL implementation in the `sparql` package - -#![warn( - clippy::cast_possible_wrap, - clippy::cast_precision_loss, - clippy::cast_sign_loss, - clippy::default_trait_access, - clippy::empty_enum, - clippy::enum_glob_use, - clippy::expl_impl_clone_on_copy, - clippy::explicit_into_iter_loop, - clippy::filter_map, - clippy::if_not_else, - clippy::inline_always, - clippy::invalid_upcast_comparisons, - clippy::items_after_statements, - clippy::linkedlist, - //TODO match_same_arms, - clippy::maybe_infinite_iter, - clippy::mut_mut, - clippy::needless_continue, - clippy::option_map_unwrap_or, - //TODO option_map_unwrap_or_else, - clippy::pub_enum_variant_names, - clippy::replace_consts, - clippy::result_map_unwrap_or_else, - //TODO single_match_else, - clippy::string_add_assign, - clippy::unicode_not_nfc -)] +//! It currently provides two `Repository` implementation providing [SPARQL 1.0 query](https://www.w3.org/TR/rdf-sparql-query/) capability: +//! * `MemoryRepository`: a simple in memory implementation. +//! * `RocksDbRepository`: a file system implementation based on the [RocksDB](https://rocksdb.org/) key-value store. +//! +//! Usage example with the `MemoryRepository`: +//! +//! ``` +//! use rudf::model::*; +//! use rudf::{Repository, RepositoryConnection, MemoryRepository, Result}; +//! use crate::rudf::sparql::PreparedQuery; +//! use std::str::FromStr; +//! use rudf::sparql::algebra::QueryResult; +//! +//! let repository = MemoryRepository::default(); +//! let connection = repository.connection().unwrap(); +//! +//! // insertion +//! let ex = NamedNode::from_str("http://example.com").unwrap(); +//! let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); +//! connection.insert(&quad); +//! +//! // quad filter +//! let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); +//! assert_eq!(vec![quad], results.unwrap()); +//! +//! // SPARQL query +//! let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap(); +//! let results = prepared_query.exec().unwrap(); +//! if let QueryResult::Bindings(results) = results { +//! assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +//! } +//! ``` pub mod model; +mod repository; pub mod rio; pub mod sparql; -pub mod store; +pub(crate) mod store; pub use failure::Error; pub type Result = ::std::result::Result; +pub use crate::store::MemoryRepository; +#[cfg(feature = "rocksdb")] +pub use crate::store::RocksDbRepository; +pub use repository::Repository; +pub use repository::RepositoryConnection; diff --git a/lib/src/model/dataset.rs b/lib/src/model/dataset.rs deleted file mode 100644 index 92a95710..00000000 --- a/lib/src/model/dataset.rs +++ /dev/null @@ -1,212 +0,0 @@ -use crate::model::*; -use crate::Result; - -/// Trait for [RDF graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-graph) -/// -/// This crate currently provides a simple stand-alone in memory implementation of the `Graph` trait. -/// -/// Usage example: -/// ``` -/// use rudf::model::*; -/// use rudf::store::MemoryGraph; -/// use std::str::FromStr; -/// -/// let graph = MemoryGraph::default(); -/// let ex = NamedNode::from_str("http://example.com").unwrap(); -/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone()); -/// graph.insert(&triple); -/// let results: Vec = graph.triples_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect(); -/// assert_eq!(vec![triple], results); -/// ``` -pub trait Graph { - type TriplesIterator: Iterator>; - type TriplesForSubjectIterator: Iterator>; - type ObjectsForSubjectPredicateIterator: Iterator>; - type PredicatesForSubjectObjectIterator: Iterator>; - type TriplesForPredicateIterator: Iterator>; - type SubjectsForPredicateObjectIterator: Iterator>; - type TriplesForObjectIterator: Iterator>; - - /// Returns all triples contained by the graph - fn iter(&self) -> Result { - self.triples() - } - - /// Returns all triples contained by the graph - fn triples(&self) -> Result; - - fn triples_for_subject( - &self, - subject: &NamedOrBlankNode, - ) -> Result; - - fn objects_for_subject_predicate( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - ) -> Result; - - fn object_for_subject_predicate( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - ) -> Result> { - //TODO use transpose when stable - match self - .objects_for_subject_predicate(subject, predicate)? - .nth(0) - { - Some(object) => Ok(Some(object?)), - None => Ok(None), - } - } - - fn predicates_for_subject_object( - &self, - subject: &NamedOrBlankNode, - object: &Term, - ) -> Result; - - fn triples_for_predicate( - &self, - predicate: &NamedNode, - ) -> Result; - - fn subjects_for_predicate_object( - &self, - predicate: &NamedNode, - object: &Term, - ) -> Result; - - fn triples_for_object(&self, object: &Term) -> Result; - - /// Checks if the graph contains the given triple - fn contains(&self, triple: &Triple) -> Result; - - /// Adds a triple to the graph - fn insert(&self, triple: &Triple) -> Result<()>; - - /// Removes a concrete triple from the graph - fn remove(&self, triple: &Triple) -> Result<()>; - - /// Returns the number of triples in this graph - fn len(&self) -> Result; - - /// Checks if this graph contains a triple - fn is_empty(&self) -> Result; -} - -/// Trait for [RDF named graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-named-graph) i.e. RDF graphs identified by an IRI -pub trait NamedGraph: Graph { - fn name(&self) -> &NamedOrBlankNode; -} - -/// Trait for [RDF datasets](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset) -/// -/// This crate currently provides two implementation of the `Dataset` traits: -/// * One in memory: `rudf::store::MemoryDataset` -/// * One disk-based using [RocksDB](https://rocksdb.org/): `rudf::store::RocksDbDataset` -/// -/// Usage example with `rudf::store::MemoryDataset`: -/// ``` -/// use rudf::model::*; -/// use rudf::store::MemoryDataset; -/// use std::str::FromStr; -/// -/// let dataset = MemoryDataset::default(); -/// let default_graph = dataset.default_graph(); -/// let ex = NamedNode::from_str("http://example.com").unwrap(); -/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone()); -/// default_graph.insert(&triple); -/// let results: Vec = dataset.quads_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect(); -/// assert_eq!(vec![triple.in_graph(None)], results); -/// ``` -/// -/// The implementation backed by RocksDB if disabled by default and requires the `"rocksdb"` feature to be activated. -/// A `RocksDbDataset` could be built using `RocksDbDataset::open` and works just like its in-memory equivalent: -/// ```ignore -/// use rudf::store::RocksDbDataset; -/// let dataset = RocksDbDataset::open("foo.db"); -/// ``` -pub trait Dataset { - type NamedGraph: NamedGraph; - type DefaultGraph: Graph; - type UnionGraph: Graph; - type QuadsIterator: Iterator>; - type QuadsForSubjectIterator: Iterator>; - type QuadsForSubjectPredicateIterator: Iterator>; - type QuadsForSubjectPredicateObjectIterator: Iterator>; - type QuadsForSubjectObjectIterator: Iterator>; - type QuadsForPredicateIterator: Iterator>; - type QuadsForPredicateObjectIterator: Iterator>; - type QuadsForObjectIterator: Iterator>; - - /// Returns an object for a [named graph](https://www.w3.org/TR/rdf11-concepts/#dfn-named-graph) of this dataset. - /// - /// This named graph may be empty if no triple is in the graph yet. - fn named_graph(&self, name: &NamedOrBlankNode) -> Result; - - /// Returns an object for the [default graph](https://www.w3.org/TR/rdf11-concepts/#dfn-default-graph) of this dataset - fn default_graph(&self) -> Self::DefaultGraph; - - /// Returns a graph that is the union of all graphs contained in this dataset, including the default graph - fn union_graph(&self) -> Self::UnionGraph; - - /// Returns all quads contained by the graph - fn iter(&self) -> Result { - self.quads() - } - - /// Returns all quads contained by the graph - fn quads(&self) -> Result; - - fn quads_for_subject( - &self, - subject: &NamedOrBlankNode, - ) -> Result; - - fn quads_for_subject_predicate( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - ) -> Result; - - fn quads_for_subject_predicate_object( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - object: &Term, - ) -> Result; - - fn quads_for_subject_object( - &self, - subject: &NamedOrBlankNode, - object: &Term, - ) -> Result; - - fn quads_for_predicate(&self, predicate: &NamedNode) - -> Result; - - fn quads_for_predicate_object( - &self, - predicate: &NamedNode, - object: &Term, - ) -> Result; - - fn quads_for_object(&self, object: &Term) -> Result; - - /// Checks if this dataset contains a given quad - fn contains(&self, quad: &Quad) -> Result; - - /// Adds a quad to this dataset - fn insert(&self, quad: &Quad) -> Result<()>; - - /// Removes a quad from this dataset - fn remove(&self, quad: &Quad) -> Result<()>; - - /// Returns the number of quads in this dataset - fn len(&self) -> Result; - - /// Checks if this dataset contains a quad - fn is_empty(&self) -> Result; -} diff --git a/lib/src/model/graph.rs b/lib/src/model/graph.rs index 5c7eca27..ccb01fd6 100644 --- a/lib/src/model/graph.rs +++ b/lib/src/model/graph.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use std::fmt; use std::iter::FromIterator; -/// Simple data structure [RDF graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-graph). +/// A simple implementation of [RDF graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-graph). /// /// It is not done to hold big graphs. /// @@ -118,6 +118,8 @@ impl SimpleGraph { } /// Checks if the current graph is [isomorphic](https://www.w3.org/TR/rdf11-concepts/#dfn-graph-isomorphism) with an other one + /// + /// Warning: This algorithm as a worst case complexity in n! pub fn is_isomorphic(&self, other: &SimpleGraph) -> bool { are_graphs_isomorphic(self, other) } @@ -132,6 +134,15 @@ impl IntoIterator for SimpleGraph { } } +impl<'a> IntoIterator for &'a SimpleGraph { + type Item = &'a Triple; + type IntoIter = <&'a HashSet as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.triples.iter() + } +} + impl FromIterator for SimpleGraph { fn from_iter>(iter: I) -> Self { Self { diff --git a/lib/src/model/isomorphism.rs b/lib/src/model/isomorphism.rs index 695f968a..4e9b189b 100644 --- a/lib/src/model/isomorphism.rs +++ b/lib/src/model/isomorphism.rs @@ -1,8 +1,6 @@ use crate::model::*; -use permutohedron::LexicalPermutation; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashSet; -use std::collections::{BTreeSet, HashMap}; +use std::collections::hash_map::{DefaultHasher, RandomState}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::hash::Hash; use std::hash::Hasher; @@ -38,140 +36,176 @@ fn predicate_objects_for_subject<'a>( }) } -fn hash_blank_nodes<'a>( - bnodes: HashSet<&'a BlankNode>, +fn split_hash_buckets<'a>( + bnodes_by_hash: HashMap>, graph: &'a SimpleGraph, + distance: usize, ) -> HashMap> { - let mut bnodes_by_hash = HashMap::default(); - - // NB: we need to sort the triples to have the same hash - for bnode in bnodes { - let mut hasher = DefaultHasher::new(); - - { - let subject = NamedOrBlankNode::from(bnode.clone()); - let mut po_set: BTreeSet = BTreeSet::default(); - for po in predicate_objects_for_subject(graph, &subject) { - match &po.object { - Term::BlankNode(_) => (), - _ => { - po_set.insert(po); + let mut new_bnodes_by_hash = HashMap::default(); + + for (hash, bnodes) in bnodes_by_hash { + if bnodes.len() == 1 { + new_bnodes_by_hash.insert(hash, bnodes); // Nothing to improve + } else { + for bnode in bnodes { + let mut starts = vec![NamedOrBlankNode::from(bnode.clone())]; + for _ in 0..distance { + let mut new_starts = Vec::default(); + for s in starts { + for t in graph.triples_for_subject(&s) { + match t.object() { + Term::NamedNode(t) => new_starts.push(t.clone().into()), + Term::BlankNode(t) => new_starts.push(t.clone().into()), + Term::Literal(_) => (), + } + } + for t in graph.triples_for_object(&s.into()) { + new_starts.push(t.subject().clone()); + } } + starts = new_starts; } - } - for po in po_set { - po.hash(&mut hasher); - } - } - { - let object = Term::from(bnode.clone()); - let mut sp_set: BTreeSet = BTreeSet::default(); - for sp in subject_predicates_for_object(graph, &object) { - match &sp.subject { - NamedOrBlankNode::BlankNode(_) => (), - _ => { - sp_set.insert(sp); + // We do the hashing + let mut hasher = DefaultHasher::default(); + hash.hash(&mut hasher); // We start with the previous hash + + // NB: we need to sort the triples to have the same hash + let mut po_set: BTreeSet = BTreeSet::default(); + for start in &starts { + for po in predicate_objects_for_subject(graph, start) { + match &po.object { + Term::BlankNode(_) => (), + _ => { + po_set.insert(po); + } + } } } - } - for sp in sp_set { - sp.hash(&mut hasher); + for po in &po_set { + po.hash(&mut hasher); + } + + let mut sp_set: BTreeSet = BTreeSet::default(); + let term_starts: Vec<_> = starts.into_iter().map(|t| t.into()).collect(); + for start in &term_starts { + for sp in subject_predicates_for_object(graph, start) { + match &sp.subject { + NamedOrBlankNode::BlankNode(_) => (), + _ => { + sp_set.insert(sp); + } + } + } + } + for sp in &sp_set { + sp.hash(&mut hasher); + } + + new_bnodes_by_hash + .entry(hasher.finish()) + .or_insert_with(Vec::default) + .push(bnode); } } - - bnodes_by_hash - .entry(hasher.finish()) - .or_insert_with(Vec::default) - .push(bnode); } - bnodes_by_hash + new_bnodes_by_hash } fn build_and_check_containment_from_hashes<'a>( - hashes_to_see: &mut Vec<&u64>, - a_bnodes_by_hash: &'a HashMap>, + a_bnodes_by_hash: &mut Vec<(u64, Vec<&'a BlankNode>)>, b_bnodes_by_hash: &'a HashMap>, a_to_b_mapping: &mut HashMap<&'a BlankNode, &'a BlankNode>, - a: &SimpleGraph, - b: &SimpleGraph, + a: &'a SimpleGraph, + b: &'a SimpleGraph, + current_a_nodes: &[&'a BlankNode], + current_b_nodes: &mut BTreeSet<&'a BlankNode>, ) -> bool { - let hash = match hashes_to_see.pop() { - Some(h) => h, - None => return check_is_contained(a_to_b_mapping, a, b), - }; - - let a_nodes = a_bnodes_by_hash - .get(hash) - .map_or(&[] as &[&BlankNode], |v| v.as_slice()); - let b_nodes = b_bnodes_by_hash - .get(hash) - .map_or(&[] as &[&BlankNode], |v| v.as_slice()); - if a_nodes.len() != b_nodes.len() { - return false; - } - if a_nodes.len() == 1 { - // Avoid allocation for len == 1 - a_to_b_mapping.insert(a_nodes[0], b_nodes[0]); - let result = build_and_check_containment_from_hashes( - hashes_to_see, + if let Some((a_node, remaining_a_node)) = current_a_nodes.split_last() { + let b_nodes = current_b_nodes.iter().cloned().collect::>(); + for b_node in b_nodes.into_iter() { + current_b_nodes.remove(b_node); + a_to_b_mapping.insert(a_node, b_node); + if check_is_contained_focused(a_to_b_mapping, a_node, a, b) + && build_and_check_containment_from_hashes( + a_bnodes_by_hash, + b_bnodes_by_hash, + a_to_b_mapping, + a, + b, + remaining_a_node, + current_b_nodes, + ) + { + return true; + } + current_b_nodes.insert(b_node); + } + a_to_b_mapping.remove(a_node); + false + } else { + let (hash, new_a_nodes) = match a_bnodes_by_hash.pop() { + Some(v) => v, + None => return true, + }; + + let mut new_b_nodes = b_bnodes_by_hash + .get(&hash) + .map_or(BTreeSet::default(), |v| v.into_iter().cloned().collect()); + if new_a_nodes.len() != new_b_nodes.len() { + return false; + } + + if new_a_nodes.len() > 10 { + eprintln!("Too big instance, aborting"); + return true; //TODO: Very very very bad + } + + if build_and_check_containment_from_hashes( a_bnodes_by_hash, b_bnodes_by_hash, a_to_b_mapping, a, b, - ); - a_to_b_mapping.remove(a_nodes[0]); - hashes_to_see.push(hash); - result - } else { - // We compute all the rotations of a_nodes and then zip it with b_nodes to have all the possible pairs (a,b) - let mut a_nodes_rotated = a_nodes.to_vec(); - a_nodes_rotated.sort(); - loop { - for (a_node, b_node) in a_nodes_rotated.iter().zip(b_nodes.iter()) { - a_to_b_mapping.insert(a_node, b_node); - } - let result = if build_and_check_containment_from_hashes( - hashes_to_see, - a_bnodes_by_hash, - b_bnodes_by_hash, - a_to_b_mapping, - a, - b, - ) { - Some(true) - } else if a_nodes_rotated.next_permutation() { - None //keep going - } else { - Some(false) // No more permutation - }; - - if let Some(result) = result { - for a_node in &a_nodes_rotated { - a_to_b_mapping.remove(a_node); - } - hashes_to_see.push(hash); - return result; - } + &new_a_nodes, + &mut new_b_nodes, + ) { + true + } else { + a_bnodes_by_hash.push((hash, new_a_nodes)); + false } } } -fn check_is_contained<'a>( +fn check_is_contained_focused<'a>( a_to_b_mapping: &mut HashMap<&'a BlankNode, &'a BlankNode>, - a: &SimpleGraph, - b: &SimpleGraph, + a_bnode_focus: &'a BlankNode, + a: &'a SimpleGraph, + b: &'a SimpleGraph, ) -> bool { - for t_a in a.iter() { - let subject = if let NamedOrBlankNode::BlankNode(s_a) = &t_a.subject() { - a_to_b_mapping[s_a].clone().into() + let a_bnode_subject = a_bnode_focus.clone().into(); + let a_bnode_object = a_bnode_focus.clone().into(); + let ts_a = a + .triples_for_subject(&a_bnode_subject) + .chain(a.triples_for_object(&a_bnode_object)); + for t_a in ts_a { + let subject: NamedOrBlankNode = if let NamedOrBlankNode::BlankNode(s_a) = &t_a.subject() { + if let Some(s_a) = a_to_b_mapping.get(s_a) { + (*s_a).clone().into() + } else { + continue; // We skip for now + } } else { t_a.subject().clone() }; let predicate = t_a.predicate().clone(); - let object = if let Term::BlankNode(o_a) = &t_a.object() { - a_to_b_mapping[o_a].clone().into() + let object: Term = if let Term::BlankNode(o_a) = &t_a.object() { + if let Some(o_a) = a_to_b_mapping.get(o_a) { + (*o_a).clone().into() + } else { + continue; // We skip for now + } } else { t_a.object().clone() }; @@ -183,9 +217,9 @@ fn check_is_contained<'a>( true } -fn graph_blank_nodes(graph: &SimpleGraph) -> HashSet<&BlankNode> { - let mut blank_nodes = HashSet::default(); - for t in graph.iter() { +fn graph_blank_nodes(graph: &SimpleGraph) -> Vec<&BlankNode> { + let mut blank_nodes: HashSet<&BlankNode, RandomState> = HashSet::default(); + for t in graph { if let NamedOrBlankNode::BlankNode(subject) = t.subject() { blank_nodes.insert(subject); } @@ -193,7 +227,7 @@ fn graph_blank_nodes(graph: &SimpleGraph) -> HashSet<&BlankNode> { blank_nodes.insert(object); } } - blank_nodes + blank_nodes.into_iter().collect() } pub fn are_graphs_isomorphic(a: &SimpleGraph, b: &SimpleGraph) -> bool { @@ -201,23 +235,60 @@ pub fn are_graphs_isomorphic(a: &SimpleGraph, b: &SimpleGraph) -> bool { return false; } - let a_bnodes = graph_blank_nodes(a); - let a_bnodes_by_hash = hash_blank_nodes(a_bnodes, a); + // We check containment of everything buts triples with blank nodes + let mut a_bnodes_triples = SimpleGraph::default(); + for t in a { + if t.subject().is_blank_node() || t.object().is_blank_node() { + a_bnodes_triples.insert(t.clone()); + } else if !b.contains(t) { + return false; // Triple in a not in b without blank nodes + } + } + + let mut b_bnodes_triples = SimpleGraph::default(); + for t in b { + if t.subject().is_blank_node() || t.object().is_blank_node() { + b_bnodes_triples.insert(t.clone()); + } else if !a.contains(t) { + return false; // Triple in a not in b without blank nodes + } + } - let b_bnodes = graph_blank_nodes(b); - let b_bnodes_by_hash = hash_blank_nodes(b_bnodes, b); + let mut a_bnodes_by_hash = HashMap::default(); + a_bnodes_by_hash.insert(0, graph_blank_nodes(&a_bnodes_triples)); + let mut b_bnodes_by_hash = HashMap::default(); + b_bnodes_by_hash.insert(0, graph_blank_nodes(&b_bnodes_triples)); - // Hashes should have the same size everywhere - if a_bnodes_by_hash.len() != b_bnodes_by_hash.len() { - return false; + for distance in 0..5 { + let max_size = a_bnodes_by_hash + .values() + .into_iter() + .map(|l| l.len()) + .max() + .unwrap_or(0); + if max_size < 2 { + break; // We only have small buckets + } + + a_bnodes_by_hash = split_hash_buckets(a_bnodes_by_hash, a, distance); + b_bnodes_by_hash = split_hash_buckets(b_bnodes_by_hash, b, distance); + + // Hashes should have the same size + if a_bnodes_by_hash.len() != b_bnodes_by_hash.len() { + return false; + } } + let mut sorted_a_bnodes_by_hash: Vec<_> = a_bnodes_by_hash.into_iter().collect(); + sorted_a_bnodes_by_hash.sort_by(|(_, l1), (_, l2)| l1.len().cmp(&l2.len())); + build_and_check_containment_from_hashes( - &mut a_bnodes_by_hash.keys().collect(), - &a_bnodes_by_hash, + &mut sorted_a_bnodes_by_hash, &b_bnodes_by_hash, &mut HashMap::default(), - a, - b, + &a_bnodes_triples, + &b_bnodes_triples, + &[], + &mut BTreeSet::default(), ) } diff --git a/lib/src/model/mod.rs b/lib/src/model/mod.rs index a58ece23..7704715c 100644 --- a/lib/src/model/mod.rs +++ b/lib/src/model/mod.rs @@ -3,7 +3,6 @@ //! Inspired by [RDFjs](http://rdf.js.org/) and [Apache Commons RDF](http://commons.apache.org/proper/commons-rdf/) mod blank_node; -mod dataset; mod graph; mod isomorphism; mod language_tag; @@ -13,9 +12,6 @@ mod triple; pub mod vocab; pub use crate::model::blank_node::BlankNode; -pub use crate::model::dataset::Dataset; -pub use crate::model::dataset::Graph; -pub use crate::model::dataset::NamedGraph; pub use crate::model::graph::SimpleGraph; pub use crate::model::language_tag::LanguageTag; pub use crate::model::literal::Literal; diff --git a/lib/src/model/triple.rs b/lib/src/model/triple.rs index 58ffd9df..567f8c39 100644 --- a/lib/src/model/triple.rs +++ b/lib/src/model/triple.rs @@ -260,6 +260,11 @@ impl Quad { pub fn graph_name_owned(self) -> Option { self.graph_name } + + /// Returns the underlying [triple](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-triple) + pub fn into_triple(self) -> Triple { + Triple::new(self.subject, self.predicate, self.object) + } } impl fmt::Display for Quad { diff --git a/lib/src/repository.rs b/lib/src/repository.rs new file mode 100644 index 00000000..273818f3 --- /dev/null +++ b/lib/src/repository.rs @@ -0,0 +1,126 @@ +use crate::model::*; +use crate::sparql::PreparedQuery; +use crate::Result; +use std::io::Read; + +/// A `Repository` stores a [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset) +/// and allows to query and update it using SPARQL. +/// +/// This crate currently provides two implementation of the `Repository` traits: +/// * One in memory: `MemoryRepository` +/// * One disk-based using [RocksDB](https://rocksdb.org/): `RocksDbRepository` +/// +/// Usage example with `MemoryRepository`: +/// ``` +/// use rudf::model::*; +/// use rudf::{Repository, RepositoryConnection, MemoryRepository, Result}; +/// use crate::rudf::sparql::PreparedQuery; +/// use std::str::FromStr; +/// use rudf::sparql::algebra::QueryResult; +/// +/// let repository = MemoryRepository::default(); +/// let connection = repository.connection().unwrap(); +/// +/// // insertion +/// let ex = NamedNode::from_str("http://example.com").unwrap(); +/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); +/// connection.insert(&quad); +/// +/// // quad filter +/// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); +/// assert_eq!(vec![quad], results.unwrap()); +/// +/// // SPARQL query +/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap(); +/// let results = prepared_query.exec().unwrap(); +/// if let QueryResult::Bindings(results) = results { +/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +/// } +/// ``` +/// +/// The implementation based on RocksDB if disabled by default and requires the `"rocksdb"` feature to be activated. +/// A `RocksDbRepository` could be built using `RocksDbRepository::open` and works just like its in-memory equivalent: +/// ```ignore +/// use rudf::RocksDbRepository; +/// let dataset = RocksDbRepository::open("example.db").unwrap(); +/// ``` +/// +/// Quads insertion and deletion should respect [ACID](https://en.wikipedia.org/wiki/ACID) properties for all implementation. +/// No complex transaction support is provided yet. +pub trait Repository { + type Connection: RepositoryConnection; + + fn connection(self) -> Result; +} + +/// A connection to a `Repository` +pub trait RepositoryConnection: Clone { + type PreparedQuery: PreparedQuery; + + /// Prepares a [SPARQL 1.1](https://www.w3.org/TR/sparql11-query/) query and returns an object that could be used to execute it. + /// + /// The implementation is a work in progress, SPARQL 1.1 specific features are not implemented yet. + /// + /// Usage example: + /// ``` + /// use rudf::model::*; + /// use rudf::{Repository, RepositoryConnection, MemoryRepository}; + /// use rudf::sparql::PreparedQuery; + /// use rudf::sparql::algebra::QueryResult; + /// use std::str::FromStr; + /// + /// let repository = MemoryRepository::default(); + /// let connection = repository.connection().unwrap(); + /// + /// // insertions + /// let ex = NamedNode::from_str("http://example.com").unwrap(); + /// connection.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None)); + /// + /// // SPARQL query + /// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap(); + /// let results = prepared_query.exec().unwrap(); + /// if let QueryResult::Bindings(results) = results { + /// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); + /// } + /// ``` + fn prepare_query(&self, query: impl Read) -> Result; + + /// Retrieves quads with a filter on each quad component + /// + /// Usage example: + /// ``` + /// use rudf::model::*; + /// use rudf::{Repository, RepositoryConnection, MemoryRepository, Result}; + /// use std::str::FromStr; + /// + /// let repository = MemoryRepository::default(); + /// let connection = repository.connection().unwrap(); + /// + /// // insertion + /// let ex = NamedNode::from_str("http://example.com").unwrap(); + /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); + /// connection.insert(&quad); + /// + /// // quad filter + /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); + /// assert_eq!(vec![quad], results.unwrap()); + /// ``` + fn quads_for_pattern<'a>( + &'a self, + subject: Option<&NamedOrBlankNode>, + predicate: Option<&NamedNode>, + object: Option<&Term>, + graph_name: Option<&NamedOrBlankNode>, + ) -> Box> + 'a> + where + Self: 'a; + + /// Checks if this dataset contains a given quad + fn contains(&self, quad: &Quad) -> Result; + + /// Adds a quad to this dataset + fn insert(&self, quad: &Quad) -> Result<()>; + + /// Removes a quad from this dataset + fn remove(&self, quad: &Quad) -> Result<()>; +} diff --git a/lib/src/sparql/eval.rs b/lib/src/sparql/eval.rs index 3984cbd8..adf39cc3 100644 --- a/lib/src/sparql/eval.rs +++ b/lib/src/sparql/eval.rs @@ -2,8 +2,8 @@ use crate::model::BlankNode; use crate::model::Triple; use crate::sparql::algebra::*; use crate::sparql::plan::*; -use crate::store::encoded::EncodedQuadsStore; use crate::store::numeric_encoder::*; +use crate::store::StoreConnection; use crate::Result; use chrono::prelude::*; use num_traits::identities::Zero; @@ -26,40 +26,38 @@ const REGEX_SIZE_LIMIT: usize = 1_000_000; type EncodedTuplesIterator<'a> = Box> + 'a>; -pub struct SimpleEvaluator { - store: Arc, +#[derive(Clone)] +pub struct SimpleEvaluator { + store: S, bnodes_map: Arc>>, } -impl Clone for SimpleEvaluator { - fn clone(&self) -> Self { - Self { - store: self.store.clone(), - bnodes_map: self.bnodes_map.clone(), - } - } -} - -impl SimpleEvaluator { - pub fn new(store: Arc) -> Self { +impl<'a, S: StoreConnection + 'a> SimpleEvaluator { + pub fn new(store: S) -> Self { Self { store, bnodes_map: Arc::new(Mutex::new(BTreeMap::default())), } } - pub fn evaluate_select_plan<'a>( - &'a self, - plan: &'a PlanNode, + pub fn evaluate_select_plan<'b>( + &'b self, + plan: &'b PlanNode, variables: &[Variable], - ) -> Result> { + ) -> Result> + where + 'a: 'b, + { let iter = self.eval_plan(plan, vec![None; variables.len()]); Ok(QueryResult::Bindings( self.decode_bindings(iter, variables.to_vec()), )) } - pub fn evaluate_ask_plan<'a>(&'a self, plan: &'a PlanNode) -> Result> { + pub fn evaluate_ask_plan<'b>(&'b self, plan: &'b PlanNode) -> Result> + where + 'a: 'b, + { match self.eval_plan(plan, vec![]).next() { Some(Ok(_)) => Ok(QueryResult::Boolean(true)), Some(Err(error)) => Err(error), @@ -67,11 +65,14 @@ impl SimpleEvaluator { } } - pub fn evaluate_construct_plan<'a>( - &'a self, - plan: &'a PlanNode, - construct: &'a [TripleTemplate], - ) -> Result> { + pub fn evaluate_construct_plan<'b>( + &'b self, + plan: &'b PlanNode, + construct: &'b [TripleTemplate], + ) -> Result> + where + 'a: 'b, + { Ok(QueryResult::Graph(Box::new(ConstructIterator { store: self.store.clone(), iter: self.eval_plan(plan, vec![]), @@ -81,15 +82,21 @@ impl SimpleEvaluator { }))) } - pub fn evaluate_describe_plan<'a>(&'a self, plan: &'a PlanNode) -> Result> { + pub fn evaluate_describe_plan<'b>(&'b self, plan: &'b PlanNode) -> Result> + where + 'a: 'b, + { Ok(QueryResult::Graph(Box::new(DescribeIterator { store: self.store.clone(), iter: self.eval_plan(plan, vec![]), - quads_iters: Vec::default(), + quads: Vec::default(), }))) } - fn eval_plan<'a>(&self, node: &'a PlanNode, from: EncodedTuple) -> EncodedTuplesIterator<'a> { + fn eval_plan<'b>(&'b self, node: &'b PlanNode, from: EncodedTuple) -> EncodedTuplesIterator<'b> + where + 'a: 'b, + { match node { PlanNode::Init => Box::new(once(Ok(from))), PlanNode::StaticBindings { tuples } => Box::new(tuples.iter().cloned().map(Ok)), @@ -99,84 +106,72 @@ impl SimpleEvaluator { predicate, object, graph_name, - } => { - let eval = self.clone(); - Box::new( - self.eval_plan(&*child, from) - .flat_map(move |tuple| match tuple { - Ok(tuple) => { - let mut iter = eval.store.quads_for_pattern( - get_pattern_value(&subject, &tuple), - get_pattern_value(&predicate, &tuple), - get_pattern_value(&object, &tuple), - get_pattern_value(&graph_name, &tuple), - ); - if subject.is_var() && subject == predicate { + } => Box::new( + self.eval_plan(&*child, from) + .flat_map(move |tuple| match tuple { + Ok(tuple) => { + let mut iter = self.store.quads_for_pattern( + get_pattern_value(&subject, &tuple), + get_pattern_value(&predicate, &tuple), + get_pattern_value(&object, &tuple), + get_pattern_value(&graph_name, &tuple), + ); + if subject.is_var() && subject == predicate { + iter = Box::new(iter.filter(|quad| match quad { + Err(_) => true, + Ok(quad) => quad.subject == quad.predicate, + })) + } + if subject.is_var() && subject == object { + iter = Box::new(iter.filter(|quad| match quad { + Err(_) => true, + Ok(quad) => quad.subject == quad.object, + })) + } + if predicate.is_var() && predicate == object { + iter = Box::new(iter.filter(|quad| match quad { + Err(_) => true, + Ok(quad) => quad.predicate == quad.object, + })) + } + if graph_name.is_var() { + iter = Box::new(iter.filter(|quad| match quad { + Err(_) => true, + Ok(quad) => quad.graph_name != ENCODED_DEFAULT_GRAPH, + })); + if graph_name == subject { iter = Box::new(iter.filter(|quad| match quad { Err(_) => true, - Ok(quad) => quad.subject == quad.predicate, + Ok(quad) => quad.graph_name == quad.subject, })) } - if subject.is_var() && subject == object { + if graph_name == predicate { iter = Box::new(iter.filter(|quad| match quad { Err(_) => true, - Ok(quad) => quad.subject == quad.object, + Ok(quad) => quad.graph_name == quad.predicate, })) } - if predicate.is_var() && predicate == object { + if graph_name == object { iter = Box::new(iter.filter(|quad| match quad { Err(_) => true, - Ok(quad) => quad.predicate == quad.object, + Ok(quad) => quad.graph_name == quad.object, })) } - if graph_name.is_var() { - iter = Box::new(iter.filter(|quad| match quad { - Err(_) => true, - Ok(quad) => quad.graph_name != ENCODED_DEFAULT_GRAPH, - })); - if graph_name == subject { - iter = Box::new(iter.filter(|quad| match quad { - Err(_) => true, - Ok(quad) => quad.graph_name == quad.subject, - })) - } - if graph_name == predicate { - iter = Box::new(iter.filter(|quad| match quad { - Err(_) => true, - Ok(quad) => quad.graph_name == quad.predicate, - })) - } - if graph_name == object { - iter = Box::new(iter.filter(|quad| match quad { - Err(_) => true, - Ok(quad) => quad.graph_name == quad.object, - })) - } - } - let iter: EncodedTuplesIterator<'_> = - Box::new(iter.map(move |quad| { - let quad = quad?; - let mut new_tuple = tuple.clone(); - put_pattern_value(&subject, quad.subject, &mut new_tuple); - put_pattern_value( - &predicate, - quad.predicate, - &mut new_tuple, - ); - put_pattern_value(&object, quad.object, &mut new_tuple); - put_pattern_value( - &graph_name, - quad.graph_name, - &mut new_tuple, - ); - Ok(new_tuple) - })); - iter } - Err(error) => Box::new(once(Err(error))), - }), - ) - } + let iter: EncodedTuplesIterator<'_> = Box::new(iter.map(move |quad| { + let quad = quad?; + let mut new_tuple = tuple.clone(); + put_pattern_value(&subject, quad.subject, &mut new_tuple); + put_pattern_value(&predicate, quad.predicate, &mut new_tuple); + put_pattern_value(&object, quad.object, &mut new_tuple); + put_pattern_value(&graph_name, quad.graph_name, &mut new_tuple); + Ok(new_tuple) + })); + iter + } + Err(error) => Box::new(once(Err(error))), + }), + ), PlanNode::Join { left, right } => { //TODO: very dumb implementation let left_iter = self.eval_plan(&*left, from.clone()); @@ -208,7 +203,7 @@ impl SimpleEvaluator { eval: self.clone(), right_plan: &*right, left_iter: self.eval_plan(&*left, filtered_from), - current_right_iter: None, + current_right: Vec::default(), }; if problem_vars.is_empty() { Box::new(iter) @@ -236,7 +231,7 @@ impl SimpleEvaluator { eval: self.clone(), children_plan: &children, input_iter: self.eval_plan(&*entry, from), - current_iters: Vec::default(), + current: Vec::default(), }), PlanNode::Extend { child, @@ -853,11 +848,14 @@ impl SimpleEvaluator { } } - fn decode_bindings<'a>( - &self, - iter: EncodedTuplesIterator<'a>, + fn decode_bindings<'b>( + &'b self, + iter: EncodedTuplesIterator<'b>, variables: Vec, - ) -> BindingsIterator<'a> { + ) -> BindingsIterator<'b> + where + 'a: 'b, + { let store = self.store.clone(); BindingsIterator::new( variables, @@ -1124,31 +1122,31 @@ impl<'a> Iterator for JoinIterator<'a> { } } -struct LeftJoinIterator<'a, S: EncodedQuadsStore> { +struct LeftJoinIterator<'a, S: StoreConnection + 'a> { eval: SimpleEvaluator, right_plan: &'a PlanNode, left_iter: EncodedTuplesIterator<'a>, - current_right_iter: Option>, + current_right: Vec>, //TODO: keep using an iterator? } -impl<'a, S: EncodedQuadsStore> Iterator for LeftJoinIterator<'a, S> { +impl<'a, S: StoreConnection> Iterator for LeftJoinIterator<'a, S> { type Item = Result; fn next(&mut self) -> Option> { - if let Some(ref mut right_iter) = self.current_right_iter { - if let Some(tuple) = right_iter.next() { - return Some(tuple); - } + if let Some(tuple) = self.current_right.pop() { + return Some(tuple); } match self.left_iter.next()? { Ok(left_tuple) => { - let mut right_iter = self.eval.eval_plan(self.right_plan, left_tuple.clone()); - match right_iter.next() { - Some(right_tuple) => { - self.current_right_iter = Some(right_iter); - Some(right_tuple) - } - None => Some(Ok(left_tuple)), + let mut current_right: Vec<_> = self + .eval + .eval_plan(self.right_plan, left_tuple.clone()) + .collect(); + if let Some(right_tuple) = current_right.pop() { + self.current_right = current_right; + Some(right_tuple) + } else { + Some(Ok(left_tuple)) } } Err(error) => Some(Err(error)), @@ -1156,13 +1154,13 @@ impl<'a, S: EncodedQuadsStore> Iterator for LeftJoinIterator<'a, S> { } } -struct BadLeftJoinIterator<'a, S: EncodedQuadsStore> { +struct BadLeftJoinIterator<'a, S: StoreConnection> { input: EncodedTuple, iter: LeftJoinIterator<'a, S>, problem_vars: Vec, } -impl<'a, S: EncodedQuadsStore> Iterator for BadLeftJoinIterator<'a, S> { +impl<'a, S: StoreConnection> Iterator for BadLeftJoinIterator<'a, S> { type Item = Result; fn next(&mut self) -> Option> { @@ -1192,28 +1190,25 @@ impl<'a, S: EncodedQuadsStore> Iterator for BadLeftJoinIterator<'a, S> { } } -struct UnionIterator<'a, S: EncodedQuadsStore> { +struct UnionIterator<'a, S: StoreConnection + 'a> { eval: SimpleEvaluator, children_plan: &'a Vec, input_iter: EncodedTuplesIterator<'a>, - current_iters: Vec>, + current: Vec>, //TODO: avoid } -impl<'a, S: EncodedQuadsStore> Iterator for UnionIterator<'a, S> { +impl<'a, S: StoreConnection> Iterator for UnionIterator<'a, S> { type Item = Result; fn next(&mut self) -> Option> { - while let Some(mut iter) = self.current_iters.pop() { - if let Some(tuple) = iter.next() { - self.current_iters.push(iter); - return Some(tuple); - } + while let Some(tuple) = self.current.pop() { + return Some(tuple); } match self.input_iter.next()? { Ok(input_tuple) => { for plan in self.children_plan { - self.current_iters - .push(self.eval.eval_plan(plan, input_tuple.clone())); + self.current + .extend(self.eval.eval_plan(plan, input_tuple.clone())); } } Err(error) => return Some(Err(error)), @@ -1244,15 +1239,15 @@ impl<'a> Iterator for HashDeduplicateIterator<'a> { } } -struct ConstructIterator<'a, S: EncodedQuadsStore> { - store: Arc, +struct ConstructIterator<'a, S: StoreConnection> { + store: S, iter: EncodedTuplesIterator<'a>, template: &'a [TripleTemplate], buffered_results: Vec>, bnodes: Vec, } -impl<'a, S: EncodedQuadsStore> Iterator for ConstructIterator<'a, S> { +impl<'a, S: StoreConnection> Iterator for ConstructIterator<'a, S> { type Item = Result; fn next(&mut self) -> Option> { @@ -1315,42 +1310,39 @@ fn decode_triple( )) } -struct DescribeIterator<'a, S: EncodedQuadsStore> { - store: Arc, +struct DescribeIterator<'a, S: StoreConnection + 'a> { + store: S, iter: EncodedTuplesIterator<'a>, - quads_iters: Vec, + quads: Vec>, } -impl<'a, S: EncodedQuadsStore> Iterator for DescribeIterator<'a, S> { +impl<'a, S: StoreConnection> Iterator for DescribeIterator<'a, S> { type Item = Result; fn next(&mut self) -> Option> { - while let Some(mut quads_iter) = self.quads_iters.pop() { - if let Some(quad) = quads_iter.next() { - self.quads_iters.push(quads_iter); - return Some(quad.and_then(|quad| self.store.encoder().decode_triple(&quad))); - } + while let Some(quad) = self.quads.pop() { + return Some(match quad { + Ok(quad) => self + .store + .encoder() + .decode_quad(&quad) + .map(|q| q.into_triple()), + Err(error) => Err(error), + }); } let tuple = match self.iter.next()? { Ok(tuple) => tuple, Err(error) => return Some(Err(error)), }; - let mut error_to_return = None; for subject in tuple { if let Some(subject) = subject { - match self.store.quads_for_subject(subject) { - Ok(quads_iter) => self.quads_iters.push(quads_iter), - Err(error) => { - error_to_return = Some(error); - } - } + self.quads = self + .store + .quads_for_pattern(Some(subject), None, None, None) + .collect(); } } - if let Some(error) = error_to_return { - Some(Err(error)) - } else { - self.next() - } + self.next() } } diff --git a/lib/src/sparql/mod.rs b/lib/src/sparql/mod.rs index 044e194d..717aebbe 100644 --- a/lib/src/sparql/mod.rs +++ b/lib/src/sparql/mod.rs @@ -1,29 +1,5 @@ //! [SPARQL](https://www.w3.org/TR/sparql11-overview/) implementation. -//! -//! The support of RDF Dataset and SPARQL 1.1 specific features is not done yet. -//! -//! This module adds query capabilities to the `rudf::model::Dataset` implementations. -//! -//! Usage example: -//! ``` -//! use rudf::model::*; -//! use rudf::store::MemoryDataset; -//! use rudf::sparql::SparqlDataset; -//! use rudf::sparql::PreparedQuery; -//! use rudf::sparql::algebra::QueryResult; -//! use std::str::FromStr; -//! -//! let dataset = MemoryDataset::default(); -//! let ex = NamedNode::from_str("http://example.com").unwrap(); -//! dataset.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None)); -//! let prepared_query = dataset.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap(); -//! let results = prepared_query.exec().unwrap(); -//! if let QueryResult::Bindings(results) = results { -//! assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); -//! } -//! ``` -use crate::model::Dataset; use crate::sparql::algebra::Query; use crate::sparql::algebra::QueryResult; use crate::sparql::algebra::Variable; @@ -32,8 +8,7 @@ use crate::sparql::parser::read_sparql_query; use crate::sparql::plan::PlanBuilder; use crate::sparql::plan::PlanNode; use crate::sparql::plan::TripleTemplate; -use crate::store::encoded::EncodedQuadsStore; -use crate::store::encoded::StoreDataset; +use crate::store::StoreConnection; use crate::Result; use std::io::Read; @@ -43,50 +18,58 @@ pub mod parser; mod plan; pub mod xml_results; -/// An extension of the `rudf::model::Dataset` trait to allow SPARQL operations on it. -/// -/// It is implemented by all stores provided by Rudf -pub trait SparqlDataset: Dataset { - type PreparedQuery: PreparedQuery; - - /// Prepares a [SPARQL 1.1](https://www.w3.org/TR/sparql11-query/) query and returns an object that could be used to execute it - /// - /// The implementation is a work in progress, RDF Dataset and SPARQL 1.1 specific features are not implemented yet. - fn prepare_query(&self, query: impl Read) -> Result; -} - /// A prepared [SPARQL 1.1](https://www.w3.org/TR/sparql11-query/) query pub trait PreparedQuery { /// Evaluates the query and returns its results fn exec(&self) -> Result>; } -impl SparqlDataset for StoreDataset { - type PreparedQuery = SimplePreparedQuery; +/// An implementation of `PreparedQuery` for internal use +pub struct SimplePreparedQuery(SimplePreparedQueryOptions); + +enum SimplePreparedQueryOptions { + Select { + plan: PlanNode, + variables: Vec, + evaluator: SimpleEvaluator, + }, + Ask { + plan: PlanNode, + evaluator: SimpleEvaluator, + }, + Construct { + plan: PlanNode, + construct: Vec, + evaluator: SimpleEvaluator, + }, + Describe { + plan: PlanNode, + evaluator: SimpleEvaluator, + }, +} - fn prepare_query(&self, query: impl Read) -> Result> { - Ok(SimplePreparedQuery(match read_sparql_query(query, None)? { +impl SimplePreparedQuery { + pub(crate) fn new(connection: S, query: impl Read) -> Result { + Ok(Self(match read_sparql_query(query, None)? { Query::Select { algebra, dataset: _, } => { - let store = self.encoded(); - let (plan, variables) = PlanBuilder::build(&*store, &algebra)?; + let (plan, variables) = PlanBuilder::build(&connection, &algebra)?; SimplePreparedQueryOptions::Select { plan, variables, - evaluator: SimpleEvaluator::new(store), + evaluator: SimpleEvaluator::new(connection), } } Query::Ask { algebra, dataset: _, } => { - let store = self.encoded(); - let (plan, _) = PlanBuilder::build(&*store, &algebra)?; + let (plan, _) = PlanBuilder::build(&connection, &algebra)?; SimplePreparedQueryOptions::Ask { plan, - evaluator: SimpleEvaluator::new(store), + evaluator: SimpleEvaluator::new(connection), } } Query::Construct { @@ -94,54 +77,32 @@ impl SparqlDataset for StoreDataset { algebra, dataset: _, } => { - let store = self.encoded(); - let (plan, variables) = PlanBuilder::build(&*store, &algebra)?; + let (plan, variables) = PlanBuilder::build(&connection, &algebra)?; SimplePreparedQueryOptions::Construct { plan, - construct: PlanBuilder::build_graph_template(&*store, &construct, variables)?, - evaluator: SimpleEvaluator::new(store), + construct: PlanBuilder::build_graph_template( + &connection, + &construct, + variables, + )?, + evaluator: SimpleEvaluator::new(connection), } } Query::Describe { algebra, dataset: _, } => { - let store = self.encoded(); - let (plan, _) = PlanBuilder::build(&*store, &algebra)?; + let (plan, _) = PlanBuilder::build(&connection, &algebra)?; SimplePreparedQueryOptions::Describe { plan, - evaluator: SimpleEvaluator::new(store), + evaluator: SimpleEvaluator::new(connection), } } })) } } -/// An implementation of `PreparedQuery` for internal use -pub struct SimplePreparedQuery(SimplePreparedQueryOptions); - -enum SimplePreparedQueryOptions { - Select { - plan: PlanNode, - variables: Vec, - evaluator: SimpleEvaluator, - }, - Ask { - plan: PlanNode, - evaluator: SimpleEvaluator, - }, - Construct { - plan: PlanNode, - construct: Vec, - evaluator: SimpleEvaluator, - }, - Describe { - plan: PlanNode, - evaluator: SimpleEvaluator, - }, -} - -impl PreparedQuery for SimplePreparedQuery { +impl PreparedQuery for SimplePreparedQuery { fn exec(&self) -> Result> { match &self.0 { SimplePreparedQueryOptions::Select { diff --git a/lib/src/sparql/plan.rs b/lib/src/sparql/plan.rs index 31df3da7..fad97c49 100644 --- a/lib/src/sparql/plan.rs +++ b/lib/src/sparql/plan.rs @@ -1,9 +1,9 @@ use crate::model::vocab::xsd; use crate::model::Literal; use crate::sparql::algebra::*; -use crate::store::encoded::EncodedQuadsStore; use crate::store::numeric_encoder::EncodedTerm; use crate::store::numeric_encoder::ENCODED_DEFAULT_GRAPH; +use crate::store::StoreConnection; use crate::Result; use failure::format_err; use std::collections::BTreeSet; @@ -355,12 +355,12 @@ pub enum TripleTemplateValue { Variable(usize), } -pub struct PlanBuilder<'a, S: EncodedQuadsStore> { +pub struct PlanBuilder<'a, S: StoreConnection> { store: &'a S, } -impl<'a, S: EncodedQuadsStore> PlanBuilder<'a, S> { - pub fn build(store: &S, pattern: &GraphPattern) -> Result<(PlanNode, Vec)> { +impl<'a, S: StoreConnection> PlanBuilder<'a, S> { + pub fn build(store: &'a S, pattern: &GraphPattern) -> Result<(PlanNode, Vec)> { let mut variables = Vec::default(); let plan = PlanBuilder { store }.build_for_graph_pattern( pattern, diff --git a/lib/src/store/encoded.rs b/lib/src/store/encoded.rs deleted file mode 100644 index a6c0c2c6..00000000 --- a/lib/src/store/encoded.rs +++ /dev/null @@ -1,966 +0,0 @@ -use crate::model::*; -use crate::store::numeric_encoder::*; -use crate::Result; -use failure::format_err; -use std::fmt; -use std::iter::empty; -use std::iter::once; -use std::iter::FromIterator; -use std::iter::Iterator; -use std::sync::Arc; - -/// Defines the Store traits that is used to have efficient binary storage - -pub trait EncodedQuadsStore: StringStore + Sized + 'static { - type QuadsIterator: Iterator> + 'static; - type QuadsForSubjectIterator: Iterator> + 'static; - type QuadsForSubjectPredicateIterator: Iterator> + 'static; - type QuadsForSubjectPredicateObjectIterator: Iterator> + 'static; - type QuadsForSubjectObjectIterator: Iterator> + 'static; - type QuadsForPredicateIterator: Iterator> + 'static; - type QuadsForPredicateObjectIterator: Iterator> + 'static; - type QuadsForObjectIterator: Iterator> + 'static; - type QuadsForGraphIterator: Iterator> + 'static; - type QuadsForSubjectGraphIterator: Iterator> + 'static; - type QuadsForSubjectPredicateGraphIterator: Iterator> + 'static; - type QuadsForSubjectObjectGraphIterator: Iterator> + 'static; - type QuadsForPredicateGraphIterator: Iterator> + 'static; - type QuadsForPredicateObjectGraphIterator: Iterator> + 'static; - type QuadsForObjectGraphIterator: Iterator> + 'static; - - fn encoder(&self) -> Encoder<&Self> { - Encoder::new(&self) - } - - fn quads(&self) -> Result; - fn quads_for_subject(&self, subject: EncodedTerm) -> Result; - fn quads_for_subject_predicate( - &self, - subject: EncodedTerm, - predicate: EncodedTerm, - ) -> Result; - fn quads_for_subject_predicate_object( - &self, - subject: EncodedTerm, - predicate: EncodedTerm, - object: EncodedTerm, - ) -> Result; - fn quads_for_subject_object( - &self, - subject: EncodedTerm, - object: EncodedTerm, - ) -> Result; - fn quads_for_predicate( - &self, - predicate: EncodedTerm, - ) -> Result; - fn quads_for_predicate_object( - &self, - predicate: EncodedTerm, - object: EncodedTerm, - ) -> Result; - fn quads_for_object(&self, object: EncodedTerm) -> Result; - fn quads_for_graph(&self, graph_name: EncodedTerm) -> Result; - fn quads_for_subject_graph( - &self, - subject: EncodedTerm, - graph_name: EncodedTerm, - ) -> Result; - fn quads_for_subject_predicate_graph( - &self, - subject: EncodedTerm, - predicate: EncodedTerm, - graph_name: EncodedTerm, - ) -> Result; - fn quads_for_subject_object_graph( - &self, - subject: EncodedTerm, - object: EncodedTerm, - graph_name: EncodedTerm, - ) -> Result; - fn quads_for_predicate_graph( - &self, - predicate: EncodedTerm, - graph_name: EncodedTerm, - ) -> Result; - fn quads_for_predicate_object_graph( - &self, - predicate: EncodedTerm, - object: EncodedTerm, - graph_name: EncodedTerm, - ) -> Result; - fn quads_for_object_graph( - &self, - object: EncodedTerm, - graph_name: EncodedTerm, - ) -> Result; - fn contains(&self, quad: &EncodedQuad) -> Result; - fn insert(&self, quad: &EncodedQuad) -> Result<()>; - fn remove(&self, quad: &EncodedQuad) -> Result<()>; - fn quads_for_pattern( - &self, - subject: Option, - predicate: Option, - object: Option, - graph_name: Option, - ) -> Box>> { - match subject { - Some(subject) => match predicate { - Some(predicate) => match object { - Some(object) => match graph_name { - Some(graph_name) => { - let quad = EncodedQuad::new(subject, predicate, object, graph_name); - match self.contains(&quad) { - Ok(true) => Box::new(once(Ok(quad))), - Ok(false) => Box::new(empty()), - Err(error) => Box::new(once(Err(error))), - } - } - None => wrap_error( - self.quads_for_subject_predicate_object(subject, predicate, object), - ), - }, - None => match graph_name { - Some(graph_name) => wrap_error( - self.quads_for_subject_predicate_graph(subject, predicate, graph_name), - ), - None => wrap_error(self.quads_for_subject_predicate(subject, predicate)), - }, - }, - None => match object { - Some(object) => match graph_name { - Some(graph_name) => wrap_error( - self.quads_for_subject_object_graph(subject, object, graph_name), - ), - None => wrap_error(self.quads_for_subject_object(subject, object)), - }, - None => match graph_name { - Some(graph_name) => { - wrap_error(self.quads_for_subject_graph(subject, graph_name)) - } - None => wrap_error(self.quads_for_subject(subject)), - }, - }, - }, - None => match predicate { - Some(predicate) => match object { - Some(object) => match graph_name { - Some(graph_name) => wrap_error( - self.quads_for_predicate_object_graph(predicate, object, graph_name), - ), - None => wrap_error(self.quads_for_predicate_object(predicate, object)), - }, - None => match graph_name { - Some(graph_name) => { - wrap_error(self.quads_for_predicate_graph(predicate, graph_name)) - } - None => wrap_error(self.quads_for_predicate(predicate)), - }, - }, - None => match object { - Some(object) => match graph_name { - Some(graph_name) => { - wrap_error(self.quads_for_object_graph(object, graph_name)) - } - None => wrap_error(self.quads_for_object(object)), - }, - None => match graph_name { - Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)), - None => wrap_error(self.quads()), - }, - }, - }, - } - } -} - -fn wrap_error> + 'static>( - iter: Result, -) -> Box>> { - match iter { - Ok(iter) => Box::new(iter), - Err(error) => Box::new(once(Err(error))), - } -} - -pub struct StoreDataset { - store: Arc, -} - -impl StoreDataset { - pub fn new_from_store(store: S) -> Self { - Self { - store: Arc::new(store), - } - } - - pub(crate) fn encoded(&self) -> Arc { - self.store.clone() - } -} - -impl Dataset for StoreDataset { - type NamedGraph = StoreNamedGraph; - type DefaultGraph = StoreDefaultGraph; - type UnionGraph = StoreUnionGraph; - type QuadsIterator = QuadsIterator; - type QuadsForSubjectIterator = QuadsIterator; - type QuadsForSubjectPredicateIterator = QuadsIterator; - type QuadsForSubjectPredicateObjectIterator = - QuadsIterator; - type QuadsForSubjectObjectIterator = QuadsIterator; - type QuadsForPredicateIterator = QuadsIterator; - type QuadsForPredicateObjectIterator = QuadsIterator; - type QuadsForObjectIterator = QuadsIterator; - - fn named_graph(&self, name: &NamedOrBlankNode) -> Result> { - Ok(StoreNamedGraph { - store: self.store.clone(), - name: name.clone(), - encoded_name: self.store.encoder().encode_named_or_blank_node(name)?, - }) - } - - fn default_graph(&self) -> StoreDefaultGraph { - StoreDefaultGraph { - store: self.store.clone(), - } - } - - fn union_graph(&self) -> StoreUnionGraph { - StoreUnionGraph { - store: self.store.clone(), - } - } - - fn quads(&self) -> Result> { - Ok(QuadsIterator { - iter: self.store.quads()?, - store: self.store.clone(), - }) - } - - fn quads_for_subject( - &self, - subject: &NamedOrBlankNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(QuadsIterator { - iter: self - .store - .quads_for_subject(encoder.encode_named_or_blank_node(subject)?)?, - store: self.store.clone(), - }) - } - - fn quads_for_subject_predicate( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(QuadsIterator { - iter: self.store.quads_for_subject_predicate( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_named_node(predicate)?, - )?, - store: self.store.clone(), - }) - } - - fn quads_for_subject_predicate_object( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(QuadsIterator { - iter: self.store.quads_for_subject_predicate_object( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_named_node(predicate)?, - encoder.encode_term(object)?, - )?, - store: self.store.clone(), - }) - } - - fn quads_for_subject_object( - &self, - subject: &NamedOrBlankNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(QuadsIterator { - iter: self.store.quads_for_subject_object( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_term(object)?, - )?, - store: self.store.clone(), - }) - } - - fn quads_for_predicate( - &self, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(QuadsIterator { - iter: self - .store - .quads_for_predicate(encoder.encode_named_node(predicate)?)?, - store: self.store.clone(), - }) - } - - fn quads_for_predicate_object( - &self, - predicate: &NamedNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(QuadsIterator { - iter: self.store.quads_for_predicate_object( - encoder.encode_named_node(predicate)?, - encoder.encode_term(object)?, - )?, - store: self.store.clone(), - }) - } - - fn quads_for_object( - &self, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(QuadsIterator { - iter: self.store.quads_for_object(encoder.encode_term(object)?)?, - store: self.store.clone(), - }) - } - - fn contains(&self, quad: &Quad) -> Result { - self.store - .contains(&self.store.encoder().encode_quad(quad)?) - } - - fn insert(&self, quad: &Quad) -> Result<()> { - self.store.insert(&self.store.encoder().encode_quad(quad)?) - } - - fn remove(&self, quad: &Quad) -> Result<()> { - self.store.remove(&self.store.encoder().encode_quad(quad)?) - } - - fn len(&self) -> Result { - Ok(self.store.quads()?.count()) - } - - fn is_empty(&self) -> Result { - Ok(self.store.quads()?.any(|_| true)) - } -} - -impl fmt::Display for StoreDataset { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - for quad in self.iter().map_err(|_| fmt::Error)? { - writeln!(fmt, "{}", quad.map_err(|_| fmt::Error)?)?; - } - Ok(()) - } -} - -impl Default for StoreDataset { - fn default() -> Self { - Self::new_from_store(S::default()) - } -} - -impl FromIterator for StoreDataset { - fn from_iter>(iter: I) -> Self { - let dataset = Self::default(); - for quad in iter { - dataset.insert(&quad).unwrap(); - } - dataset - } -} - -impl<'a, S: EncodedQuadsStore + Default> FromIterator<&'a Quad> for StoreDataset { - fn from_iter>(iter: I) -> Self { - let dataset = Self::default(); - for quad in iter { - dataset.insert(quad).unwrap(); - } - dataset - } -} - -pub struct StoreNamedGraph { - store: Arc, - name: NamedOrBlankNode, - encoded_name: EncodedTerm, -} - -impl Graph for StoreNamedGraph { - type TriplesIterator = TriplesIterator; - type TriplesForSubjectIterator = TriplesIterator; - type ObjectsForSubjectPredicateIterator = - ObjectsIterator; - type PredicatesForSubjectObjectIterator = - PredicatesIterator; - type TriplesForPredicateIterator = TriplesIterator; - type SubjectsForPredicateObjectIterator = - SubjectsIterator; - type TriplesForObjectIterator = TriplesIterator; - - fn triples(&self) -> Result> { - Ok(TriplesIterator { - iter: self.store.quads_for_graph(self.encoded_name)?, - store: self.store.clone(), - }) - } - - fn triples_for_subject( - &self, - subject: &NamedOrBlankNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self.store.quads_for_subject_graph( - encoder.encode_named_or_blank_node(subject)?, - self.encoded_name, - )?, - store: self.store.clone(), - }) - } - fn objects_for_subject_predicate( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(ObjectsIterator { - iter: self.store.quads_for_subject_predicate_graph( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_named_node(predicate)?, - self.encoded_name, - )?, - store: self.store.clone(), - }) - } - fn predicates_for_subject_object( - &self, - subject: &NamedOrBlankNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(PredicatesIterator { - iter: self.store.quads_for_subject_object_graph( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_term(object)?, - self.encoded_name, - )?, - store: self.store.clone(), - }) - } - fn triples_for_predicate( - &self, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self.store.quads_for_predicate_graph( - encoder.encode_named_node(predicate)?, - self.encoded_name, - )?, - store: self.store.clone(), - }) - } - fn subjects_for_predicate_object( - &self, - predicate: &NamedNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(SubjectsIterator { - iter: self.store.quads_for_predicate_object_graph( - encoder.encode_named_node(predicate)?, - encoder.encode_term(object)?, - self.encoded_name, - )?, - store: self.store.clone(), - }) - } - fn triples_for_object( - &self, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self - .store - .quads_for_object_graph(encoder.encode_term(object)?, self.encoded_name)?, - store: self.store.clone(), - }) - } - - fn contains(&self, triple: &Triple) -> Result { - self.store.contains( - &self - .store - .encoder() - .encode_triple_in_graph(triple, self.encoded_name)?, - ) - } - - fn insert(&self, triple: &Triple) -> Result<()> { - self.store.insert( - &self - .store - .encoder() - .encode_triple_in_graph(triple, self.encoded_name)?, - ) - } - - fn remove(&self, triple: &Triple) -> Result<()> { - self.store.remove( - &self - .store - .encoder() - .encode_triple_in_graph(triple, self.encoded_name)?, - ) - } - - fn len(&self) -> Result { - Ok(self.store.quads_for_graph(self.encoded_name)?.count()) - } - - fn is_empty(&self) -> Result { - Ok(self.store.quads_for_graph(self.encoded_name)?.any(|_| true)) - } -} - -impl NamedGraph for StoreNamedGraph { - fn name(&self) -> &NamedOrBlankNode { - &self.name - } -} - -impl fmt::Display for StoreNamedGraph { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - for triple in self.iter().map_err(|_| fmt::Error)? { - writeln!(fmt, "{}", triple.map_err(|_| fmt::Error)?)?; - } - Ok(()) - } -} - -pub struct StoreDefaultGraph { - store: Arc, -} - -impl Graph for StoreDefaultGraph { - type TriplesIterator = TriplesIterator; - type TriplesForSubjectIterator = TriplesIterator; - type ObjectsForSubjectPredicateIterator = - ObjectsIterator; - type PredicatesForSubjectObjectIterator = - PredicatesIterator; - type TriplesForPredicateIterator = TriplesIterator; - type SubjectsForPredicateObjectIterator = - SubjectsIterator; - type TriplesForObjectIterator = TriplesIterator; - - fn triples(&self) -> Result> { - Ok(TriplesIterator { - iter: self.store.quads_for_graph(ENCODED_DEFAULT_GRAPH)?, - store: self.store.clone(), - }) - } - - fn triples_for_subject( - &self, - subject: &NamedOrBlankNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self.store.quads_for_subject_graph( - encoder.encode_named_or_blank_node(subject)?, - ENCODED_DEFAULT_GRAPH, - )?, - store: self.store.clone(), - }) - } - fn objects_for_subject_predicate( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(ObjectsIterator { - iter: self.store.quads_for_subject_predicate_graph( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_named_node(predicate)?, - ENCODED_DEFAULT_GRAPH, - )?, - store: self.store.clone(), - }) - } - fn predicates_for_subject_object( - &self, - subject: &NamedOrBlankNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(PredicatesIterator { - iter: self.store.quads_for_subject_object_graph( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_term(object)?, - ENCODED_DEFAULT_GRAPH, - )?, - store: self.store.clone(), - }) - } - fn triples_for_predicate( - &self, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self.store.quads_for_predicate_graph( - encoder.encode_named_node(predicate)?, - ENCODED_DEFAULT_GRAPH, - )?, - store: self.store.clone(), - }) - } - fn subjects_for_predicate_object( - &self, - predicate: &NamedNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(SubjectsIterator { - iter: self.store.quads_for_predicate_object_graph( - encoder.encode_named_node(predicate)?, - encoder.encode_term(object)?, - ENCODED_DEFAULT_GRAPH, - )?, - store: self.store.clone(), - }) - } - fn triples_for_object( - &self, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self - .store - .quads_for_object_graph(encoder.encode_term(object)?, ENCODED_DEFAULT_GRAPH)?, - store: self.store.clone(), - }) - } - - fn contains(&self, triple: &Triple) -> Result { - self.store.contains( - &self - .store - .encoder() - .encode_triple_in_graph(triple, ENCODED_DEFAULT_GRAPH)?, - ) - } - - fn insert(&self, triple: &Triple) -> Result<()> { - self.store.insert( - &self - .store - .encoder() - .encode_triple_in_graph(triple, ENCODED_DEFAULT_GRAPH)?, - ) - } - - fn remove(&self, triple: &Triple) -> Result<()> { - self.store.remove( - &self - .store - .encoder() - .encode_triple_in_graph(triple, ENCODED_DEFAULT_GRAPH)?, - ) - } - - fn len(&self) -> Result { - Ok(self.store.quads_for_graph(ENCODED_DEFAULT_GRAPH)?.count()) - } - - fn is_empty(&self) -> Result { - Ok(self - .store - .quads_for_graph(ENCODED_DEFAULT_GRAPH)? - .any(|_| true)) - } -} - -impl fmt::Display for StoreDefaultGraph { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - for triple in self.iter().map_err(|_| fmt::Error)? { - writeln!(fmt, "{}", triple.map_err(|_| fmt::Error)?)?; - } - Ok(()) - } -} - -impl Default for StoreDefaultGraph { - fn default() -> Self { - StoreDataset::default().default_graph() - } -} - -impl FromIterator for StoreDefaultGraph { - fn from_iter>(iter: I) -> Self { - let graph = Self::default(); - for triple in iter { - graph.insert(&triple).unwrap(); - } - graph - } -} - -impl<'a, S: EncodedQuadsStore + Default> FromIterator<&'a Triple> for StoreDefaultGraph { - fn from_iter>(iter: I) -> Self { - let graph = Self::default(); - for triple in iter { - graph.insert(triple).unwrap(); - } - graph - } -} - -pub struct StoreUnionGraph { - store: Arc, -} - -impl Graph for StoreUnionGraph { - type TriplesIterator = TriplesIterator; - type TriplesForSubjectIterator = TriplesIterator; - type ObjectsForSubjectPredicateIterator = - ObjectsIterator; - type PredicatesForSubjectObjectIterator = - PredicatesIterator; - type TriplesForPredicateIterator = TriplesIterator; - type SubjectsForPredicateObjectIterator = - SubjectsIterator; - type TriplesForObjectIterator = TriplesIterator; - - fn triples(&self) -> Result> { - Ok(TriplesIterator { - iter: self.store.quads()?, - store: self.store.clone(), - }) - } - - fn triples_for_subject( - &self, - subject: &NamedOrBlankNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self - .store - .quads_for_subject(encoder.encode_named_or_blank_node(subject)?)?, - store: self.store.clone(), - }) - } - fn objects_for_subject_predicate( - &self, - subject: &NamedOrBlankNode, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(ObjectsIterator { - iter: self.store.quads_for_subject_predicate( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_named_node(predicate)?, - )?, - store: self.store.clone(), - }) - } - fn predicates_for_subject_object( - &self, - subject: &NamedOrBlankNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(PredicatesIterator { - iter: self.store.quads_for_subject_object( - encoder.encode_named_or_blank_node(subject)?, - encoder.encode_term(object)?, - )?, - store: self.store.clone(), - }) - } - fn triples_for_predicate( - &self, - predicate: &NamedNode, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self - .store - .quads_for_predicate(encoder.encode_named_node(predicate)?)?, - store: self.store.clone(), - }) - } - fn subjects_for_predicate_object( - &self, - predicate: &NamedNode, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(SubjectsIterator { - iter: self.store.quads_for_predicate_object( - encoder.encode_named_node(predicate)?, - encoder.encode_term(object)?, - )?, - store: self.store.clone(), - }) - } - - fn triples_for_object( - &self, - object: &Term, - ) -> Result> { - let encoder = self.store.encoder(); - Ok(TriplesIterator { - iter: self.store.quads_for_object(encoder.encode_term(object)?)?, - store: self.store.clone(), - }) - } - - fn contains(&self, triple: &Triple) -> Result { - let encoder = self.store.encoder(); - Ok(self - .store - .quads_for_subject_predicate_object( - encoder.encode_named_or_blank_node(triple.subject())?, - encoder.encode_named_node(triple.predicate())?, - encoder.encode_term(triple.object())?, - )? - .any(|_| true)) - } - - fn insert(&self, _triple: &Triple) -> Result<()> { - Err(format_err!("Union graph is not writable")) - } - - fn remove(&self, _triple: &Triple) -> Result<()> { - Err(format_err!("Union graph is not writable")) - } - - fn len(&self) -> Result { - Ok(self.store.quads()?.count()) - } - - fn is_empty(&self) -> Result { - Ok(self.store.quads()?.any(|_| true)) - } -} - -impl fmt::Display for StoreUnionGraph { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - for triple in self.iter().map_err(|_| fmt::Error)? { - writeln!(fmt, "{}", triple.map_err(|_| fmt::Error)?)?; - } - Ok(()) - } -} - -pub struct QuadsIterator>, S: EncodedQuadsStore> { - iter: I, - store: Arc, -} - -impl>, S: EncodedQuadsStore> Iterator - for QuadsIterator -{ - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter - .next() - .map(|k| k.and_then(|quad| self.store.encoder().decode_quad(&quad))) - } -} - -pub struct TriplesIterator>, S: EncodedQuadsStore> { - iter: I, - store: Arc, -} - -impl>, S: EncodedQuadsStore> Iterator - for TriplesIterator -{ - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter - .next() - .map(|k| k.and_then(|quad| self.store.encoder().decode_triple(&quad))) - } -} - -pub struct SubjectsIterator>, S: EncodedQuadsStore> { - iter: I, - store: Arc, -} - -impl>, S: EncodedQuadsStore> Iterator - for SubjectsIterator -{ - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter.next().map(|k| { - k.and_then(|quad| { - self.store - .encoder() - .decode_named_or_blank_node(quad.subject) - }) - }) - } -} - -pub struct PredicatesIterator>, S: EncodedQuadsStore> { - iter: I, - store: Arc, -} - -impl>, S: EncodedQuadsStore> Iterator - for PredicatesIterator -{ - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter - .next() - .map(|k| k.and_then(|quad| self.store.encoder().decode_named_node(quad.predicate))) - } -} - -pub struct ObjectsIterator>, S: EncodedQuadsStore> { - iter: I, - store: Arc, -} - -impl>, S: EncodedQuadsStore> Iterator - for ObjectsIterator -{ - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter - .next() - .map(|k| k.and_then(|quad| self.store.encoder().decode_term(quad.object))) - } -} diff --git a/lib/src/store/memory.rs b/lib/src/store/memory.rs index a8e575b3..62ba7828 100644 --- a/lib/src/store/memory.rs +++ b/lib/src/store/memory.rs @@ -1,64 +1,58 @@ use crate::model::LanguageTag; -use crate::store::encoded::*; use crate::store::numeric_encoder::*; -use crate::Result; +use crate::store::*; +use crate::{Repository, Result}; use std::collections::BTreeMap; use std::collections::BTreeSet; +use std::iter::empty; +use std::iter::once; use std::sync::RwLock; use std::sync::RwLockReadGuard; use std::sync::RwLockWriteGuard; -/// Memory based implementation of the `rudf::model::Dataset` trait. -/// They are cheap to build using the `MemoryDataset::default()` method. +/// Memory based implementation of the `Repository` trait. +/// They are cheap to build using the `MemoryRepository::default()` method. /// /// Usage example: /// ``` /// use rudf::model::*; -/// use rudf::store::MemoryDataset; +/// use rudf::{Repository, RepositoryConnection, MemoryRepository, Result}; +/// use crate::rudf::sparql::PreparedQuery; /// use std::str::FromStr; +/// use rudf::sparql::algebra::QueryResult; /// -/// let dataset = MemoryDataset::default(); -/// let default_graph = dataset.default_graph(); +/// let repository = MemoryRepository::default(); +/// let connection = repository.connection().unwrap(); +/// +/// // insertion /// let ex = NamedNode::from_str("http://example.com").unwrap(); -/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone()); -/// default_graph.insert(&triple); -/// let results: Vec = dataset.quads_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect(); -/// assert_eq!(vec![triple.in_graph(None)], results); -/// ``` -pub type MemoryDataset = StoreDataset; - -/// Memory based implementation of the `rudf::model::Graph` trait. -/// They are cheap to build using the `MemoryGraph::default()` method. +/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); +/// connection.insert(&quad); /// -/// Usage example: -/// ``` -/// use rudf::model::*; -/// use rudf::store::MemoryGraph; -/// use std::str::FromStr; +/// // quad filter +/// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); +/// assert_eq!(vec![quad], results.unwrap()); /// -/// let graph = MemoryGraph::default(); -/// let ex = NamedNode::from_str("http://example.com").unwrap(); -/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone()); -/// graph.insert(&triple); -/// let results: Vec = graph.triples_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect(); -/// assert_eq!(vec![triple], results); +/// // SPARQL query +/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap(); +/// let results = prepared_query.exec().unwrap(); +/// if let QueryResult::Bindings(results) = results { +/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +/// } /// ``` -pub type MemoryGraph = StoreDefaultGraph; +#[derive(Default)] +pub struct MemoryRepository { + inner: MemoryStore, +} + +pub type MemoryRepositoryConnection<'a> = StoreRepositoryConnection<&'a MemoryStore>; +#[derive(Default)] pub struct MemoryStore { string_store: MemoryStringStore, graph_indexes: RwLock>, } -impl Default for MemoryStore { - fn default() -> Self { - Self { - string_store: MemoryStringStore::default(), - graph_indexes: RwLock::default(), - } - } -} - #[derive(Default)] struct MemoryGraphIndexes { spo: BTreeMap>>, @@ -66,6 +60,22 @@ struct MemoryGraphIndexes { osp: BTreeMap>>, } +impl<'a> Repository for &'a MemoryRepository { + type Connection = MemoryRepositoryConnection<'a>; + + fn connection(self) -> Result> { + Ok(self.inner.connection()?.into()) + } +} + +impl<'a> Store for &'a MemoryStore { + type Connection = &'a MemoryStore; + + fn connection(self) -> Result { + Ok(self) + } +} + impl StringStore for MemoryStore { type StringType = String; @@ -82,25 +92,203 @@ impl StringStore for MemoryStore { } } -impl EncodedQuadsStore for MemoryStore { - type QuadsIterator = > as IntoIterator>::IntoIter; - type QuadsForSubjectIterator = > as IntoIterator>::IntoIter; - type QuadsForSubjectPredicateIterator = > as IntoIterator>::IntoIter; - type QuadsForSubjectPredicateObjectIterator = - > as IntoIterator>::IntoIter; - type QuadsForSubjectObjectIterator = > as IntoIterator>::IntoIter; - type QuadsForPredicateIterator = > as IntoIterator>::IntoIter; - type QuadsForPredicateObjectIterator = > as IntoIterator>::IntoIter; - type QuadsForObjectIterator = > as IntoIterator>::IntoIter; - type QuadsForGraphIterator = > as IntoIterator>::IntoIter; - type QuadsForSubjectGraphIterator = > as IntoIterator>::IntoIter; - type QuadsForSubjectPredicateGraphIterator = - > as IntoIterator>::IntoIter; - type QuadsForSubjectObjectGraphIterator = > as IntoIterator>::IntoIter; - type QuadsForPredicateGraphIterator = > as IntoIterator>::IntoIter; - type QuadsForPredicateObjectGraphIterator = - > as IntoIterator>::IntoIter; - type QuadsForObjectGraphIterator = > as IntoIterator>::IntoIter; +impl<'a> StoreConnection for &'a MemoryStore { + fn contains(&self, quad: &EncodedQuad) -> Result { + Ok(self + .graph_indexes()? + .get(&quad.graph_name) + .map_or(false, |graph| { + graph.spo.get(&quad.subject).map_or(false, |po| { + po.get(&quad.predicate) + .map_or(false, |o| o.contains(&quad.object)) + }) + })) + } + + fn insert(&self, quad: &EncodedQuad) -> Result<()> { + let mut graph_indexes = self.graph_indexes_mut()?; + let graph = graph_indexes + .entry(quad.graph_name) + .or_insert_with(MemoryGraphIndexes::default); + graph + .spo + .entry(quad.subject) + .or_default() + .entry(quad.predicate) + .or_default() + .insert(quad.object); + graph + .pos + .entry(quad.predicate) + .or_default() + .entry(quad.object) + .or_default() + .insert(quad.subject); + graph + .osp + .entry(quad.object) + .or_default() + .entry(quad.subject) + .or_default() + .insert(quad.predicate); + Ok(()) + } + + fn remove(&self, quad: &EncodedQuad) -> Result<()> { + let mut graph_indexes = self.graph_indexes_mut()?; + let mut empty_graph = false; + if let Some(graph) = graph_indexes.get_mut(&quad.graph_name) { + { + let mut empty_pos = false; + if let Some(pos) = graph.spo.get_mut(&quad.subject) { + let mut empty_os = false; + if let Some(os) = pos.get_mut(&quad.predicate) { + os.remove(&quad.object); + empty_os = os.is_empty(); + } + if empty_os { + pos.remove(&quad.predicate); + } + empty_pos = pos.is_empty(); + } + if empty_pos { + graph.spo.remove(&quad.subject); + } + } + + { + let mut empty_oss = false; + if let Some(oss) = graph.pos.get_mut(&quad.predicate) { + let mut empty_ss = false; + if let Some(ss) = oss.get_mut(&quad.object) { + ss.remove(&quad.subject); + empty_ss = ss.is_empty(); + } + if empty_ss { + oss.remove(&quad.object); + } + empty_oss = oss.is_empty(); + } + if empty_oss { + graph.pos.remove(&quad.predicate); + } + } + + { + let mut empty_sps = false; + if let Some(sps) = graph.osp.get_mut(&quad.object) { + let mut empty_ps = false; + if let Some(ps) = sps.get_mut(&quad.subject) { + ps.remove(&quad.predicate); + empty_ps = ps.is_empty(); + } + if empty_ps { + sps.remove(&quad.subject); + } + empty_sps = sps.is_empty(); + } + if empty_sps { + graph.osp.remove(&quad.object); + } + } + + empty_graph = graph.spo.is_empty(); + } + if empty_graph { + graph_indexes.remove(&quad.graph_name); + } + Ok(()) + } + + fn quads_for_pattern<'b>( + &'b self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> Box> + 'b> { + match subject { + Some(subject) => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => { + let quad = EncodedQuad::new(subject, predicate, object, graph_name); + match self.contains(&quad) { + Ok(true) => Box::new(once(Ok(quad))), + Ok(false) => Box::new(empty()), + Err(error) => Box::new(once(Err(error))), + } + } + None => wrap_error( + self.quads_for_subject_predicate_object(subject, predicate, object), + ), + }, + None => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_subject_predicate_graph(subject, predicate, graph_name), + ), + None => wrap_error(self.quads_for_subject_predicate(subject, predicate)), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_subject_object_graph(subject, object, graph_name), + ), + None => wrap_error(self.quads_for_subject_object(subject, object)), + }, + None => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_subject_graph(subject, graph_name)) + } + None => wrap_error(self.quads_for_subject(subject)), + }, + }, + }, + None => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_predicate_object_graph(predicate, object, graph_name), + ), + None => wrap_error(self.quads_for_predicate_object(predicate, object)), + }, + None => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_predicate_graph(predicate, graph_name)) + } + None => wrap_error(self.quads_for_predicate(predicate)), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_object_graph(object, graph_name)) + } + None => wrap_error(self.quads_for_object(object)), + }, + None => match graph_name { + Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)), + None => wrap_error(self.quads()), + }, + }, + }, + } + } +} + +impl MemoryStore { + fn graph_indexes( + &self, + ) -> Result>> { + Ok(self.graph_indexes.read().map_err(MutexPoisonError::from)?) + } + + fn graph_indexes_mut( + &self, + ) -> Result>> { + Ok(self.graph_indexes.write().map_err(MutexPoisonError::from)?) + } fn quads(&self) -> Result<> as IntoIterator>::IntoIter> { let mut result = Vec::default(); @@ -372,125 +560,13 @@ impl EncodedQuadsStore for MemoryStore { } Ok(result.into_iter()) } - - fn contains(&self, quad: &EncodedQuad) -> Result { - Ok(self - .graph_indexes()? - .get(&quad.graph_name) - .map_or(false, |graph| { - graph.spo.get(&quad.subject).map_or(false, |po| { - po.get(&quad.predicate) - .map_or(false, |o| o.contains(&quad.object)) - }) - })) - } - - fn insert(&self, quad: &EncodedQuad) -> Result<()> { - let mut graph_indexes = self.graph_indexes_mut()?; - let graph = graph_indexes - .entry(quad.graph_name) - .or_insert_with(MemoryGraphIndexes::default); - graph - .spo - .entry(quad.subject) - .or_default() - .entry(quad.predicate) - .or_default() - .insert(quad.object); - graph - .pos - .entry(quad.predicate) - .or_default() - .entry(quad.object) - .or_default() - .insert(quad.subject); - graph - .osp - .entry(quad.object) - .or_default() - .entry(quad.subject) - .or_default() - .insert(quad.predicate); - Ok(()) - } - - fn remove(&self, quad: &EncodedQuad) -> Result<()> { - let mut graph_indexes = self.graph_indexes_mut()?; - let mut empty_graph = false; - if let Some(graph) = graph_indexes.get_mut(&quad.graph_name) { - { - let mut empty_pos = false; - if let Some(pos) = graph.spo.get_mut(&quad.subject) { - let mut empty_os = false; - if let Some(os) = pos.get_mut(&quad.predicate) { - os.remove(&quad.object); - empty_os = os.is_empty(); - } - if empty_os { - pos.remove(&quad.predicate); - } - empty_pos = pos.is_empty(); - } - if empty_pos { - graph.spo.remove(&quad.subject); - } - } - - { - let mut empty_oss = false; - if let Some(oss) = graph.pos.get_mut(&quad.predicate) { - let mut empty_ss = false; - if let Some(ss) = oss.get_mut(&quad.object) { - ss.remove(&quad.subject); - empty_ss = ss.is_empty(); - } - if empty_ss { - oss.remove(&quad.object); - } - empty_oss = oss.is_empty(); - } - if empty_oss { - graph.pos.remove(&quad.predicate); - } - } - - { - let mut empty_sps = false; - if let Some(sps) = graph.osp.get_mut(&quad.object) { - let mut empty_ps = false; - if let Some(ps) = sps.get_mut(&quad.subject) { - ps.remove(&quad.predicate); - empty_ps = ps.is_empty(); - } - if empty_ps { - sps.remove(&quad.subject); - } - empty_sps = sps.is_empty(); - } - if empty_sps { - graph.osp.remove(&quad.object); - } - } - - empty_graph = graph.spo.is_empty(); - } - if empty_graph { - graph_indexes.remove(&quad.graph_name); - } - Ok(()) - } } -impl MemoryStore { - fn graph_indexes( - &self, - ) -> Result>> { - Ok(self.graph_indexes.read().map_err(MutexPoisonError::from)?) - } - - fn graph_indexes_mut( - &self, - ) -> Result>> { - Ok(self.graph_indexes.write().map_err(MutexPoisonError::from)?) +fn wrap_error> + 'static>( + iter: Result, +) -> Box>> { + match iter { + Ok(iter) => Box::new(iter), + Err(error) => Box::new(once(Err(error))), } } diff --git a/lib/src/store/mod.rs b/lib/src/store/mod.rs index 9a8b9298..9601e80a 100644 --- a/lib/src/store/mod.rs +++ b/lib/src/store/mod.rs @@ -1,12 +1,125 @@ -//! Provides implementations of the `rudf::model::Graph` and `rudf::model::Dataset` traits. +//! Provides implementations of the `rudf::Repository` trait. -pub(crate) mod encoded; mod memory; pub(crate) mod numeric_encoder; #[cfg(feature = "rocksdb")] mod rocksdb; -pub use crate::store::memory::MemoryDataset; -pub use crate::store::memory::MemoryGraph; +pub use crate::store::memory::MemoryRepository; #[cfg(feature = "rocksdb")] -pub use crate::store::rocksdb::RocksDbDataset; +pub use crate::store::rocksdb::RocksDbRepository; + +use crate::model::*; +use crate::sparql::SimplePreparedQuery; +use crate::store::numeric_encoder::*; +use crate::{RepositoryConnection, Result}; +use std::io::Read; +use std::iter::{once, Iterator}; + +/// Defines the `Store` traits that is used to have efficient binary storage +pub trait Store { + type Connection: StoreConnection; + + fn connection(self) -> Result; +} + +/// A connection to a `Store` +pub trait StoreConnection: StringStore + Sized + Clone { + fn contains(&self, quad: &EncodedQuad) -> Result; + fn insert(&self, quad: &EncodedQuad) -> Result<()>; + fn remove(&self, quad: &EncodedQuad) -> Result<()>; + fn quads_for_pattern<'a>( + &'a self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> Box> + 'a>; + fn encoder(&self) -> Encoder<&Self> { + Encoder::new(&self) + } +} + +/// A `RepositoryConnection` from a `StoreConnection` +#[derive(Clone)] +pub struct StoreRepositoryConnection { + inner: S, +} + +impl From for StoreRepositoryConnection { + fn from(inner: S) -> Self { + Self { inner } + } +} + +impl RepositoryConnection for StoreRepositoryConnection { + type PreparedQuery = SimplePreparedQuery; + + fn prepare_query(&self, query: impl Read) -> Result> { + SimplePreparedQuery::new(self.inner.clone(), query) //TODO: avoid clone + } + + fn quads_for_pattern<'a>( + &'a self, + subject: Option<&NamedOrBlankNode>, + predicate: Option<&NamedNode>, + object: Option<&Term>, + graph_name: Option<&NamedOrBlankNode>, + ) -> Box> + 'a> + where + Self: 'a, + { + let encoder = self.inner.encoder(); + let subject = if let Some(subject) = subject { + match encoder.encode_named_or_blank_node(subject) { + Ok(subject) => Some(subject), + Err(error) => return Box::new(once(Err(error))), + } + } else { + None + }; + let predicate = if let Some(predicate) = predicate { + match encoder.encode_named_node(predicate) { + Ok(predicate) => Some(predicate), + Err(error) => return Box::new(once(Err(error))), + } + } else { + None + }; + let object = if let Some(object) = object { + match encoder.encode_term(object) { + Ok(object) => Some(object), + Err(error) => return Box::new(once(Err(error))), + } + } else { + None + }; + let graph_name = if let Some(graph_name) = graph_name { + match encoder.encode_named_or_blank_node(graph_name) { + Ok(subject) => Some(subject), + Err(error) => return Box::new(once(Err(error))), + } + } else { + None + }; + + Box::new( + self.inner + .quads_for_pattern(subject, predicate, object, graph_name) + .map(move |quad| self.inner.encoder().decode_quad(&quad?)), + ) + } + + fn contains(&self, quad: &Quad) -> Result { + self.inner + .contains(&self.inner.encoder().encode_quad(quad)?) + } + + fn insert(&self, quad: &Quad) -> Result<()> { + self.inner.insert(&self.inner.encoder().encode_quad(quad)?) + } + + fn remove(&self, quad: &Quad) -> Result<()> { + self.inner.remove(&self.inner.encoder().encode_quad(quad)?) + } +} diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index 004a3b2d..04e87321 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -1,8 +1,7 @@ use crate::model::LanguageTag; -use crate::store::encoded::EncodedQuadsStore; -use crate::store::encoded::StoreDataset; use crate::store::numeric_encoder::*; -use crate::Result; +use crate::store::{Store, StoreConnection, StoreRepositoryConnection}; +use crate::{Repository, Result}; use byteorder::ByteOrder; use byteorder::LittleEndian; use failure::format_err; @@ -14,28 +13,49 @@ use rocksdb::Options; use rocksdb::WriteBatch; use rocksdb::DB; use std::io::Cursor; +use std::iter::{empty, once}; use std::ops::Deref; use std::path::Path; use std::str; use std::sync::Mutex; -/// `rudf::model::Dataset` trait implementation based on the [RocksDB](https://rocksdb.org/) key-value store +/// `Repository` implementation based on the [RocksDB](https://rocksdb.org/) key-value store /// /// To use it, the `"rocksdb"` feature need to be activated. /// /// Usage example: +/// ```ignored +/// use rudf::model::*; +/// use rudf::{Repository, RepositoryConnection, RocksDbRepository, Result}; +/// use crate::rudf::sparql::PreparedQuery; +/// use std::str::FromStr; +/// use rudf::sparql::algebra::QueryResult; +/// +/// let repository = RocksDbRepository::open("example.db").unwrap(); +/// let connection = repository.connection().unwrap(); +/// +/// // insertion +/// let ex = NamedNode::from_str("http://example.com").unwrap(); +/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); +/// connection.insert(&quad); +/// +/// // quad filter +/// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); +/// assert_eq!(vec![quad], results.unwrap()); +/// +/// // SPARQL query +/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap(); +/// let results = prepared_query.exec().unwrap(); +/// if let QueryResult::Bindings(results) = results { +/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +/// } /// ``` -/// use rudf::store::RocksDbDataset; -/// let dataset = RocksDbDataset::open("example.db").unwrap(); -/// ``` -pub type RocksDbDataset = StoreDataset; - -impl RocksDbDataset { - pub fn open(path: impl AsRef) -> Result { - Ok(Self::new_from_store(RocksDbStore::open(path)?)) - } +pub struct RocksDbRepository { + inner: RocksDbStore, } +pub type RocksDbRepositoryConnection<'a> = StoreRepositoryConnection>; + const ID2STR_CF: &str = "id2str"; const STR2ID_CF: &str = "id2str"; const SPOG_CF: &str = "spog"; @@ -48,14 +68,35 @@ const EMPTY_BUF: [u8; 0] = [0 as u8; 0]; const COLUMN_FAMILIES: [&str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF]; -pub struct RocksDbStore { +struct RocksDbStore { db: DB, str_id_counter: Mutex, - id2str_cf: SendColumnFamily, - str2id_cf: SendColumnFamily, - spog_cf: SendColumnFamily, - posg_cf: SendColumnFamily, - ospg_cf: SendColumnFamily, +} + +#[derive(Clone)] +pub struct RocksDbStoreConnection<'a> { + store: &'a RocksDbStore, + id2str_cf: ColumnFamily<'a>, + str2id_cf: ColumnFamily<'a>, + spog_cf: ColumnFamily<'a>, + posg_cf: ColumnFamily<'a>, + ospg_cf: ColumnFamily<'a>, +} + +impl RocksDbRepository { + pub fn open(path: impl AsRef) -> Result { + Ok(Self { + inner: RocksDbStore::open(path)?, + }) + } +} + +impl<'a> Repository for &'a RocksDbRepository { + type Connection = RocksDbRepositoryConnection<'a>; + + fn connection(self) -> Result>> { + Ok(self.inner.connection()?.into()) + } } impl RocksDbStore { @@ -65,51 +106,57 @@ impl RocksDbStore { options.create_missing_column_families(true); options.set_compaction_style(DBCompactionStyle::Universal); - let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?; - let id2str_cf = SendColumnFamily(get_cf(&db, STR2ID_CF)?); - let str2id_cf = SendColumnFamily(get_cf(&db, ID2STR_CF)?); - let spog_cf = SendColumnFamily(get_cf(&db, SPOG_CF)?); - let posg_cf = SendColumnFamily(get_cf(&db, POSG_CF)?); - let ospg_cf = SendColumnFamily(get_cf(&db, OSPG_CF)?); - let new = Self { - db, + db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?, str_id_counter: Mutex::new(RocksDBCounter::new("bsc")), - id2str_cf, - str2id_cf, - spog_cf, - posg_cf, - ospg_cf, }; - new.set_first_strings()?; + (&new).connection()?.set_first_strings()?; Ok(new) } } -impl StringStore for RocksDbStore { +impl<'a> Store for &'a RocksDbStore { + type Connection = RocksDbStoreConnection<'a>; + + fn connection(self) -> Result> { + Ok(RocksDbStoreConnection { + store: self, + id2str_cf: get_cf(&self.db, ID2STR_CF)?, + str2id_cf: get_cf(&self.db, STR2ID_CF)?, + spog_cf: get_cf(&self.db, SPOG_CF)?, + posg_cf: get_cf(&self.db, POSG_CF)?, + ospg_cf: get_cf(&self.db, OSPG_CF)?, + }) + } +} + +impl StringStore for RocksDbStoreConnection<'_> { type StringType = RocksString; fn insert_str(&self, value: &str) -> Result { let value = value.as_bytes(); - Ok(if let Some(id) = self.db.get_cf(*self.str2id_cf, value)? { - LittleEndian::read_u64(&id) - } else { - let id = self - .str_id_counter - .lock() - .map_err(MutexPoisonError::from)? - .get_and_increment(&self.db)? as u64; - let id_bytes = to_bytes(id); - let mut batch = WriteBatch::default(); - batch.put_cf(*self.id2str_cf, &id_bytes, value)?; - batch.put_cf(*self.str2id_cf, value, &id_bytes)?; - self.db.write(batch)?; - id - }) + Ok( + if let Some(id) = self.store.db.get_cf(self.str2id_cf, value)? { + LittleEndian::read_u64(&id) + } else { + let id = self + .store + .str_id_counter + .lock() + .map_err(MutexPoisonError::from)? + .get_and_increment(&self.store.db)? as u64; + let id_bytes = to_bytes(id); + let mut batch = WriteBatch::default(); + batch.put_cf(self.id2str_cf, &id_bytes, value)?; + batch.put_cf(self.str2id_cf, value, &id_bytes)?; + self.store.db.write(batch)?; + id + }, + ) } fn get_str(&self, id: u64) -> Result { - let value = self.db.get_cf(*self.id2str_cf, &to_bytes(id))?; + let value = self.store.db.get_cf(self.id2str_cf, &to_bytes(id))?; if let Some(value) = value { Ok(RocksString { vec: value }) } else { @@ -122,31 +169,113 @@ impl StringStore for RocksDbStore { } } -impl EncodedQuadsStore for RocksDbStore { - type QuadsIterator = SPOGIndexIterator; - type QuadsForSubjectIterator = FilteringEncodedQuadsIterator; - type QuadsForSubjectPredicateIterator = FilteringEncodedQuadsIterator; - type QuadsForSubjectPredicateObjectIterator = FilteringEncodedQuadsIterator; - type QuadsForSubjectObjectIterator = FilteringEncodedQuadsIterator; - type QuadsForPredicateIterator = FilteringEncodedQuadsIterator; - type QuadsForPredicateObjectIterator = FilteringEncodedQuadsIterator; - type QuadsForObjectIterator = FilteringEncodedQuadsIterator; - type QuadsForGraphIterator = InGraphQuadsIterator; - type QuadsForSubjectGraphIterator = - InGraphQuadsIterator>; - type QuadsForSubjectPredicateGraphIterator = - InGraphQuadsIterator>; - type QuadsForSubjectObjectGraphIterator = - InGraphQuadsIterator>; - type QuadsForPredicateGraphIterator = - InGraphQuadsIterator>; - type QuadsForPredicateObjectGraphIterator = - InGraphQuadsIterator>; - type QuadsForObjectGraphIterator = - InGraphQuadsIterator>; +impl<'a> StoreConnection for RocksDbStoreConnection<'a> { + fn contains(&self, quad: &EncodedQuad) -> Result { + Ok(self + .store + .db + .get_cf(self.spog_cf, &encode_spog_quad(quad)?)? + .is_some()) + } + + fn insert(&self, quad: &EncodedQuad) -> Result<()> { + let mut batch = WriteBatch::default(); + batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?; + batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; + batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; + self.store.db.write(batch)?; //TODO: check what's going on if the key already exists + Ok(()) + } + fn remove(&self, quad: &EncodedQuad) -> Result<()> { + let mut batch = WriteBatch::default(); + batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?; + batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?; + batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?; + self.store.db.write(batch)?; + Ok(()) + } + + fn quads_for_pattern<'b>( + &'b self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> Box> + 'b> { + match subject { + Some(subject) => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => { + let quad = EncodedQuad::new(subject, predicate, object, graph_name); + match self.contains(&quad) { + Ok(true) => Box::new(once(Ok(quad))), + Ok(false) => Box::new(empty()), + Err(error) => Box::new(once(Err(error))), + } + } + None => wrap_error( + self.quads_for_subject_predicate_object(subject, predicate, object), + ), + }, + None => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_subject_predicate_graph(subject, predicate, graph_name), + ), + None => wrap_error(self.quads_for_subject_predicate(subject, predicate)), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_subject_object_graph(subject, object, graph_name), + ), + None => wrap_error(self.quads_for_subject_object(subject, object)), + }, + None => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_subject_graph(subject, graph_name)) + } + None => wrap_error(self.quads_for_subject(subject)), + }, + }, + }, + None => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_predicate_object_graph(predicate, object, graph_name), + ), + None => wrap_error(self.quads_for_predicate_object(predicate, object)), + }, + None => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_predicate_graph(predicate, graph_name)) + } + None => wrap_error(self.quads_for_predicate(predicate)), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_object_graph(object, graph_name)) + } + None => wrap_error(self.quads_for_object(object)), + }, + None => match graph_name { + Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)), + None => wrap_error(self.quads()), + }, + }, + }, + } + } +} + +impl<'a> RocksDbStoreConnection<'a> { fn quads(&self) -> Result { - let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; iter.seek_to_first(); Ok(SPOGIndexIterator { iter }) } @@ -155,7 +284,7 @@ impl EncodedQuadsStore for RocksDbStore { &self, subject: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; iter.seek(&encode_term(subject)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, @@ -168,7 +297,7 @@ impl EncodedQuadsStore for RocksDbStore { subject: EncodedTerm, predicate: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; iter.seek(&encode_term_pair(subject, predicate)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, @@ -182,7 +311,7 @@ impl EncodedQuadsStore for RocksDbStore { predicate: EncodedTerm, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; iter.seek(&encode_term_triple(subject, predicate, object)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, @@ -195,7 +324,7 @@ impl EncodedQuadsStore for RocksDbStore { subject: EncodedTerm, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; iter.seek(&encode_term_pair(object, subject)?); Ok(FilteringEncodedQuadsIterator { iter: OSPGIndexIterator { iter }, @@ -207,7 +336,7 @@ impl EncodedQuadsStore for RocksDbStore { &self, predicate: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(*self.posg_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.posg_cf)?; iter.seek(&encode_term(predicate)?); Ok(FilteringEncodedQuadsIterator { iter: POSGIndexIterator { iter }, @@ -220,7 +349,7 @@ impl EncodedQuadsStore for RocksDbStore { predicate: EncodedTerm, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; iter.seek(&encode_term_pair(predicate, object)?); Ok(FilteringEncodedQuadsIterator { iter: POSGIndexIterator { iter }, @@ -232,7 +361,7 @@ impl EncodedQuadsStore for RocksDbStore { &self, object: EncodedTerm, ) -> Result> { - let mut iter = self.db.raw_iterator_cf(*self.ospg_cf)?; + let mut iter = self.store.db.raw_iterator_cf(self.ospg_cf)?; iter.seek(&encode_term(object)?); Ok(FilteringEncodedQuadsIterator { iter: OSPGIndexIterator { iter }, @@ -318,36 +447,20 @@ impl EncodedQuadsStore for RocksDbStore { graph_name, }) } - - fn contains(&self, quad: &EncodedQuad) -> Result { - Ok(self - .db - .get_cf(*self.spog_cf, &encode_spog_quad(quad)?)? - .is_some()) - } - - fn insert(&self, quad: &EncodedQuad) -> Result<()> { - let mut batch = WriteBatch::default(); - batch.put_cf(*self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?; - batch.put_cf(*self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; - batch.put_cf(*self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; - self.db.write(batch)?; //TODO: check what's going on if the key already exists - Ok(()) - } - - fn remove(&self, quad: &EncodedQuad) -> Result<()> { - let mut batch = WriteBatch::default(); - batch.delete_cf(*self.spog_cf, &encode_spog_quad(quad)?)?; - batch.delete_cf(*self.posg_cf, &encode_posg_quad(quad)?)?; - batch.delete_cf(*self.ospg_cf, &encode_ospg_quad(quad)?)?; - self.db.write(batch)?; - Ok(()) - } } -pub fn get_cf(db: &DB, name: &str) -> Result { +fn get_cf<'a>(db: &'a DB, name: &str) -> Result> { db.cf_handle(name) - .ok_or_else(|| format_err!("column family not found")) + .ok_or_else(|| format_err!("column family {} not found", name)) +} + +fn wrap_error<'a, E: 'a, I: Iterator> + 'a>( + iter: Result, +) -> Box> + 'a> { + match iter { + Ok(iter) => Box::new(iter), + Err(error) => Box::new(once(Err(error))), + } } struct RocksDBCounter { @@ -454,11 +567,11 @@ fn encode_ospg_quad(quad: &EncodedQuad) -> Result> { Ok(vec) } -pub struct SPOGIndexIterator { - iter: DBRawIterator, +struct SPOGIndexIterator<'a> { + iter: DBRawIterator<'a>, } -impl Iterator for SPOGIndexIterator { +impl<'a> Iterator for SPOGIndexIterator<'a> { type Item = Result; fn next(&mut self) -> Option> { @@ -472,11 +585,11 @@ impl Iterator for SPOGIndexIterator { } } -pub struct POSGIndexIterator { - iter: DBRawIterator, +struct POSGIndexIterator<'a> { + iter: DBRawIterator<'a>, } -impl Iterator for POSGIndexIterator { +impl<'a> Iterator for POSGIndexIterator<'a> { type Item = Result; fn next(&mut self) -> Option> { @@ -490,11 +603,11 @@ impl Iterator for POSGIndexIterator { } } -pub struct OSPGIndexIterator { - iter: DBRawIterator, +struct OSPGIndexIterator<'a> { + iter: DBRawIterator<'a>, } -impl Iterator for OSPGIndexIterator { +impl<'a> Iterator for OSPGIndexIterator<'a> { type Item = Result; fn next(&mut self) -> Option> { @@ -508,7 +621,7 @@ impl Iterator for OSPGIndexIterator { } } -pub struct FilteringEncodedQuadsIterator>> { +struct FilteringEncodedQuadsIterator>> { iter: I, filter: EncodedQuadPattern, } @@ -524,7 +637,7 @@ impl>> Iterator for FilteringEncodedQuads } } -pub struct InGraphQuadsIterator>> { +struct InGraphQuadsIterator>> { iter: I, graph_name: EncodedTerm, } @@ -570,16 +683,3 @@ impl From for String { val.deref().to_owned() } } - -// TODO: very bad but I believe it is fine -#[derive(Clone, Copy)] -struct SendColumnFamily(ColumnFamily); -unsafe impl Sync for SendColumnFamily {} - -impl Deref for SendColumnFamily { - type Target = ColumnFamily; - - fn deref(&self) -> &ColumnFamily { - &self.0 - } -} diff --git a/lib/tests/sparql_test_cases.rs b/lib/tests/sparql_test_cases.rs index 7dcfb5fc..acf02bdf 100644 --- a/lib/tests/sparql_test_cases.rs +++ b/lib/tests/sparql_test_cases.rs @@ -10,9 +10,7 @@ use rudf::sparql::algebra::QueryResult; use rudf::sparql::parser::read_sparql_query; use rudf::sparql::xml_results::read_xml_results; use rudf::sparql::PreparedQuery; -use rudf::sparql::SparqlDataset; -use rudf::store::MemoryDataset; -use rudf::Result; +use rudf::{MemoryRepository, Repository, RepositoryConnection, Result}; use std::fmt; use std::fs::File; use std::io::{BufRead, BufReader}; @@ -125,10 +123,22 @@ fn sparql_w3c_query_evaluation_testsuite() { ).unwrap(), //Simple literal vs xsd:string. We apply RDF 1.1 NamedNode::from_str("http://www.w3.org/2001/sw/DataAccess/tests/data-r2/distinct/manifest#distinct-2").unwrap(), - //URI normalization: we are normalizing more strongly + //URI normalization: we are not normalizing well + NamedNode::from_str( + "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#normalization-1", + ).unwrap(), + NamedNode::from_str( + "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#normalization-2", + ).unwrap(), NamedNode::from_str( "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#normalization-3", ).unwrap(), + NamedNode::from_str( + "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#kanji-1", + ).unwrap(), + NamedNode::from_str( + "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#kanji-2", + ).unwrap(), //Test on curly brace scoping with OPTIONAL filter NamedNode::from_str( "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/optional-filter/manifest#dawg-optional-filter-005-not-simplified", @@ -148,28 +158,35 @@ fn sparql_w3c_query_evaluation_testsuite() { continue; } if test.kind == "QueryEvaluationTest" { - let data = match &test.data { + let repository = match &test.data { Some(data) => { - let dataset = MemoryDataset::default(); - let dataset_default = dataset.default_graph(); + let repository = MemoryRepository::default(); + let connection = repository.connection().unwrap(); load_graph(&data) .unwrap() - .iter() - .for_each(|triple| dataset_default.insert(triple).unwrap()); - dataset + .into_iter() + .for_each(|triple| connection.insert(&triple.in_graph(None)).unwrap()); + repository } - None => MemoryDataset::default(), + None => MemoryRepository::default(), }; for graph_data in &test.graph_data { - let named_graph = data - .named_graph(&NamedNode::new(graph_data.clone()).into()) - .unwrap(); + let graph_name = NamedNode::new(graph_data); + let connection = repository.connection().unwrap(); load_graph(&graph_data) .unwrap() - .iter() - .for_each(|triple| named_graph.insert(triple).unwrap()); + .into_iter() + .for_each(move |triple| { + connection + .insert(&triple.in_graph(Some(graph_name.clone().into()))) + .unwrap() + }); } - match data.prepare_query(read_file(&test.query).unwrap()) { + match repository + .connection() + .unwrap() + .prepare_query(read_file(&test.query).unwrap()) + { Err(error) => assert!( false, "Failure to parse query of {} with error: {}", @@ -196,7 +213,7 @@ fn sparql_w3c_query_evaluation_testsuite() { expected_graph, actual_graph, load_sparql_query(&test.query).unwrap(), - data + repository_to_string(&repository) ) } }, @@ -207,6 +224,15 @@ fn sparql_w3c_query_evaluation_testsuite() { } } +fn repository_to_string(repository: impl Repository) -> String { + repository + .connection() + .unwrap() + .quads_for_pattern(None, None, None, None) + .map(|q| q.unwrap().to_string() + "\n") + .collect() +} + fn load_graph(url: &str) -> Result { if url.ends_with(".ttl") { read_turtle(read_file(url)?, Some(url))?.collect() diff --git a/server/src/main.rs b/server/src/main.rs index bd00293e..bd839893 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -27,16 +27,15 @@ use hyper::StatusCode; use lazy_static::lazy_static; use mime; use mime::Mime; -use rudf::model::Graph; use rudf::rio::read_ntriples; use rudf::sparql::algebra::QueryResult; use rudf::sparql::xml_results::write_xml_results; use rudf::sparql::PreparedQuery; -use rudf::sparql::SparqlDataset; -use rudf::store::MemoryDataset; -use rudf::store::MemoryGraph; -use rudf::store::RocksDbDataset; +use rudf::Repository; +use rudf::RepositoryConnection; +use rudf::{MemoryRepository, RocksDbRepository}; use serde_derive::Deserialize; +use std::fmt::Write; use std::fs::File; use std::io::BufReader; use std::panic::RefUnwindSafe; @@ -87,21 +86,27 @@ pub fn main() -> Result<(), failure::Error> { let file = matches.value_of("file").map(|v| v.to_string()); if let Some(file) = file { - main_with_dataset(Arc::new(RocksDbDataset::open(file)?), &matches) + main_with_dataset(Arc::new(RocksDbRepository::open(file)?), &matches) } else { - main_with_dataset(Arc::new(MemoryDataset::default()), &matches) + main_with_dataset(Arc::new(MemoryRepository::default()), &matches) } } -fn main_with_dataset( +fn main_with_dataset( dataset: Arc, matches: &ArgMatches<'_>, -) -> Result<(), failure::Error> { +) -> Result<(), failure::Error> +where + for<'a> &'a D: Repository, +{ if let Some(nt_file) = matches.value_of("ntriples") { println!("Loading NTriples file {}", nt_file); - let default_graph = dataset.default_graph(); - for quad in read_ntriples(BufReader::new(File::open(nt_file)?))? { - default_graph.insert(&quad?)? + let connection = dataset.connection()?; + if let Some(nt_file) = matches.value_of("ntriples") { + println!("Loading NTriples file {}", nt_file); + for triple in read_ntriples(BufReader::new(File::open(nt_file)?))? { + connection.insert(&triple?.in_graph(None))? + } } } @@ -111,10 +116,10 @@ fn main_with_dataset( Ok(()) } -fn router( - dataset: Arc, - base: String, -) -> Router { +fn router(dataset: Arc, base: String) -> Router +where + for<'a> &'a D: Repository, +{ let middleware = StateMiddleware::new(GothamState { dataset, base }); let pipeline = single_middleware(middleware); let (chain, pipelines) = single_pipeline(pipeline); @@ -149,48 +154,48 @@ fn router( .concat2() .then(|body| match body { Ok(body) => { - let content_type: Option> = HeaderMap::borrow_from(&state) + let content_type: Option> = HeaderMap::borrow_from(&state) .get(CONTENT_TYPE) .map(|content_type| Ok(Mime::from_str(content_type.to_str()?)?)); let response = match content_type { - Some(Ok(content_type)) => match (content_type.type_(), content_type.subtype()) { - (mime::APPLICATION, subtype) if subtype == APPLICATION_SPARQL_QUERY_UTF_8.subtype() => { - evaluate_sparql_query::( - &mut state, - &body.into_bytes(), - ) - }, - (mime::APPLICATION, mime::WWW_FORM_URLENCODED) => { - match parse_urlencoded_query_request(&body.into_bytes()) - { - Ok(parsed_request) => evaluate_sparql_query::( - &mut state, - &parsed_request.query.as_bytes(), - ), - Err(error) => error_to_response( - &state, - &error, - StatusCode::BAD_REQUEST, - ), - } - }, - _ => error_to_response( - &state, - &format_err!("Unsupported Content-Type: {:?}", content_type), - StatusCode::BAD_REQUEST, - ) + Some(Ok(content_type)) => match (content_type.type_(), content_type.subtype()) { + (mime::APPLICATION, subtype) if subtype == APPLICATION_SPARQL_QUERY_UTF_8.subtype() => { + evaluate_sparql_query::( + &mut state, + &body.into_bytes(), + ) } - Some(Err(error)) => error_to_response( - &state, - &format_err!("The request contains an invalid Content-Type header: {}", error), - StatusCode::BAD_REQUEST, - ), - None => error_to_response( + (mime::APPLICATION, mime::WWW_FORM_URLENCODED) => { + match parse_urlencoded_query_request(&body.into_bytes()) + { + Ok(parsed_request) => evaluate_sparql_query::( + &mut state, + &parsed_request.query.as_bytes(), + ), + Err(error) => error_to_response( + &state, + &error, + StatusCode::BAD_REQUEST, + ), + } + } + _ => error_to_response( &state, - &format_err!("The request should contain a Content-Type header"), + &format_err!("Unsupported Content-Type: {:?}", content_type), StatusCode::BAD_REQUEST, - ), - }; + ) + } + Some(Err(error)) => error_to_response( + &state, + &format_err!("The request contains an invalid Content-Type header: {}", error), + StatusCode::BAD_REQUEST, + ), + None => error_to_response( + &state, + &format_err!("The request should contain a Content-Type header"), + StatusCode::BAD_REQUEST, + ), + }; future::ok((state, response)) } Err(e) => future::err((state, e.into_handler_error())), @@ -202,12 +207,18 @@ fn router( } #[derive(StateData)] -struct GothamState { +struct GothamState +where + for<'a> &'a D: Repository, +{ dataset: Arc, base: String, } -impl Clone for GothamState { +impl Clone for GothamState +where + for<'a> &'a D: Repository, +{ fn clone(&self) -> Self { Self { dataset: self.dataset.clone(), @@ -230,20 +241,39 @@ fn parse_urlencoded_query_request(query: &[u8]) -> Result( +fn evaluate_sparql_query( state: &mut State, query: &[u8], -) -> Response { +) -> Response +where + for<'a> &'a D: Repository, +{ let gotham_state: GothamState = GothamState::take_from(state); - match gotham_state.dataset.prepare_query(query) { + let connection = match gotham_state.dataset.connection() { + Ok(connection) => connection, + Err(error) => return error_to_response(&state, &error, StatusCode::INTERNAL_SERVER_ERROR), + }; + let result = match connection.prepare_query(query) { Ok(query) => match query.exec() { Ok(QueryResult::Graph(triples)) => { - let triples: Result = triples.collect(); + let mut result = String::default(); + for triple in triples { + match triple { + Ok(triple) => write!(&mut result, "{}\n", triple).unwrap(), + Err(error) => { + return error_to_response( + &state, + &error, + StatusCode::INTERNAL_SERVER_ERROR, + ); + } + } + } create_response( &state, StatusCode::OK, APPLICATION_N_TRIPLES_UTF_8.clone(), - triples.unwrap().to_string(), + result, ) } Ok(result) => create_response( @@ -255,7 +285,8 @@ fn evaluate_sparql_query error_to_response(&state, &error, StatusCode::INTERNAL_SERVER_ERROR), }, Err(error) => error_to_response(&state, &error, StatusCode::BAD_REQUEST), - } + }; + result } fn error_to_response(state: &State, error: &failure::Error, code: StatusCode) -> Response { @@ -271,8 +302,11 @@ mod tests { #[test] fn get_ui() { - let test_server = - TestServer::new(router(Arc::new(MemoryDataset::default()), "".to_string())).unwrap(); + let test_server = TestServer::new(router( + Arc::new(MemoryRepository::default()), + "".to_string(), + )) + .unwrap(); let response = test_server .client() .get("http://localhost/") @@ -283,8 +317,11 @@ mod tests { #[test] fn get_query() { - let test_server = - TestServer::new(router(Arc::new(MemoryDataset::default()), "".to_string())).unwrap(); + let test_server = TestServer::new(router( + Arc::new(MemoryRepository::default()), + "".to_string(), + )) + .unwrap(); let response = test_server .client() .get("http://localhost/query?query=SELECT+*+WHERE+{+?s+?p+?o+}") @@ -295,8 +332,11 @@ mod tests { #[test] fn post_query() { - let test_server = - TestServer::new(router(Arc::new(MemoryDataset::default()), "".to_string())).unwrap(); + let test_server = TestServer::new(router( + Arc::new(MemoryRepository::default()), + "".to_string(), + )) + .unwrap(); let response = test_server .client() .post(