Introduces Repository and RepositoryConnection and simplifies API

Allows to upgrades to the latest versions of RocksDB and avoid some behaviors that could cause unexpected crashes
pull/10/head
Tpt 5 years ago
parent b9bd6e66d3
commit 21ad76c7cf
  1. 11
      README.md
  2. 3
      lib/Cargo.toml
  3. 78
      lib/src/lib.rs
  4. 212
      lib/src/model/dataset.rs
  5. 13
      lib/src/model/graph.rs
  6. 251
      lib/src/model/isomorphism.rs
  7. 4
      lib/src/model/mod.rs
  8. 5
      lib/src/model/triple.rs
  9. 126
      lib/src/repository.rs
  10. 186
      lib/src/sparql/eval.rs
  11. 121
      lib/src/sparql/mod.rs
  12. 8
      lib/src/sparql/plan.rs
  13. 966
      lib/src/store/encoded.rs
  14. 424
      lib/src/store/memory.rs
  15. 123
      lib/src/store/mod.rs
  16. 344
      lib/src/store/rocksdb.rs
  17. 62
      lib/tests/sparql_test_cases.rs
  18. 106
      server/src/main.rs

@ -1,12 +1,11 @@
# Rudf Rudf is a work in progress graph database implementing the [SPARQL](https://www.w3.org/TR/sparql11-overview/) standard.
This library is a work in progress of a [RDF](https://www.w3.org/RDF/) stack implementation in [Rust](https://www.rust-lang.org). There is no released version yet.
There is no released version yet but [automated documentation for the master branch is available](https://tpt.github.io/rudf/). Its goal is to provide a compliant, safe and fast graph database.
It is written in Rust.
Its goal is to provide a compliant, safe and fast implementation of W3C specifications in Rust. The `lib` directory contains the database written as a Rust library.
The `lib` directory contains the Rust library code and the `python` directory a beginning of Python bindings.
[![Build Status](https://travis-ci.org/Tpt/rudf.svg?branch=master)](https://travis-ci.org/Tpt/rudf) [![Build Status](https://travis-ci.org/Tpt/rudf.svg?branch=master)](https://travis-ci.org/Tpt/rudf)
[![dependency status](https://deps.rs/repo/github/Tpt/rudf/status.svg)](https://deps.rs/repo/github/Tpt/rudf) [![dependency status](https://deps.rs/repo/github/Tpt/rudf/status.svg)](https://deps.rs/repo/github/Tpt/rudf)

@ -17,7 +17,7 @@ travis-ci = { repository = "Tpt/rudf" }
[dependencies] [dependencies]
lazy_static = "1" lazy_static = "1"
rocksdb = { version = "0.11", optional = true } rocksdb = { version = "0.12", optional = true }
url = "2" url = "2"
uuid = { version = "0.7", features = ["v4"] } uuid = { version = "0.7", features = ["v4"] }
bzip2 = "0.3" bzip2 = "0.3"
@ -32,7 +32,6 @@ regex = "1"
rio_api = "0.2" rio_api = "0.2"
rio_turtle = "0.2" rio_turtle = "0.2"
rio_xml = "0.2" rio_xml = "0.2"
permutohedron = "0.2"
[build-dependencies] [build-dependencies]
peg = "0.5" peg = "0.5"

@ -1,46 +1,50 @@
//! This crate is a work in progress of implementation of an RDF and SPARQL software stack in Rust. //! Rudf is a work in progress graph database implementing the [SPARQL](https://www.w3.org/TR/sparql11-overview/) standard.
//! //!
//! Its goal is to provide a compliant, safe and fast implementation of W3C specifications. //! Its goal is to provide a compliant, safe and fast graph database.
//! //!
//! It currently provides: //! It currently provides two `Repository` implementation providing [SPARQL 1.0 query](https://www.w3.org/TR/rdf-sparql-query/) capability:
//! * Basic RDF data structures in the `model` package //! * `MemoryRepository`: a simple in memory implementation.
//! * Parsers for XML, Turtle and N-Triples syntaxes in the `rio` package //! * `RocksDbRepository`: a file system implementation based on the [RocksDB](https://rocksdb.org/) key-value store.
//! * A memory based and a disk based stores in the `store` package //!
//! * A work in progress SPARQL implementation in the `sparql` package //! Usage example with the `MemoryRepository`:
//!
#![warn( //! ```
clippy::cast_possible_wrap, //! use rudf::model::*;
clippy::cast_precision_loss, //! use rudf::{Repository, RepositoryConnection, MemoryRepository, Result};
clippy::cast_sign_loss, //! use crate::rudf::sparql::PreparedQuery;
clippy::default_trait_access, //! use std::str::FromStr;
clippy::empty_enum, //! use rudf::sparql::algebra::QueryResult;
clippy::enum_glob_use, //!
clippy::expl_impl_clone_on_copy, //! let repository = MemoryRepository::default();
clippy::explicit_into_iter_loop, //! let connection = repository.connection().unwrap();
clippy::filter_map, //!
clippy::if_not_else, //! // insertion
clippy::inline_always, //! let ex = NamedNode::from_str("http://example.com").unwrap();
clippy::invalid_upcast_comparisons, //! let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
clippy::items_after_statements, //! connection.insert(&quad);
clippy::linkedlist, //!
//TODO match_same_arms, //! // quad filter
clippy::maybe_infinite_iter, //! let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
clippy::mut_mut, //! assert_eq!(vec![quad], results.unwrap());
clippy::needless_continue, //!
clippy::option_map_unwrap_or, //! // SPARQL query
//TODO option_map_unwrap_or_else, //! let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap();
clippy::pub_enum_variant_names, //! let results = prepared_query.exec().unwrap();
clippy::replace_consts, //! if let QueryResult::Bindings(results) = results {
clippy::result_map_unwrap_or_else, //! assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
//TODO single_match_else, //! }
clippy::string_add_assign, //! ```
clippy::unicode_not_nfc
)]
pub mod model; pub mod model;
mod repository;
pub mod rio; pub mod rio;
pub mod sparql; pub mod sparql;
pub mod store; pub(crate) mod store;
pub use failure::Error; pub use failure::Error;
pub type Result<T> = ::std::result::Result<T, failure::Error>; pub type Result<T> = ::std::result::Result<T, failure::Error>;
pub use crate::store::MemoryRepository;
#[cfg(feature = "rocksdb")]
pub use crate::store::RocksDbRepository;
pub use repository::Repository;
pub use repository::RepositoryConnection;

@ -1,212 +0,0 @@
use crate::model::*;
use crate::Result;
/// Trait for [RDF graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-graph)
///
/// This crate currently provides a simple stand-alone in memory implementation of the `Graph` trait.
///
/// Usage example:
/// ```
/// use rudf::model::*;
/// use rudf::store::MemoryGraph;
/// use std::str::FromStr;
///
/// let graph = MemoryGraph::default();
/// let ex = NamedNode::from_str("http://example.com").unwrap();
/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone());
/// graph.insert(&triple);
/// let results: Vec<Triple> = graph.triples_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect();
/// assert_eq!(vec![triple], results);
/// ```
pub trait Graph {
type TriplesIterator: Iterator<Item = Result<Triple>>;
type TriplesForSubjectIterator: Iterator<Item = Result<Triple>>;
type ObjectsForSubjectPredicateIterator: Iterator<Item = Result<Term>>;
type PredicatesForSubjectObjectIterator: Iterator<Item = Result<NamedNode>>;
type TriplesForPredicateIterator: Iterator<Item = Result<Triple>>;
type SubjectsForPredicateObjectIterator: Iterator<Item = Result<NamedOrBlankNode>>;
type TriplesForObjectIterator: Iterator<Item = Result<Triple>>;
/// Returns all triples contained by the graph
fn iter(&self) -> Result<Self::TriplesIterator> {
self.triples()
}
/// Returns all triples contained by the graph
fn triples(&self) -> Result<Self::TriplesIterator>;
fn triples_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<Self::TriplesForSubjectIterator>;
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<Self::ObjectsForSubjectPredicateIterator>;
fn object_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<Option<Term>> {
//TODO use transpose when stable
match self
.objects_for_subject_predicate(subject, predicate)?
.nth(0)
{
Some(object) => Ok(Some(object?)),
None => Ok(None),
}
}
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<Self::PredicatesForSubjectObjectIterator>;
fn triples_for_predicate(
&self,
predicate: &NamedNode,
) -> Result<Self::TriplesForPredicateIterator>;
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<Self::SubjectsForPredicateObjectIterator>;
fn triples_for_object(&self, object: &Term) -> Result<Self::TriplesForObjectIterator>;
/// Checks if the graph contains the given triple
fn contains(&self, triple: &Triple) -> Result<bool>;
/// Adds a triple to the graph
fn insert(&self, triple: &Triple) -> Result<()>;
/// Removes a concrete triple from the graph
fn remove(&self, triple: &Triple) -> Result<()>;
/// Returns the number of triples in this graph
fn len(&self) -> Result<usize>;
/// Checks if this graph contains a triple
fn is_empty(&self) -> Result<bool>;
}
/// Trait for [RDF named graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-named-graph) i.e. RDF graphs identified by an IRI
pub trait NamedGraph: Graph {
fn name(&self) -> &NamedOrBlankNode;
}
/// Trait for [RDF datasets](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset)
///
/// This crate currently provides two implementation of the `Dataset` traits:
/// * One in memory: `rudf::store::MemoryDataset`
/// * One disk-based using [RocksDB](https://rocksdb.org/): `rudf::store::RocksDbDataset`
///
/// Usage example with `rudf::store::MemoryDataset`:
/// ```
/// use rudf::model::*;
/// use rudf::store::MemoryDataset;
/// use std::str::FromStr;
///
/// let dataset = MemoryDataset::default();
/// let default_graph = dataset.default_graph();
/// let ex = NamedNode::from_str("http://example.com").unwrap();
/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone());
/// default_graph.insert(&triple);
/// let results: Vec<Quad> = dataset.quads_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect();
/// assert_eq!(vec![triple.in_graph(None)], results);
/// ```
///
/// The implementation backed by RocksDB if disabled by default and requires the `"rocksdb"` feature to be activated.
/// A `RocksDbDataset` could be built using `RocksDbDataset::open` and works just like its in-memory equivalent:
/// ```ignore
/// use rudf::store::RocksDbDataset;
/// let dataset = RocksDbDataset::open("foo.db");
/// ```
pub trait Dataset {
type NamedGraph: NamedGraph;
type DefaultGraph: Graph;
type UnionGraph: Graph;
type QuadsIterator: Iterator<Item = Result<Quad>>;
type QuadsForSubjectIterator: Iterator<Item = Result<Quad>>;
type QuadsForSubjectPredicateIterator: Iterator<Item = Result<Quad>>;
type QuadsForSubjectPredicateObjectIterator: Iterator<Item = Result<Quad>>;
type QuadsForSubjectObjectIterator: Iterator<Item = Result<Quad>>;
type QuadsForPredicateIterator: Iterator<Item = Result<Quad>>;
type QuadsForPredicateObjectIterator: Iterator<Item = Result<Quad>>;
type QuadsForObjectIterator: Iterator<Item = Result<Quad>>;
/// Returns an object for a [named graph](https://www.w3.org/TR/rdf11-concepts/#dfn-named-graph) of this dataset.
///
/// This named graph may be empty if no triple is in the graph yet.
fn named_graph(&self, name: &NamedOrBlankNode) -> Result<Self::NamedGraph>;
/// Returns an object for the [default graph](https://www.w3.org/TR/rdf11-concepts/#dfn-default-graph) of this dataset
fn default_graph(&self) -> Self::DefaultGraph;
/// Returns a graph that is the union of all graphs contained in this dataset, including the default graph
fn union_graph(&self) -> Self::UnionGraph;
/// Returns all quads contained by the graph
fn iter(&self) -> Result<Self::QuadsIterator> {
self.quads()
}
/// Returns all quads contained by the graph
fn quads(&self) -> Result<Self::QuadsIterator>;
fn quads_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<Self::QuadsForSubjectIterator>;
fn quads_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<Self::QuadsForSubjectPredicateIterator>;
fn quads_for_subject_predicate_object(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
object: &Term,
) -> Result<Self::QuadsForSubjectPredicateObjectIterator>;
fn quads_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<Self::QuadsForSubjectObjectIterator>;
fn quads_for_predicate(&self, predicate: &NamedNode)
-> Result<Self::QuadsForPredicateIterator>;
fn quads_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<Self::QuadsForPredicateObjectIterator>;
fn quads_for_object(&self, object: &Term) -> Result<Self::QuadsForObjectIterator>;
/// Checks if this dataset contains a given quad
fn contains(&self, quad: &Quad) -> Result<bool>;
/// Adds a quad to this dataset
fn insert(&self, quad: &Quad) -> Result<()>;
/// Removes a quad from this dataset
fn remove(&self, quad: &Quad) -> Result<()>;
/// Returns the number of quads in this dataset
fn len(&self) -> Result<usize>;
/// Checks if this dataset contains a quad
fn is_empty(&self) -> Result<bool>;
}

@ -4,7 +4,7 @@ use std::collections::HashSet;
use std::fmt; use std::fmt;
use std::iter::FromIterator; use std::iter::FromIterator;
/// Simple data structure [RDF graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-graph). /// A simple implementation of [RDF graphs](https://www.w3.org/TR/rdf11-concepts/#dfn-graph).
/// ///
/// It is not done to hold big graphs. /// It is not done to hold big graphs.
/// ///
@ -118,6 +118,8 @@ impl SimpleGraph {
} }
/// Checks if the current graph is [isomorphic](https://www.w3.org/TR/rdf11-concepts/#dfn-graph-isomorphism) with an other one /// Checks if the current graph is [isomorphic](https://www.w3.org/TR/rdf11-concepts/#dfn-graph-isomorphism) with an other one
///
/// Warning: This algorithm as a worst case complexity in n!
pub fn is_isomorphic(&self, other: &SimpleGraph) -> bool { pub fn is_isomorphic(&self, other: &SimpleGraph) -> bool {
are_graphs_isomorphic(self, other) are_graphs_isomorphic(self, other)
} }
@ -132,6 +134,15 @@ impl IntoIterator for SimpleGraph {
} }
} }
impl<'a> IntoIterator for &'a SimpleGraph {
type Item = &'a Triple;
type IntoIter = <&'a HashSet<Triple> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.triples.iter()
}
}
impl FromIterator<Triple> for SimpleGraph { impl FromIterator<Triple> for SimpleGraph {
fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self { fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self {
Self { Self {

@ -1,8 +1,6 @@
use crate::model::*; use crate::model::*;
use permutohedron::LexicalPermutation; use std::collections::hash_map::{DefaultHasher, RandomState};
use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::HashSet;
use std::collections::{BTreeSet, HashMap};
use std::hash::Hash; use std::hash::Hash;
use std::hash::Hasher; use std::hash::Hasher;
@ -38,20 +36,44 @@ fn predicate_objects_for_subject<'a>(
}) })
} }
fn hash_blank_nodes<'a>( fn split_hash_buckets<'a>(
bnodes: HashSet<&'a BlankNode>, bnodes_by_hash: HashMap<u64, Vec<&'a BlankNode>>,
graph: &'a SimpleGraph, graph: &'a SimpleGraph,
distance: usize,
) -> HashMap<u64, Vec<&'a BlankNode>> { ) -> HashMap<u64, Vec<&'a BlankNode>> {
let mut bnodes_by_hash = HashMap::default(); let mut new_bnodes_by_hash = HashMap::default();
// NB: we need to sort the triples to have the same hash for (hash, bnodes) in bnodes_by_hash {
if bnodes.len() == 1 {
new_bnodes_by_hash.insert(hash, bnodes); // Nothing to improve
} else {
for bnode in bnodes { for bnode in bnodes {
let mut hasher = DefaultHasher::new(); let mut starts = vec![NamedOrBlankNode::from(bnode.clone())];
for _ in 0..distance {
let mut new_starts = Vec::default();
for s in starts {
for t in graph.triples_for_subject(&s) {
match t.object() {
Term::NamedNode(t) => new_starts.push(t.clone().into()),
Term::BlankNode(t) => new_starts.push(t.clone().into()),
Term::Literal(_) => (),
}
}
for t in graph.triples_for_object(&s.into()) {
new_starts.push(t.subject().clone());
}
}
starts = new_starts;
}
{ // We do the hashing
let subject = NamedOrBlankNode::from(bnode.clone()); let mut hasher = DefaultHasher::default();
hash.hash(&mut hasher); // We start with the previous hash
// NB: we need to sort the triples to have the same hash
let mut po_set: BTreeSet<PredicateObject> = BTreeSet::default(); let mut po_set: BTreeSet<PredicateObject> = BTreeSet::default();
for po in predicate_objects_for_subject(graph, &subject) { for start in &starts {
for po in predicate_objects_for_subject(graph, start) {
match &po.object { match &po.object {
Term::BlankNode(_) => (), Term::BlankNode(_) => (),
_ => { _ => {
@ -59,15 +81,15 @@ fn hash_blank_nodes<'a>(
} }
} }
} }
for po in po_set {
po.hash(&mut hasher);
} }
for po in &po_set {
po.hash(&mut hasher);
} }
{
let object = Term::from(bnode.clone());
let mut sp_set: BTreeSet<SubjectPredicate> = BTreeSet::default(); let mut sp_set: BTreeSet<SubjectPredicate> = BTreeSet::default();
for sp in subject_predicates_for_object(graph, &object) { let term_starts: Vec<_> = starts.into_iter().map(|t| t.into()).collect();
for start in &term_starts {
for sp in subject_predicates_for_object(graph, start) {
match &sp.subject { match &sp.subject {
NamedOrBlankNode::BlankNode(_) => (), NamedOrBlankNode::BlankNode(_) => (),
_ => { _ => {
@ -75,103 +97,115 @@ fn hash_blank_nodes<'a>(
} }
} }
} }
for sp in sp_set {
sp.hash(&mut hasher);
} }
for sp in &sp_set {
sp.hash(&mut hasher);
} }
bnodes_by_hash new_bnodes_by_hash
.entry(hasher.finish()) .entry(hasher.finish())
.or_insert_with(Vec::default) .or_insert_with(Vec::default)
.push(bnode); .push(bnode);
} }
bnodes_by_hash }
}
new_bnodes_by_hash
} }
fn build_and_check_containment_from_hashes<'a>( fn build_and_check_containment_from_hashes<'a>(
hashes_to_see: &mut Vec<&u64>, a_bnodes_by_hash: &mut Vec<(u64, Vec<&'a BlankNode>)>,
a_bnodes_by_hash: &'a HashMap<u64, Vec<&'a BlankNode>>,
b_bnodes_by_hash: &'a HashMap<u64, Vec<&'a BlankNode>>, b_bnodes_by_hash: &'a HashMap<u64, Vec<&'a BlankNode>>,
a_to_b_mapping: &mut HashMap<&'a BlankNode, &'a BlankNode>, a_to_b_mapping: &mut HashMap<&'a BlankNode, &'a BlankNode>,
a: &SimpleGraph, a: &'a SimpleGraph,
b: &SimpleGraph, b: &'a SimpleGraph,
current_a_nodes: &[&'a BlankNode],
current_b_nodes: &mut BTreeSet<&'a BlankNode>,
) -> bool { ) -> bool {
let hash = match hashes_to_see.pop() { if let Some((a_node, remaining_a_node)) = current_a_nodes.split_last() {
Some(h) => h, let b_nodes = current_b_nodes.iter().cloned().collect::<Vec<_>>();
None => return check_is_contained(a_to_b_mapping, a, b), for b_node in b_nodes.into_iter() {
}; current_b_nodes.remove(b_node);
a_to_b_mapping.insert(a_node, b_node);
let a_nodes = a_bnodes_by_hash if check_is_contained_focused(a_to_b_mapping, a_node, a, b)
.get(hash) && build_and_check_containment_from_hashes(
.map_or(&[] as &[&BlankNode], |v| v.as_slice());
let b_nodes = b_bnodes_by_hash
.get(hash)
.map_or(&[] as &[&BlankNode], |v| v.as_slice());
if a_nodes.len() != b_nodes.len() {
return false;
}
if a_nodes.len() == 1 {
// Avoid allocation for len == 1
a_to_b_mapping.insert(a_nodes[0], b_nodes[0]);
let result = build_and_check_containment_from_hashes(
hashes_to_see,
a_bnodes_by_hash, a_bnodes_by_hash,
b_bnodes_by_hash, b_bnodes_by_hash,
a_to_b_mapping, a_to_b_mapping,
a, a,
b, b,
); remaining_a_node,
a_to_b_mapping.remove(a_nodes[0]); current_b_nodes,
hashes_to_see.push(hash); )
result {
return true;
}
current_b_nodes.insert(b_node);
}
a_to_b_mapping.remove(a_node);
false
} else { } else {
// We compute all the rotations of a_nodes and then zip it with b_nodes to have all the possible pairs (a,b) let (hash, new_a_nodes) = match a_bnodes_by_hash.pop() {
let mut a_nodes_rotated = a_nodes.to_vec(); Some(v) => v,
a_nodes_rotated.sort(); None => return true,
loop { };
for (a_node, b_node) in a_nodes_rotated.iter().zip(b_nodes.iter()) {
a_to_b_mapping.insert(a_node, b_node); let mut new_b_nodes = b_bnodes_by_hash
.get(&hash)
.map_or(BTreeSet::default(), |v| v.into_iter().cloned().collect());
if new_a_nodes.len() != new_b_nodes.len() {
return false;
} }
let result = if build_and_check_containment_from_hashes(
hashes_to_see, if new_a_nodes.len() > 10 {
eprintln!("Too big instance, aborting");
return true; //TODO: Very very very bad
}
if build_and_check_containment_from_hashes(
a_bnodes_by_hash, a_bnodes_by_hash,
b_bnodes_by_hash, b_bnodes_by_hash,
a_to_b_mapping, a_to_b_mapping,
a, a,
b, b,
&new_a_nodes,
&mut new_b_nodes,
) { ) {
Some(true) true
} else if a_nodes_rotated.next_permutation() {
None //keep going
} else { } else {
Some(false) // No more permutation a_bnodes_by_hash.push((hash, new_a_nodes));
}; false
if let Some(result) = result {
for a_node in &a_nodes_rotated {
a_to_b_mapping.remove(a_node);
}
hashes_to_see.push(hash);
return result;
}
} }
} }
} }
fn check_is_contained<'a>( fn check_is_contained_focused<'a>(
a_to_b_mapping: &mut HashMap<&'a BlankNode, &'a BlankNode>, a_to_b_mapping: &mut HashMap<&'a BlankNode, &'a BlankNode>,
a: &SimpleGraph, a_bnode_focus: &'a BlankNode,
b: &SimpleGraph, a: &'a SimpleGraph,
b: &'a SimpleGraph,
) -> bool { ) -> bool {
for t_a in a.iter() { let a_bnode_subject = a_bnode_focus.clone().into();
let subject = if let NamedOrBlankNode::BlankNode(s_a) = &t_a.subject() { let a_bnode_object = a_bnode_focus.clone().into();
a_to_b_mapping[s_a].clone().into() let ts_a = a
.triples_for_subject(&a_bnode_subject)
.chain(a.triples_for_object(&a_bnode_object));
for t_a in ts_a {
let subject: NamedOrBlankNode = if let NamedOrBlankNode::BlankNode(s_a) = &t_a.subject() {
if let Some(s_a) = a_to_b_mapping.get(s_a) {
(*s_a).clone().into()
} else {
continue; // We skip for now
}
} else { } else {
t_a.subject().clone() t_a.subject().clone()
}; };
let predicate = t_a.predicate().clone(); let predicate = t_a.predicate().clone();
let object = if let Term::BlankNode(o_a) = &t_a.object() { let object: Term = if let Term::BlankNode(o_a) = &t_a.object() {
a_to_b_mapping[o_a].clone().into() if let Some(o_a) = a_to_b_mapping.get(o_a) {
(*o_a).clone().into()
} else {
continue; // We skip for now
}
} else { } else {
t_a.object().clone() t_a.object().clone()
}; };
@ -183,9 +217,9 @@ fn check_is_contained<'a>(
true true
} }
fn graph_blank_nodes(graph: &SimpleGraph) -> HashSet<&BlankNode> { fn graph_blank_nodes(graph: &SimpleGraph) -> Vec<&BlankNode> {
let mut blank_nodes = HashSet::default(); let mut blank_nodes: HashSet<&BlankNode, RandomState> = HashSet::default();
for t in graph.iter() { for t in graph {
if let NamedOrBlankNode::BlankNode(subject) = t.subject() { if let NamedOrBlankNode::BlankNode(subject) = t.subject() {
blank_nodes.insert(subject); blank_nodes.insert(subject);
} }
@ -193,7 +227,7 @@ fn graph_blank_nodes(graph: &SimpleGraph) -> HashSet<&BlankNode> {
blank_nodes.insert(object); blank_nodes.insert(object);
} }
} }
blank_nodes blank_nodes.into_iter().collect()
} }
pub fn are_graphs_isomorphic(a: &SimpleGraph, b: &SimpleGraph) -> bool { pub fn are_graphs_isomorphic(a: &SimpleGraph, b: &SimpleGraph) -> bool {
@ -201,23 +235,60 @@ pub fn are_graphs_isomorphic(a: &SimpleGraph, b: &SimpleGraph) -> bool {
return false; return false;
} }
let a_bnodes = graph_blank_nodes(a); // We check containment of everything buts triples with blank nodes
let a_bnodes_by_hash = hash_blank_nodes(a_bnodes, a); let mut a_bnodes_triples = SimpleGraph::default();
for t in a {
if t.subject().is_blank_node() || t.object().is_blank_node() {
a_bnodes_triples.insert(t.clone());
} else if !b.contains(t) {
return false; // Triple in a not in b without blank nodes
}
}
let mut b_bnodes_triples = SimpleGraph::default();
for t in b {
if t.subject().is_blank_node() || t.object().is_blank_node() {
b_bnodes_triples.insert(t.clone());
} else if !a.contains(t) {
return false; // Triple in a not in b without blank nodes
}
}
let mut a_bnodes_by_hash = HashMap::default();
a_bnodes_by_hash.insert(0, graph_blank_nodes(&a_bnodes_triples));
let mut b_bnodes_by_hash = HashMap::default();
b_bnodes_by_hash.insert(0, graph_blank_nodes(&b_bnodes_triples));
for distance in 0..5 {
let max_size = a_bnodes_by_hash
.values()
.into_iter()
.map(|l| l.len())
.max()
.unwrap_or(0);
if max_size < 2 {
break; // We only have small buckets
}
let b_bnodes = graph_blank_nodes(b); a_bnodes_by_hash = split_hash_buckets(a_bnodes_by_hash, a, distance);
let b_bnodes_by_hash = hash_blank_nodes(b_bnodes, b); b_bnodes_by_hash = split_hash_buckets(b_bnodes_by_hash, b, distance);
// Hashes should have the same size everywhere // Hashes should have the same size
if a_bnodes_by_hash.len() != b_bnodes_by_hash.len() { if a_bnodes_by_hash.len() != b_bnodes_by_hash.len() {
return false; return false;
} }
}
let mut sorted_a_bnodes_by_hash: Vec<_> = a_bnodes_by_hash.into_iter().collect();
sorted_a_bnodes_by_hash.sort_by(|(_, l1), (_, l2)| l1.len().cmp(&l2.len()));
build_and_check_containment_from_hashes( build_and_check_containment_from_hashes(
&mut a_bnodes_by_hash.keys().collect(), &mut sorted_a_bnodes_by_hash,
&a_bnodes_by_hash,
&b_bnodes_by_hash, &b_bnodes_by_hash,
&mut HashMap::default(), &mut HashMap::default(),
a, &a_bnodes_triples,
b, &b_bnodes_triples,
&[],
&mut BTreeSet::default(),
) )
} }

@ -3,7 +3,6 @@
//! Inspired by [RDFjs](http://rdf.js.org/) and [Apache Commons RDF](http://commons.apache.org/proper/commons-rdf/) //! Inspired by [RDFjs](http://rdf.js.org/) and [Apache Commons RDF](http://commons.apache.org/proper/commons-rdf/)
mod blank_node; mod blank_node;
mod dataset;
mod graph; mod graph;
mod isomorphism; mod isomorphism;
mod language_tag; mod language_tag;
@ -13,9 +12,6 @@ mod triple;
pub mod vocab; pub mod vocab;
pub use crate::model::blank_node::BlankNode; pub use crate::model::blank_node::BlankNode;
pub use crate::model::dataset::Dataset;
pub use crate::model::dataset::Graph;
pub use crate::model::dataset::NamedGraph;
pub use crate::model::graph::SimpleGraph; pub use crate::model::graph::SimpleGraph;
pub use crate::model::language_tag::LanguageTag; pub use crate::model::language_tag::LanguageTag;
pub use crate::model::literal::Literal; pub use crate::model::literal::Literal;

@ -260,6 +260,11 @@ impl Quad {
pub fn graph_name_owned(self) -> Option<NamedOrBlankNode> { pub fn graph_name_owned(self) -> Option<NamedOrBlankNode> {
self.graph_name self.graph_name
} }
/// Returns the underlying [triple](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-triple)
pub fn into_triple(self) -> Triple {
Triple::new(self.subject, self.predicate, self.object)
}
} }
impl fmt::Display for Quad { impl fmt::Display for Quad {

@ -0,0 +1,126 @@
use crate::model::*;
use crate::sparql::PreparedQuery;
use crate::Result;
use std::io::Read;
/// A `Repository` stores a [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset)
/// and allows to query and update it using SPARQL.
///
/// This crate currently provides two implementation of the `Repository` traits:
/// * One in memory: `MemoryRepository`
/// * One disk-based using [RocksDB](https://rocksdb.org/): `RocksDbRepository`
///
/// Usage example with `MemoryRepository`:
/// ```
/// use rudf::model::*;
/// use rudf::{Repository, RepositoryConnection, MemoryRepository, Result};
/// use crate::rudf::sparql::PreparedQuery;
/// use std::str::FromStr;
/// use rudf::sparql::algebra::QueryResult;
///
/// let repository = MemoryRepository::default();
/// let connection = repository.connection().unwrap();
///
/// // insertion
/// let ex = NamedNode::from_str("http://example.com").unwrap();
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results.unwrap());
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap();
/// let results = prepared_query.exec().unwrap();
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// }
/// ```
///
/// The implementation based on RocksDB if disabled by default and requires the `"rocksdb"` feature to be activated.
/// A `RocksDbRepository` could be built using `RocksDbRepository::open` and works just like its in-memory equivalent:
/// ```ignore
/// use rudf::RocksDbRepository;
/// let dataset = RocksDbRepository::open("example.db").unwrap();
/// ```
///
/// Quads insertion and deletion should respect [ACID](https://en.wikipedia.org/wiki/ACID) properties for all implementation.
/// No complex transaction support is provided yet.
pub trait Repository {
type Connection: RepositoryConnection;
fn connection(self) -> Result<Self::Connection>;
}
/// A connection to a `Repository`
pub trait RepositoryConnection: Clone {
type PreparedQuery: PreparedQuery;
/// Prepares a [SPARQL 1.1](https://www.w3.org/TR/sparql11-query/) query and returns an object that could be used to execute it.
///
/// The implementation is a work in progress, SPARQL 1.1 specific features are not implemented yet.
///
/// Usage example:
/// ```
/// use rudf::model::*;
/// use rudf::{Repository, RepositoryConnection, MemoryRepository};
/// use rudf::sparql::PreparedQuery;
/// use rudf::sparql::algebra::QueryResult;
/// use std::str::FromStr;
///
/// let repository = MemoryRepository::default();
/// let connection = repository.connection().unwrap();
///
/// // insertions
/// let ex = NamedNode::from_str("http://example.com").unwrap();
/// connection.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None));
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap();
/// let results = prepared_query.exec().unwrap();
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// }
/// ```
fn prepare_query(&self, query: impl Read) -> Result<Self::PreparedQuery>;
/// Retrieves quads with a filter on each quad component
///
/// Usage example:
/// ```
/// use rudf::model::*;
/// use rudf::{Repository, RepositoryConnection, MemoryRepository, Result};
/// use std::str::FromStr;
///
/// let repository = MemoryRepository::default();
/// let connection = repository.connection().unwrap();
///
/// // insertion
/// let ex = NamedNode::from_str("http://example.com").unwrap();
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results.unwrap());
/// ```
fn quads_for_pattern<'a>(
&'a self,
subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<&NamedOrBlankNode>,
) -> Box<dyn Iterator<Item = Result<Quad>> + 'a>
where
Self: 'a;
/// Checks if this dataset contains a given quad
fn contains(&self, quad: &Quad) -> Result<bool>;
/// Adds a quad to this dataset
fn insert(&self, quad: &Quad) -> Result<()>;
/// Removes a quad from this dataset
fn remove(&self, quad: &Quad) -> Result<()>;
}

@ -2,8 +2,8 @@ use crate::model::BlankNode;
use crate::model::Triple; use crate::model::Triple;
use crate::sparql::algebra::*; use crate::sparql::algebra::*;
use crate::sparql::plan::*; use crate::sparql::plan::*;
use crate::store::encoded::EncodedQuadsStore;
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::StoreConnection;
use crate::Result; use crate::Result;
use chrono::prelude::*; use chrono::prelude::*;
use num_traits::identities::Zero; use num_traits::identities::Zero;
@ -26,40 +26,38 @@ const REGEX_SIZE_LIMIT: usize = 1_000_000;
type EncodedTuplesIterator<'a> = Box<dyn Iterator<Item = Result<EncodedTuple>> + 'a>; type EncodedTuplesIterator<'a> = Box<dyn Iterator<Item = Result<EncodedTuple>> + 'a>;
pub struct SimpleEvaluator<S: EncodedQuadsStore> { #[derive(Clone)]
store: Arc<S>, pub struct SimpleEvaluator<S: StoreConnection> {
store: S,
bnodes_map: Arc<Mutex<BTreeMap<u64, BlankNode>>>, bnodes_map: Arc<Mutex<BTreeMap<u64, BlankNode>>>,
} }
impl<S: EncodedQuadsStore> Clone for SimpleEvaluator<S> { impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
fn clone(&self) -> Self { pub fn new(store: S) -> Self {
Self {
store: self.store.clone(),
bnodes_map: self.bnodes_map.clone(),
}
}
}
impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
pub fn new(store: Arc<S>) -> Self {
Self { Self {
store, store,
bnodes_map: Arc::new(Mutex::new(BTreeMap::default())), bnodes_map: Arc::new(Mutex::new(BTreeMap::default())),
} }
} }
pub fn evaluate_select_plan<'a>( pub fn evaluate_select_plan<'b>(
&'a self, &'b self,
plan: &'a PlanNode, plan: &'b PlanNode,
variables: &[Variable], variables: &[Variable],
) -> Result<QueryResult<'a>> { ) -> Result<QueryResult<'b>>
where
'a: 'b,
{
let iter = self.eval_plan(plan, vec![None; variables.len()]); let iter = self.eval_plan(plan, vec![None; variables.len()]);
Ok(QueryResult::Bindings( Ok(QueryResult::Bindings(
self.decode_bindings(iter, variables.to_vec()), self.decode_bindings(iter, variables.to_vec()),
)) ))
} }
pub fn evaluate_ask_plan<'a>(&'a self, plan: &'a PlanNode) -> Result<QueryResult<'a>> { pub fn evaluate_ask_plan<'b>(&'b self, plan: &'b PlanNode) -> Result<QueryResult<'b>>
where
'a: 'b,
{
match self.eval_plan(plan, vec![]).next() { match self.eval_plan(plan, vec![]).next() {
Some(Ok(_)) => Ok(QueryResult::Boolean(true)), Some(Ok(_)) => Ok(QueryResult::Boolean(true)),
Some(Err(error)) => Err(error), Some(Err(error)) => Err(error),
@ -67,11 +65,14 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
} }
} }
pub fn evaluate_construct_plan<'a>( pub fn evaluate_construct_plan<'b>(
&'a self, &'b self,
plan: &'a PlanNode, plan: &'b PlanNode,
construct: &'a [TripleTemplate], construct: &'b [TripleTemplate],
) -> Result<QueryResult<'a>> { ) -> Result<QueryResult<'b>>
where
'a: 'b,
{
Ok(QueryResult::Graph(Box::new(ConstructIterator { Ok(QueryResult::Graph(Box::new(ConstructIterator {
store: self.store.clone(), store: self.store.clone(),
iter: self.eval_plan(plan, vec![]), iter: self.eval_plan(plan, vec![]),
@ -81,15 +82,21 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
}))) })))
} }
pub fn evaluate_describe_plan<'a>(&'a self, plan: &'a PlanNode) -> Result<QueryResult<'a>> { pub fn evaluate_describe_plan<'b>(&'b self, plan: &'b PlanNode) -> Result<QueryResult<'b>>
where
'a: 'b,
{
Ok(QueryResult::Graph(Box::new(DescribeIterator { Ok(QueryResult::Graph(Box::new(DescribeIterator {
store: self.store.clone(), store: self.store.clone(),
iter: self.eval_plan(plan, vec![]), iter: self.eval_plan(plan, vec![]),
quads_iters: Vec::default(), quads: Vec::default(),
}))) })))
} }
fn eval_plan<'a>(&self, node: &'a PlanNode, from: EncodedTuple) -> EncodedTuplesIterator<'a> { fn eval_plan<'b>(&'b self, node: &'b PlanNode, from: EncodedTuple) -> EncodedTuplesIterator<'b>
where
'a: 'b,
{
match node { match node {
PlanNode::Init => Box::new(once(Ok(from))), PlanNode::Init => Box::new(once(Ok(from))),
PlanNode::StaticBindings { tuples } => Box::new(tuples.iter().cloned().map(Ok)), PlanNode::StaticBindings { tuples } => Box::new(tuples.iter().cloned().map(Ok)),
@ -99,13 +106,11 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
predicate, predicate,
object, object,
graph_name, graph_name,
} => { } => Box::new(
let eval = self.clone();
Box::new(
self.eval_plan(&*child, from) self.eval_plan(&*child, from)
.flat_map(move |tuple| match tuple { .flat_map(move |tuple| match tuple {
Ok(tuple) => { Ok(tuple) => {
let mut iter = eval.store.quads_for_pattern( let mut iter = self.store.quads_for_pattern(
get_pattern_value(&subject, &tuple), get_pattern_value(&subject, &tuple),
get_pattern_value(&predicate, &tuple), get_pattern_value(&predicate, &tuple),
get_pattern_value(&object, &tuple), get_pattern_value(&object, &tuple),
@ -153,30 +158,20 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
})) }))
} }
} }
let iter: EncodedTuplesIterator<'_> = let iter: EncodedTuplesIterator<'_> = Box::new(iter.map(move |quad| {
Box::new(iter.map(move |quad| {
let quad = quad?; let quad = quad?;
let mut new_tuple = tuple.clone(); let mut new_tuple = tuple.clone();
put_pattern_value(&subject, quad.subject, &mut new_tuple); put_pattern_value(&subject, quad.subject, &mut new_tuple);
put_pattern_value( put_pattern_value(&predicate, quad.predicate, &mut new_tuple);
&predicate,
quad.predicate,
&mut new_tuple,
);
put_pattern_value(&object, quad.object, &mut new_tuple); put_pattern_value(&object, quad.object, &mut new_tuple);
put_pattern_value( put_pattern_value(&graph_name, quad.graph_name, &mut new_tuple);
&graph_name,
quad.graph_name,
&mut new_tuple,
);
Ok(new_tuple) Ok(new_tuple)
})); }));
iter iter
} }
Err(error) => Box::new(once(Err(error))), Err(error) => Box::new(once(Err(error))),
}), }),
) ),
}
PlanNode::Join { left, right } => { PlanNode::Join { left, right } => {
//TODO: very dumb implementation //TODO: very dumb implementation
let left_iter = self.eval_plan(&*left, from.clone()); let left_iter = self.eval_plan(&*left, from.clone());
@ -208,7 +203,7 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
eval: self.clone(), eval: self.clone(),
right_plan: &*right, right_plan: &*right,
left_iter: self.eval_plan(&*left, filtered_from), left_iter: self.eval_plan(&*left, filtered_from),
current_right_iter: None, current_right: Vec::default(),
}; };
if problem_vars.is_empty() { if problem_vars.is_empty() {
Box::new(iter) Box::new(iter)
@ -236,7 +231,7 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
eval: self.clone(), eval: self.clone(),
children_plan: &children, children_plan: &children,
input_iter: self.eval_plan(&*entry, from), input_iter: self.eval_plan(&*entry, from),
current_iters: Vec::default(), current: Vec::default(),
}), }),
PlanNode::Extend { PlanNode::Extend {
child, child,
@ -853,11 +848,14 @@ impl<S: EncodedQuadsStore> SimpleEvaluator<S> {
} }
} }
fn decode_bindings<'a>( fn decode_bindings<'b>(
&self, &'b self,
iter: EncodedTuplesIterator<'a>, iter: EncodedTuplesIterator<'b>,
variables: Vec<Variable>, variables: Vec<Variable>,
) -> BindingsIterator<'a> { ) -> BindingsIterator<'b>
where
'a: 'b,
{
let store = self.store.clone(); let store = self.store.clone();
BindingsIterator::new( BindingsIterator::new(
variables, variables,
@ -1124,31 +1122,31 @@ impl<'a> Iterator for JoinIterator<'a> {
} }
} }
struct LeftJoinIterator<'a, S: EncodedQuadsStore> { struct LeftJoinIterator<'a, S: StoreConnection + 'a> {
eval: SimpleEvaluator<S>, eval: SimpleEvaluator<S>,
right_plan: &'a PlanNode, right_plan: &'a PlanNode,
left_iter: EncodedTuplesIterator<'a>, left_iter: EncodedTuplesIterator<'a>,
current_right_iter: Option<EncodedTuplesIterator<'a>>, current_right: Vec<Result<EncodedTuple>>, //TODO: keep using an iterator?
} }
impl<'a, S: EncodedQuadsStore> Iterator for LeftJoinIterator<'a, S> { impl<'a, S: StoreConnection> Iterator for LeftJoinIterator<'a, S> {
type Item = Result<EncodedTuple>; type Item = Result<EncodedTuple>;
fn next(&mut self) -> Option<Result<EncodedTuple>> { fn next(&mut self) -> Option<Result<EncodedTuple>> {
if let Some(ref mut right_iter) = self.current_right_iter { if let Some(tuple) = self.current_right.pop() {
if let Some(tuple) = right_iter.next() {
return Some(tuple); return Some(tuple);
} }
}
match self.left_iter.next()? { match self.left_iter.next()? {
Ok(left_tuple) => { Ok(left_tuple) => {
let mut right_iter = self.eval.eval_plan(self.right_plan, left_tuple.clone()); let mut current_right: Vec<_> = self
match right_iter.next() { .eval
Some(right_tuple) => { .eval_plan(self.right_plan, left_tuple.clone())
self.current_right_iter = Some(right_iter); .collect();
if let Some(right_tuple) = current_right.pop() {
self.current_right = current_right;
Some(right_tuple) Some(right_tuple)
} } else {
None => Some(Ok(left_tuple)), Some(Ok(left_tuple))
} }
} }
Err(error) => Some(Err(error)), Err(error) => Some(Err(error)),
@ -1156,13 +1154,13 @@ impl<'a, S: EncodedQuadsStore> Iterator for LeftJoinIterator<'a, S> {
} }
} }
struct BadLeftJoinIterator<'a, S: EncodedQuadsStore> { struct BadLeftJoinIterator<'a, S: StoreConnection> {
input: EncodedTuple, input: EncodedTuple,
iter: LeftJoinIterator<'a, S>, iter: LeftJoinIterator<'a, S>,
problem_vars: Vec<usize>, problem_vars: Vec<usize>,
} }
impl<'a, S: EncodedQuadsStore> Iterator for BadLeftJoinIterator<'a, S> { impl<'a, S: StoreConnection> Iterator for BadLeftJoinIterator<'a, S> {
type Item = Result<EncodedTuple>; type Item = Result<EncodedTuple>;
fn next(&mut self) -> Option<Result<EncodedTuple>> { fn next(&mut self) -> Option<Result<EncodedTuple>> {
@ -1192,28 +1190,25 @@ impl<'a, S: EncodedQuadsStore> Iterator for BadLeftJoinIterator<'a, S> {
} }
} }
struct UnionIterator<'a, S: EncodedQuadsStore> { struct UnionIterator<'a, S: StoreConnection + 'a> {
eval: SimpleEvaluator<S>, eval: SimpleEvaluator<S>,
children_plan: &'a Vec<PlanNode>, children_plan: &'a Vec<PlanNode>,
input_iter: EncodedTuplesIterator<'a>, input_iter: EncodedTuplesIterator<'a>,
current_iters: Vec<EncodedTuplesIterator<'a>>, current: Vec<Result<EncodedTuple>>, //TODO: avoid
} }
impl<'a, S: EncodedQuadsStore> Iterator for UnionIterator<'a, S> { impl<'a, S: StoreConnection> Iterator for UnionIterator<'a, S> {
type Item = Result<EncodedTuple>; type Item = Result<EncodedTuple>;
fn next(&mut self) -> Option<Result<EncodedTuple>> { fn next(&mut self) -> Option<Result<EncodedTuple>> {
while let Some(mut iter) = self.current_iters.pop() { while let Some(tuple) = self.current.pop() {
if let Some(tuple) = iter.next() {
self.current_iters.push(iter);
return Some(tuple); return Some(tuple);
} }
}
match self.input_iter.next()? { match self.input_iter.next()? {
Ok(input_tuple) => { Ok(input_tuple) => {
for plan in self.children_plan { for plan in self.children_plan {
self.current_iters self.current
.push(self.eval.eval_plan(plan, input_tuple.clone())); .extend(self.eval.eval_plan(plan, input_tuple.clone()));
} }
} }
Err(error) => return Some(Err(error)), Err(error) => return Some(Err(error)),
@ -1244,15 +1239,15 @@ impl<'a> Iterator for HashDeduplicateIterator<'a> {
} }
} }
struct ConstructIterator<'a, S: EncodedQuadsStore> { struct ConstructIterator<'a, S: StoreConnection> {
store: Arc<S>, store: S,
iter: EncodedTuplesIterator<'a>, iter: EncodedTuplesIterator<'a>,
template: &'a [TripleTemplate], template: &'a [TripleTemplate],
buffered_results: Vec<Result<Triple>>, buffered_results: Vec<Result<Triple>>,
bnodes: Vec<BlankNode>, bnodes: Vec<BlankNode>,
} }
impl<'a, S: EncodedQuadsStore> Iterator for ConstructIterator<'a, S> { impl<'a, S: StoreConnection> Iterator for ConstructIterator<'a, S> {
type Item = Result<Triple>; type Item = Result<Triple>;
fn next(&mut self) -> Option<Result<Triple>> { fn next(&mut self) -> Option<Result<Triple>> {
@ -1315,43 +1310,40 @@ fn decode_triple<S: StringStore>(
)) ))
} }
struct DescribeIterator<'a, S: EncodedQuadsStore> { struct DescribeIterator<'a, S: StoreConnection + 'a> {
store: Arc<S>, store: S,
iter: EncodedTuplesIterator<'a>, iter: EncodedTuplesIterator<'a>,
quads_iters: Vec<S::QuadsForSubjectIterator>, quads: Vec<Result<EncodedQuad>>,
} }
impl<'a, S: EncodedQuadsStore> Iterator for DescribeIterator<'a, S> { impl<'a, S: StoreConnection> Iterator for DescribeIterator<'a, S> {
type Item = Result<Triple>; type Item = Result<Triple>;
fn next(&mut self) -> Option<Result<Triple>> { fn next(&mut self) -> Option<Result<Triple>> {
while let Some(mut quads_iter) = self.quads_iters.pop() { while let Some(quad) = self.quads.pop() {
if let Some(quad) = quads_iter.next() { return Some(match quad {
self.quads_iters.push(quads_iter); Ok(quad) => self
return Some(quad.and_then(|quad| self.store.encoder().decode_triple(&quad))); .store
} .encoder()
.decode_quad(&quad)
.map(|q| q.into_triple()),
Err(error) => Err(error),
});
} }
let tuple = match self.iter.next()? { let tuple = match self.iter.next()? {
Ok(tuple) => tuple, Ok(tuple) => tuple,
Err(error) => return Some(Err(error)), Err(error) => return Some(Err(error)),
}; };
let mut error_to_return = None;
for subject in tuple { for subject in tuple {
if let Some(subject) = subject { if let Some(subject) = subject {
match self.store.quads_for_subject(subject) { self.quads = self
Ok(quads_iter) => self.quads_iters.push(quads_iter), .store
Err(error) => { .quads_for_pattern(Some(subject), None, None, None)
error_to_return = Some(error); .collect();
}
}
} }
} }
if let Some(error) = error_to_return {
Some(Err(error))
} else {
self.next() self.next()
} }
}
} }
struct ZipLongest<T1, T2, I1: Iterator<Item = T1>, I2: Iterator<Item = T2>> { struct ZipLongest<T1, T2, I1: Iterator<Item = T1>, I2: Iterator<Item = T2>> {

@ -1,29 +1,5 @@
//! [SPARQL](https://www.w3.org/TR/sparql11-overview/) implementation. //! [SPARQL](https://www.w3.org/TR/sparql11-overview/) implementation.
//!
//! The support of RDF Dataset and SPARQL 1.1 specific features is not done yet.
//!
//! This module adds query capabilities to the `rudf::model::Dataset` implementations.
//!
//! Usage example:
//! ```
//! use rudf::model::*;
//! use rudf::store::MemoryDataset;
//! use rudf::sparql::SparqlDataset;
//! use rudf::sparql::PreparedQuery;
//! use rudf::sparql::algebra::QueryResult;
//! use std::str::FromStr;
//!
//! let dataset = MemoryDataset::default();
//! let ex = NamedNode::from_str("http://example.com").unwrap();
//! dataset.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None));
//! let prepared_query = dataset.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap();
//! let results = prepared_query.exec().unwrap();
//! if let QueryResult::Bindings(results) = results {
//! assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
//! }
//! ```
use crate::model::Dataset;
use crate::sparql::algebra::Query; use crate::sparql::algebra::Query;
use crate::sparql::algebra::QueryResult; use crate::sparql::algebra::QueryResult;
use crate::sparql::algebra::Variable; use crate::sparql::algebra::Variable;
@ -32,8 +8,7 @@ use crate::sparql::parser::read_sparql_query;
use crate::sparql::plan::PlanBuilder; use crate::sparql::plan::PlanBuilder;
use crate::sparql::plan::PlanNode; use crate::sparql::plan::PlanNode;
use crate::sparql::plan::TripleTemplate; use crate::sparql::plan::TripleTemplate;
use crate::store::encoded::EncodedQuadsStore; use crate::store::StoreConnection;
use crate::store::encoded::StoreDataset;
use crate::Result; use crate::Result;
use std::io::Read; use std::io::Read;
@ -43,50 +18,58 @@ pub mod parser;
mod plan; mod plan;
pub mod xml_results; pub mod xml_results;
/// An extension of the `rudf::model::Dataset` trait to allow SPARQL operations on it.
///
/// It is implemented by all stores provided by Rudf
pub trait SparqlDataset: Dataset {
type PreparedQuery: PreparedQuery;
/// Prepares a [SPARQL 1.1](https://www.w3.org/TR/sparql11-query/) query and returns an object that could be used to execute it
///
/// The implementation is a work in progress, RDF Dataset and SPARQL 1.1 specific features are not implemented yet.
fn prepare_query(&self, query: impl Read) -> Result<Self::PreparedQuery>;
}
/// A prepared [SPARQL 1.1](https://www.w3.org/TR/sparql11-query/) query /// A prepared [SPARQL 1.1](https://www.w3.org/TR/sparql11-query/) query
pub trait PreparedQuery { pub trait PreparedQuery {
/// Evaluates the query and returns its results /// Evaluates the query and returns its results
fn exec(&self) -> Result<QueryResult<'_>>; fn exec(&self) -> Result<QueryResult<'_>>;
} }
impl<S: EncodedQuadsStore> SparqlDataset for StoreDataset<S> { /// An implementation of `PreparedQuery` for internal use
type PreparedQuery = SimplePreparedQuery<S>; pub struct SimplePreparedQuery<S: StoreConnection>(SimplePreparedQueryOptions<S>);
enum SimplePreparedQueryOptions<S: StoreConnection> {
Select {
plan: PlanNode,
variables: Vec<Variable>,
evaluator: SimpleEvaluator<S>,
},
Ask {
plan: PlanNode,
evaluator: SimpleEvaluator<S>,
},
Construct {
plan: PlanNode,
construct: Vec<TripleTemplate>,
evaluator: SimpleEvaluator<S>,
},
Describe {
plan: PlanNode,
evaluator: SimpleEvaluator<S>,
},
}
fn prepare_query(&self, query: impl Read) -> Result<SimplePreparedQuery<S>> { impl<S: StoreConnection> SimplePreparedQuery<S> {
Ok(SimplePreparedQuery(match read_sparql_query(query, None)? { pub(crate) fn new(connection: S, query: impl Read) -> Result<Self> {
Ok(Self(match read_sparql_query(query, None)? {
Query::Select { Query::Select {
algebra, algebra,
dataset: _, dataset: _,
} => { } => {
let store = self.encoded(); let (plan, variables) = PlanBuilder::build(&connection, &algebra)?;
let (plan, variables) = PlanBuilder::build(&*store, &algebra)?;
SimplePreparedQueryOptions::Select { SimplePreparedQueryOptions::Select {
plan, plan,
variables, variables,
evaluator: SimpleEvaluator::new(store), evaluator: SimpleEvaluator::new(connection),
} }
} }
Query::Ask { Query::Ask {
algebra, algebra,
dataset: _, dataset: _,
} => { } => {
let store = self.encoded(); let (plan, _) = PlanBuilder::build(&connection, &algebra)?;
let (plan, _) = PlanBuilder::build(&*store, &algebra)?;
SimplePreparedQueryOptions::Ask { SimplePreparedQueryOptions::Ask {
plan, plan,
evaluator: SimpleEvaluator::new(store), evaluator: SimpleEvaluator::new(connection),
} }
} }
Query::Construct { Query::Construct {
@ -94,54 +77,32 @@ impl<S: EncodedQuadsStore> SparqlDataset for StoreDataset<S> {
algebra, algebra,
dataset: _, dataset: _,
} => { } => {
let store = self.encoded(); let (plan, variables) = PlanBuilder::build(&connection, &algebra)?;
let (plan, variables) = PlanBuilder::build(&*store, &algebra)?;
SimplePreparedQueryOptions::Construct { SimplePreparedQueryOptions::Construct {
plan, plan,
construct: PlanBuilder::build_graph_template(&*store, &construct, variables)?, construct: PlanBuilder::build_graph_template(
evaluator: SimpleEvaluator::new(store), &connection,
&construct,
variables,
)?,
evaluator: SimpleEvaluator::new(connection),
} }
} }
Query::Describe { Query::Describe {
algebra, algebra,
dataset: _, dataset: _,
} => { } => {
let store = self.encoded(); let (plan, _) = PlanBuilder::build(&connection, &algebra)?;
let (plan, _) = PlanBuilder::build(&*store, &algebra)?;
SimplePreparedQueryOptions::Describe { SimplePreparedQueryOptions::Describe {
plan, plan,
evaluator: SimpleEvaluator::new(store), evaluator: SimpleEvaluator::new(connection),
} }
} }
})) }))
} }
} }
/// An implementation of `PreparedQuery` for internal use impl<S: StoreConnection> PreparedQuery for SimplePreparedQuery<S> {
pub struct SimplePreparedQuery<S: EncodedQuadsStore>(SimplePreparedQueryOptions<S>);
enum SimplePreparedQueryOptions<S: EncodedQuadsStore> {
Select {
plan: PlanNode,
variables: Vec<Variable>,
evaluator: SimpleEvaluator<S>,
},
Ask {
plan: PlanNode,
evaluator: SimpleEvaluator<S>,
},
Construct {
plan: PlanNode,
construct: Vec<TripleTemplate>,
evaluator: SimpleEvaluator<S>,
},
Describe {
plan: PlanNode,
evaluator: SimpleEvaluator<S>,
},
}
impl<S: EncodedQuadsStore> PreparedQuery for SimplePreparedQuery<S> {
fn exec(&self) -> Result<QueryResult<'_>> { fn exec(&self) -> Result<QueryResult<'_>> {
match &self.0 { match &self.0 {
SimplePreparedQueryOptions::Select { SimplePreparedQueryOptions::Select {

@ -1,9 +1,9 @@
use crate::model::vocab::xsd; use crate::model::vocab::xsd;
use crate::model::Literal; use crate::model::Literal;
use crate::sparql::algebra::*; use crate::sparql::algebra::*;
use crate::store::encoded::EncodedQuadsStore;
use crate::store::numeric_encoder::EncodedTerm; use crate::store::numeric_encoder::EncodedTerm;
use crate::store::numeric_encoder::ENCODED_DEFAULT_GRAPH; use crate::store::numeric_encoder::ENCODED_DEFAULT_GRAPH;
use crate::store::StoreConnection;
use crate::Result; use crate::Result;
use failure::format_err; use failure::format_err;
use std::collections::BTreeSet; use std::collections::BTreeSet;
@ -355,12 +355,12 @@ pub enum TripleTemplateValue {
Variable(usize), Variable(usize),
} }
pub struct PlanBuilder<'a, S: EncodedQuadsStore> { pub struct PlanBuilder<'a, S: StoreConnection> {
store: &'a S, store: &'a S,
} }
impl<'a, S: EncodedQuadsStore> PlanBuilder<'a, S> { impl<'a, S: StoreConnection> PlanBuilder<'a, S> {
pub fn build(store: &S, pattern: &GraphPattern) -> Result<(PlanNode, Vec<Variable>)> { pub fn build(store: &'a S, pattern: &GraphPattern) -> Result<(PlanNode, Vec<Variable>)> {
let mut variables = Vec::default(); let mut variables = Vec::default();
let plan = PlanBuilder { store }.build_for_graph_pattern( let plan = PlanBuilder { store }.build_for_graph_pattern(
pattern, pattern,

@ -1,966 +0,0 @@
use crate::model::*;
use crate::store::numeric_encoder::*;
use crate::Result;
use failure::format_err;
use std::fmt;
use std::iter::empty;
use std::iter::once;
use std::iter::FromIterator;
use std::iter::Iterator;
use std::sync::Arc;
/// Defines the Store traits that is used to have efficient binary storage
pub trait EncodedQuadsStore: StringStore + Sized + 'static {
type QuadsIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectPredicateIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectPredicateObjectIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectObjectIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForPredicateIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForPredicateObjectIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForObjectIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectPredicateGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForSubjectObjectGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForPredicateGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForPredicateObjectGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
type QuadsForObjectGraphIterator: Iterator<Item = Result<EncodedQuad>> + 'static;
fn encoder(&self) -> Encoder<&Self> {
Encoder::new(&self)
}
fn quads(&self) -> Result<Self::QuadsIterator>;
fn quads_for_subject(&self, subject: EncodedTerm) -> Result<Self::QuadsForSubjectIterator>;
fn quads_for_subject_predicate(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
) -> Result<Self::QuadsForSubjectPredicateIterator>;
fn quads_for_subject_predicate_object(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
object: EncodedTerm,
) -> Result<Self::QuadsForSubjectPredicateObjectIterator>;
fn quads_for_subject_object(
&self,
subject: EncodedTerm,
object: EncodedTerm,
) -> Result<Self::QuadsForSubjectObjectIterator>;
fn quads_for_predicate(
&self,
predicate: EncodedTerm,
) -> Result<Self::QuadsForPredicateIterator>;
fn quads_for_predicate_object(
&self,
predicate: EncodedTerm,
object: EncodedTerm,
) -> Result<Self::QuadsForPredicateObjectIterator>;
fn quads_for_object(&self, object: EncodedTerm) -> Result<Self::QuadsForObjectIterator>;
fn quads_for_graph(&self, graph_name: EncodedTerm) -> Result<Self::QuadsForGraphIterator>;
fn quads_for_subject_graph(
&self,
subject: EncodedTerm,
graph_name: EncodedTerm,
) -> Result<Self::QuadsForSubjectGraphIterator>;
fn quads_for_subject_predicate_graph(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
graph_name: EncodedTerm,
) -> Result<Self::QuadsForSubjectPredicateGraphIterator>;
fn quads_for_subject_object_graph(
&self,
subject: EncodedTerm,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> Result<Self::QuadsForSubjectObjectGraphIterator>;
fn quads_for_predicate_graph(
&self,
predicate: EncodedTerm,
graph_name: EncodedTerm,
) -> Result<Self::QuadsForPredicateGraphIterator>;
fn quads_for_predicate_object_graph(
&self,
predicate: EncodedTerm,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> Result<Self::QuadsForPredicateObjectGraphIterator>;
fn quads_for_object_graph(
&self,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> Result<Self::QuadsForObjectGraphIterator>;
fn contains(&self, quad: &EncodedQuad) -> Result<bool>;
fn insert(&self, quad: &EncodedQuad) -> Result<()>;
fn remove(&self, quad: &EncodedQuad) -> Result<()>;
fn quads_for_pattern(
&self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>>> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match self.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))),
}
}
None => wrap_error(
self.quads_for_subject_predicate_object(subject, predicate, object),
),
},
None => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_predicate_graph(subject, predicate, graph_name),
),
None => wrap_error(self.quads_for_subject_predicate(subject, predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_object_graph(subject, object, graph_name),
),
None => wrap_error(self.quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_subject_graph(subject, graph_name))
}
None => wrap_error(self.quads_for_subject(subject)),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_predicate_object_graph(predicate, object, graph_name),
),
None => wrap_error(self.quads_for_predicate_object(predicate, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_predicate_graph(predicate, graph_name))
}
None => wrap_error(self.quads_for_predicate(predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_object_graph(object, graph_name))
}
None => wrap_error(self.quads_for_object(object)),
},
None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)),
None => wrap_error(self.quads()),
},
},
},
}
}
}
fn wrap_error<E: 'static, I: Iterator<Item = Result<E>> + 'static>(
iter: Result<I>,
) -> Box<dyn Iterator<Item = Result<E>>> {
match iter {
Ok(iter) => Box::new(iter),
Err(error) => Box::new(once(Err(error))),
}
}
pub struct StoreDataset<S: EncodedQuadsStore> {
store: Arc<S>,
}
impl<S: EncodedQuadsStore> StoreDataset<S> {
pub fn new_from_store(store: S) -> Self {
Self {
store: Arc::new(store),
}
}
pub(crate) fn encoded(&self) -> Arc<S> {
self.store.clone()
}
}
impl<S: EncodedQuadsStore> Dataset for StoreDataset<S> {
type NamedGraph = StoreNamedGraph<S>;
type DefaultGraph = StoreDefaultGraph<S>;
type UnionGraph = StoreUnionGraph<S>;
type QuadsIterator = QuadsIterator<S::QuadsIterator, S>;
type QuadsForSubjectIterator = QuadsIterator<S::QuadsForSubjectIterator, S>;
type QuadsForSubjectPredicateIterator = QuadsIterator<S::QuadsForSubjectPredicateIterator, S>;
type QuadsForSubjectPredicateObjectIterator =
QuadsIterator<S::QuadsForSubjectPredicateObjectIterator, S>;
type QuadsForSubjectObjectIterator = QuadsIterator<S::QuadsForSubjectObjectIterator, S>;
type QuadsForPredicateIterator = QuadsIterator<S::QuadsForPredicateIterator, S>;
type QuadsForPredicateObjectIterator = QuadsIterator<S::QuadsForPredicateObjectIterator, S>;
type QuadsForObjectIterator = QuadsIterator<S::QuadsForObjectIterator, S>;
fn named_graph(&self, name: &NamedOrBlankNode) -> Result<StoreNamedGraph<S>> {
Ok(StoreNamedGraph {
store: self.store.clone(),
name: name.clone(),
encoded_name: self.store.encoder().encode_named_or_blank_node(name)?,
})
}
fn default_graph(&self) -> StoreDefaultGraph<S> {
StoreDefaultGraph {
store: self.store.clone(),
}
}
fn union_graph(&self) -> StoreUnionGraph<S> {
StoreUnionGraph {
store: self.store.clone(),
}
}
fn quads(&self) -> Result<QuadsIterator<S::QuadsIterator, S>> {
Ok(QuadsIterator {
iter: self.store.quads()?,
store: self.store.clone(),
})
}
fn quads_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<QuadsIterator<S::QuadsForSubjectIterator, S>> {
let encoder = self.store.encoder();
Ok(QuadsIterator {
iter: self
.store
.quads_for_subject(encoder.encode_named_or_blank_node(subject)?)?,
store: self.store.clone(),
})
}
fn quads_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<QuadsIterator<S::QuadsForSubjectPredicateIterator, S>> {
let encoder = self.store.encoder();
Ok(QuadsIterator {
iter: self.store.quads_for_subject_predicate(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_named_node(predicate)?,
)?,
store: self.store.clone(),
})
}
fn quads_for_subject_predicate_object(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
object: &Term,
) -> Result<QuadsIterator<S::QuadsForSubjectPredicateObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(QuadsIterator {
iter: self.store.quads_for_subject_predicate_object(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_named_node(predicate)?,
encoder.encode_term(object)?,
)?,
store: self.store.clone(),
})
}
fn quads_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<QuadsIterator<S::QuadsForSubjectObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(QuadsIterator {
iter: self.store.quads_for_subject_object(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_term(object)?,
)?,
store: self.store.clone(),
})
}
fn quads_for_predicate(
&self,
predicate: &NamedNode,
) -> Result<QuadsIterator<S::QuadsForPredicateIterator, S>> {
let encoder = self.store.encoder();
Ok(QuadsIterator {
iter: self
.store
.quads_for_predicate(encoder.encode_named_node(predicate)?)?,
store: self.store.clone(),
})
}
fn quads_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<QuadsIterator<S::QuadsForPredicateObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(QuadsIterator {
iter: self.store.quads_for_predicate_object(
encoder.encode_named_node(predicate)?,
encoder.encode_term(object)?,
)?,
store: self.store.clone(),
})
}
fn quads_for_object(
&self,
object: &Term,
) -> Result<QuadsIterator<S::QuadsForObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(QuadsIterator {
iter: self.store.quads_for_object(encoder.encode_term(object)?)?,
store: self.store.clone(),
})
}
fn contains(&self, quad: &Quad) -> Result<bool> {
self.store
.contains(&self.store.encoder().encode_quad(quad)?)
}
fn insert(&self, quad: &Quad) -> Result<()> {
self.store.insert(&self.store.encoder().encode_quad(quad)?)
}
fn remove(&self, quad: &Quad) -> Result<()> {
self.store.remove(&self.store.encoder().encode_quad(quad)?)
}
fn len(&self) -> Result<usize> {
Ok(self.store.quads()?.count())
}
fn is_empty(&self) -> Result<bool> {
Ok(self.store.quads()?.any(|_| true))
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreDataset<S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
for quad in self.iter().map_err(|_| fmt::Error)? {
writeln!(fmt, "{}", quad.map_err(|_| fmt::Error)?)?;
}
Ok(())
}
}
impl<S: EncodedQuadsStore + Default> Default for StoreDataset<S> {
fn default() -> Self {
Self::new_from_store(S::default())
}
}
impl<S: EncodedQuadsStore + Default> FromIterator<Quad> for StoreDataset<S> {
fn from_iter<I: IntoIterator<Item = Quad>>(iter: I) -> Self {
let dataset = Self::default();
for quad in iter {
dataset.insert(&quad).unwrap();
}
dataset
}
}
impl<'a, S: EncodedQuadsStore + Default> FromIterator<&'a Quad> for StoreDataset<S> {
fn from_iter<I: IntoIterator<Item = &'a Quad>>(iter: I) -> Self {
let dataset = Self::default();
for quad in iter {
dataset.insert(quad).unwrap();
}
dataset
}
}
pub struct StoreNamedGraph<S: EncodedQuadsStore> {
store: Arc<S>,
name: NamedOrBlankNode,
encoded_name: EncodedTerm,
}
impl<S: EncodedQuadsStore> Graph for StoreNamedGraph<S> {
type TriplesIterator = TriplesIterator<S::QuadsForGraphIterator, S>;
type TriplesForSubjectIterator = TriplesIterator<S::QuadsForSubjectGraphIterator, S>;
type ObjectsForSubjectPredicateIterator =
ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>;
type PredicatesForSubjectObjectIterator =
PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>;
type TriplesForPredicateIterator = TriplesIterator<S::QuadsForPredicateGraphIterator, S>;
type SubjectsForPredicateObjectIterator =
SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>;
type TriplesForObjectIterator = TriplesIterator<S::QuadsForObjectGraphIterator, S>;
fn triples(&self) -> Result<TriplesIterator<S::QuadsForGraphIterator, S>> {
Ok(TriplesIterator {
iter: self.store.quads_for_graph(self.encoded_name)?,
store: self.store.clone(),
})
}
fn triples_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<TriplesIterator<S::QuadsForSubjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self.store.quads_for_subject_graph(
encoder.encode_named_or_blank_node(subject)?,
self.encoded_name,
)?,
store: self.store.clone(),
})
}
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(ObjectsIterator {
iter: self.store.quads_for_subject_predicate_graph(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_named_node(predicate)?,
self.encoded_name,
)?,
store: self.store.clone(),
})
}
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(PredicatesIterator {
iter: self.store.quads_for_subject_object_graph(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_term(object)?,
self.encoded_name,
)?,
store: self.store.clone(),
})
}
fn triples_for_predicate(
&self,
predicate: &NamedNode,
) -> Result<TriplesIterator<S::QuadsForPredicateGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self.store.quads_for_predicate_graph(
encoder.encode_named_node(predicate)?,
self.encoded_name,
)?,
store: self.store.clone(),
})
}
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(SubjectsIterator {
iter: self.store.quads_for_predicate_object_graph(
encoder.encode_named_node(predicate)?,
encoder.encode_term(object)?,
self.encoded_name,
)?,
store: self.store.clone(),
})
}
fn triples_for_object(
&self,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self
.store
.quads_for_object_graph(encoder.encode_term(object)?, self.encoded_name)?,
store: self.store.clone(),
})
}
fn contains(&self, triple: &Triple) -> Result<bool> {
self.store.contains(
&self
.store
.encoder()
.encode_triple_in_graph(triple, self.encoded_name)?,
)
}
fn insert(&self, triple: &Triple) -> Result<()> {
self.store.insert(
&self
.store
.encoder()
.encode_triple_in_graph(triple, self.encoded_name)?,
)
}
fn remove(&self, triple: &Triple) -> Result<()> {
self.store.remove(
&self
.store
.encoder()
.encode_triple_in_graph(triple, self.encoded_name)?,
)
}
fn len(&self) -> Result<usize> {
Ok(self.store.quads_for_graph(self.encoded_name)?.count())
}
fn is_empty(&self) -> Result<bool> {
Ok(self.store.quads_for_graph(self.encoded_name)?.any(|_| true))
}
}
impl<S: EncodedQuadsStore> NamedGraph for StoreNamedGraph<S> {
fn name(&self) -> &NamedOrBlankNode {
&self.name
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreNamedGraph<S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
for triple in self.iter().map_err(|_| fmt::Error)? {
writeln!(fmt, "{}", triple.map_err(|_| fmt::Error)?)?;
}
Ok(())
}
}
pub struct StoreDefaultGraph<S: EncodedQuadsStore> {
store: Arc<S>,
}
impl<S: EncodedQuadsStore> Graph for StoreDefaultGraph<S> {
type TriplesIterator = TriplesIterator<S::QuadsForGraphIterator, S>;
type TriplesForSubjectIterator = TriplesIterator<S::QuadsForSubjectGraphIterator, S>;
type ObjectsForSubjectPredicateIterator =
ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>;
type PredicatesForSubjectObjectIterator =
PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>;
type TriplesForPredicateIterator = TriplesIterator<S::QuadsForPredicateGraphIterator, S>;
type SubjectsForPredicateObjectIterator =
SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>;
type TriplesForObjectIterator = TriplesIterator<S::QuadsForObjectGraphIterator, S>;
fn triples(&self) -> Result<TriplesIterator<S::QuadsForGraphIterator, S>> {
Ok(TriplesIterator {
iter: self.store.quads_for_graph(ENCODED_DEFAULT_GRAPH)?,
store: self.store.clone(),
})
}
fn triples_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<TriplesIterator<S::QuadsForSubjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self.store.quads_for_subject_graph(
encoder.encode_named_or_blank_node(subject)?,
ENCODED_DEFAULT_GRAPH,
)?,
store: self.store.clone(),
})
}
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(ObjectsIterator {
iter: self.store.quads_for_subject_predicate_graph(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_named_node(predicate)?,
ENCODED_DEFAULT_GRAPH,
)?,
store: self.store.clone(),
})
}
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(PredicatesIterator {
iter: self.store.quads_for_subject_object_graph(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_term(object)?,
ENCODED_DEFAULT_GRAPH,
)?,
store: self.store.clone(),
})
}
fn triples_for_predicate(
&self,
predicate: &NamedNode,
) -> Result<TriplesIterator<S::QuadsForPredicateGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self.store.quads_for_predicate_graph(
encoder.encode_named_node(predicate)?,
ENCODED_DEFAULT_GRAPH,
)?,
store: self.store.clone(),
})
}
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(SubjectsIterator {
iter: self.store.quads_for_predicate_object_graph(
encoder.encode_named_node(predicate)?,
encoder.encode_term(object)?,
ENCODED_DEFAULT_GRAPH,
)?,
store: self.store.clone(),
})
}
fn triples_for_object(
&self,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self
.store
.quads_for_object_graph(encoder.encode_term(object)?, ENCODED_DEFAULT_GRAPH)?,
store: self.store.clone(),
})
}
fn contains(&self, triple: &Triple) -> Result<bool> {
self.store.contains(
&self
.store
.encoder()
.encode_triple_in_graph(triple, ENCODED_DEFAULT_GRAPH)?,
)
}
fn insert(&self, triple: &Triple) -> Result<()> {
self.store.insert(
&self
.store
.encoder()
.encode_triple_in_graph(triple, ENCODED_DEFAULT_GRAPH)?,
)
}
fn remove(&self, triple: &Triple) -> Result<()> {
self.store.remove(
&self
.store
.encoder()
.encode_triple_in_graph(triple, ENCODED_DEFAULT_GRAPH)?,
)
}
fn len(&self) -> Result<usize> {
Ok(self.store.quads_for_graph(ENCODED_DEFAULT_GRAPH)?.count())
}
fn is_empty(&self) -> Result<bool> {
Ok(self
.store
.quads_for_graph(ENCODED_DEFAULT_GRAPH)?
.any(|_| true))
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreDefaultGraph<S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
for triple in self.iter().map_err(|_| fmt::Error)? {
writeln!(fmt, "{}", triple.map_err(|_| fmt::Error)?)?;
}
Ok(())
}
}
impl<S: EncodedQuadsStore + Default> Default for StoreDefaultGraph<S> {
fn default() -> Self {
StoreDataset::default().default_graph()
}
}
impl<S: EncodedQuadsStore + Default> FromIterator<Triple> for StoreDefaultGraph<S> {
fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self {
let graph = Self::default();
for triple in iter {
graph.insert(&triple).unwrap();
}
graph
}
}
impl<'a, S: EncodedQuadsStore + Default> FromIterator<&'a Triple> for StoreDefaultGraph<S> {
fn from_iter<I: IntoIterator<Item = &'a Triple>>(iter: I) -> Self {
let graph = Self::default();
for triple in iter {
graph.insert(triple).unwrap();
}
graph
}
}
pub struct StoreUnionGraph<S: EncodedQuadsStore> {
store: Arc<S>,
}
impl<S: EncodedQuadsStore> Graph for StoreUnionGraph<S> {
type TriplesIterator = TriplesIterator<S::QuadsIterator, S>;
type TriplesForSubjectIterator = TriplesIterator<S::QuadsForSubjectIterator, S>;
type ObjectsForSubjectPredicateIterator =
ObjectsIterator<S::QuadsForSubjectPredicateIterator, S>;
type PredicatesForSubjectObjectIterator =
PredicatesIterator<S::QuadsForSubjectObjectIterator, S>;
type TriplesForPredicateIterator = TriplesIterator<S::QuadsForPredicateIterator, S>;
type SubjectsForPredicateObjectIterator =
SubjectsIterator<S::QuadsForPredicateObjectIterator, S>;
type TriplesForObjectIterator = TriplesIterator<S::QuadsForObjectIterator, S>;
fn triples(&self) -> Result<TriplesIterator<S::QuadsIterator, S>> {
Ok(TriplesIterator {
iter: self.store.quads()?,
store: self.store.clone(),
})
}
fn triples_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<TriplesIterator<S::QuadsForSubjectIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self
.store
.quads_for_subject(encoder.encode_named_or_blank_node(subject)?)?,
store: self.store.clone(),
})
}
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<ObjectsIterator<S::QuadsForSubjectPredicateIterator, S>> {
let encoder = self.store.encoder();
Ok(ObjectsIterator {
iter: self.store.quads_for_subject_predicate(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_named_node(predicate)?,
)?,
store: self.store.clone(),
})
}
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<PredicatesIterator<S::QuadsForSubjectObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(PredicatesIterator {
iter: self.store.quads_for_subject_object(
encoder.encode_named_or_blank_node(subject)?,
encoder.encode_term(object)?,
)?,
store: self.store.clone(),
})
}
fn triples_for_predicate(
&self,
predicate: &NamedNode,
) -> Result<TriplesIterator<S::QuadsForPredicateIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self
.store
.quads_for_predicate(encoder.encode_named_node(predicate)?)?,
store: self.store.clone(),
})
}
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<SubjectsIterator<S::QuadsForPredicateObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(SubjectsIterator {
iter: self.store.quads_for_predicate_object(
encoder.encode_named_node(predicate)?,
encoder.encode_term(object)?,
)?,
store: self.store.clone(),
})
}
fn triples_for_object(
&self,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
iter: self.store.quads_for_object(encoder.encode_term(object)?)?,
store: self.store.clone(),
})
}
fn contains(&self, triple: &Triple) -> Result<bool> {
let encoder = self.store.encoder();
Ok(self
.store
.quads_for_subject_predicate_object(
encoder.encode_named_or_blank_node(triple.subject())?,
encoder.encode_named_node(triple.predicate())?,
encoder.encode_term(triple.object())?,
)?
.any(|_| true))
}
fn insert(&self, _triple: &Triple) -> Result<()> {
Err(format_err!("Union graph is not writable"))
}
fn remove(&self, _triple: &Triple) -> Result<()> {
Err(format_err!("Union graph is not writable"))
}
fn len(&self) -> Result<usize> {
Ok(self.store.quads()?.count())
}
fn is_empty(&self) -> Result<bool> {
Ok(self.store.quads()?.any(|_| true))
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreUnionGraph<S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
for triple in self.iter().map_err(|_| fmt::Error)? {
writeln!(fmt, "{}", triple.map_err(|_| fmt::Error)?)?;
}
Ok(())
}
}
pub struct QuadsIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for QuadsIterator<I, S>
{
type Item = Result<Quad>;
fn next(&mut self) -> Option<Result<Quad>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.store.encoder().decode_quad(&quad)))
}
}
pub struct TriplesIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for TriplesIterator<I, S>
{
type Item = Result<Triple>;
fn next(&mut self) -> Option<Result<Triple>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.store.encoder().decode_triple(&quad)))
}
}
pub struct SubjectsIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for SubjectsIterator<I, S>
{
type Item = Result<NamedOrBlankNode>;
fn next(&mut self) -> Option<Result<NamedOrBlankNode>> {
self.iter.next().map(|k| {
k.and_then(|quad| {
self.store
.encoder()
.decode_named_or_blank_node(quad.subject)
})
})
}
}
pub struct PredicatesIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for PredicatesIterator<I, S>
{
type Item = Result<NamedNode>;
fn next(&mut self) -> Option<Result<NamedNode>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.store.encoder().decode_named_node(quad.predicate)))
}
}
pub struct ObjectsIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for ObjectsIterator<I, S>
{
type Item = Result<Term>;
fn next(&mut self) -> Option<Result<Term>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.store.encoder().decode_term(quad.object)))
}
}

@ -1,64 +1,58 @@
use crate::model::LanguageTag; use crate::model::LanguageTag;
use crate::store::encoded::*;
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::Result; use crate::store::*;
use crate::{Repository, Result};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::iter::empty;
use std::iter::once;
use std::sync::RwLock; use std::sync::RwLock;
use std::sync::RwLockReadGuard; use std::sync::RwLockReadGuard;
use std::sync::RwLockWriteGuard; use std::sync::RwLockWriteGuard;
/// Memory based implementation of the `rudf::model::Dataset` trait. /// Memory based implementation of the `Repository` trait.
/// They are cheap to build using the `MemoryDataset::default()` method. /// They are cheap to build using the `MemoryRepository::default()` method.
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
/// use rudf::model::*; /// use rudf::model::*;
/// use rudf::store::MemoryDataset; /// use rudf::{Repository, RepositoryConnection, MemoryRepository, Result};
/// use crate::rudf::sparql::PreparedQuery;
/// use std::str::FromStr; /// use std::str::FromStr;
/// use rudf::sparql::algebra::QueryResult;
/// ///
/// let dataset = MemoryDataset::default(); /// let repository = MemoryRepository::default();
/// let default_graph = dataset.default_graph(); /// let connection = repository.connection().unwrap();
///
/// // insertion
/// let ex = NamedNode::from_str("http://example.com").unwrap(); /// let ex = NamedNode::from_str("http://example.com").unwrap();
/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone()); /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// default_graph.insert(&triple); /// connection.insert(&quad);
/// let results: Vec<Quad> = dataset.quads_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect();
/// assert_eq!(vec![triple.in_graph(None)], results);
/// ```
pub type MemoryDataset = StoreDataset<MemoryStore>;
/// Memory based implementation of the `rudf::model::Graph` trait.
/// They are cheap to build using the `MemoryGraph::default()` method.
/// ///
/// Usage example: /// // quad filter
/// ``` /// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// use rudf::model::*; /// assert_eq!(vec![quad], results.unwrap());
/// use rudf::store::MemoryGraph;
/// use std::str::FromStr;
/// ///
/// let graph = MemoryGraph::default(); /// // SPARQL query
/// let ex = NamedNode::from_str("http://example.com").unwrap(); /// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap();
/// let triple = Triple::new(ex.clone(), ex.clone(), ex.clone()); /// let results = prepared_query.exec().unwrap();
/// graph.insert(&triple); /// if let QueryResult::Bindings(results) = results {
/// let results: Vec<Triple> = graph.triples_for_subject(&ex.into()).unwrap().map(|t| t.unwrap()).collect(); /// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// assert_eq!(vec![triple], results); /// }
/// ``` /// ```
pub type MemoryGraph = StoreDefaultGraph<MemoryStore>; #[derive(Default)]
pub struct MemoryRepository {
inner: MemoryStore,
}
pub type MemoryRepositoryConnection<'a> = StoreRepositoryConnection<&'a MemoryStore>;
#[derive(Default)]
pub struct MemoryStore { pub struct MemoryStore {
string_store: MemoryStringStore, string_store: MemoryStringStore,
graph_indexes: RwLock<BTreeMap<EncodedTerm, MemoryGraphIndexes>>, graph_indexes: RwLock<BTreeMap<EncodedTerm, MemoryGraphIndexes>>,
} }
impl Default for MemoryStore {
fn default() -> Self {
Self {
string_store: MemoryStringStore::default(),
graph_indexes: RwLock::default(),
}
}
}
#[derive(Default)] #[derive(Default)]
struct MemoryGraphIndexes { struct MemoryGraphIndexes {
spo: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>, spo: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>,
@ -66,6 +60,22 @@ struct MemoryGraphIndexes {
osp: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>, osp: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>,
} }
impl<'a> Repository for &'a MemoryRepository {
type Connection = MemoryRepositoryConnection<'a>;
fn connection(self) -> Result<StoreRepositoryConnection<&'a MemoryStore>> {
Ok(self.inner.connection()?.into())
}
}
impl<'a> Store for &'a MemoryStore {
type Connection = &'a MemoryStore;
fn connection(self) -> Result<Self::Connection> {
Ok(self)
}
}
impl StringStore for MemoryStore { impl StringStore for MemoryStore {
type StringType = String; type StringType = String;
@ -82,25 +92,203 @@ impl StringStore for MemoryStore {
} }
} }
impl EncodedQuadsStore for MemoryStore { impl<'a> StoreConnection for &'a MemoryStore {
type QuadsIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
type QuadsForSubjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; Ok(self
type QuadsForSubjectPredicateIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; .graph_indexes()?
type QuadsForSubjectPredicateObjectIterator = .get(&quad.graph_name)
<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; .map_or(false, |graph| {
type QuadsForSubjectObjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; graph.spo.get(&quad.subject).map_or(false, |po| {
type QuadsForPredicateIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; po.get(&quad.predicate)
type QuadsForPredicateObjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; .map_or(false, |o| o.contains(&quad.object))
type QuadsForObjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; })
type QuadsForGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; }))
type QuadsForSubjectGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; }
type QuadsForSubjectPredicateGraphIterator =
<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; fn insert(&self, quad: &EncodedQuad) -> Result<()> {
type QuadsForSubjectObjectGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; let mut graph_indexes = self.graph_indexes_mut()?;
type QuadsForPredicateGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; let graph = graph_indexes
type QuadsForPredicateObjectGraphIterator = .entry(quad.graph_name)
<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; .or_insert_with(MemoryGraphIndexes::default);
type QuadsForObjectGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter; graph
.spo
.entry(quad.subject)
.or_default()
.entry(quad.predicate)
.or_default()
.insert(quad.object);
graph
.pos
.entry(quad.predicate)
.or_default()
.entry(quad.object)
.or_default()
.insert(quad.subject);
graph
.osp
.entry(quad.object)
.or_default()
.entry(quad.subject)
.or_default()
.insert(quad.predicate);
Ok(())
}
fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut graph_indexes = self.graph_indexes_mut()?;
let mut empty_graph = false;
if let Some(graph) = graph_indexes.get_mut(&quad.graph_name) {
{
let mut empty_pos = false;
if let Some(pos) = graph.spo.get_mut(&quad.subject) {
let mut empty_os = false;
if let Some(os) = pos.get_mut(&quad.predicate) {
os.remove(&quad.object);
empty_os = os.is_empty();
}
if empty_os {
pos.remove(&quad.predicate);
}
empty_pos = pos.is_empty();
}
if empty_pos {
graph.spo.remove(&quad.subject);
}
}
{
let mut empty_oss = false;
if let Some(oss) = graph.pos.get_mut(&quad.predicate) {
let mut empty_ss = false;
if let Some(ss) = oss.get_mut(&quad.object) {
ss.remove(&quad.subject);
empty_ss = ss.is_empty();
}
if empty_ss {
oss.remove(&quad.object);
}
empty_oss = oss.is_empty();
}
if empty_oss {
graph.pos.remove(&quad.predicate);
}
}
{
let mut empty_sps = false;
if let Some(sps) = graph.osp.get_mut(&quad.object) {
let mut empty_ps = false;
if let Some(ps) = sps.get_mut(&quad.subject) {
ps.remove(&quad.predicate);
empty_ps = ps.is_empty();
}
if empty_ps {
sps.remove(&quad.subject);
}
empty_sps = sps.is_empty();
}
if empty_sps {
graph.osp.remove(&quad.object);
}
}
empty_graph = graph.spo.is_empty();
}
if empty_graph {
graph_indexes.remove(&quad.graph_name);
}
Ok(())
}
fn quads_for_pattern<'b>(
&'b self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'b> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match self.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))),
}
}
None => wrap_error(
self.quads_for_subject_predicate_object(subject, predicate, object),
),
},
None => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_predicate_graph(subject, predicate, graph_name),
),
None => wrap_error(self.quads_for_subject_predicate(subject, predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_object_graph(subject, object, graph_name),
),
None => wrap_error(self.quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_subject_graph(subject, graph_name))
}
None => wrap_error(self.quads_for_subject(subject)),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_predicate_object_graph(predicate, object, graph_name),
),
None => wrap_error(self.quads_for_predicate_object(predicate, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_predicate_graph(predicate, graph_name))
}
None => wrap_error(self.quads_for_predicate(predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_object_graph(object, graph_name))
}
None => wrap_error(self.quads_for_object(object)),
},
None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)),
None => wrap_error(self.quads()),
},
},
},
}
}
}
impl MemoryStore {
fn graph_indexes(
&self,
) -> Result<RwLockReadGuard<'_, BTreeMap<EncodedTerm, MemoryGraphIndexes>>> {
Ok(self.graph_indexes.read().map_err(MutexPoisonError::from)?)
}
fn graph_indexes_mut(
&self,
) -> Result<RwLockWriteGuard<'_, BTreeMap<EncodedTerm, MemoryGraphIndexes>>> {
Ok(self.graph_indexes.write().map_err(MutexPoisonError::from)?)
}
fn quads(&self) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> { fn quads(&self) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default(); let mut result = Vec::default();
@ -372,125 +560,13 @@ impl EncodedQuadsStore for MemoryStore {
} }
Ok(result.into_iter()) Ok(result.into_iter())
} }
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self
.graph_indexes()?
.get(&quad.graph_name)
.map_or(false, |graph| {
graph.spo.get(&quad.subject).map_or(false, |po| {
po.get(&quad.predicate)
.map_or(false, |o| o.contains(&quad.object))
})
}))
}
fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut graph_indexes = self.graph_indexes_mut()?;
let graph = graph_indexes
.entry(quad.graph_name)
.or_insert_with(MemoryGraphIndexes::default);
graph
.spo
.entry(quad.subject)
.or_default()
.entry(quad.predicate)
.or_default()
.insert(quad.object);
graph
.pos
.entry(quad.predicate)
.or_default()
.entry(quad.object)
.or_default()
.insert(quad.subject);
graph
.osp
.entry(quad.object)
.or_default()
.entry(quad.subject)
.or_default()
.insert(quad.predicate);
Ok(())
}
fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut graph_indexes = self.graph_indexes_mut()?;
let mut empty_graph = false;
if let Some(graph) = graph_indexes.get_mut(&quad.graph_name) {
{
let mut empty_pos = false;
if let Some(pos) = graph.spo.get_mut(&quad.subject) {
let mut empty_os = false;
if let Some(os) = pos.get_mut(&quad.predicate) {
os.remove(&quad.object);
empty_os = os.is_empty();
}
if empty_os {
pos.remove(&quad.predicate);
}
empty_pos = pos.is_empty();
}
if empty_pos {
graph.spo.remove(&quad.subject);
}
}
{
let mut empty_oss = false;
if let Some(oss) = graph.pos.get_mut(&quad.predicate) {
let mut empty_ss = false;
if let Some(ss) = oss.get_mut(&quad.object) {
ss.remove(&quad.subject);
empty_ss = ss.is_empty();
}
if empty_ss {
oss.remove(&quad.object);
}
empty_oss = oss.is_empty();
}
if empty_oss {
graph.pos.remove(&quad.predicate);
}
}
{
let mut empty_sps = false;
if let Some(sps) = graph.osp.get_mut(&quad.object) {
let mut empty_ps = false;
if let Some(ps) = sps.get_mut(&quad.subject) {
ps.remove(&quad.predicate);
empty_ps = ps.is_empty();
}
if empty_ps {
sps.remove(&quad.subject);
}
empty_sps = sps.is_empty();
}
if empty_sps {
graph.osp.remove(&quad.object);
}
}
empty_graph = graph.spo.is_empty();
}
if empty_graph {
graph_indexes.remove(&quad.graph_name);
}
Ok(())
}
} }
impl MemoryStore { fn wrap_error<E: 'static, I: Iterator<Item = Result<E>> + 'static>(
fn graph_indexes( iter: Result<I>,
&self, ) -> Box<dyn Iterator<Item = Result<E>>> {
) -> Result<RwLockReadGuard<'_, BTreeMap<EncodedTerm, MemoryGraphIndexes>>> { match iter {
Ok(self.graph_indexes.read().map_err(MutexPoisonError::from)?) Ok(iter) => Box::new(iter),
} Err(error) => Box::new(once(Err(error))),
fn graph_indexes_mut(
&self,
) -> Result<RwLockWriteGuard<'_, BTreeMap<EncodedTerm, MemoryGraphIndexes>>> {
Ok(self.graph_indexes.write().map_err(MutexPoisonError::from)?)
} }
} }

@ -1,12 +1,125 @@
//! Provides implementations of the `rudf::model::Graph` and `rudf::model::Dataset` traits. //! Provides implementations of the `rudf::Repository` trait.
pub(crate) mod encoded;
mod memory; mod memory;
pub(crate) mod numeric_encoder; pub(crate) mod numeric_encoder;
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
mod rocksdb; mod rocksdb;
pub use crate::store::memory::MemoryDataset; pub use crate::store::memory::MemoryRepository;
pub use crate::store::memory::MemoryGraph;
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
pub use crate::store::rocksdb::RocksDbDataset; pub use crate::store::rocksdb::RocksDbRepository;
use crate::model::*;
use crate::sparql::SimplePreparedQuery;
use crate::store::numeric_encoder::*;
use crate::{RepositoryConnection, Result};
use std::io::Read;
use std::iter::{once, Iterator};
/// Defines the `Store` traits that is used to have efficient binary storage
pub trait Store {
type Connection: StoreConnection;
fn connection(self) -> Result<Self::Connection>;
}
/// A connection to a `Store`
pub trait StoreConnection: StringStore + Sized + Clone {
fn contains(&self, quad: &EncodedQuad) -> Result<bool>;
fn insert(&self, quad: &EncodedQuad) -> Result<()>;
fn remove(&self, quad: &EncodedQuad) -> Result<()>;
fn quads_for_pattern<'a>(
&'a self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a>;
fn encoder(&self) -> Encoder<&Self> {
Encoder::new(&self)
}
}
/// A `RepositoryConnection` from a `StoreConnection`
#[derive(Clone)]
pub struct StoreRepositoryConnection<S: StoreConnection> {
inner: S,
}
impl<S: StoreConnection> From<S> for StoreRepositoryConnection<S> {
fn from(inner: S) -> Self {
Self { inner }
}
}
impl<S: StoreConnection> RepositoryConnection for StoreRepositoryConnection<S> {
type PreparedQuery = SimplePreparedQuery<S>;
fn prepare_query(&self, query: impl Read) -> Result<SimplePreparedQuery<S>> {
SimplePreparedQuery::new(self.inner.clone(), query) //TODO: avoid clone
}
fn quads_for_pattern<'a>(
&'a self,
subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<&NamedOrBlankNode>,
) -> Box<dyn Iterator<Item = Result<Quad>> + 'a>
where
Self: 'a,
{
let encoder = self.inner.encoder();
let subject = if let Some(subject) = subject {
match encoder.encode_named_or_blank_node(subject) {
Ok(subject) => Some(subject),
Err(error) => return Box::new(once(Err(error))),
}
} else {
None
};
let predicate = if let Some(predicate) = predicate {
match encoder.encode_named_node(predicate) {
Ok(predicate) => Some(predicate),
Err(error) => return Box::new(once(Err(error))),
}
} else {
None
};
let object = if let Some(object) = object {
match encoder.encode_term(object) {
Ok(object) => Some(object),
Err(error) => return Box::new(once(Err(error))),
}
} else {
None
};
let graph_name = if let Some(graph_name) = graph_name {
match encoder.encode_named_or_blank_node(graph_name) {
Ok(subject) => Some(subject),
Err(error) => return Box::new(once(Err(error))),
}
} else {
None
};
Box::new(
self.inner
.quads_for_pattern(subject, predicate, object, graph_name)
.map(move |quad| self.inner.encoder().decode_quad(&quad?)),
)
}
fn contains(&self, quad: &Quad) -> Result<bool> {
self.inner
.contains(&self.inner.encoder().encode_quad(quad)?)
}
fn insert(&self, quad: &Quad) -> Result<()> {
self.inner.insert(&self.inner.encoder().encode_quad(quad)?)
}
fn remove(&self, quad: &Quad) -> Result<()> {
self.inner.remove(&self.inner.encoder().encode_quad(quad)?)
}
}

@ -1,8 +1,7 @@
use crate::model::LanguageTag; use crate::model::LanguageTag;
use crate::store::encoded::EncodedQuadsStore;
use crate::store::encoded::StoreDataset;
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::Result; use crate::store::{Store, StoreConnection, StoreRepositoryConnection};
use crate::{Repository, Result};
use byteorder::ByteOrder; use byteorder::ByteOrder;
use byteorder::LittleEndian; use byteorder::LittleEndian;
use failure::format_err; use failure::format_err;
@ -14,28 +13,49 @@ use rocksdb::Options;
use rocksdb::WriteBatch; use rocksdb::WriteBatch;
use rocksdb::DB; use rocksdb::DB;
use std::io::Cursor; use std::io::Cursor;
use std::iter::{empty, once};
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::str; use std::str;
use std::sync::Mutex; use std::sync::Mutex;
/// `rudf::model::Dataset` trait implementation based on the [RocksDB](https://rocksdb.org/) key-value store /// `Repository` implementation based on the [RocksDB](https://rocksdb.org/) key-value store
/// ///
/// To use it, the `"rocksdb"` feature need to be activated. /// To use it, the `"rocksdb"` feature need to be activated.
/// ///
/// Usage example: /// Usage example:
/// ```ignored
/// use rudf::model::*;
/// use rudf::{Repository, RepositoryConnection, RocksDbRepository, Result};
/// use crate::rudf::sparql::PreparedQuery;
/// use std::str::FromStr;
/// use rudf::sparql::algebra::QueryResult;
///
/// let repository = RocksDbRepository::open("example.db").unwrap();
/// let connection = repository.connection().unwrap();
///
/// // insertion
/// let ex = NamedNode::from_str("http://example.com").unwrap();
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results.unwrap());
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }".as_bytes()).unwrap();
/// let results = prepared_query.exec().unwrap();
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// }
/// ``` /// ```
/// use rudf::store::RocksDbDataset; pub struct RocksDbRepository {
/// let dataset = RocksDbDataset::open("example.db").unwrap(); inner: RocksDbStore,
/// ```
pub type RocksDbDataset = StoreDataset<RocksDbStore>;
impl RocksDbDataset {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self::new_from_store(RocksDbStore::open(path)?))
}
} }
pub type RocksDbRepositoryConnection<'a> = StoreRepositoryConnection<RocksDbStoreConnection<'a>>;
const ID2STR_CF: &str = "id2str"; const ID2STR_CF: &str = "id2str";
const STR2ID_CF: &str = "id2str"; const STR2ID_CF: &str = "id2str";
const SPOG_CF: &str = "spog"; const SPOG_CF: &str = "spog";
@ -48,14 +68,35 @@ const EMPTY_BUF: [u8; 0] = [0 as u8; 0];
const COLUMN_FAMILIES: [&str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF]; const COLUMN_FAMILIES: [&str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF];
pub struct RocksDbStore { struct RocksDbStore {
db: DB, db: DB,
str_id_counter: Mutex<RocksDBCounter>, str_id_counter: Mutex<RocksDBCounter>,
id2str_cf: SendColumnFamily, }
str2id_cf: SendColumnFamily,
spog_cf: SendColumnFamily, #[derive(Clone)]
posg_cf: SendColumnFamily, pub struct RocksDbStoreConnection<'a> {
ospg_cf: SendColumnFamily, store: &'a RocksDbStore,
id2str_cf: ColumnFamily<'a>,
str2id_cf: ColumnFamily<'a>,
spog_cf: ColumnFamily<'a>,
posg_cf: ColumnFamily<'a>,
ospg_cf: ColumnFamily<'a>,
}
impl RocksDbRepository {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
inner: RocksDbStore::open(path)?,
})
}
}
impl<'a> Repository for &'a RocksDbRepository {
type Connection = RocksDbRepositoryConnection<'a>;
fn connection(self) -> Result<StoreRepositoryConnection<RocksDbStoreConnection<'a>>> {
Ok(self.inner.connection()?.into())
}
} }
impl RocksDbStore { impl RocksDbStore {
@ -65,51 +106,57 @@ impl RocksDbStore {
options.create_missing_column_families(true); options.create_missing_column_families(true);
options.set_compaction_style(DBCompactionStyle::Universal); options.set_compaction_style(DBCompactionStyle::Universal);
let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?;
let id2str_cf = SendColumnFamily(get_cf(&db, STR2ID_CF)?);
let str2id_cf = SendColumnFamily(get_cf(&db, ID2STR_CF)?);
let spog_cf = SendColumnFamily(get_cf(&db, SPOG_CF)?);
let posg_cf = SendColumnFamily(get_cf(&db, POSG_CF)?);
let ospg_cf = SendColumnFamily(get_cf(&db, OSPG_CF)?);
let new = Self { let new = Self {
db, db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?,
str_id_counter: Mutex::new(RocksDBCounter::new("bsc")), str_id_counter: Mutex::new(RocksDBCounter::new("bsc")),
id2str_cf,
str2id_cf,
spog_cf,
posg_cf,
ospg_cf,
}; };
new.set_first_strings()?; (&new).connection()?.set_first_strings()?;
Ok(new) Ok(new)
} }
} }
impl StringStore for RocksDbStore { impl<'a> Store for &'a RocksDbStore {
type Connection = RocksDbStoreConnection<'a>;
fn connection(self) -> Result<RocksDbStoreConnection<'a>> {
Ok(RocksDbStoreConnection {
store: self,
id2str_cf: get_cf(&self.db, ID2STR_CF)?,
str2id_cf: get_cf(&self.db, STR2ID_CF)?,
spog_cf: get_cf(&self.db, SPOG_CF)?,
posg_cf: get_cf(&self.db, POSG_CF)?,
ospg_cf: get_cf(&self.db, OSPG_CF)?,
})
}
}
impl StringStore for RocksDbStoreConnection<'_> {
type StringType = RocksString; type StringType = RocksString;
fn insert_str(&self, value: &str) -> Result<u64> { fn insert_str(&self, value: &str) -> Result<u64> {
let value = value.as_bytes(); let value = value.as_bytes();
Ok(if let Some(id) = self.db.get_cf(*self.str2id_cf, value)? { Ok(
if let Some(id) = self.store.db.get_cf(self.str2id_cf, value)? {
LittleEndian::read_u64(&id) LittleEndian::read_u64(&id)
} else { } else {
let id = self let id = self
.store
.str_id_counter .str_id_counter
.lock() .lock()
.map_err(MutexPoisonError::from)? .map_err(MutexPoisonError::from)?
.get_and_increment(&self.db)? as u64; .get_and_increment(&self.store.db)? as u64;
let id_bytes = to_bytes(id); let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(*self.id2str_cf, &id_bytes, value)?; batch.put_cf(self.id2str_cf, &id_bytes, value)?;
batch.put_cf(*self.str2id_cf, value, &id_bytes)?; batch.put_cf(self.str2id_cf, value, &id_bytes)?;
self.db.write(batch)?; self.store.db.write(batch)?;
id id
}) },
)
} }
fn get_str(&self, id: u64) -> Result<RocksString> { fn get_str(&self, id: u64) -> Result<RocksString> {
let value = self.db.get_cf(*self.id2str_cf, &to_bytes(id))?; let value = self.store.db.get_cf(self.id2str_cf, &to_bytes(id))?;
if let Some(value) = value { if let Some(value) = value {
Ok(RocksString { vec: value }) Ok(RocksString { vec: value })
} else { } else {
@ -122,31 +169,113 @@ impl StringStore for RocksDbStore {
} }
} }
impl EncodedQuadsStore for RocksDbStore { impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
type QuadsIterator = SPOGIndexIterator; fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
type QuadsForSubjectIterator = FilteringEncodedQuadsIterator<SPOGIndexIterator>; Ok(self
type QuadsForSubjectPredicateIterator = FilteringEncodedQuadsIterator<SPOGIndexIterator>; .store
type QuadsForSubjectPredicateObjectIterator = FilteringEncodedQuadsIterator<SPOGIndexIterator>; .db
type QuadsForSubjectObjectIterator = FilteringEncodedQuadsIterator<OSPGIndexIterator>; .get_cf(self.spog_cf, &encode_spog_quad(quad)?)?
type QuadsForPredicateIterator = FilteringEncodedQuadsIterator<POSGIndexIterator>; .is_some())
type QuadsForPredicateObjectIterator = FilteringEncodedQuadsIterator<POSGIndexIterator>; }
type QuadsForObjectIterator = FilteringEncodedQuadsIterator<OSPGIndexIterator>;
type QuadsForGraphIterator = InGraphQuadsIterator<SPOGIndexIterator>; fn insert(&self, quad: &EncodedQuad) -> Result<()> {
type QuadsForSubjectGraphIterator = let mut batch = WriteBatch::default();
InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>; batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?;
type QuadsForSubjectPredicateGraphIterator = batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?;
InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>; batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?;
type QuadsForSubjectObjectGraphIterator = self.store.db.write(batch)?; //TODO: check what's going on if the key already exists
InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>; Ok(())
type QuadsForPredicateGraphIterator = }
InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>;
type QuadsForPredicateObjectGraphIterator = fn remove(&self, quad: &EncodedQuad) -> Result<()> {
InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>; let mut batch = WriteBatch::default();
type QuadsForObjectGraphIterator = batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?;
InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>; batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?;
batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?;
self.store.db.write(batch)?;
Ok(())
}
fn quads_for_pattern<'b>(
&'b self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'b> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match self.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))),
}
}
None => wrap_error(
self.quads_for_subject_predicate_object(subject, predicate, object),
),
},
None => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_predicate_graph(subject, predicate, graph_name),
),
None => wrap_error(self.quads_for_subject_predicate(subject, predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_object_graph(subject, object, graph_name),
),
None => wrap_error(self.quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_subject_graph(subject, graph_name))
}
None => wrap_error(self.quads_for_subject(subject)),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_predicate_object_graph(predicate, object, graph_name),
),
None => wrap_error(self.quads_for_predicate_object(predicate, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_predicate_graph(predicate, graph_name))
}
None => wrap_error(self.quads_for_predicate(predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_object_graph(object, graph_name))
}
None => wrap_error(self.quads_for_object(object)),
},
None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)),
None => wrap_error(self.quads()),
},
},
},
}
}
}
impl<'a> RocksDbStoreConnection<'a> {
fn quads(&self) -> Result<SPOGIndexIterator> { fn quads(&self) -> Result<SPOGIndexIterator> {
let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?;
iter.seek_to_first(); iter.seek_to_first();
Ok(SPOGIndexIterator { iter }) Ok(SPOGIndexIterator { iter })
} }
@ -155,7 +284,7 @@ impl EncodedQuadsStore for RocksDbStore {
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term(subject)?); iter.seek(&encode_term(subject)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter }, iter: SPOGIndexIterator { iter },
@ -168,7 +297,7 @@ impl EncodedQuadsStore for RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(subject, predicate)?); iter.seek(&encode_term_pair(subject, predicate)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter }, iter: SPOGIndexIterator { iter },
@ -182,7 +311,7 @@ impl EncodedQuadsStore for RocksDbStore {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_triple(subject, predicate, object)?); iter.seek(&encode_term_triple(subject, predicate, object)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter }, iter: SPOGIndexIterator { iter },
@ -195,7 +324,7 @@ impl EncodedQuadsStore for RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(object, subject)?); iter.seek(&encode_term_pair(object, subject)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter }, iter: OSPGIndexIterator { iter },
@ -207,7 +336,7 @@ impl EncodedQuadsStore for RocksDbStore {
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(*self.posg_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.posg_cf)?;
iter.seek(&encode_term(predicate)?); iter.seek(&encode_term(predicate)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter }, iter: POSGIndexIterator { iter },
@ -220,7 +349,7 @@ impl EncodedQuadsStore for RocksDbStore {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(predicate, object)?); iter.seek(&encode_term_pair(predicate, object)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter }, iter: POSGIndexIterator { iter },
@ -232,7 +361,7 @@ impl EncodedQuadsStore for RocksDbStore {
&self, &self,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(*self.ospg_cf)?; let mut iter = self.store.db.raw_iterator_cf(self.ospg_cf)?;
iter.seek(&encode_term(object)?); iter.seek(&encode_term(object)?);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter }, iter: OSPGIndexIterator { iter },
@ -318,36 +447,20 @@ impl EncodedQuadsStore for RocksDbStore {
graph_name, graph_name,
}) })
} }
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self
.db
.get_cf(*self.spog_cf, &encode_spog_quad(quad)?)?
.is_some())
}
fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.put_cf(*self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(*self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(*self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?;
self.db.write(batch)?; //TODO: check what's going on if the key already exists
Ok(())
}
fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.delete_cf(*self.spog_cf, &encode_spog_quad(quad)?)?;
batch.delete_cf(*self.posg_cf, &encode_posg_quad(quad)?)?;
batch.delete_cf(*self.ospg_cf, &encode_ospg_quad(quad)?)?;
self.db.write(batch)?;
Ok(())
}
} }
pub fn get_cf(db: &DB, name: &str) -> Result<ColumnFamily> { fn get_cf<'a>(db: &'a DB, name: &str) -> Result<ColumnFamily<'a>> {
db.cf_handle(name) db.cf_handle(name)
.ok_or_else(|| format_err!("column family not found")) .ok_or_else(|| format_err!("column family {} not found", name))
}
fn wrap_error<'a, E: 'a, I: Iterator<Item = Result<E>> + 'a>(
iter: Result<I>,
) -> Box<dyn Iterator<Item = Result<E>> + 'a> {
match iter {
Ok(iter) => Box::new(iter),
Err(error) => Box::new(once(Err(error))),
}
} }
struct RocksDBCounter { struct RocksDBCounter {
@ -454,11 +567,11 @@ fn encode_ospg_quad(quad: &EncodedQuad) -> Result<Vec<u8>> {
Ok(vec) Ok(vec)
} }
pub struct SPOGIndexIterator { struct SPOGIndexIterator<'a> {
iter: DBRawIterator, iter: DBRawIterator<'a>,
} }
impl Iterator for SPOGIndexIterator { impl<'a> Iterator for SPOGIndexIterator<'a> {
type Item = Result<EncodedQuad>; type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
@ -472,11 +585,11 @@ impl Iterator for SPOGIndexIterator {
} }
} }
pub struct POSGIndexIterator { struct POSGIndexIterator<'a> {
iter: DBRawIterator, iter: DBRawIterator<'a>,
} }
impl Iterator for POSGIndexIterator { impl<'a> Iterator for POSGIndexIterator<'a> {
type Item = Result<EncodedQuad>; type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
@ -490,11 +603,11 @@ impl Iterator for POSGIndexIterator {
} }
} }
pub struct OSPGIndexIterator { struct OSPGIndexIterator<'a> {
iter: DBRawIterator, iter: DBRawIterator<'a>,
} }
impl Iterator for OSPGIndexIterator { impl<'a> Iterator for OSPGIndexIterator<'a> {
type Item = Result<EncodedQuad>; type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
@ -508,7 +621,7 @@ impl Iterator for OSPGIndexIterator {
} }
} }
pub struct FilteringEncodedQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> { struct FilteringEncodedQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
iter: I, iter: I,
filter: EncodedQuadPattern, filter: EncodedQuadPattern,
} }
@ -524,7 +637,7 @@ impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for FilteringEncodedQuads
} }
} }
pub struct InGraphQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> { struct InGraphQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
iter: I, iter: I,
graph_name: EncodedTerm, graph_name: EncodedTerm,
} }
@ -570,16 +683,3 @@ impl From<RocksString> for String {
val.deref().to_owned() val.deref().to_owned()
} }
} }
// TODO: very bad but I believe it is fine
#[derive(Clone, Copy)]
struct SendColumnFamily(ColumnFamily);
unsafe impl Sync for SendColumnFamily {}
impl Deref for SendColumnFamily {
type Target = ColumnFamily;
fn deref(&self) -> &ColumnFamily {
&self.0
}
}

@ -10,9 +10,7 @@ use rudf::sparql::algebra::QueryResult;
use rudf::sparql::parser::read_sparql_query; use rudf::sparql::parser::read_sparql_query;
use rudf::sparql::xml_results::read_xml_results; use rudf::sparql::xml_results::read_xml_results;
use rudf::sparql::PreparedQuery; use rudf::sparql::PreparedQuery;
use rudf::sparql::SparqlDataset; use rudf::{MemoryRepository, Repository, RepositoryConnection, Result};
use rudf::store::MemoryDataset;
use rudf::Result;
use std::fmt; use std::fmt;
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
@ -125,10 +123,22 @@ fn sparql_w3c_query_evaluation_testsuite() {
).unwrap(), ).unwrap(),
//Simple literal vs xsd:string. We apply RDF 1.1 //Simple literal vs xsd:string. We apply RDF 1.1
NamedNode::from_str("http://www.w3.org/2001/sw/DataAccess/tests/data-r2/distinct/manifest#distinct-2").unwrap(), NamedNode::from_str("http://www.w3.org/2001/sw/DataAccess/tests/data-r2/distinct/manifest#distinct-2").unwrap(),
//URI normalization: we are normalizing more strongly //URI normalization: we are not normalizing well
NamedNode::from_str(
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#normalization-1",
).unwrap(),
NamedNode::from_str(
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#normalization-2",
).unwrap(),
NamedNode::from_str( NamedNode::from_str(
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#normalization-3", "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#normalization-3",
).unwrap(), ).unwrap(),
NamedNode::from_str(
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#kanji-1",
).unwrap(),
NamedNode::from_str(
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/i18n/manifest#kanji-2",
).unwrap(),
//Test on curly brace scoping with OPTIONAL filter //Test on curly brace scoping with OPTIONAL filter
NamedNode::from_str( NamedNode::from_str(
"http://www.w3.org/2001/sw/DataAccess/tests/data-r2/optional-filter/manifest#dawg-optional-filter-005-not-simplified", "http://www.w3.org/2001/sw/DataAccess/tests/data-r2/optional-filter/manifest#dawg-optional-filter-005-not-simplified",
@ -148,28 +158,35 @@ fn sparql_w3c_query_evaluation_testsuite() {
continue; continue;
} }
if test.kind == "QueryEvaluationTest" { if test.kind == "QueryEvaluationTest" {
let data = match &test.data { let repository = match &test.data {
Some(data) => { Some(data) => {
let dataset = MemoryDataset::default(); let repository = MemoryRepository::default();
let dataset_default = dataset.default_graph(); let connection = repository.connection().unwrap();
load_graph(&data) load_graph(&data)
.unwrap() .unwrap()
.iter() .into_iter()
.for_each(|triple| dataset_default.insert(triple).unwrap()); .for_each(|triple| connection.insert(&triple.in_graph(None)).unwrap());
dataset repository
} }
None => MemoryDataset::default(), None => MemoryRepository::default(),
}; };
for graph_data in &test.graph_data { for graph_data in &test.graph_data {
let named_graph = data let graph_name = NamedNode::new(graph_data);
.named_graph(&NamedNode::new(graph_data.clone()).into()) let connection = repository.connection().unwrap();
.unwrap();
load_graph(&graph_data) load_graph(&graph_data)
.unwrap() .unwrap()
.iter() .into_iter()
.for_each(|triple| named_graph.insert(triple).unwrap()); .for_each(move |triple| {
connection
.insert(&triple.in_graph(Some(graph_name.clone().into())))
.unwrap()
});
} }
match data.prepare_query(read_file(&test.query).unwrap()) { match repository
.connection()
.unwrap()
.prepare_query(read_file(&test.query).unwrap())
{
Err(error) => assert!( Err(error) => assert!(
false, false,
"Failure to parse query of {} with error: {}", "Failure to parse query of {} with error: {}",
@ -196,7 +213,7 @@ fn sparql_w3c_query_evaluation_testsuite() {
expected_graph, expected_graph,
actual_graph, actual_graph,
load_sparql_query(&test.query).unwrap(), load_sparql_query(&test.query).unwrap(),
data repository_to_string(&repository)
) )
} }
}, },
@ -207,6 +224,15 @@ fn sparql_w3c_query_evaluation_testsuite() {
} }
} }
fn repository_to_string(repository: impl Repository) -> String {
repository
.connection()
.unwrap()
.quads_for_pattern(None, None, None, None)
.map(|q| q.unwrap().to_string() + "\n")
.collect()
}
fn load_graph(url: &str) -> Result<SimpleGraph> { fn load_graph(url: &str) -> Result<SimpleGraph> {
if url.ends_with(".ttl") { if url.ends_with(".ttl") {
read_turtle(read_file(url)?, Some(url))?.collect() read_turtle(read_file(url)?, Some(url))?.collect()

@ -27,16 +27,15 @@ use hyper::StatusCode;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use mime; use mime;
use mime::Mime; use mime::Mime;
use rudf::model::Graph;
use rudf::rio::read_ntriples; use rudf::rio::read_ntriples;
use rudf::sparql::algebra::QueryResult; use rudf::sparql::algebra::QueryResult;
use rudf::sparql::xml_results::write_xml_results; use rudf::sparql::xml_results::write_xml_results;
use rudf::sparql::PreparedQuery; use rudf::sparql::PreparedQuery;
use rudf::sparql::SparqlDataset; use rudf::Repository;
use rudf::store::MemoryDataset; use rudf::RepositoryConnection;
use rudf::store::MemoryGraph; use rudf::{MemoryRepository, RocksDbRepository};
use rudf::store::RocksDbDataset;
use serde_derive::Deserialize; use serde_derive::Deserialize;
use std::fmt::Write;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::panic::RefUnwindSafe; use std::panic::RefUnwindSafe;
@ -87,21 +86,27 @@ pub fn main() -> Result<(), failure::Error> {
let file = matches.value_of("file").map(|v| v.to_string()); let file = matches.value_of("file").map(|v| v.to_string());
if let Some(file) = file { if let Some(file) = file {
main_with_dataset(Arc::new(RocksDbDataset::open(file)?), &matches) main_with_dataset(Arc::new(RocksDbRepository::open(file)?), &matches)
} else { } else {
main_with_dataset(Arc::new(MemoryDataset::default()), &matches) main_with_dataset(Arc::new(MemoryRepository::default()), &matches)
} }
} }
fn main_with_dataset<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>( fn main_with_dataset<D: Send + Sync + RefUnwindSafe + 'static>(
dataset: Arc<D>, dataset: Arc<D>,
matches: &ArgMatches<'_>, matches: &ArgMatches<'_>,
) -> Result<(), failure::Error> { ) -> Result<(), failure::Error>
where
for<'a> &'a D: Repository,
{
if let Some(nt_file) = matches.value_of("ntriples") { if let Some(nt_file) = matches.value_of("ntriples") {
println!("Loading NTriples file {}", nt_file); println!("Loading NTriples file {}", nt_file);
let default_graph = dataset.default_graph(); let connection = dataset.connection()?;
for quad in read_ntriples(BufReader::new(File::open(nt_file)?))? { if let Some(nt_file) = matches.value_of("ntriples") {
default_graph.insert(&quad?)? println!("Loading NTriples file {}", nt_file);
for triple in read_ntriples(BufReader::new(File::open(nt_file)?))? {
connection.insert(&triple?.in_graph(None))?
}
} }
} }
@ -111,10 +116,10 @@ fn main_with_dataset<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(
Ok(()) Ok(())
} }
fn router<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>( fn router<D: Send + Sync + RefUnwindSafe + 'static>(dataset: Arc<D>, base: String) -> Router
dataset: Arc<D>, where
base: String, for<'a> &'a D: Repository,
) -> Router { {
let middleware = StateMiddleware::new(GothamState { dataset, base }); let middleware = StateMiddleware::new(GothamState { dataset, base });
let pipeline = single_middleware(middleware); let pipeline = single_middleware(middleware);
let (chain, pipelines) = single_pipeline(pipeline); let (chain, pipelines) = single_pipeline(pipeline);
@ -149,7 +154,7 @@ fn router<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(
.concat2() .concat2()
.then(|body| match body { .then(|body| match body {
Ok(body) => { Ok(body) => {
let content_type: Option<Result<Mime,failure::Error>> = HeaderMap::borrow_from(&state) let content_type: Option<Result<Mime, failure::Error>> = HeaderMap::borrow_from(&state)
.get(CONTENT_TYPE) .get(CONTENT_TYPE)
.map(|content_type| Ok(Mime::from_str(content_type.to_str()?)?)); .map(|content_type| Ok(Mime::from_str(content_type.to_str()?)?));
let response = match content_type { let response = match content_type {
@ -159,7 +164,7 @@ fn router<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(
&mut state, &mut state,
&body.into_bytes(), &body.into_bytes(),
) )
}, }
(mime::APPLICATION, mime::WWW_FORM_URLENCODED) => { (mime::APPLICATION, mime::WWW_FORM_URLENCODED) => {
match parse_urlencoded_query_request(&body.into_bytes()) match parse_urlencoded_query_request(&body.into_bytes())
{ {
@ -173,7 +178,7 @@ fn router<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(
StatusCode::BAD_REQUEST, StatusCode::BAD_REQUEST,
), ),
} }
}, }
_ => error_to_response( _ => error_to_response(
&state, &state,
&format_err!("Unsupported Content-Type: {:?}", content_type), &format_err!("Unsupported Content-Type: {:?}", content_type),
@ -202,12 +207,18 @@ fn router<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(
} }
#[derive(StateData)] #[derive(StateData)]
struct GothamState<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static> { struct GothamState<D: Send + Sync + RefUnwindSafe + 'static>
where
for<'a> &'a D: Repository,
{
dataset: Arc<D>, dataset: Arc<D>,
base: String, base: String,
} }
impl<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static> Clone for GothamState<D> { impl<D: Send + Sync + RefUnwindSafe + 'static> Clone for GothamState<D>
where
for<'a> &'a D: Repository,
{
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
dataset: self.dataset.clone(), dataset: self.dataset.clone(),
@ -230,20 +241,39 @@ fn parse_urlencoded_query_request(query: &[u8]) -> Result<QueryRequest, failure:
.ok_or_else(|| format_err!("'query' parameter not found")) .ok_or_else(|| format_err!("'query' parameter not found"))
} }
fn evaluate_sparql_query<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>( fn evaluate_sparql_query<D: Send + Sync + RefUnwindSafe + 'static>(
state: &mut State, state: &mut State,
query: &[u8], query: &[u8],
) -> Response<Body> { ) -> Response<Body>
where
for<'a> &'a D: Repository,
{
let gotham_state: GothamState<D> = GothamState::take_from(state); let gotham_state: GothamState<D> = GothamState::take_from(state);
match gotham_state.dataset.prepare_query(query) { let connection = match gotham_state.dataset.connection() {
Ok(connection) => connection,
Err(error) => return error_to_response(&state, &error, StatusCode::INTERNAL_SERVER_ERROR),
};
let result = match connection.prepare_query(query) {
Ok(query) => match query.exec() { Ok(query) => match query.exec() {
Ok(QueryResult::Graph(triples)) => { Ok(QueryResult::Graph(triples)) => {
let triples: Result<MemoryGraph, failure::Error> = triples.collect(); let mut result = String::default();
for triple in triples {
match triple {
Ok(triple) => write!(&mut result, "{}\n", triple).unwrap(),
Err(error) => {
return error_to_response(
&state,
&error,
StatusCode::INTERNAL_SERVER_ERROR,
);
}
}
}
create_response( create_response(
&state, &state,
StatusCode::OK, StatusCode::OK,
APPLICATION_N_TRIPLES_UTF_8.clone(), APPLICATION_N_TRIPLES_UTF_8.clone(),
triples.unwrap().to_string(), result,
) )
} }
Ok(result) => create_response( Ok(result) => create_response(
@ -255,7 +285,8 @@ fn evaluate_sparql_query<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'stati
Err(error) => error_to_response(&state, &error, StatusCode::INTERNAL_SERVER_ERROR), Err(error) => error_to_response(&state, &error, StatusCode::INTERNAL_SERVER_ERROR),
}, },
Err(error) => error_to_response(&state, &error, StatusCode::BAD_REQUEST), Err(error) => error_to_response(&state, &error, StatusCode::BAD_REQUEST),
} };
result
} }
fn error_to_response(state: &State, error: &failure::Error, code: StatusCode) -> Response<Body> { fn error_to_response(state: &State, error: &failure::Error, code: StatusCode) -> Response<Body> {
@ -271,8 +302,11 @@ mod tests {
#[test] #[test]
fn get_ui() { fn get_ui() {
let test_server = let test_server = TestServer::new(router(
TestServer::new(router(Arc::new(MemoryDataset::default()), "".to_string())).unwrap(); Arc::new(MemoryRepository::default()),
"".to_string(),
))
.unwrap();
let response = test_server let response = test_server
.client() .client()
.get("http://localhost/") .get("http://localhost/")
@ -283,8 +317,11 @@ mod tests {
#[test] #[test]
fn get_query() { fn get_query() {
let test_server = let test_server = TestServer::new(router(
TestServer::new(router(Arc::new(MemoryDataset::default()), "".to_string())).unwrap(); Arc::new(MemoryRepository::default()),
"".to_string(),
))
.unwrap();
let response = test_server let response = test_server
.client() .client()
.get("http://localhost/query?query=SELECT+*+WHERE+{+?s+?p+?o+}") .get("http://localhost/query?query=SELECT+*+WHERE+{+?s+?p+?o+}")
@ -295,8 +332,11 @@ mod tests {
#[test] #[test]
fn post_query() { fn post_query() {
let test_server = let test_server = TestServer::new(router(
TestServer::new(router(Arc::new(MemoryDataset::default()), "".to_string())).unwrap(); Arc::new(MemoryRepository::default()),
"".to_string(),
))
.unwrap();
let response = test_server let response = test_server
.client() .client()
.post( .post(

Loading…
Cancel
Save